首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >网络流量分析

网络流量分析
EN

Code Review用户
提问于 2020-09-16 19:43:55
回答 1查看 69关注 0票数 1

为了对网络流进行一些分析,我创建了一个简短的Python脚本。

我有一个中型网络(/8到/30掩码)的目录(大约10k的引用),这些网络为我的每个网络(物理站点、VRF (路由实例)等)提供了几个信息。

然后,我有一些庞大的流数据(百万流),其中包含精确的(/32掩码)源和目标IP。

我的主要目标是识别我的/32 IP属于哪个网络,然后识别我的目的地VRF与我的源VRF相同的流。

代码语言:javascript
复制
import xlsxwriter
import csv
from progress.bar import Bar
from ipaddress import IPv4Address, IPv4Network
import operator
import timeit
import os

AnalysedDictionnary = dict()

class Flow:
    def __init__(self, srcIp : IPv4Address, dstIp : IPv4Address, port : int, nbHits : int):
        self.srcIp = srcIp
        self.dstIp = dstIp
        self.port = port
        self.nbHits = nbHits

    def __repr__(self):
        return "<Flow srcIp:%s dstIp:%s port:%s nbHits:%s>" % (self.srcIp, self.dstIp, self.port, self.nbHits)

    def __str__(self):
        return "From str method of Flow: srcIp is %s, dstIp is %s, port is %s, nbHits is %s," % (self.srcIp, self.dstIp, self.port, self.nbHits)

class EntityNetwork:
    def __init__(self, siteId, siteName, vrfId, vrfName, IPV4Networks):
        self.siteId = siteId
        self.siteName = siteName
        self.vrfId = vrfId
        self.vrfName = vrfName
        self.MyIPv4Network = IPV4Networks

class InventoryNetworks:
    # "Reference all current Known Networks"
    def __init__(self, filePath=''):
        self.filePath = filePath
        self.MyEntityNetwork = []
        # "Read File, parse file and populate a list of IPV4Networks"
        with open(filePath) as csvfile:
            reader = csv.DictReader(csvfile, delimiter=',')
            for line in reader:
                self.MyEntityNetwork.append(EntityNetwork(line['Site ID'], line['Site'], line['VRF (VRF ID)'], line['VRF Description'], IPv4Network(line['Subnet'])))
        self.MyEntityNetwork.sort(key=operator.attrgetter('MyIPv4Network'), reverse=True)
            

class FlowCapture:
    # An observed flow
    def __init__(self, filePath=''):
        self.filePath = filePath
        self.myFlows = []
        # "Read File, parse file and populate a list of IPV4Networks"
        file = open(filePath, "r")
        Lines = file.readlines() 
        for line in Lines: 
            splitedLine = line.split(",")
            self.myFlows.append(Flow(IPv4Address(splitedLine[7]), IPv4Address(splitedLine[8]), splitedLine[25], 1))
        tmpMyFlows = set(self.myFlows)
        self.myFlows = list(tmpMyFlows)

class AnalyzedFlow:
    def findSubnet(self, subnet):
        maxMask = 0

        #Check if we already analyzed the network and saved it in the dictionnary
        if subnet in AnalysedDictionnary:
            return AnalysedDictionnary[subnet]

        for flow in self.refNetworks.MyEntityNetwork:
            if flow.MyIPv4Network.overlaps(IPv4Network(IPv4Address(subnet))):
                if flow.MyIPv4Network.prefixlen > maxMask:
                    matchedSubnet = flow
                    maxMask = flow.MyIPv4Network.prefixlen
                    AnalysedDictionnary[subnet] = matchedSubnet
                    #we ordered the subnets by mask size, meaning that we can't find a more precise mask that would match our network, so we can break
                    break
        return matchedSubnet
                    

    def __init__(self, flow: Flow, refNetworks : InventoryNetworks):
        self.srcIp = flow.srcIp
        self.dstIp = flow.dstIp
        self.port = flow.port
        self.nbHits = flow.nbHits
        self.refNetworks = refNetworks
        self.srcSubnet = self.findSubnet(self.srcIp)
        self.dstSubnet = self.findSubnet(self.dstIp)


#list of interresting ports
adPort = ['10','12','14','443']

#Create our Inventory of Networks from reference file
script_dir = os.path.dirname(os.path.abspath(__file__))
rel_path = "InventorySheet.csv"
abs_file_path = os.path.join(script_dir, rel_path)
ReferenceNetworks = InventoryNetworks(abs_file_path)

#Create our flows
rel_path = "Logs_Equipment.txt"
abs_file_path = os.path.join(script_dir, rel_path)
CapturedFlows = FlowCapture(abs_file_path)

#Create our analyzed flows
AnalysedFlows = []
print('its going on')
start_time = timeit.default_timer()

# code you want to evaluate
#[:100] limit to 100 objects in our array so testing time is shorter
bar = Bar('Processing', max=len(CapturedFlows.myFlows[:100]))
for flow in CapturedFlows.myFlows[:100]:
    AnalysedFlows.append(AnalyzedFlow(flow,ReferenceNetworks))
    bar.next()
bar.finish()
elapsed = timeit.default_timer() - start_time
print('we analysed in :', elapsed)

#Filter our analyzed flow
intraVrfFlows = [flow for flow in AnalysedFlows if flow.srcSubnet.vrfId == flow.dstSubnet.vrfId]

adFlows = [flow for flow in AnalysedFlows if flow.port in adPort]

# Create a workbook and add a worksheet.
workbook = xlsxwriter.Workbook('flowAnalysis.xlsx')
worksheetIntraVrfFlows = workbook.add_worksheet('IntraVrfFlows')
worksheetAdFlows = workbook.add_worksheet('AdFlows')

