首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >简单抗病毒

简单抗病毒
EN

Code Review用户
提问于 2020-03-23 01:33:14
回答 1查看 379关注 0票数 6

我用Python编写了一个简单的防病毒程序,并且希望得到更多的想法来实现,以及一般的代码审查。

到目前为止,它有一个FileScanner来检查一个已知的病毒哈希数据库,还有一个网络扫描器来检查一个可能存在恶意IP地址的数据库。

我很好奇我能做些什么来推进这个项目。

代码语言:javascript
复制
#!/usr/bin/env python

import os
import re
import time
import psutil
import hashlib
import sqlite3
import requests
import threading
from bs4 import BeautifulSoup
from argparse import ArgumentParser


WINDOWS = os.name == 'nt'
if WINDOWS:
    from win10toast import ToastNotifier


class DB(object):
    # TODO: Log the URLS it's grabbed hashes from
    # And check the logged urls and skip over logged urls
    # when calling the self.update() function
    def __init__(self, db_fp='data.db'):
        self.db_fp = db_fp
        self.connect()

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        self.close()

    def __repr__(self):
        return "<SQLite3 Database: {}>".format(self.db_fp)

    def connect(self):
        self.conn = sqlite3.connect(self.db_fp)
        self.cur = self.conn.cursor()

    def close(self):
        self.conn.commit()
        self.cur.close()
        self.conn.close()

    def create_tables(self):
        self.cur.execute('CREATE TABLE IF NOT EXISTS virus_md5_hashes(md5_hash TEXT NOT NULL UNIQUE)')
        self.cur.execute('CREATE TABLE IF NOT EXISTS processed_virusshare_urls(url TEXT NOT NULL UNIQUE)')
        self.cur.execute('CREATE TABLE IF NOT EXISTS high_risk_ips(ip TEXT NOT NULL UNIQUE)')
        self.conn.commit()

    def drop_tables(self):
        self.cur.execute('DROP TABLE IF EXISTS virus_md5_hashes')
        self.cur.execute('DROP TABLE IF EXISTS processed_virusshare_urls')
        self.cur.execute('DROP TABLE IF EXISTS high_risk_ips')
        self.conn.commit()

    def add(self, table, value):
        try:
            sql = f"INSERT INTO {table} VALUES (?)"
            self.cur.execute(sql, (value,))
        except sqlite3.IntegrityError as e:
            if 'UNIQUE' in str(e):
                pass # Do nothing if trying to add a duplicate value
            else:
                raise e

    def exists(self, vname, table, value):
        sql = f"SELECT {vname} FROM {table} WHERE {vname} = (?)"
        self.cur.execute(sql, (value,))
        return self.cur.fetchone() is not None

    def reset(self):
        '''
        reformats the database, think of it as a fresh-install
        '''
        # self.drop_tables() # This is soooo slow
        self.close()
        os.remove(self.db_fp)
        self.connect()
        self.update()

    def update(self):
        self.create_tables()
        self.update_md5_hashes()
        self.update_high_risk_ips()

    def update_md5_hashes(self):
        '''
        updates the sqlite database of known virus md5 hashes
        '''
        urls = self.get_virusshare_urls()
        for n, url in enumerate(urls):
            reprint(f"Downloading known virus hashes {n+1}/{len(urls)}")
            if not self.exists('url', 'processed_virusshare_urls', url):
                for md5_hash in self.get_virusshare_hashes(url):
                    self.add('virus_md5_hashes', md5_hash)
                self.add('processed_virusshare_urls', url)
            self.conn.commit()
        print()

    def get_virusshare_urls(self) -> list:
        '''
        returns a list of virusshare.com urls containing md5 hashes
        '''
        r = requests.get('https://virusshare.com/hashes.4n6')
        soup = BeautifulSoup(r.content, 'html.parser')
        return ["https://virusshare.com/{}".format(a['href']) for a in soup.find_all('a')][6:-2]

    def get_virusshare_hashes(self, url) -> str:
        '''
        parses all the md5 hashes from a valid virusshare.com url
        '''
        r = requests.get(url)
        return r.text.splitlines()[6:]

    def update_high_risk_ips(self):
        sources = [
            'https://blocklist.greensnow.co/greensnow.txt',
            'https://cinsscore.com/list/ci-badguys.txt',
            'http://danger.rulez.sk/projects/bruteforceblocker/blist.php',
            'https://malc0de.com/bl/IP_Blacklist.txt',
            'https://rules.emergingthreats.net/blockrules/compromised-ips.txt',
            'https://rules.emergingthreats.net/fwrules/emerging-Block-IPs.txt',
            'https://check.torproject.org/cgi-bin/TorBulkExitList.py?ip=1.1.1.1',
            'https://feodotracker.abuse.ch/blocklist/?download=ipblocklist',
            'https://hosts.ubuntu101.co.za/ips.list',
            'https://lists.blocklist.de/lists/all.txt',
            'https://myip.ms/files/blacklist/general/latest_blacklist.txt',
            'https://pgl.yoyo.org/adservers/iplist.php?format=&showintro=0',
            'https://ransomwaretracker.abuse.ch/downloads/RW_IPBL.txt',
            'https://raw.githubusercontent.com/firehol/blocklist-ipsets/master/stopforumspam_7d.ipset',
            'https://www.dan.me.uk/torlist/?exit',
            'https://www.malwaredomainlist.com/hostslist/ip.txt',
            'https://www.maxmind.com/es/proxy-detection-sample-list',
            'https://www.projecthoneypot.org/list_of_ips.php?t=d&rss=1',
            'http://www.unsubscore.com/blacklist.txt',
        ]
        for n, source in enumerate(sources):
            reprint(f"Downloading ips list: {n+1}/{len(sources)}")
            try:
                r = requests.get(source)
                for ip in re.findall(r'[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}', r.text):
                    self.add('high_risk_ips', ip)
            except requests.exceptions.RequestException:
                print(f"Exception at {source}")
        print()


