我在晚上的blob存储检查中遇到了问题。如果在获取客户端时没有设置checkpoint_store,则我的应用程序运行良好。每当我尝试设置checkpoint_store变量并运行我的代码时,它都会引发以下异常:
EventProcessor实例“xxxxxxxxxxx”,它是甚至是暴徒消费者组的实例。负载平衡和所有权声明时发生错误.例外是KeyError('ownerid')。在xxxx秒后重试
我唯一能找到的github条目甚至提到了这种错误是这一个,但是问题本身从未得到解决,问题解决的人最终使用了不同的库。
我正在使用的相关库是azure-eventhub和azure-eventhub checkpointstoreblob-aio。
下面是我正在使用的代码(我用本教程作为指南)的相关片段:
import asyncio
from azure.eventhub.aio import EventHubConsumerClient, EventHubProducerClient
from azure.eventhub import EventData
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStoreasync def on_event(partition_context, event):
await partition_context.update_checkpoint(event)
#<do stuff with event data>checkpoint_store = BlobCheckpointStore.from_connection_string(blob_connection_string, container_name)
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=input_eventhub_name, checkpoint_store=checkpoint_store)
async def main():
async with client:
await client.receive(
on_event=on_event,
)
print("Terminated.")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())问题似乎完全与blob存储检查点有关;如果我在创建消费者客户端时注释掉'checkpoint_store=checkpoint_store‘,一切运行都没有问题。
与blob存储的连接看起来很好,正如我所做的一些挖掘,并发现在blob存储中创建了一些文件夹“检查点”和“所有权”:blob存储快照,后者在其元数据中包含一些带有“ownerid”的文件:所有者文件元数据。
也就是说,钥匙确实存在。我认为正在发生的是,EventProcessor试图获取这些小块的所有权元数据,但不知怎么没有这样做。如果有人知道如何解决这个问题,我会非常感激的!
发布于 2020-08-11 21:14:35
这看起来像是从其中一个小块中检索"ownerid“的问题。你能帮我个忙来测试一下这些场景吗?
"owner_id": blob.metadata.get("ownerid"),发布于 2021-01-12 20:03:02
根本原因是,当在启用了数据湖(分层命名空间)的list_blobs存储blob上调用存储sdk的v2功能时,不仅会得到每个分区的检查点/所有权,还会得到不包含元数据的父blob节点。
为了更好地说明这一点,假设我们有以下blob结构:
- fullqualifiednamespace (directory)
- eventhubname (directory)
- $default (directory)
- ownership (directory)
- 0 (blob)
- 1 (blob)
...在启用了数据湖(分层命名空间)的v2存储中,当代码使用前缀{<fully_qualified_namespace>/<eventhub_name>/<consumer_group>/ownership搜索blobs时,当我们试图提取信息时,{<fully_qualified_namespace>/<eventhub_name>/<consumer_group>/ownership目录本身也将被返回,其中不包含导致KeyError的元数据。
有一个bug修复版本的校验点商店problem,请升级到最新的版本,看看它是否解决了您的问题。
如果你还有更多的问题请告诉我。
链接:
同步:https://pypi.org/project/azure-eventhub-checkpointstoreblob/1.1.2/
对于异步:https://pypi.org/project/azure-eventhub-checkpointstoreblob-aio/1.1.2/
github杂志:https://github.com/Azure/azure-sdk-for-python/issues/13060
https://stackoverflow.com/questions/63354884
复制相似问题