为了对网络流进行一些分析,我创建了一个简短的Python脚本。
我有一个中型网络(/8到/30掩码)的目录(大约10k的引用),这些网络为我的每个网络(物理站点、VRF (路由实例)等)提供了几个信息。
然后,我有一些庞大的流数据(百万流),其中包含精确的(/32掩码)源和目标IP。
我的主要目标是识别我的/32 IP属于哪个网络,然后识别我的目的地VRF与我的源VRF相同的流。
import xlsxwriter
import csv
from progress.bar import Bar
from ipaddress import IPv4Address, IPv4Network
import operator
import timeit
import os
AnalysedDictionnary = dict()
class Flow:
def __init__(self, srcIp : IPv4Address, dstIp : IPv4Address, port : int, nbHits : int):
self.srcIp = srcIp
self.dstIp = dstIp
self.port = port
self.nbHits = nbHits
def __repr__(self):
return "<Flow srcIp:%s dstIp:%s port:%s nbHits:%s>" % (self.srcIp, self.dstIp, self.port, self.nbHits)
def __str__(self):
return "From str method of Flow: srcIp is %s, dstIp is %s, port is %s, nbHits is %s," % (self.srcIp, self.dstIp, self.port, self.nbHits)
class EntityNetwork:
def __init__(self, siteId, siteName, vrfId, vrfName, IPV4Networks):
self.siteId = siteId
self.siteName = siteName
self.vrfId = vrfId
self.vrfName = vrfName
self.MyIPv4Network = IPV4Networks
class InventoryNetworks:
# "Reference all current Known Networks"
def __init__(self, filePath=''):
self.filePath = filePath
self.MyEntityNetwork = []
# "Read File, parse file and populate a list of IPV4Networks"
with open(filePath) as csvfile:
reader = csv.DictReader(csvfile, delimiter=',')
for line in reader:
self.MyEntityNetwork.append(EntityNetwork(line['Site ID'], line['Site'], line['VRF (VRF ID)'], line['VRF Description'], IPv4Network(line['Subnet'])))
self.MyEntityNetwork.sort(key=operator.attrgetter('MyIPv4Network'), reverse=True)
class FlowCapture:
# An observed flow
def __init__(self, filePath=''):
self.filePath = filePath
self.myFlows = []
# "Read File, parse file and populate a list of IPV4Networks"
file = open(filePath, "r")
Lines = file.readlines()
for line in Lines:
splitedLine = line.split(",")
self.myFlows.append(Flow(IPv4Address(splitedLine[7]), IPv4Address(splitedLine[8]), splitedLine[25], 1))
tmpMyFlows = set(self.myFlows)
self.myFlows = list(tmpMyFlows)
class AnalyzedFlow:
def findSubnet(self, subnet):
maxMask = 0
#Check if we already analyzed the network and saved it in the dictionnary
if subnet in AnalysedDictionnary:
return AnalysedDictionnary[subnet]
for flow in self.refNetworks.MyEntityNetwork:
if flow.MyIPv4Network.overlaps(IPv4Network(IPv4Address(subnet))):
if flow.MyIPv4Network.prefixlen > maxMask:
matchedSubnet = flow
maxMask = flow.MyIPv4Network.prefixlen
AnalysedDictionnary[subnet] = matchedSubnet
#we ordered the subnets by mask size, meaning that we can't find a more precise mask that would match our network, so we can break
break
return matchedSubnet
def __init__(self, flow: Flow, refNetworks : InventoryNetworks):
self.srcIp = flow.srcIp
self.dstIp = flow.dstIp
self.port = flow.port
self.nbHits = flow.nbHits
self.refNetworks = refNetworks
self.srcSubnet = self.findSubnet(self.srcIp)
self.dstSubnet = self.findSubnet(self.dstIp)
#list of interresting ports
adPort = ['10','12','14','443']
#Create our Inventory of Networks from reference file
script_dir = os.path.dirname(os.path.abspath(__file__))
rel_path = "InventorySheet.csv"
abs_file_path = os.path.join(script_dir, rel_path)
ReferenceNetworks = InventoryNetworks(abs_file_path)
#Create our flows
rel_path = "Logs_Equipment.txt"
abs_file_path = os.path.join(script_dir, rel_path)
CapturedFlows = FlowCapture(abs_file_path)
#Create our analyzed flows
AnalysedFlows = []
print('its going on')
start_time = timeit.default_timer()
# code you want to evaluate
#[:100] limit to 100 objects in our array so testing time is shorter
bar = Bar('Processing', max=len(CapturedFlows.myFlows[:100]))
for flow in CapturedFlows.myFlows[:100]:
AnalysedFlows.append(AnalyzedFlow(flow,ReferenceNetworks))
bar.next()
bar.finish()
elapsed = timeit.default_timer() - start_time
print('we analysed in :', elapsed)
#Filter our analyzed flow
intraVrfFlows = [flow for flow in AnalysedFlows if flow.srcSubnet.vrfId == flow.dstSubnet.vrfId]
adFlows = [flow for flow in AnalysedFlows if flow.port in adPort]
# Create a workbook and add a worksheet.
workbook = xlsxwriter.Workbook('flowAnalysis.xlsx')
worksheetIntraVrfFlows = workbook.add_worksheet('IntraVrfFlows')
worksheetAdFlows = workbook.add_worksheet('AdFlows')
# Start from the first cell. Rows and columns are zero indexed.
def writeSheetIndex(excelSheet):
excelSheet.write(0,0, 'Source IP')
excelSheet.write(0,1, 'Source Subnet')
excelSheet.write(0,2, 'Source VRF')
excelSheet.write(0,3, 'Source Site')
excelSheet.write(0,4, 'Port')
excelSheet.write(0,5, 'Destination IP')
excelSheet.write(0,6, 'Destination Subnet')
excelSheet.write(0,7, 'Destination VRF')
excelSheet.write(0,8, 'Destination Site')
def populateSheetFlow(excelSheet, analyzedFlows):
nrow = 1
for flow in analyzedFlows:
excelSheet.write(nrow,0, str(flow.srcIp))
excelSheet.write(nrow,1, flow.srcSubnet.MyIPv4Network.with_prefixlen)
excelSheet.write(nrow,2, flow.srcSubnet.vrfName)
excelSheet.write(nrow,3, flow.srcSubnet.siteName)
excelSheet.write(nrow,4, flow.port)
excelSheet.write(nrow,5, str(flow.dstIp))
excelSheet.write(nrow,6, flow.dstSubnet.MyIPv4Network.with_prefixlen)
excelSheet.write(nrow,7, flow.dstSubnet.vrfName)
excelSheet.write(nrow,8, flow.dstSubnet.siteName)
if flow.srcSubnet.siteName :
nrow = nrow + 1
if True :
writeSheetIndex(worksheetIntraVrfFlows)
populateSheetFlow(worksheetIntraVrfFlows, intraVrfFlows)
writeSheetIndex(worksheetAdFlows)
populateSheetFlow(worksheetAdFlows, adFlows)
workbook.close()我发现的前两大改进是:
还有其他地方可以提高我的脚本的性能吗?到目前为止,我的度量标准是100行6次,1k行30次,10k行120次(一次多行,我的字典就越有用)。
这还不算太糟糕,但我担心要花多长时间来分析一亿行,所以我宁愿事先找到我能找到的所有优化。
发布于 2020-09-16 22:30:07
我认为接下来要研究的是AnalyzedFlow类findSubnet方法中的这个部分:
for flow in self.refNetworks.MyEntityNetwork:
if flow.MyIPv4Network.overlaps(IPv4Network(IPv4Address(subnet))):
if flow.MyIPv4Network.prefixlen > maxMask:首先,您正在构建IPv4Network(IPv4Address(子网)),每次循环迭代一次。相反,做一些类似的事情
subnetNetwork = IPv4Network(IPv4Address(subnet))
for flow in self.refNetworks.MyEntityNetwork:
if flow.MyIPv4Network.overlaps(subnetNetwork):
if flow.MyIPv4Network.prefixlen > maxMask:第二位更复杂。您的流是有序的,但如果您的结构不同,它可能--取决于您的子网--加快速度。
例如,假设您的10,000个网络有几百个网络--它们都匹配模式10.x.y.z/一些掩码,几百个网络都匹配模式11.x.y.z/Some-掩码,几百个网络都匹配模式183.x.y.z/Some-掩码,以此类推。如果您的输入子网是183.22.15.4,理想情况下,您不想检查这10.或者11..。网络,仅仅是从183开始的网络的“桶”。
构建它可能需要一段时间,如果您需要的话,我可以尝试帮助您,但是我会根据网络地址中的第一个八进制构建一个子网,这样您的InventoryNetworks而不是一个列表就会有一个dict。
class InventoryNetworks:
# "Reference all current Known Networks"
def __init__(self, filePath=''):
self.filePath = filePath
self.MyEntityNetwork = dict()
# "Read File, parse file and populate a dict of IPV4Networks"
with open(filePath) as csvfile:
reader = csv.DictReader(csvfile, delimiter=',')
for line in reader:
entityNetwork = EntityNetwork(line['Site ID'], line['Site'], line['VRF (VRF ID)'], line['VRF Description'], IPv4Network(line['Subnet']))
exploded = entityNetwork.MyIPv4Network.network_address.exploded
firstOctet = exploded[0:exploded.index(".")]
prev = MyEntityNetwork[firstOctet]
if prev is None:
MyEntityNetwork[firstOctet] = [ entityNetwork ]
else:
prev.append(entityNetwork)
MyEntityNetwork[firstOctet] = prev
for octet in MyEntityNetwork.keys():
items = MyEntityNetwork[octet]
items.sort(key=operator.attrgetter('MyIPv4Network'), reverse=True)
MyEntityNetwork[octet] = items接下来,您可以做一些类似于在第一个八进制上匹配的搜索子网的操作,这将提高匹配速度。我还没有包括这段代码,我希望如果你需要的话,你可以把它算出来。
您甚至可以进一步嵌套,并在您的dict中添加其他级别,例如
MyEntityNetwork['183']['22']['15'] = [ list of items that match ]https://codereview.stackexchange.com/questions/249454
复制相似问题