首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >卡夫卡消费者信息提交问题

卡夫卡消费者信息提交问题
EN

Stack Overflow用户
提问于 2020-05-30 18:20:11
回答 1查看 1.4K关注 0票数 0

卡夫卡新手。

Kafka版本: 2.3.1

我正在尝试使用spring从两个主题中使用Kafka消息。我没有做太多的配置,除了卡夫卡绑定和一些简单的配置,如下所示。每当(组协调器lbbb111a.uat.pncint.net:9092 (id: 2147483641齿条: null)不可用或无效,将尝试重新发现)发生时,已处理的消息再次被处理。不知道发生了什么。

代码语言:javascript
复制
spring.cloud.stream.kafka.binder.brokers: xxxxx:9094
spring:
  cloud:
    stream:
      default:
        group: bbb-bl-kyc
      bindings:
        input:
          destination: bbb.core.sar.blul.events,bbb.core.sar.bluloc.events
          contentType: application/json
          consumer:
            headerMode: embeddedHeaders  

spring.kafka.consumer.properties.spring.json.trusted.packages: "*"
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
#Custom Serializer configurations to secure data
spring.cloud.stream.kafka.binder.configuration:
  key.serializer: org.apache.kafka.common.serialization.StringSerializer
  value.serializer: pnc.aop.core.kafka.serialization.MessageSecuredByteArraySerializer
  value.deserializer: pnc.aop.core.kafka.serialization.MessageSecuredByteArrayDeserializer
  key.deserializer: org.apache.kafka.common.serialization.StringDeserializer



2020-05-29 07:01:11.389  INFO 1 --- [container-0-C-1] p.a.b.k.service.KYCOrchestrationService  : Done with Customer xxxx MS call response handling  Confm Id: 159073553171893 Appln Id: HSUKQJDJNZNMWVZZ
2020-05-29 07:01:11.393  INFO 1 --- [container-0-C-1] p.a.b.kyc.service.DMSIntegrationService  : Message written to the DMS topic successfully 159073553171893
2020-05-29 07:01:11.394  INFO 1 --- [container-0-C-1] p.a.b.k.s.AdminConsoleProducerService    : Message written to Admin console Application Log topic successfully  Confm Id: 159073553171893 Appln Id: HSUKQJDJNZNMWVZZ
2020-05-30 17:21:13.140  INFO 1 --- [ad | bbb-bl-kyc] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null) is unavailable or invalid, will attempt rediscovery
2020-05-30 17:21:13.122  INFO 1 --- [ad | bbb-bl-kyc] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null) is unavailable or invalid, will attempt rediscovery
2020-05-30 17:21:14.522  INFO 1 --- [ad | bbb-bl-kyc] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Discovered group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null)
2020-05-30 17:21:14.692  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Discovered group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null)
2020-05-30 17:21:15.151  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Attempt to heartbeat failed for since member id consumer-4-f5a03efd-75cd-425b-94e1-efd3d728d7ca is not valid.
2020-05-30 17:21:15.152  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Revoking previously assigned partitions [bbb.core.sar.bluloc.events-0]
2020-05-30 17:21:15.173  INFO 1 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : bbb-bl-kyc: partitions revoked: [bbb.core.sar.bluloc.events-0]
2020-05-30 17:21:15.141  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Attempt to heartbeat failed for since member id consumer-2-52012bae-1b22-4211-b107-803fb3765720 is not valid.
2020-05-30 17:21:15.175  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] (Re-)joining group
2020-05-30 17:21:15.176  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Revoking previously assigned partitions [bbb.core.sar.blul.events-0]
2020-05-30 17:21:15.184  INFO 1 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : bbb-bl-kyc: partitions revoked: [bbb.core.sar.blul.events-0]
2020-05-30 17:21:15.184  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] (Re-)joining group
2020-05-30 17:21:18.200  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Successfully joined group with generation 66
2020-05-30 17:21:18.200  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Successfully joined group with generation 66
2020-05-30 17:21:18.200  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Setting newly assigned partitions: bbb.core.sar.bluloc.events-0
2020-05-30 17:21:18.200  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Setting newly assigned partitions: bbb.core.sar.blul.events-0
2020-05-30 17:21:18.203  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Found no committed offset for partition bbb.core.sar.blul.events-0
2020-05-30 17:21:18.203  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Found no committed offset for partition bbb.core.sar.bluloc.events-0
2020-05-30 17:21:18.537  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Resetting offset for partition bbb.core.sar.blul.events-0 to offset 4.
2020-05-30 17:21:18.538  INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Resetting offset for partition bbb.core.sar.bluloc.events-0 to offset 0.
2020-05-30 17:21:18.621  INFO 1 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : bbb-bl-kyc: partitions assigned: [bbb.core.sar.blul.events-0]
2020-05-30 17:21:18.625  INFO 1 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : bbb-bl-kyc: partitions assigned: [bbb.core.sar.bluloc.events-0]
2020-05-30 17:21:18.822  INFO 1 --- [container-0-C-1] p.a.b.k.stream.KYCbbbCoreEventsListener  : Initiating KYC Orchestration 159071814927374
2020-05-30 17:21:18.826  INFO 1 --- [container-0-C-1] p.a.b.k.stream.KYCbbbCoreEventsListener  : Initiating KYC Orchestration null
2020-05-30 17:21:18.928  INFO 1 --- [container-0-C-1] p.a.b.k.s.AdminConsoleProducerService    : Message written to Admin console Application topic successfully Confm Id: null Appln Id: XQZ58K3H3XZADTAT
EN

回答 1

Stack Overflow用户

发布于 2020-05-30 18:59:30

在不改变大部分使用者配置的情况下,您将至少拥有一次传递语义。

当组协调员暂时不可用时,您的使用者将无法提交它处理的消息。在重新加入之后,使用者将再次处理相同的消息(因为它们尚未提交),从而导致重复。

您可以找到关于GroupCoordinator和传递语义这里的更多详细信息。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62106878

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档