首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在apache-beam中写入多个Kafka主题?

在apache-beam中写入多个Kafka主题?
EN

Stack Overflow用户
提问于 2020-07-07 02:14:20
回答 1查看 272关注 0票数 0

我正在执行一个简单的单词统计程序,其中我使用一个Kafka主题(生产者)作为输入源,然后应用pardo来计算单词计数。现在我需要帮助写单词到不同的主题的基础上,他们的频率。让我们说所有频率为偶数的单词将转到主题1,其余的将转到主题2。

有没有人能帮我举个例子?

EN

回答 1

Stack Overflow用户

发布于 2020-07-07 14:38:37

这可以使用采用Producer的writeRecord方法,然后使用新的值(“topic_name”,“value>”,“Produce<>”)--

以下是代码-:

代码语言:javascript
复制
     static class ExtractWordsFn extends DoFn<String, String> {
        private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
        private final Distribution lineLenDist =
                Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");

        @ProcessElement
        public void processElement(@Element String element, OutputReceiver<String> receiver) {
            lineLenDist.update(element.length());
            if (element.trim().isEmpty()) {
                emptyLines.inc();
            }
            String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1);
            for (String word : words) {
                if (!word.isEmpty()) {
                    receiver.output(word);
                }
            } 
        }
    }

   
    public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, ProducerRecord<String,String>> {
        @Override
        public ProducerRecord<String, String> apply(KV<String, Long> input) {
            if(input.getValue()%2==0)
             return new ProducerRecord("test",input.getKey(),input.getKey()+" "+input.getValue().toString());
            else
                return new ProducerRecord("copy",input.getKey(),input.getKey()+" "+input.getValue().toString());
        }
    }

    public static class CountWords
            extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
        @Override
        public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

            PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
            PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
            return wordCounts;
        }
    }
代码语言:javascript
复制
 p.apply("ReadLines", KafkaIO.<Long, String>read()
                .withBootstrapServers("localhost:9092")
                .withTopic("copy")// use withTopics(List<String>) to read from multiple topics.
                .withKeyDeserializer(LongDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1"))
                .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
                .withLogAppendTime()
                .withReadCommitted()
                .commitOffsetsInFinalize()
                .withProcessingTime()
                .withoutMetadata()
        )
.apply(Values.create())
.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(new CountWords())  
.apply(MapElements.via(new FormatAsTextFn())) //PCollection<ProducerRecord<string,string>>
.setCoder(ProducerRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
.apply("WriteCounts", (KafkaIO.<String, String>writeRecords()
 .withBootstrapServers("localhost:9092")
 //.withTopic("test")
 .withKeySerializer(StringSerializer.class)
 .withValueSerializer(StringSerializer.class)
                ))
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62762035

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档