首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何检测SSTP流量?

如何检测SSTP流量?
EN

Stack Overflow用户
提问于 2017-01-08 14:40:28
回答 1查看 865关注 0票数 3

我正在编写一个程序,它应该检测VPN流量。据我所读到的关于这个主题的文章,似乎隧道协议的检测就像防火墙规则一样容易,使用它们的专用端口:

PPTP:端口1723/TCP

OpenVPN: 1194端口

L2TP:端口1701/UDP

我的问题在于SSTP,因为它使用的是广泛使用的端口443。

所以我有两个问题:

  1. 我是不是太天真了,以为我只能通过端口检测到那些VPN隧道协议?
  2. 有没有人知道如何检测SSTP,并将其流量与使用TLS/SSL (甚至使用DPI)的其他类型/应用程序区别开来?

我正在附加一段Python代码,它检测上述端口中的通信

代码语言:javascript
复制
import dpkt
import socket

# -------------------------- Globals

# VPN PORTS
import common
import dal

protocols_strs = {"pp2e_gre": "1723/TCP PP2P_GRE_PORT",
                  "openvpn": "1194 OPENVPN_PORT",
                  "ike": "500/UDP IKE_PORT",
                  "l2tp_ipsec": "1701/UDP L2TP_IPSEC_PORT"
                  }

port_protocols = {1723: 'pp2e_gre',
                  1194: 'openvpn',
                  500: 'ike',
                  1701: 'l2tp_ipsec'
                  }

# Dict of sets holding the protocols sessions
protocol_sessions = {"pp2e_gre": [],
                     "openvpn": [],
                     "ike": [],
                     "l2tp_ipsec": []}


# -------------------------- Functions
def is_bidirectional(five_tuple, protocol):
    """
    Given a tuple and protocol check if the connection is bidirectional in the protocol
    :param five_tuple:
    :return: True of the connection is bidirectional False otherwise
    """
    src_ip = five_tuple['src_ip']
    dest_ip = five_tuple['dest_ip']

    # Filter the sessions the five tuple's ips spoke in
    ike_sessions = filter(lambda session: (session['src_ip'] == src_ip and session['dest_ip'] == dest_ip)
                                          or
                                          (session['dest_ip'] == src_ip and session['src_ip'] == dest_ip),
                          protocol_sessions[protocol])
    # Return true if 2 session (1 for each direction) were found
    return len(ike_sessions) == 2


def print_alert(timestamp, protocol, five_tuple):
    """
    Print alert description to std
    :param timestamp:
    :param protocol:
    :param five_tuple:
    :return:
    """
    print timestamp, ":\t detected port %s communication (%s:%s ---> %s:%s)" % \
                     (protocol, five_tuple['src_ip'], five_tuple['src_port'], five_tuple['dest_ip'],
                      five_tuple['dest_port'])


def pp2e_gre_openvpn_ike_handler(five_tuple):
    # Get protocol
    protocol = five_tuple['protocol']

    # Clear old sessions in db
    dal.remove_old_sessions(five_tuple['timestamp'], 'vpn_sessions')

    # Clear old sessions in cache
    protocol_sessions[protocol] = common.clear_old_sessions(five_tuple, protocol_sessions[protocol])

    # If session already exists - return
    if common.check_if_session_exists(five_tuple, protocol_sessions[protocol]):
        session_to_update = common.get_session(five_tuple, protocol_sessions[protocol])
        session_to_update['timestamp'] = five_tuple['timestamp']
        return

    # Update DB
    dal.upsert_vpn_session(five_tuple)

    # Add to cache
    protocol_sessions[protocol].append(five_tuple)

    # Print alert
    print_alert(five_tuple['timestamp'], protocols_strs[protocol], five_tuple)


def l2tp_ipsec_handler(five_tuple):
    if five_tuple in protocol_sessions['l2tp_ipsec']:
        return

    # If bi-directional IKE protocol performed earlier - alert
    if not is_bidirectional(five_tuple, 'ike'):
        return

    protocol_sessions['l2tp_ipsec'].append(five_tuple)
    print_alert(five_tuple['timestamp'], protocols_strs['l2tp_ipsec'], five_tuple)


# -------------------------- VPN ports jump tables
tcp_vpn_ports = {1723: pp2e_gre_openvpn_ike_handler,
                 1194: pp2e_gre_openvpn_ike_handler}

udp_vpn_ports = {500: pp2e_gre_openvpn_ike_handler,
                 1701: l2tp_ipsec_handler,
                 1194: pp2e_gre_openvpn_ike_handler}


# -------------------------- Functions
def process_packet(timestamp, packet):
    """
    Given a packet process it for detecting a VPN communication

    :param packet:
    :param timestamp:
    :return:
    """
    # Parse the input
    eth_frame = dpkt.ethernet.Ethernet(packet)

    # Check if IP
    if eth_frame.type != dpkt.ethernet.ETH_TYPE_IP:
        return

    # If not IP return
    ip_frame = eth_frame.data

    # if TCP or UDP
    if ip_frame.p not in (dpkt.ip.IP_PROTO_TCP, dpkt.ip.IP_PROTO_UDP):
        return

    # Extract L3 frame
    frame = ip_frame.data

    # Extract ports
    frame_ports = (frame.sport, frame.dport)

    # get VPN ports in session
    detected_ports = set(tcp_vpn_ports).intersection(frame_ports)

    # If TCP VPN port was not detected return
    if not detected_ports:
        return

    # Get detected port
    port = detected_ports.pop()

    # Translate port to str
    protocol_str = port_protocols[port]

    # Choose handler by port
    handler = tcp_vpn_ports[port]

    # Extract 5-tuple parameters from frames
    five_tuple = {'src_ip': socket.inet_ntoa(ip_frame.src),
                  'dest_ip': socket.inet_ntoa(ip_frame.dst),
                  'src_port': frame.sport,
                  'dest_port': frame.dport,
                  'protocol': protocol_str,
                  'timestamp': timestamp}

    # Invoke the chosen handler
    handler(five_tuple)
EN

回答 1

Stack Overflow用户

发布于 2017-01-08 15:02:02

  1. “我是不是太天真了,认为我只能通过端口检测到那些VPN隧道协议?”

官场 OpenVPN端口号为1194,但在1到65535之间的任何端口号都可以工作。如果不提供‘端口’选项,将使用1194。

因此,如果您的代码正在寻找1194通信量,根据字典条目,您将只捕获默认的Open流。

  1. HTTPS协议的通道对消息进行加密。所以我看不出你是如何识别这个流量的,因为它是加密的。(来源
票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41533864

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档