我使用的是一个普通的Dask-Kubernetes设置,它有两个工作者和一个调度器,用来迭代一些JSON文件的行(并应用一些简单起见没有出现在这里的函数)。我只看到一个工人在工作,而我希望看到他们中的两个。
希望重新分区会有所帮助,我已经尝试了不同的bag.repartition(num)值,它们返回不同的行数,但它们不会改变任何关于工作进程不平衡和只集中在一个工作进程上的内存消耗的问题。
我想我不理解分区和工作进程之间的关系,并且我在Dask文档中找不到任何与此相关的内容。任何帮助或指示都是非常欢迎的!
import dask.bag as db
def grep_buildings():
base = "https://usbuildingdata.blob.core.windows.net/usbuildings-v1-1/"
b = db.text.read_text(f"{base}/Alabama.zip")
# b = b.repartition(2)
lines = b.take(3_000_000)
return lines
len(grep_buildings())发布于 2020-09-25 03:59:14
在您的示例中,您是在文件上打开的,并且它是压缩的
db.text.read_text(f"{base}/Alabama.zip")Dask能够并行打开和处理多个文件,每个文件至少有一个分区。Dask还能够将单个文件拆分成块( blocksize参数);但这只适用于未压缩的数据。原因是,对于整个文件压缩方法,到达未压缩流中某个点的唯一方法是从头开始读取,因此每个分区都将读取大部分数据。
最后,当您从单个分区开始时,重新分区对您没有帮助:您需要读取整个文件,然后才能将数据拆分为下游任务的片段。
https://stackoverflow.com/questions/64046785
复制相似问题