首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在另一个文件中使用Asyncio循环引用

在另一个文件中使用Asyncio循环引用
EN

Stack Overflow用户
提问于 2018-03-28 11:42:51
回答 2查看 1.1K关注 0票数 1

嗨,我有下面的脚本,它侦听Google消息,在回调中,我将消息传递给另一个具有循环参数的脚本。

在第二个脚本中,我有一个具有依赖未来函数的任务。但未来的函数永远不会被调用。

代码语言:javascript
复制
import os, time
import base64, json
import asyncio
from google.cloud import pubsub_v1
from app.libs.pubsub.pubsub_connect import PubsubConnect
from app.config import config
from evergage.scripts.evga_process import initEvga
from app.utils import log

LOG = log.get_logger()

PROJECT_NAME = config.get('pubsub', 'project_name')
SUBSCRIPTION = config.get('pubsub', 'insights_subscription')


class PubsubConsumer(PubsubConnect):

    _subscriber = None

    def __init__(self, loop):
        self._subscriber = self.getClientService()
        #loop.create_task( self.getMessages(loop) )
        self.getMessages(loop)


    def getMessages(self, loop):

        def consumeMessageTask(message):
            #loop.create_task( self.callback(message, loop) )
            loop.call_soon( self.callback(message, loop) )


        subscription_path = self._subscriber.subscription_path( PROJECT_NAME, SUBSCRIPTION )
        flow_control = pubsub_v1.types.FlowControl(max_messages=10)
        self._subscriber.subscribe(subscription_path, callback=consumeMessageTask, flow_control=flow_control)

        #loop.run_forever()


    def callback(self, message, loop):

        pubSubMsg = message.data.decode("utf-8")
        pubSubMsg = json.loads( base64.urlsafe_b64decode( pubSubMsg) )

        if pubSubMsg['verb']['display'] == 'evga_upsert':
            loop.create_task( initEvga(pubSubMsg, loop) )

        message.ack()


asyncLoop = asyncio.get_event_loop()
test = PubsubConsumer(asyncLoop)
asyncLoop.run_forever()
asyncLoop.close()

第二文件evga_process.py

代码语言:javascript
复制
import os, time
import base64, json
import asyncio
import functools
from app.config import config
from app.utils import log

LOG = log.get_logger()
MKTO_BATCH = {
    'count': 0,
    'total': 5,
    'data': []
}


async def getMarketoData(cookies):
    #await asyncio.sleep(0.1)
    tmpData = []

    for cookie in cookies:
        tmpData.append("test-" + cook)

    return tmpData


def initSalesforce(future, param1):
    result = future.result()
    LOG.info( param1 )
    LOG.info( str(result) )
    pass


def initEvga(pubSubMsg, loop):
    pubSubID = pubSubMsg['id']
    upsertInfo = pubSubMsg['object']['definition']['description']

    MKTO_BATCH['data'].append(pubSubID)

    MKTO_BATCH['count'] += 1

    if MKTO_BATCH['count'] == MKTO_BATCH['total']:
        mktoTask = loop.create_task( getMarketoData(MKTO_BATCH['data']) )
        mktoTask.add_done_callback( functools.partial( initSalesforce, "myparam1") )

        MKTO_BATCH['count'] = 0
        MKTO_BATCH['data'] = []

我试过使用简单的代码,但它不起作用。只有当我像这样打电话的时候才行

代码语言:javascript
复制
loop = asyncio.get_event_loop()
task = loop.create_task( test_task(loop) )
initEvga(loop)
task.add_done_callback(got_result)

我在这里有遗漏什么吗?

单个文件中的更新脚本

代码语言:javascript
复制
async def heartbeat1():
    while True:
        await asyncio.sleep(1)
        print("heartbeat 1")

async def heartbeat2():
    while True:
        await asyncio.sleep(1)
        print("heartbeat 2")


