首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用者应用程序在从多个分区读取记录时造成重复

使用者应用程序在从多个分区读取记录时造成重复
EN

Stack Overflow用户
提问于 2020-11-05 19:32:59
回答 1查看 280关注 0票数 1

我试着用春季卡夫卡从溪流中消费唱片。记录在avro架构中。由于主题上有两个分区,所以我使用kafka并发作为2来并行地使用来自分区的记录。但是,它似乎引起了一些问题。

在处理之前,我正在记录从分区接收到的记录,以确保我们不会得到重复(不同分区中的相同键)。

配置:

代码语言:javascript
复制
    @Bean
    public ConsumerFactory<String, GenericRecord> consumerFactory(){
        
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, MAX_POLL_INTERVAL);
        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL);
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME);
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE);
        props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME);
        props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE);
        props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE);
        
        return new DefaultKafkaConsumerFactory<>(props);    
    }
    
    @Bean
    ConcurrentKafkaListenerContainerFactory<String, GenericRecord> 
    kafkaListenerContainerFactory() {
 
      ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
                            new ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(consumerFactory());
      factory.setConcurrency(KAFKA_CONCURRENCY);
      factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // manual async committ
      //factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO)
      return factory;
  }

代码:

代码语言:javascript
复制
@KafkaListener(topics = "${app.topic}", groupId = "${app.group_id_config}")
    public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgement) throws Exception {

        try {
            //System.out.println(record);
            if (record.value().get("enrollmentType").toString().matches("ACH|VCP|CHK")) 
            {
                prov_tin_number         = record.value().get("providerTinNumber").toString();
                //prov_tin_type             = record.value().get("providerTINType").toString();
                enroll_type             = record.value().get("enrollmentType").toString();
                vcp_prov_choice_ind     = record.value().get("vcpProvChoiceInd").toString();
                error_flag              = "";
        
        
            } 
            
            
                System.out.println("coming from stream :" + prov_tin_number + " into offset " + record.offset() + " and partition " + record.partition());
                
        
                
            
            acknowledgement.acknowledge();
                
        }catch (Exception ex) {
            System.out.println(record);
            System.out.println(ex.getMessage());
        }

    }

代码的输出:

示例:

代码语言:javascript
复制
coming from stream :018601027 into offset 500428 and partition 0
coming from stream :018601027 into offset 499553 and partition 1

从上面的输出来看,相同的记录正在进入不同的偏移量和分区,并在使用者端造成重复。但是,情况并非如此,当我试图使用命令行读取记录时,我的输出低于:

代码语言:javascript
复制
root@fast-data-dev bin $ kafka-avro-console-consumer --topic kaas.pe.enrollment.csp.ts2 --bootstrap-server kaas-test-ctc-a.optum.com:443 --consumer.config
/data/test/client-test-ssl.properties **--partition 1 --offset 499553**  --property schema.registry.url="http://kaas-test-schema-registry.com" --max-messages 1
{"**providerTinNumber":"018601027"**,"providerTINType":"TIN","enrollmentType":"ACH","vcpProvChoiceInd":{"string":"null"},"usrDefFld1":null,"usrDefFld2":null,"usrDefFld3":null,"usrDefFld4":null,"usrDefFld5":null,"usrDefFld6":null,"usrDefFld7":null,"usrDefFld8":null,"usrDefFld9":null,"usrDefFld10":null}
Processed a total of 1 messages
root@fast-data-dev bin $ kafka-avro-console-consumer --topic kaas.pe.enrollment.csp.ts2 --bootstrap-server kaas-test-ctc-a.optum.com:443 --consumer.config
/data/test/client-test-ssl.properties **--partition 0 --offset 500428** --property schema.registry.url="http://kaas-test-schema-registry.com" --max-messages 1
{"**providerTinNumber":"024580061"**,"providerTINType":"TIN","enrollmentType":"ACH","vcpProvChoiceInd":{"string":"null"},"usrDefFld1":null,"usrDefFld2":null,"usrDefFld3":null,"usrDefFld4":null,"usrDefFld5":null,"usrDefFld6":null,"usrDefFld7":null,"usrDefFld8":null,"usrDefFld9":null,"usrDefFld10":null}
Processed a total of 1 messages

对于不同的偏移量和分区,我们确实有不同的值。它是显而易见的,我的代码中有一些错误,它不是发生在一个记录上,而是发生在多个记录上。

完整的Spring引导日志:

