首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >producer.send不接受KeyedMessage类型

producer.send不接受KeyedMessage类型
EN

Stack Overflow用户
提问于 2016-07-26 06:27:37
回答 1查看 2.1K关注 0票数 0

我试图运行这段代码,但它不能工作,因为producer.send()不接受KeyedMessage类型。

我尝试导入kafka.javaapi.producer.Producer而不是kafka.producer.Producer;但仍然不起作用

代码是:

代码语言:javascript
复制
package sources;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Properties;

//import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.javaapi.producer.Producer;
//import kafka.producer.Producer;

public class ProducerCode {

    private static Producer<Integer, String> producer;
    private static final String topic= "mytopic";

    public void initialize() {
        Properties producerProps = new Properties();
        producerProps.put("metadata.broker.list", "localhost:9092");
        producerProps.put("serializer.class", "kafka.serializer.StringEncoder");
        producerProps.put("request.required.acks", "1");
        // ProducerConfig producerConfig = new ProducerConfig(producerProps);
        // have a change here **
        producer = new Producer<Integer, String>(new ProducerConfig(producerProps));
    }

    public void publishMesssage() throws Exception{            
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));               
        while (true){
            System.out.print("Enter message to send to kafka broker (Press 'Y' to close producer): ");
            String msg = null;
            msg = reader.readLine(); // Read message from console
            //Define topic name and message
            KeyedMessage<Integer, String> keyedMsg = new KeyedMessage<Integer, String>(topic, msg);

            producer.send(keyedMsg);
            // producer.send(keyedMsg); // This publishes message on given topic

            if("Y".equals(msg)){ break; }
            System.out.println("--> Message [" + msg + "] sent.Check message on Consumer's program console");
         }
         return;
    }

    public static void main(String[] args) throws Exception  {

        KafkaProducer kafkaProducer = new KafkaProducer();
        // Initialize producer
        kafkaProducer.initialize();            
        // Publish message
        kafkaProducer.publishMesssage();
        //Close the producer
        producer.close();
    }
}
EN

回答 1

Stack Overflow用户

发布于 2016-07-26 18:38:37

您必须对构造函数ProducerRecord(String topic, K key, V value)使用ProducerRecord (而不是KeyedMessage)

代码语言:javascript
复制
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>("my-topic", "key", "value"));

请参阅https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38578361

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档