首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Python run_forever()和任务

Python run_forever()和任务
EN

Stack Overflow用户
提问于 2018-04-18 17:51:24
回答 1查看 1.1K关注 0票数 2

我修改了这段代码,以便在异步Python: PubSub中使用Google https://github.com/cloudfind/google-pubsub-asyncio

代码语言:javascript
复制
import asyncio
import datetime
import functools
import os

from google.cloud import pubsub
from google.gax.errors import RetryError
from grpc import StatusCode

async def message_producer():
    """ Publish messages which consist of the current datetime """
    while True:
        await asyncio.sleep(0.1)


async def proc_message(message):
    await asyncio.sleep(0.1)
    print(message)
    message.ack()


def main():
    """ Main program """
    loop = asyncio.get_event_loop()

    topic = "projects/{project_id}/topics/{topic}".format(
        project_id=PROJECT, topic=TOPIC)
    subscription_name = "projects/{project_id}/subscriptions/{subscription}".format(
        project_id=PROJECT, subscription=SUBSCRIPTION)

    subscription = make_subscription(
        topic, subscription_name)

    def create_proc_message_task(message):
        """ Callback handler for the subscription; schedule a task on the event loop """
        print("Task created!")
        task = loop.create_task(proc_message(message))

    subscription.open(create_proc_message_task)
    # Produce some messages to consume

    loop.create_task(message_producer())

    print("Subscribed, let's do this!")
    loop.run_forever()


def make_subscription(topic, subscription_name):
    """ Make a publisher and subscriber client, and create the necessary resources """
    subscriber = pubsub.SubscriberClient()
    try:
        subscriber.create_subscription(subscription_name, topic)
    except:
        pass
    subscription = subscriber.subscribe(subscription_name)

    return subscription


if __name__ == "__main__":
    main()

我基本上删除了发布代码,只使用订阅代码。但是,最初我没有包括loop.create_task(message_producer())行。我认为任务是按照他们应该做的那样创建的,但是他们从来没有真正运行过自己。只有当我添加上述行时,代码才会正确执行并运行所有创建的任务。是什么导致了这种行为?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-04-19 04:40:08

PubSub正在从另一个线程调用create_proc_message_task回调。因为create_task非线程安全,所以只能从运行事件循环的线程(通常是主线程)调用它。若要更正此问题,将不再需要将loop.create_task(proc_message(message))替换为asyncio.run_coroutine_threadsafe(proc_message(message), loop)message_producer

至于为什么message_producer似乎要修复代码,请考虑run_coroutine_threadsafecreate_task做了两件额外的事情

  • 它以线程安全的方式运行,因此事件循环数据结构在并发执行时不会损坏。
  • 它确保事件循环在尽可能快的机会中醒来,以便它能够处理新的任务。

在您的示例中,create_task将任务添加到循环的可运行队列(没有任何锁定),但未能确保唤醒,因为在事件循环线程中运行时不需要这样做。然后,message_producer会强制循环定期唤醒,这也是它检查和执行可运行任务的时候。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49906034

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档