我想从单独的线程或进程(以较快的为准)复制一个文件,这样就不会阻塞主线程。
我也想偶尔更新一下进度。
使用“常规”工作线程,我可以将进度状态推送到Queue,并从主(UI)线程检查它。
我该如何使用concurrent.futures来解决这个问题呢?
发布于 2020-06-21 19:36:19
替代方法
用于运行后台线程/任务的background_runner装饰器/注释。background_pools用于保存当前正在运行的线程及其进度。__context控制着进度。
import threading
from collections import defaultdict
import time
background_pools = defaultdict(lambda: {})
def background_runner(_func=None, *, pool_max=1, pool_name='default'):
def main_wrapper(task): # It is our internal decorator, and (task) is our decorated function.
pool_size=pool_max
global background_pools
# It will return empty array if pool is not found.
pool = background_pools[pool_name]
print("Pool name is:",pool_name)
print("Pool size is:",pool_size)
def task_wrapper(*args, **kwargs): # It is the replacement or Decorated version of aur (task) or (_func)
def task_in_thread():
thread_id = threading.current_thread().ident
context = {}
pool[thread_id] = { "thread": threading.current_thread(), "context":context}
try:
return task(*args, **kwargs, __context=context)
finally:
try:
del pool[thread_id]
except:
pass
if len(pool.keys()) < pool_size:
threading.Thread(target=task_in_thread).start()
print("Task:'{}' is in process.".format(pool_name))
else:
print(f"Only { pool_size } task:{pool_name} can run at a time.")
return task_wrapper
if _func is None:
# decorator is used with named arguments.
return main_wrapper
else:
# decorator is used without arguments.
return main_wrapper(_func)使用time.sleep测试background_runner装饰器。__context用于更新进度。
@background_runner(pool_max=3, pool_name='sleep_test')
def sleep_test(__context={}):
__context['time'] = 0
for index in range(0, 20):
time.sleep(2)
__context['time'] += 2测试方法的调用
sleep_test()
time.sleep(10)
print(background_pools)
sleep_test()
time.sleep(10)
print(background_pools)
time.sleep(10)
sleep_test()
sleep_test()
print(background_pools)
time.sleep(10)
print(background_pools)发布于 2020-06-21 20:02:23
因为复制文件是受I/O限制的,所以轻量级线程是您应该使用的全部,而不是通过创建进程的开销。您也可以创建第二个线程来单独监视复制的进度,就像我在这里所做的那样,但这是可选的。
import os
import queue
import concurrent.futures
CHUNKSIZE=1000000
def file_copyer(in_file, out_file, q):
with open(in_file, 'rb') as f_in, open(out_file, 'wb') as f_out:
in_size = f_in.seek(0, os.SEEK_END)
f_in.seek(0, os.SEEK_SET)
data_read = 0
while True:
data = f_in.read(CHUNKSIZE)
if data == b'':
break
f_out.write(data)
data_read += len(data)
percentage = int(round((data_read / in_size) * 100, 0))
q.put(percentage)
def progress_thread(q):
while True:
percentage = q.get()
print(f'{percentage}% complete.')
if percentage == 100:
return
# this version uses a progress bar:
def progress_bar_thread(q):
import sys
WIDTH=40
while True:
percentage = q.get()
x = int(percentage/100 * WIDTH)
sys.stdout.write("Copying [%s%s] %i%%/100%% complete\r" % ("#"*x, "."*(WIDTH-x), percentage))
sys.stdout.flush()
if percentage == 100:
return
def main():
q = queue.Queue()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
f1 = executor.submit(file_copyer, 'IMG_1413.jpg', 'temp.jpg', q)
f2 = executor.submit(progress_thread, q)
#f2 = executor.submit(progress_bar_thread, q)
f1.result() # wait for completion
f2.result() # wait for completion
executor.shutdown()
main()函数file_copyer以离散的块大小将输入文件复制到输出文件,并在每次写入后计算已完成的百分比,并将该值写入作为参数传递的队列。这些完成百分比值既可以由主线程从队列中读取,也可以由在其自己的线程中运行的progress_thread从队列中读取。由于progress_thread除了监视进度之外什么也不做,因此它可以执行阻塞调用q.get,并等待下一个完成百分比值到达队列。如果主线程正在执行监视,那么它可能应该发出非阻塞q.get_nowait调用。当检索到100 (百分比)的值时,进度监视线程可以返回,因为这意味着复制已经完成。
应该注意的是,如果只有两个定义良好的线程(如果监控是由主线程完成的,则只有一个线程),那么也可以放弃使用concurrent.futures模块,如果需要线程池,这是很好的选择,但这里并不是这样。只需使用threading.Thread类即可。
https://stackoverflow.com/questions/62343419
复制相似问题