首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在faust中使用并发?

如何在faust中使用并发?
EN

Stack Overflow用户
提问于 2019-04-05 03:56:23
回答 1查看 1.2K关注 0票数 2

我正在与faust合作,并希望利用并发功能。列出的示例并没有很好地演示并发的用法。

我想做的是,阅读kafka producer和unnest json的内容。然后,发货被发送到一个进程,以计算账单等。我应该一次发送10个发货到一个函数,该函数进行计算。为此,我使用并发,以便10个发货量可以并发计算。

代码语言:javascript
复制
import faust
import time
import json
from typing import List
import asyncio

class Items(faust.Record):
    name: str
    billing_unit: str
    billing_qty: int


class Shipments(faust.Record, serializer="json"):
    shipments: List[Items]
    ship_type: str
    shipping_service: str
    shipped_at: str


app = faust.App('ships_app', broker='kafka://localhost:9092', )
ship_topic = app.topic('test_shipments', value_type=Shipments)


@app.agent(value_type=str, concurrency=10)
async def mytask(records):
# task that does some other activity
    async for record in records:
        print(f'received....{record}')
        time.sleep(5)


@app.agent(ship_topic)
async def process_shipments(shipments):
    # async for ships in stream.take(100, within=10):
    async for ships in shipments:
        data = ships.items
        uid = faust.uuid()
        for item in data:
            item_uuid = faust.uuid()
            print(f'{uid}, {item_uuid}, {ships.ship_type}, {ships.shipping_service}, {ships.shipped_at}, {item.name}, {item.billing_unit}, {item.billing_qty}')
            await mytask.send(value=("{} -- {}".format(uid, item_uuid)))

            # time.sleep(2)
        # time.sleep(10)


if __name__ == '__main__':
    app.main()
EN

回答 1

Stack Overflow用户

发布于 2019-11-13 05:27:03

好了,我知道它是怎么工作的了。您给出的示例的问题实际上是time.sleep位,而不是并发位。下面是两个愚蠢的例子,展示了代理如何在并发和不并发的情况下工作。

代码语言:javascript
复制
import faust
import asyncio

app = faust.App(
    'example_app',
    broker="kafka://localhost:9092",
    value_serializer='raw',
)

t = app.topic('topic_1')

# @app.agent(t, concurrency=1)
# async def my_task(tasks):
#   async for my_task in tasks:
#       val = my_task.decode('utf-8')
#       if (val == "Meher"):
#           # This will print out second because there is only one thread.
#           # It'll take 5ish seconds and print out right after Waldo
#           print("Meher's a jerk.")
#       else:
#           await asyncio.sleep(5)
#           # Since there's only one thread running this will effectively
#           # block the agent.
#           print(f"Where did {val} go?")

@app.agent(t, concurrency=2)
async def my_task2(tasks):
    async for my_task in tasks:
        val = my_task.decode('utf-8')
        if (val == "Meher"):
            # This will print out first even though the Meher message is 
            # received second. 
            print("Meher's a jerk.")
        else:
            await asyncio.sleep(5)
            # Because this will be sleeping and there are two threads available.
            print(f"Where did {val} go?")

# ===============================
# In another process run

from kafka import KafkaProducer

p = KafkaProducer()
p.send('topic_1', b'Waldo'); p.send('topic_1', b'Meher')
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55523954

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档