首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将目录复制到多个目录- I/O性能问题

将目录复制到多个目录- I/O性能问题
EN

Code Review用户
提问于 2016-04-12 20:09:24
回答 2查看 66关注 0票数 2

我正在研究下面的python复制实用程序,它应该在windows & linux上工作,但是我正在寻找一种更有效的方法来优化我的I/O校正,因为我的目标位置也依赖于网络.我计算了代码中的实用程序执行时间因子。

代码语言:javascript
复制
#!/usr/bin/python

"""
Pythonic implementation of multi-target copy (Parallel Copy).    
"""

import Queue
import threading
import time
import os, os.path
import sys
import shutil, hashlib

exitFlag = 0

class myThread(threading.Thread):
    def __init__(self, threadID, name, queue, idFlag):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.queue = queue
        self.idFlag = idFlag

    def run(self):
        if debugFlag:
            print "**** Starting %s" % self.name
        process_data(self.name, self.queue, self.idFlag)
        if debugFlag:
            print "**** Ending %s" % self.name

def copy_files_concurrantly(src_filename, target_filename, idFlag):
    """
    """
    sha512_hash = hashlib.sha512()
    src_filepath = os.path.join(os.getcwd(), src_filename)
    try:
        with open(src_filepath, "r") as sf:
            statinfo = os.stat(src_filepath)
            block_size = 100 * (2 ** 20)  # Magic number: 100 * 1MB blocks
            nb_blocks = (statinfo.st_size / block_size) + 1
            cnt_blocks = 0

            l = len(src_filepath.split('\\'))
            target_file_path = os.path.join(target_filename, src_filepath.split('\\')[l - 1])

            while True:
                block = sf.read(block_size)
                sha512_hash.update(block)   # Todo a blockwise copy
                if not block: break
                cnt_blocks = cnt_blocks + 1
                with open(target_filename, "a") as tf:
                    tf.write(block)
                tf.close()

            print "\nCopying %s (to) %s" % (src_filepath, target_filename)
            sf.close()
    except IOError:
        print "Error: cant find or read '%s' file" % (src_filename)

def delete_folder(target_path):
    """
    Deletes a folder, if it already exists
    @param target_path: Relative path of the directory to delete
    """
    if (os.path.exists(target_path) or os.path.isdir(target_path)):
        print "Directory %s already exists.. deleting..." % target_path
        try:
            shutil.rmtree(target_path)
        except OSError:
            os.remove(target_path)

def process_data(threadName, queue, idFlag):
    while not exitFlag:
        if not workQueue.empty():
            (sfile, dfile) = queue.get()
            copy_files_concurrantly(sfile, dfile, idFlag)
        time.sleep(0.5)

