假设我有一个简单的代价高昂的函数,它将一些结果存储到一个文件中:
def costly_function(filename):
time.sleep(10)
with open('filename', 'w') as f:
f.write("I am done!)现在,假设我想在dask中安排一些这些任务,然后这些任务异步地接收这些请求,然后一个一个地运行这些函数。我现在正在设置一个dask客户对象..。
cluster = dask.distributed.LocalCluster(n_workers=1, processes=False) # my attempt at sequential job processing
client = dask.distributed.Client(cluster)..。然后交互地(从IPython)调度这些作业:
>>> client.schedule(costly_function, "result1.txt")
>>> client.schedule(costly_function, "result2.txt")
>>> client.schedule(costly_function, "result3.txt")我遇到的问题是,这些任务不是连续运行的,而是以parralel方式运行的,在我的特殊情况下,这会导致并发问题。
因此,我的问题是:像我在dask中描述的那样,建立作业队列的正确方法是什么?
发布于 2019-12-05 13:58:40
好的,我想我可能有一个解决方案(尽管可以想出更好的解决方案)。它需要稍微修改以前昂贵的功能:
def costly_function(filename, prev_job=None):
time.sleep(10)
with open('filename', 'w') as f:
f.write("I am done!")
cluster = dask.distributed.LocalCluster(n_workers=1, processes=False) # my attempt at sequential job processing
client = dask.distributed.Client(cluster)然后,在交互式上下文中,您将编写以下内容:
>>> future = client.submit(costly_function, "result1.txt")
>>> future = client.submit(costly_function, "result2.txt", prev_job=future)
>>> future = client.submit(costly_function, "result3.txt", prev_job=future)https://stackoverflow.com/questions/59195701
复制相似问题