首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Azure Eventhubs (Python):启用检查点时使用blob存储的检查点-- EventProcessor中的键错误问题

Azure Eventhubs (Python):启用检查点时使用blob存储的检查点-- EventProcessor中的键错误问题
EN

Stack Overflow用户
提问于 2020-08-11 08:58:32
回答 2查看 1.7K关注 0票数 4

我在晚上的blob存储检查中遇到了问题。如果在获取客户端时没有设置checkpoint_store,则我的应用程序运行良好。每当我尝试设置checkpoint_store变量并运行我的代码时,它都会引发以下异常:

EventProcessor实例“xxxxxxxxxxx”,它是甚至是暴徒消费者组的实例。负载平衡和所有权声明时发生错误.例外是KeyError('ownerid')。在xxxx秒后重试

我唯一能找到的github条目甚至提到了这种错误是这一个,但是问题本身从未得到解决,问题解决的人最终使用了不同的库。

我正在使用的相关库是azure-eventhub和azure-eventhub checkpointstoreblob-aio。

下面是我正在使用的代码(我用本教程作为指南)的相关片段:

代码语言:javascript
复制
import asyncio
from azure.eventhub.aio import EventHubConsumerClient, EventHubProducerClient
from azure.eventhub import EventData
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
代码语言:javascript
复制
async def on_event(partition_context, event):
    await partition_context.update_checkpoint(event)
    #<do stuff with event data>
代码语言:javascript
复制
checkpoint_store = BlobCheckpointStore.from_connection_string(blob_connection_string, container_name)
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=input_eventhub_name, checkpoint_store=checkpoint_store)

async def main():
  async with client:
    await client.receive(
      on_event=on_event,
    )
    print("Terminated.")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

问题似乎完全与blob存储检查点有关;如果我在创建消费者客户端时注释掉'checkpoint_store=checkpoint_store‘,一切运行都没有问题。

与blob存储的连接看起来很好,正如我所做的一些挖掘,并发现在blob存储中创建了一些文件夹“检查点”和“所有权”:blob存储快照,后者在其元数据中包含一些带有“ownerid”的文件:所有者文件元数据

也就是说,钥匙确实存在。我认为正在发生的是,EventProcessor试图获取这些小块的所有权元数据,但不知怎么没有这样做。如果有人知道如何解决这个问题,我会非常感激的!

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-08-11 21:14:35

这看起来像是从其中一个小块中检索"ownerid“的问题。你能帮我个忙来测试一下这些场景吗?

  1. 从blob容器中删除所有内容并重试。
  2. 如果问题仍然存在,您能检查每个blob是否都有元数据"ownerid“吗?
  3. 如果问题仍然存在,您能用下面的代码替换库版本1.1.0中文件azure.eventhub.extensions.checkpointstoreblobaio._blobstoragecsaio.py的第144行并重试吗?
代码语言:javascript
复制
"owner_id": blob.metadata.get("ownerid"),
票数 2
EN

Stack Overflow用户

发布于 2021-01-12 20:03:02

根本原因是,当在启用了数据湖(分层命名空间)的list_blobs存储blob上调用存储sdk的v2功能时,不仅会得到每个分区的检查点/所有权,还会得到不包含元数据的父blob节点。

为了更好地说明这一点,假设我们有以下blob结构:

代码语言:javascript
复制
- fullqualifiednamespace (directory)
  - eventhubname (directory)
    - $default (directory)
        - ownership (directory)
          - 0 (blob)
          - 1 (blob)
          ...

在启用了数据湖(分层命名空间)的v2存储中,当代码使用前缀{<fully_qualified_namespace>/<eventhub_name>/<consumer_group>/ownership搜索blobs时,当我们试图提取信息时,{<fully_qualified_namespace>/<eventhub_name>/<consumer_group>/ownership目录本身也将被返回,其中不包含导致KeyError的元数据。

有一个bug修复版本的校验点商店problem,请升级到最新的版本,看看它是否解决了您的问题。

如果你还有更多的问题请告诉我。

链接:

同步:https://pypi.org/project/azure-eventhub-checkpointstoreblob/1.1.2/

对于异步:https://pypi.org/project/azure-eventhub-checkpointstoreblob-aio/1.1.2/

github杂志:https://github.com/Azure/azure-sdk-for-python/issues/13060

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63354884

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档