def queue_mt(argv):
    """
    Implementation to do multi-target copy (recursive) of directories
    @param argv: Arguments passed at command-line 
    """
    desc = "Recursively copies the files to destination directories."
    syntax = "\nUsage:\n c4.py cp -L -R <src-dir> <target-dir>\n c4.py cp -L -R <src-dir> -t <target-dir1> <target-dir2>..."
    options = "\n\n    cp\t\t\tCopy operation to perform.\n    -L\t\t\tDisplay running logs.(Optional)\n    -R\t\t\tRecursively copy source files to target.\n    <src-dir>\t\tSpecify source directory to copy.\n    <target-dir>\tSpecify target directory to copy."
    win = "\n\n  Windows: c4.py cp -R d:\src-dir\*.* e:\dst-dir  (OR)  c4.py cp -R d:\src-dir\*.* -t d:\dst-dir1 e:\dst-dir2"
    linux = "\n  Linux: c4.py cp -R /src-dir/*.* /dst-dir  (OR)  c4.py cp -R /src-dir/*.* -t /dst-dir1 /dst-dir2"

    cmd_usage = desc + syntax + options + win + linux

    # Displays the command-usage incase of incorrect arguments specified 
    if len(argv) < 4:
        print cmd_usage
        sys.exit(2)

    global threadID, workQueue, debugFlag
    threads, threadList, threadID, debugFlag, cnt = [], [], 1, False, 0
    stime = time.time()

    # Perform single source to single target directory copy
    if ((len(argv) == 4) and (("-R" in argv[1]) or ("-r" in argv[1]))) or ((len(argv) == 5) and (("-R" in argv[2]) or ("-r" in argv[2]))):
        if (len(argv) == 4):
            src_path, dest_path = argv[2], argv[3]
        if (len(argv) == 5) and ("-L" in argv[1]):
            debugFlag = True
            src_path, dest_path = argv[3], argv[4]
        if src_path.endswith('/*') or src_path.endswith('\*'):
            src_path = src_path[:-2]
        if src_path.endswith('/*.*') or src_path.endswith('\*.*'):
            src_path = src_path[:-4]

        # Computing the file-count recursively traversing the directory
        # Excludes the count of number of directories
        fcnt = sum([len(f) for r, d, f in os.walk(src_path)])
        print "File)s) count in source directory: %d" % fcnt
        cnt = fcnt * 1
        workQueue = Queue.Queue(cnt)

        # Fill the Queue
        for root, subfolder, filenames in os.walk(src_path):
            newDir = os.path.join(dest_path, root[1 + len(src_path):])
            if not os.path.exists(newDir):
                os.makedirs(newDir)
            else:
                delete_folder(newDir)
            for filename in filenames:
                sfpath = str(os.path.join(root, filename))
                dfpath = str(os.path.join(newDir, filename))
                workQueue.put((sfpath, dfpath))
                if debugFlag:
                    print "***** Added to Q... %s | %s" % (sfpath, dfpath)

    elif ((len(argv) > 4) and (("-t" in argv[3]) or ("-t" in argv[4]))):
        if ("-L" in argv[1]):
            debugFlag = True
            src_path, st = argv[3], 5
        else:
            src_path, st = argv[2], 4
        if src_path.endswith('/*') or src_path.endswith('\*'):
            src_path = src_path[:-2]
        if src_path.endswith('/*.*') or src_path.endswith('\*.*'):
            src_path = src_path[:-4]

        # Computing the file-count recursively traversing the directory
        # Excludes the count of number of directories
        fcnt = sum([len(f) for r, d, f in os.walk(src_path)])
        if ("-L" in argv[1]):
            dst = (len(argv) - 5)
        else:
            dst = (len(argv) - 4)
        print "File(s) count in source directory:%d | Destination directories count:%s" % (fcnt, dst)
        cnt = fcnt * dst
        workQueue = Queue.Queue(cnt)

        # Fill the Queue
        for root, subfolder, filenames in os.walk(src_path):
            for i in range(st, (len(argv))):
                dest_path = argv[i]
                newDir = os.path.join(dest_path, root[1 + len(src_path):])
                if not os.path.exists(newDir):
                    os.makedirs(newDir)
                else:
                    delete_folder(newDir)
                for filename in filenames:
                    sfpath = str(os.path.join(root, filename))
                    dfpath = str(os.path.join(newDir, filename))
                    workQueue.put((sfpath, dfpath))
                    if debugFlag:
                        print "***** Added to Q... %s | %s" % (sfpath, dfpath)

    print "\nGenerating c4id's for source directory files only...\n"
    # Create new threads
    max_threads = 100
    if cnt > max_threads:
        cnt = max_threads
    for i in range(1, cnt+1):
        s = 'Thread'+str(i)
        threadList.append(s)
    if debugFlag:
        print "***** ThreadsList: %s" % str(threadList)
    for tName in threadList:
        thread = myThread(threadID, tName, workQueue, idFlag=True)
        thread.start()
        threads.append(thread)
        threadID += 1

    # Wait for queue to empty
    while not workQueue.empty():
        pass

    # Notify threads its time to exit
    global exitFlag
    exitFlag = 1

    # Wait for all threads to complete
    for t in threads:
        t.join()

    if debugFlag:
        print "\nUtility Exec time: %s sec" %(time.time() - stime)

