我知道达斯克在这样的批处理模式下工作得很好
def load(filename):
...
def clean(data):
...
def analyze(sequence_of_data):
...
def store(result):
with open(..., 'w') as f:
f.write(result)
dsk = {'load-1': (load, 'myfile.a.data'),
'load-2': (load, 'myfile.b.data'),
'load-3': (load, 'myfile.c.data'),
'clean-1': (clean, 'load-1'),
'clean-2': (clean, 'load-2'),
'clean-3': (clean, 'load-3'),
'analyze': (analyze, ['clean-%d' % i for i in [1, 2, 3]]),
'store': (store, 'analyze')}
from dask.multiprocessing import get
get(dsk, 'store') # executes in parallel发布于 2015-11-27 15:37:32
编辑:见下面更新的答案
不是
dask中的当前任务调度程序需要一个计算图。它不支持对此图进行动态添加或删除。调度器的设计目的是在少量内存中评估大图;提前了解整个图对此至关重要。
但是,这并不能阻止创建具有不同属性的其他调度程序。这里有一个简单的解决方案,就是在一台机器上使用像conncurrent.futures这样的模块,在多台机器上使用distributed。
实际上是的
分布式调度程序现在完全异步运行,您可以在计算过程中提交任务、等待几个任务、提交更多任务、取消任务、添加/删除工作人员等。有几种方法可以做到这一点,但最简单的可能是这里简要描述的新的concurrent.futures接口:
http://dask.pydata.org/en/latest/futures.html
https://stackoverflow.com/questions/33952313
复制相似问题