我已经复制-粘贴了这个脚本,它工作得很好。我从文萨布那里得到了所有的事件:
import logging
from azure.eventhub import EventHubConsumerClient
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)
def on_event(partition_context, event):
logger.info("Received event from partition {}".format(partition_context.partition_id))
partition_context.update_checkpoint(event)
with client:
client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# receive events from specified partition:
# client.receive(on_event=on_event, partition_id='0')然而,一旦我试图从事件中获取任何值,我就会得到一个OWNERSHIP_LOST异常,并且脚本不会检索任何东西。
我试过event.body.KEY,event.KEY,json.loads(event)['body']['key'],json.loads(event)['key']。无论我对events做什么,都会引起OWNERSHIP_LOST。
你知道解决方法吗?或者你碰巧知道我哪里做错了?
发布于 2020-04-24 02:26:53
接收事件时的错误或on_event中的错误可能会导致OWNERSHIP_LOST。你能试试event.body_as_json()'key‘吗?
发布于 2020-12-09 07:57:50
EventData.body返回bytes or Generator[bytes]。
在以您想要的方式提取内容之前,您需要以适当的方式将这些原始字节反序列化为字符串/对象。
EventData对象有两个助手方法body_as_str和body_as_json,前者尝试解码原始字节,返回字符串,后者尝试将原始字节解码为字符串,然后尝试加载字符串,返回json对象。
解析原始接收到的字节取决于您发送的数据,如果您可以提供示例数据,这将对调试很有帮助。
https://stackoverflow.com/questions/61385284
复制相似问题