if __name__ == '__main__':
    queue_mt(sys.argv[1:])
EN

回答 2

Code Review用户

发布于 2016-04-12 20:48:46

让我从copy_files_concurrantly开始。

除了更新之外,不使用sha512_hash变量。它可以走了。nb_blockscnt_blocksstat_infotarget_file_pathl也是如此。

代码语言:javascript
复制
def copy_files_concurrantly(src_filename, target_filename, idFlag):
    """
    """
    src_filepath = os.path.join(os.getcwd(), src_filename)
    try:
        with open(src_filepath, "r") as sf:
            block_size = 100 * (2 ** 20)  # Magic number: 100 * 1MB blocks

            while True:
                block = sf.read(block_size)
                if not block: break
                with open(target_filename, "a") as tf:
                    tf.write(block)
                tf.close()

            print "\nCopying %s (to) %s" % (src_filepath, target_filename)
            sf.close()
    except IOError:
        print "Error: cant find or read '%s' file" % (src_filename)

这样,代码的可读性就会提高一点,因为您只剩下代码的功能部分。

另一件令人担忧的事情是使用with块并手动关闭文件。with块在代码离开with块时关闭文件。让我们去掉显式的close

代码语言:javascript
复制
def copy_files_concurrantly(src_filename, target_filename, idFlag):
    """
    """
    src_filepath = os.path.join(os.getcwd(), src_filename)
    try:
        with open(src_filepath, "r") as sf:
            block_size = 100 * (2 ** 20)  # Magic number: 100 * 1MB blocks

            while True:
                block = sf.read(block_size)
                if not block: break
                with open(target_filename, "a") as tf:
                    tf.write(block)

            print "\nCopying %s (to) %s" % (src_filepath, target_filename)
    except IOError:
        print "Error: cant find or read '%s' file" % (src_filename)

另外要注意的是,target_file一直在被打开和关闭。我们可以做得更好!

代码语言:javascript
复制
def copy_files_concurrantly(src_filename, target_filename, idFlag):
    """
    """
    src_filepath = os.path.join(os.getcwd(), src_filename)
    try:
        with open(src_filepath, "r") as sf, open(target_filename, "a") as tf:
            block_size = 100 * (2 ** 20)  # Magic number: 100 * 1MB blocks

            while True:
                block = sf.read(block_size)
                if not block: break
                tf.write(block)

            print "\nCopying %s (to) %s" % (src_filepath, target_filename)
    except IOError:
        print "Error: cant find or read '%s' file" % (src_filename)

最后一个改进是:使用partialiter

代码语言:javascript
复制
while True:
    block = sf.read(block_size)
    if not block:
        break
    ...

可以写成

代码语言:javascript
复制
from functools import partial

for block in iter(partial(sf.read, block_size), ''):
     ...

留给我们

代码语言:javascript
复制
def copy_files_concurrantly(src_filename, target_filename, idFlag):
    """
    """
    src_filepath = os.path.join(os.getcwd(), src_filename)
    try:
        with open(src_filepath, "r") as sf, open(target_filename, "a") as tf:
            block_size = 100 * (2 ** 20)  # Magic number: 100 * 1MB blocks

            for block in iter(partial(sf.read, block_size), ''):
                tf.write(block)

            print "\nCopying %s (to) %s" % (src_filepath, target_filename)
    except IOError:
        print "Error: cant find or read '%s' file" % (src_filename)

另一个(非常!)导入建议是将print语句向上移高一点。

代码语言:javascript
复制
from functools import partial