class PubsubConsumer(PubsubConnect):

    _subscriber = None

    def __init__(self, loop):
        self._subscriber = self.getClientService()

    def getMessages(self, loop):

        def consumeMessageTask(message):
            #loop.create_task( self.callback(message, loop) )
            #loop.call_soon( self.callback(message, loop) )
            PubsubConsumer.callback(message, loop)

        subscription_path = self._subscriber.subscription_path( PROJECT_NAME, SUBSCRIPTION )

        #subClient.subscribe(subscription_path, callback=PubsubConnect.callback)
        # Limit the subscriber to only have ten outstanding messages at a time.
        flow_control = pubsub_v1.types.FlowControl(max_messages=10)
        self._subscriber.subscribe(subscription_path, callback=consumeMessageTask, flow_control=flow_control)

        #loop.run_forever()

    @staticmethod
    def callback(message, loop):
        time.sleep(1)
        pubSubMsg = message.data.decode("utf-8")
        pubSubMsg = json.loads( base64.urlsafe_b64decode( pubSubMsg) )

        loop.create_task( heartbeat2() )

        message.ack()

loop = asyncio.get_event_loop()

loop.create_task( heartbeat1() )

psClient = PubsubConsumer(loop)
psClient.getMessages(loop)

loop.run_forever()
loop.close()

heartbeat1()和heartbeat2()都在调用。但是,如果删除heartbeat1()调用,那么heartbeat2()也不会触发。

原因何在?如果我始终保持heartbeat1()运行,它会产生任何问题吗?

谢谢你,Bala

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-03-28 21:20:53

这一行肯定是不正确的:

代码语言:javascript
复制
mktoTask.add_done_callback( functools.partial( initSalesforce, "myparam1")

您不能向add_done_callback发送异步函数,后者需要一个常规函数。调用时,异步/ coroutine函数返回coroutine对象,而不执行内部的任何代码。要实际执行代码,coroutine对象必须提交给事件循环。在您的示例中,它将被删除,而不会在函数中执行代码。(这很像调用生成器,但从不将返回的迭代器传递给将从中提取值的东西。)

你需要的是:

代码语言:javascript
复制
async def initEvga(pubSubMsg):
    ...
    if MKTO_BATCH['count'] == MKTO_BATCH['total']:
        await getMarketoData(MKTO_BATCH['data'])
        await initSalesforce("myparam1")
    ...

既然initEvga是一个协同器,那么使用以下方法从callback开始它:

代码语言:javascript
复制
loop.create_task(initEvga(pubSubMsg))

...or,如果从不同的线程调用回调:

代码语言:javascript
复制
asyncio.run_coroutine_threadsafe(initEvga(pubSubMsg), loop)

(不需要将loop传递给异步函数,它们总是可以通过调用asyncio.get_event_loop()获得正在运行的事件循环。)

最后,loop.call_soon(self.callback(message, loop))没有意义,因为call_soon接受要调用的函数,而上面的代码使用self.callback()的结果调用它,该结果立即被调用。你要么需要:

代码语言:javascript
复制
self.callback(message, loop)

或者类似于:

代码语言:javascript
复制
loop.call_soon(self.callback, message, loop)
# or loop.call_soon_threadsafe if called from a different thread

因为在这两种情况下,callback必须是短的和非阻塞的,所以推迟它似乎没有好处,所以我选择第一个变体。

票数 2
EN

Stack Overflow用户

发布于 2018-03-30 14:09:30

那些寻找答案的人。这是最后的剧本。

代码语言:javascript
复制
async def initEvga(pubSubMsg):
    pubSubID = pubSubMsg['id']
    print(pubSubID)
    return pubSubMsg['id']


class PubsubConsumer(PubsubConnect):

    _subscriber = None

    def __init__(self):
        self._subscriber = self.getClientService()

    def getMessages(self, loop):

        def consumeMessageTask(message):
            PubsubConsumer.callback(message, loop)

        subscription_path = self._subscriber.subscription_path( PROJECT_NAME, SUBSCRIPTION )
        flow_control = pubsub_v1.types.FlowControl(max_messages=10)
        self._subscriber.subscribe(subscription_path, callback=consumeMessageTask, flow_control=flow_control)

        loop.run_forever()

    @staticmethod
    def callback(message, loop):
        pubSubMsg = message.data.decode("utf-8")
        pubSubMsg = json.loads( base64.urlsafe_b64decode( pubSubMsg) )
        asyncio.run_coroutine_threadsafe(initEvga(pubSubMsg), loop)
        message.ack()


loop = asyncio.get_event_loop()
psClient = PubsubConsumer()
psClient.getMessages(loop)
loop.close()
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49533612

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档