这个例子来自azure-sdk- for -python for -python for Azure Event Hubs for a consumer。
#!/usr/bin/env python
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
"""
An example to show receiving events from an Event Hub with checkpoint store asynchronously.
In the `receive` method of `EventHubConsumerClient`:
If no partition id is specified, the checkpoint_store are used for load-balance and checkpoint.
If partition id is specified, the checkpoint_store can only be used for checkpoint.
"""
import asyncio
import os
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"]
BLOB_CONTAINER_NAME = "your-blob-container-name" # Please make sure the blob container resource exists.
async def on_event(partition_context, event):
# Put your code here.
print("Received event from partition: {}.".format(partition_context.partition_id))
await partition_context.update_checkpoint(event)
async def receive(client):
"""
Without specifying partition_id, the receive will try to receive events from all partitions and if provided with
a checkpoint store, the client will load-balance partition assignment with other EventHubConsumerClient instances
which also try to receive events from all partitions and use the same storage resource.
"""
await client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# With specified partition_id, load-balance will be disabled, for example:
# await client.receive(on_event=on_event, partition_id='0'))
async def main():
checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, BLOB_CONTAINER_NAME)
client = EventHubConsumerClient.from_connection_string(
CONNECTION_STR,
consumer_group="$Default",
checkpoint_store=checkpoint_store, # For load-balancing and checkpoint. Leave None for no load-balancing.
)
async with client:
await receive(client)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())有没有办法在on_event函数中传递额外的参数。例如,在这个消费者中,我收到来自生产者的消息,假设我需要在数据库中插入一些数据,我需要传递一个打开的数据库连接。我该如何着手做这样的事情呢?
repository中的所有示例都是相同的,on_event有这两个参数,坦率地说,我不知道它们是从哪里来的。
谢谢
发布于 2020-10-07 23:49:03
on_event(partition_context, event)是一个回调方法,它被传递给client.receive方法,告诉它“收到事件时以这种方式处理它”。显然,on_event必须遵循一个固定的方法'signature‘,以便客户端接收方能够在接收到事件时调用回调。即使可以向on_event传递额外的参数,它对您也是无用的,因为您没有调用该方法。
我建议你这样做。在on_event方法外声明一个字段,从某个声明方法初始化它,然后在on_event内部访问它,如下所示:
#!/usr/bin/env python
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
"""
An example to show receiving events from an Event Hub with checkpoint store asynchronously.
In the `receive` method of `EventHubConsumerClient`:
If no partition id is specified, the checkpoint_store are used for load-balance and checkpoint.
If partition id is specified, the checkpoint_store can only be used for checkpoint.
"""
import asyncio
import os
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"]
BLOB_CONTAINER_NAME = "your-blob-container-name" # Please make sure the blob container resource exists.
MY_DATABASE_CONNECTION = None
async def on_event(partition_context, event):
# Put your code here.
# Use MY_DATABASE_CONNECTION as needed
print("Received event from partition: {}.".format(partition_context.partition_id))
await partition_context.update_checkpoint(event)
async def receive(client):
"""
Without specifying partition_id, the receive will try to receive events from all partitions and if provided with
a checkpoint store, the client will load-balance partition assignment with other EventHubConsumerClient instances
which also try to receive events from all partitions and use the same storage resource.
"""
MY_DATABASE_CONNECTION = <initilize it>
await client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# With specified partition_id, load-balance will be disabled, for example:
# await client.receive(on_event=on_event, partition_id='0'))
async def main():
checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, BLOB_CONTAINER_NAME)
client = EventHubConsumerClient.from_connection_string(
CONNECTION_STR,
consumer_group="$Default",
checkpoint_store=checkpoint_store, # For load-balancing and checkpoint. Leave None for no load-balancing.
)
async with client:
await receive(client)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())https://stackoverflow.com/questions/64245343
复制相似问题