我用的是分布式的达斯克。
我重新启动了我的分布式网络(4名工作人员),然后提交一个函数,将文本从s3桶中的文件(大小为25 Mb)中读取到一个dask包中,然后计算dask集合。然后,我将计算的结果收集到本地进程,最后删除该结果和相关的未来。
在这个往返结束时: 1)存储在分布式网络中的字节比开始时高出大约100 Mb (即4x文件大小)。2)此外,我还可以看到,这些“额外”字节仅驻留在4个工人中的2个。
我可以通过重新启动分布式网络(即client.restart())来清除这些“额外的”字节,但是该解决方案“在野外”并不适用,在这里,我希望这个过程能够在传入的文件中持续运行。
请看下面的代码。我调用execute函数开始往返。
我指的是这个链接:http://distributed.dask.org/en/latest/memory.html#clearing-data
import dask.bag as db
from dask.distributed import Client
class TestClass:
def __init__(self,
client):
self.client = client
def execute(self,
s3url):
def remote_load():
return db.read_text(s3url).compute()
future = self.client.submit(func=remote_load).result()
self.client.cancel(future)
del future
if __name__ == "__main__":
client = Client("scheduler address")
test = TestClass(client)
test.execute("s3 url of my file")我所期望的是,上面的代码将导致在往返结束时存储的字节与开始存储的字节相同。
但是在一次往返旅行之后,存储的字节增加了100 Mb。在第二次往返之后,它会增加一些(但比另外增加100 Mb)。诸若此类。
因此,存储的字节继续增长。
有人能说明一下发生了什么事吗?或者提出解决方案?
非常感谢!
发布于 2019-06-16 08:13:26
我怀疑这里使用的一些库,比如boto3或s3fs,都有一些内部缓存。
这是在两台机器上得到的,而不仅仅是一台,因为您提交了一个dask作业来运行您的一名员工,而不仅仅是您的客户。我建议更换这个
def remote_load():
return db.read_text(s3url).compute()
future = self.client.submit(func=remote_load).result()有了这个
db.read_text(s3url).compute() https://stackoverflow.com/questions/56611623
复制相似问题