首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >异步计算dask数组块(Dask + FastAPI)

异步计算dask数组块(Dask + FastAPI)
EN

Stack Overflow用户
提问于 2020-03-02 16:28:22
回答 1查看 869关注 0票数 3

我正在构建一个FastAPI应用程序,该应用程序将为Dask的块提供服务。我想把FastAPI的异步功能达斯克分布的异步操作能力结合起来。下面是一个mcve,它演示了我试图在应用程序的服务器和客户端做什么:

服务器端:

代码语言:javascript
复制
import time

import dask.array as da
import numpy as np
import uvicorn
from dask.distributed import Client
from fastapi import FastAPI

app = FastAPI()
# create a dask array that we can serve
data = da.from_array(np.arange(0, 1e6, dtype=np.int), chunks=100)


async def _get_block(block_id):
    """return one block of the dask array as a list"""
    block_data = data.blocks[block_id].compute()
    return block_data.tolist()


@app.get("/")
async def get_root():
    time.sleep(1)
    return {"Hello": "World"}


@app.get("/{block_id}")
async def get_block(block_id: int):
    time.sleep(1)  # so we can test concurrency
    my_list = await _get_block(block_id)
    return {"block": my_list}


if __name__ == "__main__":
    client = Client(n_workers=2)
    print(client)
    print(client.cluster.dashboard_link)
    uvicorn.run(app, host="0.0.0.0", port=9000, log_level="debug")

客户端

代码语言:javascript
复制
import dask
import requests
from dask.distributed import Client

client = Client()

responses = [
    dask.delayed(requests.get, pure=False)(f"http://127.0.0.1:9000/{i}") for i in range(10)
]
dask.compute(responses)

在这个设置中,compute()调用在_get_block中是“阻塞”的,每次只计算一个块。我尝试过Client(asynchronous=True)client.compute(dask.compute(responses)的各种组合,但没有任何改进。是否有可能await计算的达克阵列?

EN

回答 1

Stack Overflow用户

发布于 2020-03-02 16:50:30

这条线

代码语言:javascript
复制
block_data = data.blocks[block_id].compute()

是个阻拦电话。如果您使用client.compute(data.blocks[block_id]),您将得到一个可以与您的IOLoop一起使用的未来,只要Dask使用相同的循环即可。

请注意,接收服务器非常希望以这种方式工作(它也希望为数组和其他数据类型按块流数据)。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60492963

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档