代码语言:javascript
复制
00:26:11.507 [restartedMain] INFO  com.emerald.peconsumer.ApplicationRun - Started ApplicationRun in 2.896 seconds (JVM running for 12.571)
00:26:13.357 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Cluster ID: 6cbv7QOaSW6j1vXrOCE4jA
00:26:13.357 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Cluster ID: 6cbv7QOaSW6j1vXrOCE4jA
00:26:13.359 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Discovered group coordinator apslp1563.uhc.com:9093 (id: 2147483574 rack: null)
00:26:13.359 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Discovered group coordinator apslp1563.uhc.com:9093 (id: 2147483574 rack: null)
00:26:13.521 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] (Re-)joining group
00:26:13.521 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] (Re-)joining group
00:26:15.196 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] (Re-)joining group
00:26:15.197 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] (Re-)joining group
00:26:30.504 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Finished assignment for group at generation 77: {consumer-csp-prov-emerald-test-2-d2f920dc-a52a-4ed4-aa0f-1e3ef268a4fc=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@1f9e0b89, consumer-csp-prov-emerald-test-1-242f32f2-b823-4946-be1f-a6c584a0f3ce=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@91e5bc9}
00:26:30.815 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Successfully joined group with generation 77
00:26:30.815 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Successfully joined group with generation 77
00:26:30.818 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Adding newly assigned partitions: kaas.pe.enrollment.csp.ts2-0
00:26:30.818 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Adding newly assigned partitions: kaas.pe.enrollment.csp.ts2-1
00:26:31.133 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Setting offset for partition kaas.pe.enrollment.csp.ts2-0 to the committed offset FetchPosition{offset=500428, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=apslp1559.uhc.com:9093 (id: 69 rack: null), epoch=37}}
00:26:31.133 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Setting offset for partition kaas.pe.enrollment.csp.ts2-1 to the committed offset FetchPosition{offset=499553, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=apslp1562.uhc.com:9093 (id: 72 rack: null), epoch=36}}
00:26:31.134 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.springframework.kafka.listener.KafkaMessageListenerContainer - csp-prov-emerald-test: partitions assigned: [kaas.pe.enrollment.csp.ts2-0]
00:26:31.134 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.springframework.kafka.listener.KafkaMessageListenerContainer - csp-prov-emerald-test: partitions assigned: [kaas.pe.enrollment.csp.ts2-1]
coming from stream :018601027 into offset 500428 and partition 0
coming from stream :018601027 into offset 499553 and partition 1

更新:

我试着用一个使用者线程和两个使用者线程再次打印记录。

来自2个使用者线程的输出:

你可以观察到行为的随机性。

重复记录:相同的记录正在进入两个不同的分区

第一次运行:

代码语言:javascript
复制
coming from stream :018601027 into offset 500428 and partition 0 <-- duplicate 
coming from stream :018601027 into offset 499553 and partition 1 <-- duplicate
coming from stream :072461600 into offset 500429 and partition 0
coming from stream :027400400 into offset 499554 and partition 1
coming from stream :090341206 into offset 500430 and partition 0 <-- duplicate
coming from stream :090341206 into offset 499555 and partition 1 <-- duplicate
coming from stream :113423162 into offset 500431 and partition 0
coming from stream :052407795 into offset 499556 and partition 1
coming from stream :086586131 into offset 499557 and partition 1
coming from stream :113424057 into offset 500432 and partition 0
coming from stream :090443768 into offset 499558 and partition 1
coming from stream :024580061 into offset 500433 and partition 0
coming from stream :072461600 into offset 500434 and partition 0
coming from stream :090465976 into offset 499559 and partition 1
coming from stream :094324212 into offset 499560 and partition 1
coming from stream :090341206 into offset 500435 and partition 0
coming from stream :107422748 into offset 499561 and partition 1
coming from stream :113423162 into offset 500436 and partition 0
coming from stream :004582777 into offset 499562 and partition 1
coming from stream :113424057 into offset 500437 and partition 0
coming from stream :018601027 into offset 499563 and partition 1
coming from stream :027400400 into offset 499564 and partition 1
coming from stream :031866294 into offset 499565 and partition 1
coming from stream :052407795 into offset 499566 and partition 1
coming from stream :086586131 into offset 499567 and partition 1
coming from stream :090443768 into offset 499568 and partition 1
coming from stream :090465976 into offset 499569 and partition 1
coming from stream :094324212 into offset 499570 and partition 1
coming from stream :107422748 into offset 499571 and partition 1
coming from stream :272626998 into offset 499572 and partition 1

