首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从concurrent.futures获取进度更新

从concurrent.futures获取进度更新
EN

Stack Overflow用户
提问于 2020-06-12 19:32:50
回答 2查看 574关注 0票数 4

我想从单独的线程或进程(以较快的为准)复制一个文件,这样就不会阻塞主线程。

我也想偶尔更新一下进度。

使用“常规”工作线程,我可以将进度状态推送到Queue,并从主(UI)线程检查它。

我该如何使用concurrent.futures来解决这个问题呢?

EN

回答 2

Stack Overflow用户

发布于 2020-06-21 19:36:19

替代方法

用于运行后台线程/任务的background_runner装饰器/注释。background_pools用于保存当前正在运行的线程及其进度。__context控制着进度。

代码语言:javascript
复制
import threading
from collections import defaultdict
import time

background_pools = defaultdict(lambda: {})

def background_runner(_func=None, *, pool_max=1, pool_name='default'):
    def main_wrapper(task): # It is our internal decorator, and (task) is our decorated function.
        pool_size=pool_max
        global background_pools

        # It will return empty array if pool is not found.
        pool = background_pools[pool_name]

        print("Pool name is:",pool_name)
        print("Pool size is:",pool_size)

        def task_wrapper(*args, **kwargs): # It is the replacement or Decorated version of aur (task) or (_func)
            def task_in_thread():
                thread_id = threading.current_thread().ident
                context = {}
                pool[thread_id] = { "thread": threading.current_thread(), "context":context}
                try:
                    return task(*args, **kwargs, __context=context)
                finally:
                    try: 
                        del pool[thread_id]
                    except:
                        pass    

            if len(pool.keys()) < pool_size:
                threading.Thread(target=task_in_thread).start()
                print("Task:'{}' is in process.".format(pool_name))
            else:
                print(f"Only { pool_size } task:{pool_name} can run at a time.")
        return task_wrapper
    if _func is None:
        # decorator is used with named arguments.
        return main_wrapper                
    else:
        # decorator is used without arguments.
        return main_wrapper(_func)

使用time.sleep测试background_runner装饰器。__context用于更新进度。

代码语言:javascript
复制
@background_runner(pool_max=3, pool_name='sleep_test')
def sleep_test(__context={}):
    __context['time'] = 0
    for index in range(0, 20):
        time.sleep(2)
        __context['time'] += 2

测试方法的调用

代码语言:javascript
复制
sleep_test()
time.sleep(10) 
print(background_pools)
sleep_test()
time.sleep(10)
print(background_pools)
time.sleep(10)
sleep_test()
sleep_test()
print(background_pools)
time.sleep(10)
print(background_pools)
票数 2
EN

Stack Overflow用户

发布于 2020-06-21 20:02:23

因为复制文件是受I/O限制的,所以轻量级线程是您应该使用的全部,而不是通过创建进程的开销。您也可以创建第二个线程来单独监视复制的进度,就像我在这里所做的那样,但这是可选的。

代码语言:javascript
复制
import os
import queue
import concurrent.futures

CHUNKSIZE=1000000

def file_copyer(in_file, out_file, q):
    with open(in_file, 'rb') as f_in, open(out_file, 'wb') as f_out:
        in_size = f_in.seek(0, os.SEEK_END)
        f_in.seek(0, os.SEEK_SET)
        data_read = 0
        while True:
            data = f_in.read(CHUNKSIZE)
            if data == b'':
                break
            f_out.write(data)
            data_read += len(data)
            percentage = int(round((data_read / in_size) * 100, 0))
            q.put(percentage)

def progress_thread(q):
    while True:
        percentage = q.get()
        print(f'{percentage}% complete.')
        if percentage == 100:
            return

# this version uses a progress bar:
def progress_bar_thread(q):
    import sys
    WIDTH=40
    while True:
        percentage = q.get()
        x = int(percentage/100 * WIDTH)
        sys.stdout.write("Copying [%s%s] %i%%/100%% complete\r" % ("#"*x, "."*(WIDTH-x), percentage))
        sys.stdout.flush()
        if percentage == 100:
            return

def main():
    q = queue.Queue()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        f1 = executor.submit(file_copyer, 'IMG_1413.jpg', 'temp.jpg', q)
        f2 = executor.submit(progress_thread, q)
        #f2 = executor.submit(progress_bar_thread, q)            
    f1.result() # wait for completion
    f2.result() # wait for completion
    executor.shutdown()

main()

函数file_copyer以离散的块大小将输入文件复制到输出文件,并在每次写入后计算已完成的百分比,并将该值写入作为参数传递的队列。这些完成百分比值既可以由主线程从队列中读取,也可以由在其自己的线程中运行的progress_thread从队列中读取。由于progress_thread除了监视进度之外什么也不做,因此它可以执行阻塞调用q.get,并等待下一个完成百分比值到达队列。如果主线程正在执行监视,那么它可能应该发出非阻塞q.get_nowait调用。当检索到100 (百分比)的值时,进度监视线程可以返回,因为这意味着复制已经完成。

应该注意的是,如果只有两个定义良好的线程(如果监控是由主线程完成的,则只有一个线程),那么也可以放弃使用concurrent.futures模块,如果需要线程池,这是很好的选择,但这里并不是这样。只需使用threading.Thread类即可。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62343419

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档