首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >卡夫卡消费者空白记录

卡夫卡消费者空白记录
EN

Stack Overflow用户
提问于 2020-07-08 09:10:37
回答 1查看 2.1K关注 0票数 0

关于这个话题有很多问题,但是,这不是一个重复的问题!

我面临的问题是,我试图用Java14和Kafka2.5.0设置一个SpringBoot项目,而我的消费者返回一个空的记录列表。这里的大多数答案都表明了一些被遗忘的属性,比如频繁投票或设置偏移模式到最早

我看不出与docs.confluent.io有任何逻辑上的区别,尽管我的配置设置似乎有些不合常规(请参阅下面代码段中的jaas.conf设置)。

代码语言:javascript
复制
@EnableKafka
@Configuration
public class KafkaConfig {

    @Bean
    public KafkaConsumer<Long, MyClass> consumerConfigs() {
        Properties config = new Properties();

        config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

        config.put(ConsumerConfig.CLIENT_ID_CONFIG, "serviceName");
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "confluent-cloud");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "serviceName");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyClass.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);

        System.setProperty("java.security.auth.login.config", ".\\src\\main\\resources\\jaas.conf");

        return new KafkaConsumer<>(config);
    }
}

不过,这是可行的。我没有任何例外(卡夫卡,或其他),并且连接是建立的。

代码语言:javascript
复制
// jaas.conf-file
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="serviceName"
username="username"
password="password";
};

这里是我实际投票的地方:

代码语言:javascript
复制
try {
            KafkaConsumer<Long, MyClass> consumer = kafkaConfig.consumerConfigs();
            consumer.subscribe(Collections.singletonList(inputTopic));

            int count = 0;
            Long start = System.currentTimeMillis();
            Long end = System.currentTimeMillis();

            while (end - start < 900_000) { 
                // boolean would be set to true in production
                ConsumerRecords<Long, MyClass> records = consumer.poll(Duration.ofMillis(1000));
                records.forEach(record -> {
                    MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
                    System.out.println(result);
                });
               
                consumer.commitSync();

                System.out.println("visualize number of loops made: " + ++count);
                end = System.currentTimeMillis();
            }
        } catch (KafkaException e) {
            e.printStackTrace();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }

为了找出问题,我增加了指纹和其他杂乱的东西。我以调试模式运行我的程序,并将断点放在下面一行:

代码语言:javascript
复制
MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());

因此,我每秒钟都会看到一条印有计数的线条,就像人们所预料的那样。但是由于我的使用者不返回任何记录,所以它从不进入forEach,因此也不会触发我的断点。

我肯定可以在云中看到我的主题,有两个分区。消息是在一个稳定的流中生成的,所以我知道我应该能够获得一些信息。

我知道连接到集群需要一段时间,但是在当前时间设置为四分之一小时的情况下,我至少应该收到一些信息,对吗?作为另一种选择,我尝试将consumer.subscribe()切换到consumer.assign()方法,我指定了自己的TopicPartition,将使用者设置为consumer.seekToBeginning()。它跑得很好,但也什么也没回。

在最常见的例子中,另一件事是我使用了自己的类。因此,我没有使用KafkaConsumer<String, String>,而是根据本教程实现了自定义(反)序列化器。

会不会是我的配置设置?投票超时有什么问题吗?(反序列化),还是其他完全的东西?我真的找不出为什么我会得到零记录的任何理由。如有任何反馈,将不胜感激!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-07-10 16:05:47

问题解决了。这不是任何你可以确定从我张贴的问题,然而,我想澄清一些事情,如果其他人发现他自己被相似的配置。

  1. 验证接收到的密码确实是正确的。Facepalm

正是如此,我以为他正在与集群建立连接,但我的循环继续打印计数,因为执行.poll(Duration.ofMillis(1000))方法时,->检查他是否可以在给定的超时内连接,如果连接失败,->会继续返回零条记录。没有抛出错误。通常,在2秒左右之后,应该已经建立了一个连接。

  1. 检查到数据库的连接。

您永远不希望应用程序停止,这就是为什么我设计了myOwnKafkaService.getSomethingFromRecord(record.key(), record.value())方法来记录所有错误,但是所有的异常都会被捕获。直到我检查了日志,我才意识到我访问远程数据库的权限是不合理的。

  1. 所谓的TimeStamp,应该反序列化为java.util.Date

错误地解析了它引发的异常,但是我的方法返回了null。正如这个答案中的所有评论一样,这个答案也归结为在这样的设置中缺乏经验。您会发现下面的修正类可以作为一个工作示例(但并不完全是最佳实践)。

KafkaConfig:

代码语言:javascript
复制
@EnableKafka
@Configuration
public class KafkaConfig {

    @Bean
    public KafkaConsumer<Long, MyClass> consumerConfigs() {
        Properties config = new Properties();

        config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

        config.put(ConsumerConfig.CLIENT_ID_CONFIG, "serviceName");
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "confluent-cloud");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "serviceName");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyClass.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100_000);
        config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300_000);
        config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);

        System.setProperty("java.security.auth.login.config", ".\\src\\main\\resources\\jaas.conf");

        return new KafkaConsumer<>(config);
    }
}

投票方法的主体:

代码语言:javascript
复制
            KafkaConsumer<Long, MyClass> consumer = kafkaConfig.consumerConfigs();
            consumer.subscribe(Collections.singletonList(inputTopic));

            while (true) {
                ConsumerRecords<Long, MyClass> records = consumer.poll(Duration.ofMillis(1000));
                records.forEach(record -> {
                    MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
                    System.out.println(result);
                });
                consumer.commitSync();
            }

带有反序列化器的MyClass的小示例:

代码语言:javascript
复制
@Data
@Slf4J
public class MyClass implements Deserializer<MyClass> {

    @JsonProperty("UNIQUE_KEY")
    private Long uniqueKey;
    @JsonProperty("EVENT_TIMESTAMP")
    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS")
    private Date eventTimestamp;
    @JsonProperty("SOME_OTHER_FIELD")
    private String someOtherField;

@Override
    public MyClass deserialize(String s, byte[] bytes) {
        ObjectMapper mapper = new ObjectMapper();
        MyClass event = null;
        try {
            event = mapper
                    .registerModule(new JavaTimeModule())
                    .readValue(bytes, MyClass.class);
        } catch (Exception e) {
            log.error("Something went wrong during the deserialization of the MyClass: {}", e.getMessage());
        }
        return event;
    }
}

我希望这能为将来的其他人服务。我从挫折和错误中学到了很多东西。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62791353

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档