我正在研究下面的python复制实用程序,它应该在windows & linux上工作,但是我正在寻找一种更有效的方法来优化我的I/O校正,因为我的目标位置也依赖于网络.我计算了代码中的实用程序执行时间因子。
#!/usr/bin/python
"""
Pythonic implementation of multi-target copy (Parallel Copy).
"""
import Queue
import threading
import time
import os, os.path
import sys
import shutil, hashlib
exitFlag = 0
class myThread(threading.Thread):
def __init__(self, threadID, name, queue, idFlag):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.queue = queue
self.idFlag = idFlag
def run(self):
if debugFlag:
print "**** Starting %s" % self.name
process_data(self.name, self.queue, self.idFlag)
if debugFlag:
print "**** Ending %s" % self.name
def copy_files_concurrantly(src_filename, target_filename, idFlag):
"""
"""
sha512_hash = hashlib.sha512()
src_filepath = os.path.join(os.getcwd(), src_filename)
try:
with open(src_filepath, "r") as sf:
statinfo = os.stat(src_filepath)
block_size = 100 * (2 ** 20) # Magic number: 100 * 1MB blocks
nb_blocks = (statinfo.st_size / block_size) + 1
cnt_blocks = 0
l = len(src_filepath.split('\\'))
target_file_path = os.path.join(target_filename, src_filepath.split('\\')[l - 1])
while True:
block = sf.read(block_size)
sha512_hash.update(block) # Todo a blockwise copy
if not block: break
cnt_blocks = cnt_blocks + 1
with open(target_filename, "a") as tf:
tf.write(block)
tf.close()
print "\nCopying %s (to) %s" % (src_filepath, target_filename)
sf.close()
except IOError:
print "Error: cant find or read '%s' file" % (src_filename)
def delete_folder(target_path):
"""
Deletes a folder, if it already exists
@param target_path: Relative path of the directory to delete
"""
if (os.path.exists(target_path) or os.path.isdir(target_path)):
print "Directory %s already exists.. deleting..." % target_path
try:
shutil.rmtree(target_path)
except OSError:
os.remove(target_path)
def process_data(threadName, queue, idFlag):
while not exitFlag:
if not workQueue.empty():
(sfile, dfile) = queue.get()
copy_files_concurrantly(sfile, dfile, idFlag)
time.sleep(0.5)
def queue_mt(argv):
"""
Implementation to do multi-target copy (recursive) of directories
@param argv: Arguments passed at command-line
"""
desc = "Recursively copies the files to destination directories."
syntax = "\nUsage:\n c4.py cp -L -R <src-dir> <target-dir>\n c4.py cp -L -R <src-dir> -t <target-dir1> <target-dir2>..."
options = "\n\n cp\t\t\tCopy operation to perform.\n -L\t\t\tDisplay running logs.(Optional)\n -R\t\t\tRecursively copy source files to target.\n <src-dir>\t\tSpecify source directory to copy.\n <target-dir>\tSpecify target directory to copy."
win = "\n\n Windows: c4.py cp -R d:\src-dir\*.* e:\dst-dir (OR) c4.py cp -R d:\src-dir\*.* -t d:\dst-dir1 e:\dst-dir2"
linux = "\n Linux: c4.py cp -R /src-dir/*.* /dst-dir (OR) c4.py cp -R /src-dir/*.* -t /dst-dir1 /dst-dir2"
cmd_usage = desc + syntax + options + win + linux
# Displays the command-usage incase of incorrect arguments specified
if len(argv) < 4:
print cmd_usage
sys.exit(2)
global threadID, workQueue, debugFlag
threads, threadList, threadID, debugFlag, cnt = [], [], 1, False, 0
stime = time.time()
# Perform single source to single target directory copy
if ((len(argv) == 4) and (("-R" in argv[1]) or ("-r" in argv[1]))) or ((len(argv) == 5) and (("-R" in argv[2]) or ("-r" in argv[2]))):
if (len(argv) == 4):
src_path, dest_path = argv[2], argv[3]
if (len(argv) == 5) and ("-L" in argv[1]):
debugFlag = True
src_path, dest_path = argv[3], argv[4]
if src_path.endswith('/*') or src_path.endswith('\*'):
src_path = src_path[:-2]
if src_path.endswith('/*.*') or src_path.endswith('\*.*'):
src_path = src_path[:-4]
# Computing the file-count recursively traversing the directory
# Excludes the count of number of directories
fcnt = sum([len(f) for r, d, f in os.walk(src_path)])
print "File)s) count in source directory: %d" % fcnt
cnt = fcnt * 1
workQueue = Queue.Queue(cnt)
# Fill the Queue
for root, subfolder, filenames in os.walk(src_path):
newDir = os.path.join(dest_path, root[1 + len(src_path):])
if not os.path.exists(newDir):
os.makedirs(newDir)
else:
delete_folder(newDir)
for filename in filenames:
sfpath = str(os.path.join(root, filename))
dfpath = str(os.path.join(newDir, filename))
workQueue.put((sfpath, dfpath))
if debugFlag:
print "***** Added to Q... %s | %s" % (sfpath, dfpath)
elif ((len(argv) > 4) and (("-t" in argv[3]) or ("-t" in argv[4]))):
if ("-L" in argv[1]):
debugFlag = True
src_path, st = argv[3], 5
else:
src_path, st = argv[2], 4
if src_path.endswith('/*') or src_path.endswith('\*'):
src_path = src_path[:-2]
if src_path.endswith('/*.*') or src_path.endswith('\*.*'):
src_path = src_path[:-4]
# Computing the file-count recursively traversing the directory
# Excludes the count of number of directories
fcnt = sum([len(f) for r, d, f in os.walk(src_path)])
if ("-L" in argv[1]):
dst = (len(argv) - 5)
else:
dst = (len(argv) - 4)
print "File(s) count in source directory:%d | Destination directories count:%s" % (fcnt, dst)
cnt = fcnt * dst
workQueue = Queue.Queue(cnt)
# Fill the Queue
for root, subfolder, filenames in os.walk(src_path):
for i in range(st, (len(argv))):
dest_path = argv[i]
newDir = os.path.join(dest_path, root[1 + len(src_path):])
if not os.path.exists(newDir):
os.makedirs(newDir)
else:
delete_folder(newDir)
for filename in filenames:
sfpath = str(os.path.join(root, filename))
dfpath = str(os.path.join(newDir, filename))
workQueue.put((sfpath, dfpath))
if debugFlag:
print "***** Added to Q... %s | %s" % (sfpath, dfpath)
print "\nGenerating c4id's for source directory files only...\n"
# Create new threads
max_threads = 100
if cnt > max_threads:
cnt = max_threads
for i in range(1, cnt+1):
s = 'Thread'+str(i)
threadList.append(s)
if debugFlag:
print "***** ThreadsList: %s" % str(threadList)
for tName in threadList:
thread = myThread(threadID, tName, workQueue, idFlag=True)
thread.start()
threads.append(thread)
threadID += 1
# Wait for queue to empty
while not workQueue.empty():
pass
# Notify threads its time to exit
global exitFlag
exitFlag = 1
# Wait for all threads to complete
for t in threads:
t.join()
if debugFlag:
print "\nUtility Exec time: %s sec" %(time.time() - stime)
if __name__ == '__main__':
queue_mt(sys.argv[1:])发布于 2016-04-12 20:48:46
让我从copy_files_concurrantly开始。
除了更新之外,不使用sha512_hash变量。它可以走了。nb_blocks,cnt_blocks,stat_info,target_file_path,l也是如此。
def copy_files_concurrantly(src_filename, target_filename, idFlag):
"""
"""
src_filepath = os.path.join(os.getcwd(), src_filename)
try:
with open(src_filepath, "r") as sf:
block_size = 100 * (2 ** 20) # Magic number: 100 * 1MB blocks
while True:
block = sf.read(block_size)
if not block: break
with open(target_filename, "a") as tf:
tf.write(block)
tf.close()
print "\nCopying %s (to) %s" % (src_filepath, target_filename)
sf.close()
except IOError:
print "Error: cant find or read '%s' file" % (src_filename)这样,代码的可读性就会提高一点,因为您只剩下代码的功能部分。
另一件令人担忧的事情是使用with块并手动关闭文件。with块在代码离开with块时关闭文件。让我们去掉显式的close。
def copy_files_concurrantly(src_filename, target_filename, idFlag):
"""
"""
src_filepath = os.path.join(os.getcwd(), src_filename)
try:
with open(src_filepath, "r") as sf:
block_size = 100 * (2 ** 20) # Magic number: 100 * 1MB blocks
while True:
block = sf.read(block_size)
if not block: break
with open(target_filename, "a") as tf:
tf.write(block)
print "\nCopying %s (to) %s" % (src_filepath, target_filename)
except IOError:
print "Error: cant find or read '%s' file" % (src_filename)另外要注意的是,target_file一直在被打开和关闭。我们可以做得更好!
def copy_files_concurrantly(src_filename, target_filename, idFlag):
"""
"""
src_filepath = os.path.join(os.getcwd(), src_filename)
try:
with open(src_filepath, "r") as sf, open(target_filename, "a") as tf:
block_size = 100 * (2 ** 20) # Magic number: 100 * 1MB blocks
while True:
block = sf.read(block_size)
if not block: break
tf.write(block)
print "\nCopying %s (to) %s" % (src_filepath, target_filename)
except IOError:
print "Error: cant find or read '%s' file" % (src_filename)最后一个改进是:使用partial和iter:
while True:
block = sf.read(block_size)
if not block:
break
...可以写成
from functools import partial
for block in iter(partial(sf.read, block_size), ''):
...留给我们
def copy_files_concurrantly(src_filename, target_filename, idFlag):
"""
"""
src_filepath = os.path.join(os.getcwd(), src_filename)
try:
with open(src_filepath, "r") as sf, open(target_filename, "a") as tf:
block_size = 100 * (2 ** 20) # Magic number: 100 * 1MB blocks
for block in iter(partial(sf.read, block_size), ''):
tf.write(block)
print "\nCopying %s (to) %s" % (src_filepath, target_filename)
except IOError:
print "Error: cant find or read '%s' file" % (src_filename)另一个(非常!)导入建议是将print语句向上移高一点。
from functools import partial
def copy_files_concurrantly(src_filename, target_filename, idFlag):
"""
"""
src_filepath = os.path.join(os.getcwd(), src_filename)
print "\nCopying %s (to) %s" % (src_filepath, target_filename)
try:
with open(src_filepath, "r") as sf, open(target_filename, "a") as tf:
block_size = 100 * (2 ** 20) # Magic number: 100 * 1MB blocks
for block in iter(partial(sf.read, block_size), ''):
tf.write(block)
except IOError:
print "Error: cant find or read '%s' file" % (src_filename)特别是如果文件很大,并且需要在网络上传输,那么了解该文件正在处理的内容是很好的。
现在,我假设"a"现在是一个bug,因为文件不再被截断。假设我们可以用"w"代替它,我们可以写:
import shutil
def copy_files_concurrantly(src_filename, target_filename, idFlag):
"""
"""
src_filepath = os.path.join(os.getcwd(), src_filename)
print "\nCopying %s (to) %s" % (src_filepath, target_filename)
try:
shutil.copy(src_filepath, target_filename)
except IOError:
print "Error: cant find or read '%s' file" % (src_filename)做完这件事。
在剩下的代码中,可能还有很多可以进一步清理的地方,但我只想简单地介绍一下如何修复copy_files_concurrantly。
发布于 2016-04-13 11:09:24
有一件事我不太明白:为什么process_data在myThread类之外?我的意思是,您给它整个属性集作为参数;对它只有一个调用:run方法的内部。
如果您这样做是为了在global exitFlag中使用queue_mt,那么这是个坏主意。全局被认为是错误的做法,并且有更好的方法来同步线程之间的信息。例如,您可以使用eventS:
def queue_mt(argv):
...
# Create new threads
cnt = min(100, cnt) ## No need to do a compare-and-set explicitly here
threadList = ['Thread{}'.format(i) for i in range(1, cnt+1)] ## List comprehension is better than loop + append
if debugFlag:
print "***** ThreadsList:", threadList
exit = threading.Event()
for threadID, tName in enumerate(threadList): ## Let python do the indexing for you
thread = myThread(threadID, tName, workQueue, exit, idFlag=True)
thread.start()
threads.append(thread)
# Wait for queue to empty
while not workQueue.empty():
pass
# Notify threads its time to exit
exit.set()
# Wait for all threads to complete
for t in threads:
t.join()
if debugFlag:
print "\nUtility Exec time: %s sec" %(time.time() - stime)在继续之前,这里有一些小提示:
cnt = max_thread if cnt > max_thread else cnt (对我来说)比你写的更清楚。cnt = min(cnt, max_thread)更好。不过,您可能希望将MAX_THREAD = 100定义为文件顶部的常量。threadList = [] +循环+ threadList.append(...)是一种糟糕的做法:使用列表理解。format,您将对格式有更多的控制。enumerate。因此,我们添加了一个Event作为myThread的第四个参数,让我们来处理它。同时,给这个类取一个有意义的名字:
class ThreadedFileCopy(threading.Thread):
def __init__(self, threadID, name, queue, event, idFlag):
supe(ThreadedFileCopy, self).__init__()
self.threadID = threadID
self.name = name
self.queue = queue
self.stop = event
self.idFlag = idFlag
def run(self):
if debugFlag:
print "**** Starting %s" % self.name
while not self.stop.is_set():
if not workQueue.empty():
(sfile, dfile) = self.queue.get()
copy_files_concurrantly(sfile, dfile, self.idFlag)
time.sleep(0.5)
if debugFlag:
print "**** Ending %s" % self.name这让我觉得…这个全球workQueue从何而来?哦,有一个global workQueue在queue_mt。然后构建将workQueue作为第三个参数传递的线程。因此,基本上,workQueue是self.queue;为什么不首先使用它呢?
此外,查看它的使用情况,在创建线程之前填充队列。它们只是消费者,应该在队列为空时立即停止,但您不需要任何同步机制,因为您可以知道队列何时在线程中直接为空:使用get_nowait并处理异常以优雅地终止:
class ThreadedFileCopy(threading.Thread):
def __init__(self, threadID, name, queue, idFlag):
supe(ThreadedFileCopy, self).__init__()
self.threadID = threadID
self.name = name
self.queue = queue
self.idFlag = idFlag
def run(self):
if debugFlag:
print "**** Starting %s" % self.name
try:
while True:
(sfile, dfile) = self.queue.get_nowait()
copy_files_concurrantly(sfile, dfile, self.idFlag)
except queue.Empty:
# Continue gracefully when every file has been handled
pass
if debugFlag:
print "**** Ending %s" % self.name所以,原来我们根本不需要那个Event。我也删除了time.sleep(0.5),我不明白你为什么把它放在第一位,如果表现重要。
现在,让我们回到queue_mt的用法。由于每个线程现在在队列为空时停止自身,因此不需要在此函数中监视该线程。我们还可以改进线程的创建方式,减少无用数据的存储量。
MAX_THREADS = 100
def queue_mt(argv):
...
# Create new threads
threadList = [
ThreadedFileCopy(i, 'Thread{}'.format(i+1), workQueue, True)
for i in range(min(MAX_THREADS, cnt))
]
if debugFlag:
print "***** ThreadsList:", threadList
for thread in threadList:
thread.start()
# Wait for all threads to complete
for thread in threadList:
thread.join()
if debugFlag:
print "\nUtility Exec time: %s sec" %(time.time() - stime)这样,您不需要同时存储名称和线程,因为后者包含前者。它还允许您更快地构建线程列表。哦,在我做这件事的时候,不要比需要变量的位置(比如threads, threadList, threadID, debugFlag, cnt = [], [], 1, False, 0)更早地声明变量。Python是一种动态语言,如果您在某一时刻碰巧需要一个变量,只需在此时执行赋值即可。
但是,现在有一个问题,对print "***** ThreadsList:", threadList的调用对列表中的线程进行了毫无帮助的重新表示。在构建列表的字符串表示时,Python将字符串列表转换为单个字符串没有任何困难。但是现在它必须把一个线程列表转换成一个字符串,而且它也不知道哦,把一个线程转换成一个字符串。让我们通过将__repr__方法添加到线程对象中来帮助它:
class ThreadedFileCopy(threading.Thread):
def __init__(self, threadID, name, queue, idFlag):
supe(ThreadedFileCopy, self).__init__()
self.threadID = threadID
self.name = name
self.queue = queue
self.idFlag = idFlag
def __repr__(self):
return self.name
def run(self):
if debugFlag:
print "**** Starting %s" % self.name
try:
while True:
(sfile, dfile) = self.queue.get_nowait()
copy_files_concurrantly(sfile, dfile, self.idFlag)
except queue.Empty:
# Continue gracefully when every file has been handled
pass
if debugFlag:
print "**** Ending %s" % self.name这就是,和以前一样的输出。
最后一个注意事项,我们删除了对global workQueue的需求,完全消除了对threadID的需求,尽量避免对debugFlag使用全局值。您可以将它作为线程的参数传递,并将其称为一天。
我还可以告诉您如何管理命令行解析和在同一个函数中执行逻辑是非常糟糕的,您应该先解析命令行,然后用正确的参数集调用执行逻辑的函数。我也可以告诉您argparse或getopt生成使用消息和解析命令行选项(…)但我会让别人知道的。
https://codereview.stackexchange.com/questions/125504
复制相似问题