我用Python编写了一个简单的防病毒程序,并且希望得到更多的想法来实现,以及一般的代码审查。
到目前为止,它有一个FileScanner来检查一个已知的病毒哈希数据库,还有一个网络扫描器来检查一个可能存在恶意IP地址的数据库。
我很好奇我能做些什么来推进这个项目。
#!/usr/bin/env python
import os
import re
import time
import psutil
import hashlib
import sqlite3
import requests
import threading
from bs4 import BeautifulSoup
from argparse import ArgumentParser
WINDOWS = os.name == 'nt'
if WINDOWS:
from win10toast import ToastNotifier
class DB(object):
# TODO: Log the URLS it's grabbed hashes from
# And check the logged urls and skip over logged urls
# when calling the self.update() function
def __init__(self, db_fp='data.db'):
self.db_fp = db_fp
self.connect()
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
def __repr__(self):
return "<SQLite3 Database: {}>".format(self.db_fp)
def connect(self):
self.conn = sqlite3.connect(self.db_fp)
self.cur = self.conn.cursor()
def close(self):
self.conn.commit()
self.cur.close()
self.conn.close()
def create_tables(self):
self.cur.execute('CREATE TABLE IF NOT EXISTS virus_md5_hashes(md5_hash TEXT NOT NULL UNIQUE)')
self.cur.execute('CREATE TABLE IF NOT EXISTS processed_virusshare_urls(url TEXT NOT NULL UNIQUE)')
self.cur.execute('CREATE TABLE IF NOT EXISTS high_risk_ips(ip TEXT NOT NULL UNIQUE)')
self.conn.commit()
def drop_tables(self):
self.cur.execute('DROP TABLE IF EXISTS virus_md5_hashes')
self.cur.execute('DROP TABLE IF EXISTS processed_virusshare_urls')
self.cur.execute('DROP TABLE IF EXISTS high_risk_ips')
self.conn.commit()
def add(self, table, value):
try:
sql = f"INSERT INTO {table} VALUES (?)"
self.cur.execute(sql, (value,))
except sqlite3.IntegrityError as e:
if 'UNIQUE' in str(e):
pass # Do nothing if trying to add a duplicate value
else:
raise e
def exists(self, vname, table, value):
sql = f"SELECT {vname} FROM {table} WHERE {vname} = (?)"
self.cur.execute(sql, (value,))
return self.cur.fetchone() is not None
def reset(self):
'''
reformats the database, think of it as a fresh-install
'''
# self.drop_tables() # This is soooo slow
self.close()
os.remove(self.db_fp)
self.connect()
self.update()
def update(self):
self.create_tables()
self.update_md5_hashes()
self.update_high_risk_ips()
def update_md5_hashes(self):
'''
updates the sqlite database of known virus md5 hashes
'''
urls = self.get_virusshare_urls()
for n, url in enumerate(urls):
reprint(f"Downloading known virus hashes {n+1}/{len(urls)}")
if not self.exists('url', 'processed_virusshare_urls', url):
for md5_hash in self.get_virusshare_hashes(url):
self.add('virus_md5_hashes', md5_hash)
self.add('processed_virusshare_urls', url)
self.conn.commit()
print()
def get_virusshare_urls(self) -> list:
'''
returns a list of virusshare.com urls containing md5 hashes
'''
r = requests.get('https://virusshare.com/hashes.4n6')
soup = BeautifulSoup(r.content, 'html.parser')
return ["https://virusshare.com/{}".format(a['href']) for a in soup.find_all('a')][6:-2]
def get_virusshare_hashes(self, url) -> str:
'''
parses all the md5 hashes from a valid virusshare.com url
'''
r = requests.get(url)
return r.text.splitlines()[6:]
def update_high_risk_ips(self):
sources = [
'https://blocklist.greensnow.co/greensnow.txt',
'https://cinsscore.com/list/ci-badguys.txt',
'http://danger.rulez.sk/projects/bruteforceblocker/blist.php',
'https://malc0de.com/bl/IP_Blacklist.txt',
'https://rules.emergingthreats.net/blockrules/compromised-ips.txt',
'https://rules.emergingthreats.net/fwrules/emerging-Block-IPs.txt',
'https://check.torproject.org/cgi-bin/TorBulkExitList.py?ip=1.1.1.1',
'https://feodotracker.abuse.ch/blocklist/?download=ipblocklist',
'https://hosts.ubuntu101.co.za/ips.list',
'https://lists.blocklist.de/lists/all.txt',
'https://myip.ms/files/blacklist/general/latest_blacklist.txt',
'https://pgl.yoyo.org/adservers/iplist.php?format=&showintro=0',
'https://ransomwaretracker.abuse.ch/downloads/RW_IPBL.txt',
'https://raw.githubusercontent.com/firehol/blocklist-ipsets/master/stopforumspam_7d.ipset',
'https://www.dan.me.uk/torlist/?exit',
'https://www.malwaredomainlist.com/hostslist/ip.txt',
'https://www.maxmind.com/es/proxy-detection-sample-list',
'https://www.projecthoneypot.org/list_of_ips.php?t=d&rss=1',
'http://www.unsubscore.com/blacklist.txt',
]
for n, source in enumerate(sources):
reprint(f"Downloading ips list: {n+1}/{len(sources)}")
try:
r = requests.get(source)
for ip in re.findall(r'[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}', r.text):
self.add('high_risk_ips', ip)
except requests.exceptions.RequestException:
print(f"Exception at {source}")
print()
class FileScanner(object):
def __init__(self):
self._bad_files = []
def get_files_recursively(self, folder) -> str:
'''
:param folder: directory to resursively check for binary files
:return: generator of all binary files (str == full path)
'''
for folder_name, sub_folder, filenames in os.walk(folder):
for f in filenames:
f = f"{folder_name}/{f}"
yield f
def get_md5(self, fp) -> str:
'''
:param fp: full path to a file
:return: the md5 hash of a file
'''
md5_hash = hashlib.md5()
with open(fp, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
md5_hash.update(chunk)
return md5_hash.hexdigest()
def compare_against_database(self, fp):
if is_binary(fp):
with DB() as db: # db connection has to be called within the same thread accessing the db uhg.jpg
md5_hash = self.get_md5(fp)
if db.exists('md5_hash', 'virus_md5_hashes', md5_hash):
self._bad_files.append(fp)
def scan(self, folder, max_threads=10):
start_time = time.time()
fp_gen = self.get_files_recursively(folder)
count = 0
try:
while True:
if threading.active_count() < max_threads:
fp = next(fp_gen)
t = threading.Thread(target=self.compare_against_database, args=(fp, ))
t.start()
count += 1
s = f'Scanning Files - Threads: {threading.active_count()} Files Scanned: {count} '
reprint(s)
else:
time.sleep(0.01)
except OSError:
print(f"OSError: Bad file descriptor: {fp} {' ' * len(fp)}")
except StopIteration:
end_time = time.time()
reprint(' ' * len(s))
print(f"scanned {count} files in {round(end_time - start_time, 2)} seconds")
for f in self._bad_files:
print(f"INFECTED - {f}")
class NetworkScanner(threading.Thread):
def __init__(self, timer=1):
self._timer = timer
self._running = True
self.update_current_connections()
self._displayed_notifications = []
threading.Thread.__init__(self)
def update_current_connections(self):
self._current_connections = psutil.net_connections()
def scan(self):
with DB() as db:
for conn in self._current_connections:
if conn.status != "NONE" or conn.status != "CLOSE_WAIT":
if db.exists('ip', 'high_risk_ips', conn.laddr.ip):
self.notify(conn.laddr.ip, conn.laddr.port, conn.pid)
if conn.raddr:
if db.exists('ip', 'high_risk_ips', conn.raddr.ip):
self.notify(conn.raddr.ip, conn.raddr.port, conn.pid)
def notify(self, ip, port, pid, duration=10):
title, body = "High Risk Connection", f"{psutil.Process(pid).name()}\n{ip}:{port} - {pid}"
if body not in self._displayed_notifications:
if WINDOWS:
ToastNotifier().show_toast(title, body, duration=duration, threaded=True)
self._displayed_notifications.append(body)
else:
print("{} {}".format(title, body))
self._displayed_notifications.append(body)
def run(self):
print('[+] Network Scanner Initialized')
while self._running:
self.update_current_connections()
self.scan()
time.sleep(self._timer)
def stop(self):
print('[-] Network Scanner Stopping')
self._running = False
def is_binary(fp, chunksize=1024) -> bool:
"""Return true if the given filename is binary.
@raise EnvironmentError: if the file does not exist or cannot be accessed.
@attention: found @ http://bytes.com/topic/python/answers/21222-determine-file-type-binary-text on 6/08/2010
@author: Trent Mick <TrentM@ActiveState.com>
@author: Jorge Orpinel <jorge@orpinel.com>"""
try:
with open(fp, 'rb') as f:
while True:
chunk = f.read(chunksize)
if b'\0' in chunk: # found null byte
return True
if len(chunk) < chunksize:
break
except PermissionError:
print(f"Permission Error: {fp} {' ' * len(fp)}")
return False
def reprint(s):
print(s, end='')
print('\r' * len(s), end='')
def parse_args():
parser = ArgumentParser()
parser.add_argument('path', default=os.getcwd(), type=str, help="path to scan")
parser.add_argument('-u', '--update', action="store_true", default=False, help="updates database of virus definitions & high risk IP's")
parser.add_argument('-t', '--threads', default=20, type=int, help="max threads for file scanner")
return parser.parse_args()
def Main():
# Testing for now
args = parse_args()
if args.update:
with DB() as db:
print('[+] Updating database')
db.update()
nsc = NetworkScanner()
nsc.start()
FileScanner().scan(args.path, args.threads)
nsc.stop()
if __name__ == '__main__':
Main()发布于 2020-03-23 07:37:55
数据库的一个快速性能提升是使用可以同时插入多个行的事实。它可以在代码中用于IP和散列,所以这非常有用:
def add_multiple(self, table, values):
sql = f"INSERT OR IGNORE INTO {table} VALUES (?)"
self.cur.executemany(sql, [(value,) for value in values])注意,我使用INSERT OR IGNORE来忽略已经存在的行。请注意,此命令容易受到SQL-注入的影响,因为table的恶意值可以在此命令中执行任何操作(与您的命令相同)。在这种情况下,避免这种情况应该相当容易,因为您知道所有合法的表名,所以只需显式地将它们白名单。
def __init__(self, ...):
...
self.tables = {"virus_md5_hashes",
"processed_virusshare_urls",
"high_risk_ips"}
def add(self, table, value):
if table not in self.tables:
raise ValueError("This table does not exist")
sql = f"INSERT OR IGNORE INTO {table} VALUES (?)"
self.cur.execute(sql, (value,))
def add_multiple(self, table, values):
if table not in self.tables:
raise ValueError("This table does not exist")
sql = f"INSERT OR IGNORE INTO {table} VALUES (?)"
self.cur.executemany(sql, [(value,) for value in values])要使多个insert工作,只需稍微修改更新功能:
def update_md5_hashes(self):
'''
updates the sqlite database of known virus md5 hashes
'''
for n, url in enumerate(self.virusshare_urls):
reprint(f"Downloading known virus hashes {n+1}/{len(urls)}")
if not self.exists('url', 'processed_virusshare_urls', url):
self.add_multiple('virus_md5_hashes', self.get_virusshare_hashes(url))
self.add('processed_virusshare_urls', url)
self.conn.commit()
print()
IP_ADDRESS = re.compile(r'[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}')
def update_high_risk_ips(self):
for n, source in enumerate(self.ip_blacklists):
reprint(f"Downloading ips list: {n+1}/{len(sources)}")
try:
r = requests.get(source)
self.add_many('high_risk_ips', IP_ADDRESS.findall(r.text))
except requests.exceptions.RequestException:
print(f"Exception at {source}")
print()我还会将您的virusshare URL和黑名单中的IP源作为类的一个属性,以便您可以在运行时更改它(如果需要的话)。如果您不喜欢更改这些属性,但仍然希望它们是可访问的,您也可以将它们设置为属性。
请注意,在第一个函数中,您确实有一个self.conn.commit (我将它移到if下,如果您没有执行任何操作,则不需要提交),但在后者中没有。这可能是个窃听器。
https://codereview.stackexchange.com/questions/239297
复制相似问题