# Start from the first cell. Rows and columns are zero indexed.
def writeSheetIndex(excelSheet):
    excelSheet.write(0,0, 'Source IP')
    excelSheet.write(0,1, 'Source Subnet')
    excelSheet.write(0,2, 'Source VRF')
    excelSheet.write(0,3, 'Source Site')
    excelSheet.write(0,4, 'Port')
    excelSheet.write(0,5, 'Destination IP')
    excelSheet.write(0,6, 'Destination Subnet')
    excelSheet.write(0,7, 'Destination VRF')
    excelSheet.write(0,8, 'Destination Site')

def populateSheetFlow(excelSheet, analyzedFlows):
    nrow = 1
    for flow in analyzedFlows:
        excelSheet.write(nrow,0, str(flow.srcIp))
        excelSheet.write(nrow,1, flow.srcSubnet.MyIPv4Network.with_prefixlen)
        excelSheet.write(nrow,2, flow.srcSubnet.vrfName)
        excelSheet.write(nrow,3, flow.srcSubnet.siteName)
        excelSheet.write(nrow,4, flow.port)
        excelSheet.write(nrow,5, str(flow.dstIp))
        excelSheet.write(nrow,6, flow.dstSubnet.MyIPv4Network.with_prefixlen)
        excelSheet.write(nrow,7, flow.dstSubnet.vrfName)
        excelSheet.write(nrow,8, flow.dstSubnet.siteName)
        if flow.srcSubnet.siteName :
            nrow = nrow + 1


if True :
    writeSheetIndex(worksheetIntraVrfFlows)
    populateSheetFlow(worksheetIntraVrfFlows, intraVrfFlows)
    writeSheetIndex(worksheetAdFlows)
    populateSheetFlow(worksheetAdFlows, adFlows)
    workbook.close()

我发现的前两大改进是:

  • 按掩码大小订购我的网络,这样我就可以在有匹配时立即停止搜索。
  • 创建一个字典,保存我所有的IP //网络匹配,这样如果我已经为这个IP完成了一次工作,我就不必迭代我的整个网络目录了。

还有其他地方可以提高我的脚本的性能吗?到目前为止,我的度量标准是100行6次,1k行30次,10k行120次(一次多行,我的字典就越有用)。

这还不算太糟糕,但我担心要花多长时间来分析一亿行,所以我宁愿事先找到我能找到的所有优化。

EN

回答 1

Code Review用户

发布于 2020-09-16 22:30:07

我认为接下来要研究的是AnalyzedFlow类findSubnet方法中的这个部分:

代码语言:javascript
复制
for flow in self.refNetworks.MyEntityNetwork:
        if flow.MyIPv4Network.overlaps(IPv4Network(IPv4Address(subnet))):
            if flow.MyIPv4Network.prefixlen > maxMask:

首先,您正在构建IPv4Network(IPv4Address(子网)),每次循环迭代一次。相反,做一些类似的事情

代码语言:javascript
复制
subnetNetwork = IPv4Network(IPv4Address(subnet))
for flow in self.refNetworks.MyEntityNetwork:
    if flow.MyIPv4Network.overlaps(subnetNetwork):
        if flow.MyIPv4Network.prefixlen > maxMask:

第二位更复杂。您的流是有序的,但如果您的结构不同,它可能--取决于您的子网--加快速度。

例如,假设您的10,000个网络有几百个网络--它们都匹配模式10.x.y.z/一些掩码,几百个网络都匹配模式11.x.y.z/Some-掩码,几百个网络都匹配模式183.x.y.z/Some-掩码,以此类推。如果您的输入子网是183.22.15.4,理想情况下,您不想检查这10.或者11..。网络,仅仅是从183开始的网络的“桶”。

构建它可能需要一段时间,如果您需要的话,我可以尝试帮助您,但是我会根据网络地址中的第一个八进制构建一个子网,这样您的InventoryNetworks而不是一个列表就会有一个dict。

代码语言:javascript
复制
class InventoryNetworks:
    # "Reference all current Known Networks"
    def __init__(self, filePath=''):
        self.filePath = filePath
        self.MyEntityNetwork = dict()
        # "Read File, parse file and populate a dict of IPV4Networks"
        with open(filePath) as csvfile:
            reader = csv.DictReader(csvfile, delimiter=',')
            for line in reader:
               entityNetwork = EntityNetwork(line['Site ID'], line['Site'], line['VRF (VRF ID)'], line['VRF Description'], IPv4Network(line['Subnet']))
               exploded = entityNetwork.MyIPv4Network.network_address.exploded
               firstOctet = exploded[0:exploded.index(".")]
               prev = MyEntityNetwork[firstOctet]
               if prev is None:
                   MyEntityNetwork[firstOctet] = [ entityNetwork ]
               else:
                   prev.append(entityNetwork)
                   MyEntityNetwork[firstOctet] = prev
           for octet in MyEntityNetwork.keys():
               items = MyEntityNetwork[octet]
               items.sort(key=operator.attrgetter('MyIPv4Network'), reverse=True)
               MyEntityNetwork[octet] = items

接下来,您可以做一些类似于在第一个八进制上匹配的搜索子网的操作,这将提高匹配速度。我还没有包括这段代码,我希望如果你需要的话,你可以把它算出来。

您甚至可以进一步嵌套,并在您的dict中添加其他级别,例如

代码语言:javascript
复制
MyEntityNetwork['183']['22']['15'] = [ list of items that match ]
票数 3
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/249454

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档