首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache Heron中的Kafka集成

Apache Heron中的Kafka集成
EN

Stack Overflow用户
提问于 2018-09-29 07:09:09
回答 1查看 486关注 0票数 2

我正试图把卡夫卡和苍鹭拓扑整合起来。然而,我无法找到任何最新版本的Heron (0.17.5)的例子。有没有什么可以分享的例子,或者如何实施卡夫卡喷口和卡夫卡螺栓的建议?

编辑1:

我相信KafkaSpoutKafkaBolt在Heron被刻意地反对,以让位给新的Streamlet 。目前,我想看看是否可以使用Streamlet构建一个KafkaSourceKafkaSink。但是,当我试图在KafkaConsumer源代码中创建一个时,我会得到以下异常。

代码语言:javascript
复制
Caused by: java.io.NotSerializableException: org.apache.kafka.clients.consumer.KafkaConsumer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at com.twitter.heron.api.utils.Utils.serialize(Utils.java:97)

编辑2:

修正了上述问题。我在构造函数中初始化KafkaConsumer,这是错误的。在setup()方法中初始化相同的方法修复了它。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-09-30 10:36:53

我用Heron的Streamlet成功地完成了这个任务。我在这里发同样的帖子。希望它能帮助其他面临同样问题的人。

Kafka源

代码语言:javascript
复制
public class KafkaSource implements Source {

    private String streamName;

    private Consumer<String, String> kafkaConsumer;
    private List<String> kafkaTopic;

    private static final Logger LOGGER = Logger.getLogger("KafkaSource");

    @Override
    public void setup(Context context) {

        this.streamName = context.getStreamName();

        kafkaTopic = Arrays.asList(KafkaProperties.KAFKA_TOPIC);

        Properties props = new Properties();
        props.put("bootstrap.servers", KafkaProperties.BOOTSTRAP_SERVERS);
        props.put("group.id", KafkaProperties.CONSUMER_GROUP_ID);
        props.put("enable.auto.commit", KafkaProperties.ENABLE_AUTO_COMMIT);
        props.put("auto.commit.interval.ms", KafkaProperties.AUTO_COMMIT_INTERVAL_MS);
        props.put("session.timeout.ms", KafkaProperties.SESSION_TIMEOUT);
        props.put("key.deserializer", KafkaProperties.KEY_DESERIALIZER);
        props.put("value.deserializer", KafkaProperties.VALUE_DESERIALIZER);
        props.put("auto.offset.reset", KafkaProperties.AUTO_OFFSET_RESET);
        props.put("max.poll.records", KafkaProperties.MAX_POLL_RECORDS);
        props.put("max.poll.interval.ms", KafkaProperties.MAX_POLL_INTERVAL_MS);

        this.kafkaConsumer = new KafkaConsumer<>(props);

        kafkaConsumer.subscribe(kafkaTopic);
    }

    @Override
    public Collection get() {

        List<String> kafkaRecords = new ArrayList<>();

        ConsumerRecords<String, String> records = kafkaConsumer.poll(Long.MAX_VALUE);

        for (ConsumerRecord<String, String> record : records) {
            String rVal = record.value();
            kafkaRecords.add(rVal);
        }

        return kafkaRecords;
    }

    @Override
    public void cleanup() {
        kafkaConsumer.wakeup();
    }
}
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52566042

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档