class FileScanner(object):
    def __init__(self):
        self._bad_files = []

    def get_files_recursively(self, folder) -> str:
        '''
        :param folder: directory to resursively check for binary files
        :return: generator of all binary files (str == full path)
        '''
        for folder_name, sub_folder, filenames in os.walk(folder):
            for f in filenames:
                f = f"{folder_name}/{f}"
                yield f

    def get_md5(self, fp) -> str:
        '''
        :param fp: full path to a file
        :return: the md5 hash of a file
        '''
        md5_hash = hashlib.md5()
        with open(fp, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                md5_hash.update(chunk)
        return md5_hash.hexdigest()

    def compare_against_database(self, fp):
        if is_binary(fp):
            with DB() as db: # db connection has to be called within the same thread accessing the db uhg.jpg
                md5_hash = self.get_md5(fp)
                if db.exists('md5_hash', 'virus_md5_hashes', md5_hash):
                    self._bad_files.append(fp)

    def scan(self, folder, max_threads=10):
        start_time = time.time()
        fp_gen = self.get_files_recursively(folder)
        count = 0
        try:
            while True:
                if threading.active_count() < max_threads:
                    fp = next(fp_gen)
                    t = threading.Thread(target=self.compare_against_database, args=(fp, ))
                    t.start()
                    count += 1
                    s = f'Scanning Files - Threads: {threading.active_count()}    Files Scanned: {count}     '
                    reprint(s)
                else:
                    time.sleep(0.01)
        except OSError:
            print(f"OSError: Bad file descriptor: {fp} {' ' * len(fp)}")
        except StopIteration:
            end_time = time.time()
            reprint(' ' * len(s))
            print(f"scanned {count} files in {round(end_time - start_time, 2)} seconds")
            for f in self._bad_files:
                print(f"INFECTED - {f}")


class NetworkScanner(threading.Thread):
    def __init__(self, timer=1):
        self._timer = timer
        self._running = True
        self.update_current_connections()
        self._displayed_notifications = []
        threading.Thread.__init__(self)

    def update_current_connections(self):
        self._current_connections = psutil.net_connections()

    def scan(self):
        with DB() as db:
            for conn in self._current_connections:
                if conn.status != "NONE" or conn.status != "CLOSE_WAIT":
                    if db.exists('ip', 'high_risk_ips', conn.laddr.ip):
                        self.notify(conn.laddr.ip, conn.laddr.port, conn.pid)
                    if conn.raddr:
                        if db.exists('ip', 'high_risk_ips', conn.raddr.ip):
                            self.notify(conn.raddr.ip, conn.raddr.port, conn.pid)

    def notify(self, ip, port, pid, duration=10):
        title, body = "High Risk Connection", f"{psutil.Process(pid).name()}\n{ip}:{port} - {pid}"
        if body not in self._displayed_notifications:
            if WINDOWS:
                ToastNotifier().show_toast(title, body, duration=duration, threaded=True)
                self._displayed_notifications.append(body)
            else:
                print("{} {}".format(title, body))
                self._displayed_notifications.append(body)

    def run(self):
        print('[+] Network Scanner Initialized')
        while self._running:
            self.update_current_connections()
            self.scan()
            time.sleep(self._timer)

    def stop(self):
        print('[-] Network Scanner Stopping')
        self._running = False


def is_binary(fp, chunksize=1024) -> bool:
    """Return true if the given filename is binary.
    @raise EnvironmentError: if the file does not exist or cannot be accessed.
    @attention: found @ http://bytes.com/topic/python/answers/21222-determine-file-type-binary-text on 6/08/2010
    @author: Trent Mick <TrentM@ActiveState.com>
    @author: Jorge Orpinel <jorge@orpinel.com>"""
    try:
        with open(fp, 'rb') as f:
            while True:
                chunk = f.read(chunksize)
                if b'\0' in chunk: # found null byte
                    return True
                if len(chunk) < chunksize:
                    break
    except PermissionError:
        print(f"Permission Error: {fp} {' ' * len(fp)}")
    return False

def reprint(s):
    print(s, end='')
    print('\r' * len(s), end='')

def parse_args():
    parser = ArgumentParser()
    parser.add_argument('path', default=os.getcwd(), type=str, help="path to scan")
    parser.add_argument('-u', '--update', action="store_true", default=False, help="updates database of virus definitions & high risk IP's")
    parser.add_argument('-t', '--threads', default=20, type=int, help="max threads for file scanner")
    return parser.parse_args()


def Main():
    # Testing for now
    args = parse_args()
    if args.update:
        with DB() as db:
            print('[+] Updating database')
            db.update()
    nsc = NetworkScanner()
    nsc.start()
    FileScanner().scan(args.path, args.threads)
    nsc.stop()


if __name__ == '__main__':
    Main()
EN

回答 1

Code Review用户

回答已采纳

发布于 2020-03-23 07:37:55

数据库的一个快速性能提升是使用可以同时插入多个行的事实。它可以在代码中用于IP和散列,所以这非常有用:

代码语言:javascript
复制
def add_multiple(self, table, values):
    sql = f"INSERT OR IGNORE INTO {table} VALUES (?)"
    self.cur.executemany(sql, [(value,) for value in values])

注意,我使用INSERT OR IGNORE来忽略已经存在的行。请注意,此命令容易受到SQL-注入的影响,因为table的恶意值可以在此命令中执行任何操作(与您的命令相同)。在这种情况下,避免这种情况应该相当容易,因为您知道所有合法的表名,所以只需显式地将它们白名单。

代码语言:javascript
复制
def __init__(self, ...):
    ...
    self.tables = {"virus_md5_hashes",
                   "processed_virusshare_urls",
                   "high_risk_ips"}

def add(self, table, value):
    if table not in self.tables:
        raise ValueError("This table does not exist")
    sql = f"INSERT OR IGNORE INTO {table} VALUES (?)"
    self.cur.execute(sql, (value,))

def add_multiple(self, table, values):
    if table not in self.tables:
        raise ValueError("This table does not exist")
    sql = f"INSERT OR IGNORE INTO {table} VALUES (?)"
    self.cur.executemany(sql, [(value,) for value in values])

要使多个insert工作,只需稍微修改更新功能:

代码语言:javascript
复制
def update_md5_hashes(self):
    '''
    updates the sqlite database of known virus md5 hashes
    '''
    for n, url in enumerate(self.virusshare_urls):
        reprint(f"Downloading known virus hashes {n+1}/{len(urls)}")
        if not self.exists('url', 'processed_virusshare_urls', url):
            self.add_multiple('virus_md5_hashes', self.get_virusshare_hashes(url))
            self.add('processed_virusshare_urls', url)
            self.conn.commit()
    print()

IP_ADDRESS = re.compile(r'[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}')

def update_high_risk_ips(self):
    for n, source in enumerate(self.ip_blacklists):
        reprint(f"Downloading ips list: {n+1}/{len(sources)}")
        try: 
            r = requests.get(source)
            self.add_many('high_risk_ips', IP_ADDRESS.findall(r.text))
        except requests.exceptions.RequestException:
            print(f"Exception at {source}")
    print()

我还会将您的virusshare URL和黑名单中的IP源作为类的一个属性,以便您可以在运行时更改它(如果需要的话)。如果您不喜欢更改这些属性,但仍然希望它们是可访问的,您也可以将它们设置为属性。

请注意,在第一个函数中,您确实有一个self.conn.commit (我将它移到if下,如果您没有执行任何操作,则不需要提交),但在后者中没有。这可能是个窃听器。

票数 3
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/239297

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档