def copy_files_concurrantly(src_filename, target_filename, idFlag):
    """
    """
    src_filepath = os.path.join(os.getcwd(), src_filename)
    print "\nCopying %s (to) %s" % (src_filepath, target_filename)
    try:
        with open(src_filepath, "r") as sf, open(target_filename, "a") as tf:
            block_size = 100 * (2 ** 20)  # Magic number: 100 * 1MB blocks

            for block in iter(partial(sf.read, block_size), ''):
                tf.write(block)
    except IOError:
        print "Error: cant find or read '%s' file" % (src_filename)

特别是如果文件很大,并且需要在网络上传输,那么了解该文件正在处理的内容是很好的。

现在,我假设"a"现在是一个bug,因为文件不再被截断。假设我们可以用"w"代替它,我们可以写:

代码语言:javascript
复制
import shutil

def copy_files_concurrantly(src_filename, target_filename, idFlag):
    """
    """
    src_filepath = os.path.join(os.getcwd(), src_filename)
    print "\nCopying %s (to) %s" % (src_filepath, target_filename)
    try:
        shutil.copy(src_filepath, target_filename)
    except IOError:
        print "Error: cant find or read '%s' file" % (src_filename)

做完这件事。

在剩下的代码中,可能还有很多可以进一步清理的地方,但我只想简单地介绍一下如何修复copy_files_concurrantly

票数 2
EN

Code Review用户

发布于 2016-04-13 11:09:24

有一件事我不太明白:为什么process_datamyThread类之外?我的意思是,您给它整个属性集作为参数;对它只有一个调用:run方法的内部。

如果您这样做是为了在global exitFlag中使用queue_mt,那么这是个坏主意。全局被认为是错误的做法,并且有更好的方法来同步线程之间的信息。例如,您可以使用eventS

代码语言:javascript
复制
def queue_mt(argv):

    ...

    # Create new threads
    cnt = min(100, cnt) ## No need to do a compare-and-set explicitly here
    threadList = ['Thread{}'.format(i) for i in range(1, cnt+1)] ## List comprehension is better than loop + append
    if debugFlag:
        print "***** ThreadsList:", threadList

    exit = threading.Event()
    for threadID, tName in enumerate(threadList): ## Let python do the indexing for you
        thread = myThread(threadID, tName, workQueue, exit, idFlag=True)
        thread.start()
        threads.append(thread)

    # Wait for queue to empty
    while not workQueue.empty():
        pass

    # Notify threads its time to exit
    exit.set()

    # Wait for all threads to complete
    for t in threads:
        t.join()

    if debugFlag:
        print "\nUtility Exec time: %s sec" %(time.time() - stime)

在继续之前,这里有一些小提示:

  • cnt = max_thread if cnt > max_thread else cnt (对我来说)比你写的更清楚。cnt = min(cnt, max_thread)更好。不过,您可能希望将MAX_THREAD = 100定义为文件顶部的常量。
  • threadList = [] +循环+ threadList.append(...)是一种糟糕的做法:使用列表理解。
  • 字符串连接是构建字符串的一种糟糕的方法,使用format,您将对格式有更多的控制。
  • 您几乎不需要在Python中维护显式索引。如果需要的话,在迭代集合时使用enumerate

因此,我们添加了一个Event作为myThread的第四个参数,让我们来处理它。同时,给这个类取一个有意义的名字:

代码语言:javascript
复制
class ThreadedFileCopy(threading.Thread):
    def __init__(self, threadID, name, queue, event, idFlag):
        supe(ThreadedFileCopy, self).__init__()
        self.threadID = threadID
        self.name = name
        self.queue = queue
        self.stop = event
        self.idFlag = idFlag

    def run(self):
        if debugFlag:
            print "**** Starting %s" % self.name
        while not self.stop.is_set():
            if not workQueue.empty():
                (sfile, dfile) = self.queue.get()
                copy_files_concurrantly(sfile, dfile, self.idFlag)
            time.sleep(0.5)
        if debugFlag:
            print "**** Ending %s" % self.name

这让我觉得…这个全球workQueue从何而来?哦,有一个global workQueuequeue_mt。然后构建将workQueue作为第三个参数传递的线程。因此,基本上,workQueueself.queue;为什么不首先使用它呢?

