首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >KafkaStreams如何实例化ConsumerRecordFactory?

KafkaStreams如何实例化ConsumerRecordFactory?
EN

Stack Overflow用户
提问于 2018-11-15 14:14:03
回答 1查看 981关注 0票数 1

我正在尝试使用Kafka提供的ConsumerRecordFactory (主要是合流医生 )来测试流应用程序,下面是我到目前为止拥有的代码:

代码语言:javascript
复制
// Properties of the application
Properties streamsConfiguration = new Properties();

// Give the Streams application a unique name.  The name must be unique in the Kafka cluster
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "testing_application");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyserver:2181");

// Create the topology builder
StreamsBuilder builder = new StreamsBuilder();

// Run it on the test driver
TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), streamsConfiguration);

// Feed input data
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
        "input-topic",
        new StringSerializer(),
        new IntegerSerializer()
);

// Create a test record
ConsumerRecordFactory<byte[], byte[]> record = factory.create("key", 42L);

我的问题是,当我编译我的代码时,我会得到以下错误:

代码语言:javascript
复制
Error:(70, 52) java: reference to create is ambiguous
  both method create(K,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory
  and method create(java.lang.String,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory match

因此,我理解kafka定义了泛型方法create(K,V,long),当我用非泛型类型创建工厂时,我创建了一个与第一个方法相冲突的新方法。

我的问题是如何实例化我的ConsumerRecordFactory

我试图使我的工厂在ConsumerRecordFactory<Object, Integer>中更通用,但是推断的类型不匹配。我找不到其他例子,汇合的github kafka-streams-示例似乎不使用ConsumerRecordFactory,而这就是答案似乎使用与文档相同的代码。

(我知道问题更多的是关于java,而不是卡夫卡流,但是我认为用apache-kafka-streams标记它是一个很好的方法来接触习惯于ConsumerRecordFactory的人)

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-11-15 14:32:10

以下代码中存在一些问题:

代码语言:javascript
复制
// Feed input data ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
        "input-topic",
        new StringSerializer(),
        new IntegerSerializer() );

// Create a test record
ConsumerRecordFactory<byte[], byte[]> record = factory.create("key", 42L);
  1. 您已经将valueType定义为ConsumerRecordFactory中的Integer,但在create()方法中,您将传递Long类型值。
  2. factory.create()返回一个ConsumerRecord而不是ConsumerRecordFactory

关于方法的模糊性,你是对的。因此,避免这个问题,使用如下:

代码语言:javascript
复制
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>( 
        new StringSerializer(),
        new IntegerSerializer()
);
// Use ConsumerRecord here instead of ConsumerRecordFactory
ConsumerRecord<byte[], byte[]> record = factory.create("input-topic","key", 42);
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53321384

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档