关于这个话题有很多问题,但是,这不是一个重复的问题!
我面临的问题是,我试图用Java14和Kafka2.5.0设置一个SpringBoot项目,而我的消费者返回一个空的记录列表。这里的大多数答案都表明了一些被遗忘的属性,比如频繁投票或设置偏移模式到最早。
我看不出与docs.confluent.io有任何逻辑上的区别,尽管我的配置设置似乎有些不合常规(请参阅下面代码段中的jaas.conf设置)。
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public KafkaConsumer<Long, MyClass> consumerConfigs() {
Properties config = new Properties();
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
config.put(ConsumerConfig.CLIENT_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "confluent-cloud");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyClass.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);
System.setProperty("java.security.auth.login.config", ".\\src\\main\\resources\\jaas.conf");
return new KafkaConsumer<>(config);
}
}不过,这是可行的。我没有任何例外(卡夫卡,或其他),并且连接是建立的。
// jaas.conf-file
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="serviceName"
username="username"
password="password";
};这里是我实际投票的地方:
try {
KafkaConsumer<Long, MyClass> consumer = kafkaConfig.consumerConfigs();
consumer.subscribe(Collections.singletonList(inputTopic));
int count = 0;
Long start = System.currentTimeMillis();
Long end = System.currentTimeMillis();
while (end - start < 900_000) {
// boolean would be set to true in production
ConsumerRecords<Long, MyClass> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(record -> {
MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
System.out.println(result);
});
consumer.commitSync();
System.out.println("visualize number of loops made: " + ++count);
end = System.currentTimeMillis();
}
} catch (KafkaException e) {
e.printStackTrace();
} catch (Exception e) {
System.out.println(e.getMessage());
}为了找出问题,我增加了指纹和其他杂乱的东西。我以调试模式运行我的程序,并将断点放在下面一行:
MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());因此,我每秒钟都会看到一条印有计数的线条,就像人们所预料的那样。但是由于我的使用者不返回任何记录,所以它从不进入forEach,因此也不会触发我的断点。
我肯定可以在云中看到我的主题,有两个分区。消息是在一个稳定的流中生成的,所以我知道我应该能够获得一些信息。
我知道连接到集群需要一段时间,但是在当前时间设置为四分之一小时的情况下,我至少应该收到一些信息,对吗?作为另一种选择,我尝试将consumer.subscribe()切换到consumer.assign()方法,我指定了自己的TopicPartition,将使用者设置为consumer.seekToBeginning()。它跑得很好,但也什么也没回。
在最常见的例子中,另一件事是我使用了自己的类。因此,我没有使用KafkaConsumer<String, String>,而是根据本教程实现了自定义(反)序列化器。
会不会是我的配置设置?投票超时有什么问题吗?(反序列化),还是其他完全的东西?我真的找不出为什么我会得到零记录的任何理由。如有任何反馈,将不胜感激!
发布于 2020-07-10 16:05:47
问题解决了。这不是任何你可以确定从我张贴的问题,然而,我想澄清一些事情,如果其他人发现他自己被相似的配置。
正是如此,我以为他正在与集群建立连接,但我的循环继续打印计数,因为执行.poll(Duration.ofMillis(1000))方法时,->检查他是否可以在给定的超时内连接,如果连接失败,->会继续返回零条记录。没有抛出错误。通常,在2秒左右之后,应该已经建立了一个连接。
您永远不希望应用程序停止,这就是为什么我设计了myOwnKafkaService.getSomethingFromRecord(record.key(), record.value())方法来记录所有错误,但是所有的异常都会被捕获。直到我检查了日志,我才意识到我访问远程数据库的权限是不合理的。
错误地解析了它引发的异常,但是我的方法返回了null。正如这个答案中的所有评论一样,这个答案也归结为在这样的设置中缺乏经验。您会发现下面的修正类可以作为一个工作示例(但并不完全是最佳实践)。
KafkaConfig:
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public KafkaConsumer<Long, MyClass> consumerConfigs() {
Properties config = new Properties();
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
config.put(ConsumerConfig.CLIENT_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "confluent-cloud");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyClass.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100_000);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300_000);
config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);
System.setProperty("java.security.auth.login.config", ".\\src\\main\\resources\\jaas.conf");
return new KafkaConsumer<>(config);
}
}投票方法的主体:
KafkaConsumer<Long, MyClass> consumer = kafkaConfig.consumerConfigs();
consumer.subscribe(Collections.singletonList(inputTopic));
while (true) {
ConsumerRecords<Long, MyClass> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(record -> {
MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
System.out.println(result);
});
consumer.commitSync();
}带有反序列化器的MyClass的小示例:
@Data
@Slf4J
public class MyClass implements Deserializer<MyClass> {
@JsonProperty("UNIQUE_KEY")
private Long uniqueKey;
@JsonProperty("EVENT_TIMESTAMP")
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS")
private Date eventTimestamp;
@JsonProperty("SOME_OTHER_FIELD")
private String someOtherField;
@Override
public MyClass deserialize(String s, byte[] bytes) {
ObjectMapper mapper = new ObjectMapper();
MyClass event = null;
try {
event = mapper
.registerModule(new JavaTimeModule())
.readValue(bytes, MyClass.class);
} catch (Exception e) {
log.error("Something went wrong during the deserialization of the MyClass: {}", e.getMessage());
}
return event;
}
}我希望这能为将来的其他人服务。我从挫折和错误中学到了很多东西。
https://stackoverflow.com/questions/62791353
复制相似问题