此外,查看它的使用情况,在创建线程之前填充队列。它们只是消费者,应该在队列为空时立即停止,但您不需要任何同步机制,因为您可以知道队列何时在线程中直接为空:使用get_nowait并处理异常以优雅地终止:

代码语言:javascript
复制
class ThreadedFileCopy(threading.Thread):
    def __init__(self, threadID, name, queue, idFlag):
        supe(ThreadedFileCopy, self).__init__()
        self.threadID = threadID
        self.name = name
        self.queue = queue
        self.idFlag = idFlag

    def run(self):
        if debugFlag:
            print "**** Starting %s" % self.name
        try:
            while True:
                (sfile, dfile) = self.queue.get_nowait()
                copy_files_concurrantly(sfile, dfile, self.idFlag)
        except queue.Empty:
            # Continue gracefully when every file has been handled
            pass
        if debugFlag:
            print "**** Ending %s" % self.name

所以,原来我们根本不需要那个Event。我也删除了time.sleep(0.5),我不明白你为什么把它放在第一位,如果表现重要。

现在,让我们回到queue_mt的用法。由于每个线程现在在队列为空时停止自身,因此不需要在此函数中监视该线程。我们还可以改进线程的创建方式,减少无用数据的存储量。

代码语言:javascript
复制
MAX_THREADS = 100

def queue_mt(argv):

    ...

    # Create new threads
    threadList = [
        ThreadedFileCopy(i, 'Thread{}'.format(i+1), workQueue, True)
        for i in range(min(MAX_THREADS, cnt))
    ]

    if debugFlag:
        print "***** ThreadsList:", threadList

    for thread in threadList:
        thread.start()

    # Wait for all threads to complete
    for thread in threadList:
        thread.join()

    if debugFlag:
        print "\nUtility Exec time: %s sec" %(time.time() - stime)

这样,您不需要同时存储名称和线程,因为后者包含前者。它还允许您更快地构建线程列表。哦,在我做这件事的时候,不要比需要变量的位置(比如threads, threadList, threadID, debugFlag, cnt = [], [], 1, False, 0)更早地声明变量。Python是一种动态语言,如果您在某一时刻碰巧需要一个变量,只需在此时执行赋值即可。

但是,现在有一个问题,对print "***** ThreadsList:", threadList的调用对列表中的线程进行了毫无帮助的重新表示。在构建列表的字符串表示时,Python将字符串列表转换为单个字符串没有任何困难。但是现在它必须把一个线程列表转换成一个字符串,而且它也不知道哦,把一个线程转换成一个字符串。让我们通过将__repr__方法添加到线程对象中来帮助它:

代码语言:javascript
复制
class ThreadedFileCopy(threading.Thread):
    def __init__(self, threadID, name, queue, idFlag):
        supe(ThreadedFileCopy, self).__init__()
        self.threadID = threadID
        self.name = name
        self.queue = queue
        self.idFlag = idFlag

    def __repr__(self):
        return self.name

    def run(self):
        if debugFlag:
            print "**** Starting %s" % self.name
        try:
            while True:
                (sfile, dfile) = self.queue.get_nowait()
                copy_files_concurrantly(sfile, dfile, self.idFlag)
        except queue.Empty:
            # Continue gracefully when every file has been handled
            pass
        if debugFlag:
            print "**** Ending %s" % self.name

这就是,和以前一样的输出。

最后一个注意事项,我们删除了对global workQueue的需求,完全消除了对threadID的需求,尽量避免对debugFlag使用全局值。您可以将它作为线程的参数传递,并将其称为一天。

我还可以告诉您如何管理命令行解析和在同一个函数中执行逻辑是非常糟糕的,您应该先解析命令行,然后用正确的参数集调用执行逻辑的函数。我也可以告诉您argparsegetopt生成使用消息和解析命令行选项(…)但我会让别人知道的。

票数 0
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/125504

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档