我试着用春季卡夫卡从溪流中消费唱片。记录在avro架构中。由于主题上有两个分区,所以我使用kafka并发作为2来并行地使用来自分区的记录。但是,它似乎引起了一些问题。
在处理之前,我正在记录从分区接收到的记录,以确保我们不会得到重复(不同分区中的相同键)。
配置:
@Bean
public ConsumerFactory<String, GenericRecord> consumerFactory(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, MAX_POLL_INTERVAL);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE);
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, GenericRecord>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(KAFKA_CONCURRENCY);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // manual async committ
//factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO)
return factory;
}代码:
@KafkaListener(topics = "${app.topic}", groupId = "${app.group_id_config}")
public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgement) throws Exception {
try {
//System.out.println(record);
if (record.value().get("enrollmentType").toString().matches("ACH|VCP|CHK"))
{
prov_tin_number = record.value().get("providerTinNumber").toString();
//prov_tin_type = record.value().get("providerTINType").toString();
enroll_type = record.value().get("enrollmentType").toString();
vcp_prov_choice_ind = record.value().get("vcpProvChoiceInd").toString();
error_flag = "";
}
System.out.println("coming from stream :" + prov_tin_number + " into offset " + record.offset() + " and partition " + record.partition());
acknowledgement.acknowledge();
}catch (Exception ex) {
System.out.println(record);
System.out.println(ex.getMessage());
}
}代码的输出:
示例:
coming from stream :018601027 into offset 500428 and partition 0
coming from stream :018601027 into offset 499553 and partition 1从上面的输出来看,相同的记录正在进入不同的偏移量和分区,并在使用者端造成重复。但是,情况并非如此,当我试图使用命令行读取记录时,我的输出低于:
root@fast-data-dev bin $ kafka-avro-console-consumer --topic kaas.pe.enrollment.csp.ts2 --bootstrap-server kaas-test-ctc-a.optum.com:443 --consumer.config
/data/test/client-test-ssl.properties **--partition 1 --offset 499553** --property schema.registry.url="http://kaas-test-schema-registry.com" --max-messages 1
{"**providerTinNumber":"018601027"**,"providerTINType":"TIN","enrollmentType":"ACH","vcpProvChoiceInd":{"string":"null"},"usrDefFld1":null,"usrDefFld2":null,"usrDefFld3":null,"usrDefFld4":null,"usrDefFld5":null,"usrDefFld6":null,"usrDefFld7":null,"usrDefFld8":null,"usrDefFld9":null,"usrDefFld10":null}
Processed a total of 1 messages
root@fast-data-dev bin $ kafka-avro-console-consumer --topic kaas.pe.enrollment.csp.ts2 --bootstrap-server kaas-test-ctc-a.optum.com:443 --consumer.config
/data/test/client-test-ssl.properties **--partition 0 --offset 500428** --property schema.registry.url="http://kaas-test-schema-registry.com" --max-messages 1
{"**providerTinNumber":"024580061"**,"providerTINType":"TIN","enrollmentType":"ACH","vcpProvChoiceInd":{"string":"null"},"usrDefFld1":null,"usrDefFld2":null,"usrDefFld3":null,"usrDefFld4":null,"usrDefFld5":null,"usrDefFld6":null,"usrDefFld7":null,"usrDefFld8":null,"usrDefFld9":null,"usrDefFld10":null}
Processed a total of 1 messages对于不同的偏移量和分区,我们确实有不同的值。它是显而易见的,我的代码中有一些错误,它不是发生在一个记录上,而是发生在多个记录上。
完整的Spring引导日志:
00:26:11.507 [restartedMain] INFO com.emerald.peconsumer.ApplicationRun - Started ApplicationRun in 2.896 seconds (JVM running for 12.571)
00:26:13.357 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Cluster ID: 6cbv7QOaSW6j1vXrOCE4jA
00:26:13.357 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Cluster ID: 6cbv7QOaSW6j1vXrOCE4jA
00:26:13.359 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Discovered group coordinator apslp1563.uhc.com:9093 (id: 2147483574 rack: null)
00:26:13.359 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Discovered group coordinator apslp1563.uhc.com:9093 (id: 2147483574 rack: null)
00:26:13.521 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] (Re-)joining group
00:26:13.521 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] (Re-)joining group
00:26:15.196 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] (Re-)joining group
00:26:15.197 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] (Re-)joining group
00:26:30.504 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Finished assignment for group at generation 77: {consumer-csp-prov-emerald-test-2-d2f920dc-a52a-4ed4-aa0f-1e3ef268a4fc=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@1f9e0b89, consumer-csp-prov-emerald-test-1-242f32f2-b823-4946-be1f-a6c584a0f3ce=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@91e5bc9}
00:26:30.815 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Successfully joined group with generation 77
00:26:30.815 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Successfully joined group with generation 77
00:26:30.818 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Adding newly assigned partitions: kaas.pe.enrollment.csp.ts2-0
00:26:30.818 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Adding newly assigned partitions: kaas.pe.enrollment.csp.ts2-1
00:26:31.133 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Setting offset for partition kaas.pe.enrollment.csp.ts2-0 to the committed offset FetchPosition{offset=500428, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=apslp1559.uhc.com:9093 (id: 69 rack: null), epoch=37}}
00:26:31.133 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Setting offset for partition kaas.pe.enrollment.csp.ts2-1 to the committed offset FetchPosition{offset=499553, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=apslp1562.uhc.com:9093 (id: 72 rack: null), epoch=36}}
00:26:31.134 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer - csp-prov-emerald-test: partitions assigned: [kaas.pe.enrollment.csp.ts2-0]
00:26:31.134 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer - csp-prov-emerald-test: partitions assigned: [kaas.pe.enrollment.csp.ts2-1]
coming from stream :018601027 into offset 500428 and partition 0
coming from stream :018601027 into offset 499553 and partition 1更新:
我试着用一个使用者线程和两个使用者线程再次打印记录。
来自2个使用者线程的输出:
你可以观察到行为的随机性。
重复记录:相同的记录正在进入两个不同的分区
第一次运行:
coming from stream :018601027 into offset 500428 and partition 0 <-- duplicate
coming from stream :018601027 into offset 499553 and partition 1 <-- duplicate
coming from stream :072461600 into offset 500429 and partition 0
coming from stream :027400400 into offset 499554 and partition 1
coming from stream :090341206 into offset 500430 and partition 0 <-- duplicate
coming from stream :090341206 into offset 499555 and partition 1 <-- duplicate
coming from stream :113423162 into offset 500431 and partition 0
coming from stream :052407795 into offset 499556 and partition 1
coming from stream :086586131 into offset 499557 and partition 1
coming from stream :113424057 into offset 500432 and partition 0
coming from stream :090443768 into offset 499558 and partition 1
coming from stream :024580061 into offset 500433 and partition 0
coming from stream :072461600 into offset 500434 and partition 0
coming from stream :090465976 into offset 499559 and partition 1
coming from stream :094324212 into offset 499560 and partition 1
coming from stream :090341206 into offset 500435 and partition 0
coming from stream :107422748 into offset 499561 and partition 1
coming from stream :113423162 into offset 500436 and partition 0
coming from stream :004582777 into offset 499562 and partition 1
coming from stream :113424057 into offset 500437 and partition 0
coming from stream :018601027 into offset 499563 and partition 1
coming from stream :027400400 into offset 499564 and partition 1
coming from stream :031866294 into offset 499565 and partition 1
coming from stream :052407795 into offset 499566 and partition 1
coming from stream :086586131 into offset 499567 and partition 1
coming from stream :090443768 into offset 499568 and partition 1
coming from stream :090465976 into offset 499569 and partition 1
coming from stream :094324212 into offset 499570 and partition 1
coming from stream :107422748 into offset 499571 and partition 1
coming from stream :272626998 into offset 499572 and partition 1第二轮:
coming from stream :024580061 into offset 499553 and partition 1 <-- duplicate
coming from stream :024580061 into offset 500428 and partition 0 <-- duplicate
coming from stream :027400400 into offset 499554 and partition 1
coming from stream :072461600 into offset 500429 and partition 0
coming from stream :090341206 into offset 500430 and partition 0
coming from stream :031866294 into offset 499555 and partition 1
coming from stream :113423162 into offset 500431 and partition 0
coming from stream :052407795 into offset 499556 and partition 1
coming from stream :113424057 into offset 500432 and partition 0
coming from stream :086586131 into offset 499557 and partition 1
coming from stream :090443768 into offset 499558 and partition 1
coming from stream :024580061 into offset 500433 and partition 0
coming from stream :072461600 into offset 500434 and partition 0
coming from stream :090465976 into offset 499559 and partition 1
coming from stream :090341206 into offset 500435 and partition 0
coming from stream :094324212 into offset 499560 and partition 1
coming from stream :113423162 into offset 500436 and partition 0 <-- duplicate
coming from stream :113423162 into offset 499561 and partition 1 <-- duplicate
coming from stream :004582777 into offset 499562 and partition 1
coming from stream :113424057 into offset 500437 and partition 0
coming from stream :018601027 into offset 499563 and partition 1
coming from stream :027400400 into offset 499564 and partition 1
coming from stream :031866294 into offset 499565 and partition 1
coming from stream :052407795 into offset 499566 and partition 1
coming from stream :086586131 into offset 499567 and partition 1
coming from stream :090443768 into offset 499568 and partition 1
coming from stream :090465976 into offset 499569 and partition 1
coming from stream :094324212 into offset 499570 and partition 1
coming from stream :107422748 into offset 499571 and partition 1
coming from stream :272626998 into offset 499572 and partition 1来自一个使用者线程的输出:
如果使用一个使用者线程,则没有重复。这些记录正在按预期打印出来。这是否意味着spring并发参数不可靠?那么,如何将使用者应用程序缩放为并行处理记录?
coming from stream :018601027 into offset 499553 and partition 1
coming from stream :027400400 into offset 499554 and partition 1
coming from stream :031866294 into offset 499555 and partition 1
coming from stream :052407795 into offset 499556 and partition 1
coming from stream :086586131 into offset 499557 and partition 1
coming from stream :090443768 into offset 499558 and partition 1
coming from stream :090465976 into offset 499559 and partition 1
coming from stream :094324212 into offset 499560 and partition 1
coming from stream :107422748 into offset 499561 and partition 1
coming from stream :004582777 into offset 499562 and partition 1
coming from stream :018601027 into offset 499563 and partition 1
coming from stream :027400400 into offset 499564 and partition 1
coming from stream :031866294 into offset 499565 and partition 1
coming from stream :052407795 into offset 499566 and partition 1
coming from stream :086586131 into offset 499567 and partition 1
coming from stream :090443768 into offset 499568 and partition 1
coming from stream :090465976 into offset 499569 and partition 1
coming from stream :094324212 into offset 499570 and partition 1
coming from stream :107422748 into offset 499571 and partition 1
coming from stream :272626998 into offset 499572 and partition 1
coming from stream :024580061 into offset 500428 and partition 0
coming from stream :072461600 into offset 500429 and partition 0
coming from stream :090341206 into offset 500430 and partition 0
coming from stream :113423162 into offset 500431 and partition 0
coming from stream :113424057 into offset 500432 and partition 0
coming from stream :024580061 into offset 500433 and partition 0
coming from stream :072461600 into offset 500434 and partition 0
coming from stream :090341206 into offset 500435 and partition 0
coming from stream :113423162 into offset 500436 and partition 0
coming from stream :113424057 into offset 500437 and partition 0发布于 2020-11-06 14:23:55
您很可能在从侦听器调用的代码中遇到线程安全问题;在使用多个线程时,必须使用字段,除非您使用同步逻辑来保护它们。
例如:
public class NotThreadSafe {
String someValue;
void processRecord(ConsumerRecord<?, ?> record) {
this.someValue = record.value();
doSomeMoreWork();
}
void doSomeMoreWork() {
...
doSomethingWith(this.someValue);
}
}当有多个线程时,一个线程可能从另一个线程中看到someValue。
https://stackoverflow.com/questions/64704149
复制相似问题