第二轮:

代码语言:javascript
复制
 coming from stream :024580061 into offset 499553 and partition 1 <-- duplicate
coming from stream :024580061 into offset 500428 and partition 0 <-- duplicate
coming from stream :027400400 into offset 499554 and partition 1
coming from stream :072461600 into offset 500429 and partition 0
coming from stream :090341206 into offset 500430 and partition 0
coming from stream :031866294 into offset 499555 and partition 1
coming from stream :113423162 into offset 500431 and partition 0
coming from stream :052407795 into offset 499556 and partition 1
coming from stream :113424057 into offset 500432 and partition 0
coming from stream :086586131 into offset 499557 and partition 1
coming from stream :090443768 into offset 499558 and partition 1
coming from stream :024580061 into offset 500433 and partition 0
coming from stream :072461600 into offset 500434 and partition 0
coming from stream :090465976 into offset 499559 and partition 1
coming from stream :090341206 into offset 500435 and partition 0
coming from stream :094324212 into offset 499560 and partition 1
coming from stream :113423162 into offset 500436 and partition 0 <-- duplicate
coming from stream :113423162 into offset 499561 and partition 1 <-- duplicate
coming from stream :004582777 into offset 499562 and partition 1
coming from stream :113424057 into offset 500437 and partition 0
coming from stream :018601027 into offset 499563 and partition 1
coming from stream :027400400 into offset 499564 and partition 1
coming from stream :031866294 into offset 499565 and partition 1
coming from stream :052407795 into offset 499566 and partition 1
coming from stream :086586131 into offset 499567 and partition 1
coming from stream :090443768 into offset 499568 and partition 1
coming from stream :090465976 into offset 499569 and partition 1
coming from stream :094324212 into offset 499570 and partition 1
coming from stream :107422748 into offset 499571 and partition 1
coming from stream :272626998 into offset 499572 and partition 1

来自一个使用者线程的输出:

如果使用一个使用者线程,则没有重复。这些记录正在按预期打印出来。这是否意味着spring并发参数不可靠?那么,如何将使用者应用程序缩放为并行处理记录?

代码语言:javascript
复制
coming from stream :018601027 into offset 499553 and partition 1
coming from stream :027400400 into offset 499554 and partition 1
coming from stream :031866294 into offset 499555 and partition 1
coming from stream :052407795 into offset 499556 and partition 1
coming from stream :086586131 into offset 499557 and partition 1
coming from stream :090443768 into offset 499558 and partition 1
coming from stream :090465976 into offset 499559 and partition 1
coming from stream :094324212 into offset 499560 and partition 1
coming from stream :107422748 into offset 499561 and partition 1
coming from stream :004582777 into offset 499562 and partition 1
coming from stream :018601027 into offset 499563 and partition 1
coming from stream :027400400 into offset 499564 and partition 1
coming from stream :031866294 into offset 499565 and partition 1
coming from stream :052407795 into offset 499566 and partition 1
coming from stream :086586131 into offset 499567 and partition 1
coming from stream :090443768 into offset 499568 and partition 1
coming from stream :090465976 into offset 499569 and partition 1
coming from stream :094324212 into offset 499570 and partition 1
coming from stream :107422748 into offset 499571 and partition 1
coming from stream :272626998 into offset 499572 and partition 1
coming from stream :024580061 into offset 500428 and partition 0
coming from stream :072461600 into offset 500429 and partition 0
coming from stream :090341206 into offset 500430 and partition 0
coming from stream :113423162 into offset 500431 and partition 0
coming from stream :113424057 into offset 500432 and partition 0
coming from stream :024580061 into offset 500433 and partition 0
coming from stream :072461600 into offset 500434 and partition 0
coming from stream :090341206 into offset 500435 and partition 0
coming from stream :113423162 into offset 500436 and partition 0
coming from stream :113424057 into offset 500437 and partition 0
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-11-06 14:23:55

您很可能在从侦听器调用的代码中遇到线程安全问题;在使用多个线程时,必须使用字段,除非您使用同步逻辑来保护它们。

例如:

代码语言:javascript
复制
public class NotThreadSafe {

    String someValue;

    void processRecord(ConsumerRecord<?, ?> record) {

        this.someValue = record.value();

        doSomeMoreWork();
    
    }

    void doSomeMoreWork() {
        ...
        doSomethingWith(this.someValue);
    }

}

当有多个线程时,一个线程可能从另一个线程中看到someValue

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64704149

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档