由于我是python编程的新手,我想根据文章中的表格加载数据,但我不知道如何将NSL_KDD数据集进行分类训练和测试(‘normal’,‘dos’,‘R2L’,‘probe’,‘u2r’)。

我已经检查了GateHub中的许多代码来预处理NSL_KDD数据集,将其分为五组(‘normal’,‘dos’,‘R2L’,‘probe’,‘u2r’),但我仍然找不到正确的代码。有谁可以帮我?我真的需要帮助。
发布于 2020-05-27 00:56:17
这段代码用于加载你可以找到如何PCA algorithm is used for visualization purposes. It's also used later as preprocessing for Gaussian Mixture clustering.
import os
import math
import itertools
import multiprocessing
import pandas
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from time import time
from collections import OrderedDict
%matplotlib inline
gt0 = time()
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
conf = SparkConf()\
.setMaster(f"local[{multiprocessing.cpu_count()}]")\
.setAppName("PySpark NSL-KDD")\
.setAll([("spark.driver.memory", "8g"), ("spark.default.parallelism", f"{multiprocessing.cpu_count()}")])
# Creating local SparkContext with specified SparkConf and creating SQLContext based on it
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel('INFO')
sqlContext = SQLContext(sc)
from pyspark.sql.types import *
from pyspark.sql.functions import udf, split, col
import pyspark.sql.functions as sql
train20_nsl_kdd_dataset_path = os.path.join("NSL_KDD_Dataset", "KDDTrain+_20Percent.txt")
train_nsl_kdd_dataset_path = os.path.join("NSL_KDD_Dataset", "KDDTrain+.txt")
test_nsl_kdd_dataset_path = os.path.join("NSL_KDD_Dataset", "KDDTest+.txt")
col_names = np.array(["duration","protocol_type","service","flag","src_bytes",
"dst_bytes","land","wrong_fragment","urgent","hot","num_failed_logins",
"logged_in","num_compromised","root_shell","su_attempted","num_root",
"num_file_creations","num_shells","num_access_files","num_outbound_cmds",
"is_host_login","is_guest_login","count","srv_count","serror_rate",
"srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate",
"diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count",
"dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate",
"dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate",
"dst_host_rerror_rate","dst_host_srv_rerror_rate","labels"])
nominal_inx = [1, 2, 3]
binary_inx = [6, 11, 13, 14, 20, 21]
numeric_inx = list(set(range(41)).difference(nominal_inx).difference(binary_inx))
nominal_inx = [1, 2, 3]
binary_inx = [6, 11, 13, 14, 20, 21]
numeric_inx = list(set(range(41)).difference(nominal_inx).difference(binary_inx))
nominal_cols = col_names[nominal_inx].tolist()
binary_cols = col_names[binary_inx].tolist()
numeric_cols = col_names[numeric_inx].tolist()
# Function to load dataset and divide it into 8 partitions
def load_dataset(path):
dataset_rdd = sc.textFile(path, 8).map(lambda line: line.split(','))
dataset_df = (dataset_rdd.toDF(col_names.tolist()).select(
col('duration').cast(DoubleType()),
col('protocol_type').cast(StringType()),
col('service').cast(StringType()),
col('flag').cast(StringType()),
col('src_bytes').cast(DoubleType()),
col('dst_bytes').cast(DoubleType()),
col('land').cast(DoubleType()),
col('wrong_fragment').cast(DoubleType()),
col('urgent').cast(DoubleType()),
col('hot').cast(DoubleType()),
col('num_failed_logins').cast(DoubleType()),
col('logged_in').cast(DoubleType()),
col('num_compromised').cast(DoubleType()),
col('root_shell').cast(DoubleType()),
col('su_attempted').cast(DoubleType()),
col('num_root').cast(DoubleType()),
col('num_file_creations').cast(DoubleType()),
col('num_shells').cast(DoubleType()),
col('num_access_files').cast(DoubleType()),
col('num_outbound_cmds').cast(DoubleType()),
col('is_host_login').cast(DoubleType()),
col('is_guest_login').cast(DoubleType()),
col('count').cast(DoubleType()),
col('srv_count').cast(DoubleType()),
col('serror_rate').cast(DoubleType()),
col('srv_serror_rate').cast(DoubleType()),
col('rerror_rate').cast(DoubleType()),
col('srv_rerror_rate').cast(DoubleType()),
col('same_srv_rate').cast(DoubleType()),
col('diff_srv_rate').cast(DoubleType()),
col('srv_diff_host_rate').cast(DoubleType()),
col('dst_host_count').cast(DoubleType()),
col('dst_host_srv_count').cast(DoubleType()),
col('dst_host_same_srv_rate').cast(DoubleType()),
col('dst_host_diff_srv_rate').cast(DoubleType()),
col('dst_host_same_src_port_rate').cast(DoubleType()),
col('dst_host_srv_diff_host_rate').cast(DoubleType()),
col('dst_host_serror_rate').cast(DoubleType()),
col('dst_host_srv_serror_rate').cast(DoubleType()),
col('dst_host_rerror_rate').cast(DoubleType()),
col('dst_host_srv_rerror_rate').cast(DoubleType()),
col('labels').cast(StringType())))
return dataset_df
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import StringIndexer
from pyspark import keyword_only
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
# Dictionary that contains mapping of various attacks to the four main categories
attack_dict = {
'normal': 'normal',
'back': 'DoS',
'land': 'DoS',
'neptune': 'DoS',
'pod': 'DoS',
'smurf': 'DoS',
'teardrop': 'DoS',
'mailbomb': 'DoS',
'apache2': 'DoS',
'processtable': 'DoS',
'udpstorm': 'DoS',
'ipsweep': 'Probe',
'nmap': 'Probe',
'portsweep': 'Probe',
'satan': 'Probe',
'mscan': 'Probe',
'saint': 'Probe',
'ftp_write': 'R2L',
'guess_passwd': 'R2L',
'imap': 'R2L',
'multihop': 'R2L',
'phf': 'R2L',
'spy': 'R2L',
'warezclient': 'R2L',
'warezmaster': 'R2L',
'sendmail': 'R2L',
'named': 'R2L',
'snmpgetattack': 'R2L',
'snmpguess': 'R2L',
'xlock': 'R2L',
'xsnoop': 'R2L',
'worm': 'R2L',
'buffer_overflow': 'U2R',
'loadmodule': 'U2R',
'perl': 'U2R',
'rootkit': 'U2R',
'httptunnel': 'U2R',
'ps': 'U2R',
'sqlattack': 'U2R',
'xterm': 'U2R'
}
attack_mapping_udf = udf(lambda v: attack_dict[v])
class Labels2Converter(Transformer):
@keyword_only
def __init__(self):
super(Labels2Converter, self).__init__()
def _transform(self, dataset):
return dataset.withColumn('labels2', sql.regexp_replace(col('labels'), '^(?!normal).*$', 'attack'))
class Labels5Converter(Transformer):
@keyword_only
def __init__(self):
super(Labels5Converter, self).__init__()
def _transform(self, dataset):
return dataset.withColumn('labels5', attack_mapping_udf(col('labels')))
labels2_indexer = StringIndexer(inputCol="labels2", outputCol="labels2_index")
labels5_indexer = StringIndexer(inputCol="labels5", outputCol="labels5_index")
labels_mapping_pipeline = Pipeline(stages=[Labels2Converter(), Labels5Converter(), labels2_indexer, labels5_indexer])
labels2 = ['normal', 'attack']
labels5 = ['normal', 'DoS', 'Probe', 'R2L', 'U2R']
labels_col = 'labels2_index'
# Loading train data
t0 = time()
train_df = load_dataset(train_nsl_kdd_dataset_path)
# Fitting preparation pipeline
labels_mapping_model = labels_mapping_pipeline.fit(train_df)
# Transforming labels column and adding id column
train_df = labels_mapping_model.transform(train_df).withColumn('id', sql.monotonically_increasing_id())
train_df = train_df.cache()
print(f"Number of examples in train set: {train_df.count()}")
print(f"Time: {time() - t0:.2f}s")
# Loading test data
t0 = time()
test_df = load_dataset(test_nsl_kdd_dataset_path)
# Transforming labels column and adding id column
test_df = labels_mapping_model.transform(test_df).withColumn('id', sql.monotonically_increasing_id())
test_df = test_df.cache()
print(f"Number of examples in test set: {test_df.count()}")
print(f"Time: {time() - t0:.2f}s")https://stackoverflow.com/questions/61778582
复制相似问题