首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka producer.send()由producer.close()停止

Kafka producer.send()由producer.close()停止
EN

Stack Overflow用户
提问于 2016-05-27 10:40:01
回答 1查看 2.9K关注 0票数 0

我试图发送一个字计数问题的输出(以火花- scala)在一个名为"test“的kafka主题上。见下文“守则”:

代码语言:javascript
复制
val Dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

val lines = Dstream.map(f => f._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

wordCounts.foreachRDD(
      rdd => rdd.foreach(
        f =>
          {
            val sendProps = new Properties()
            sendProps.put("metadata.broker.list", brokers)
            sendProps.put("serializer.class", "kafka.serializer.StringEncoder")
            sendProps.put("producer.type", "async")

            val config = new ProducerConfig(sendProps)
            val producer = new Producer[String, String](config)
            producer.send(new KeyedMessage[String, String]"test", f._1 + " " +f._2))
            producer.close();

          })) 

问题是输出中有些词是随机缺失的。我还注意到如果我删除声明

代码语言:javascript
复制
producer.close()

没有数据丢失。

这是否意味着producer.close()在将数据放入缓冲区之前中断了producer.send(),因为缓冲区中没有将特定的元组发送给使用者?如果是的话,我该如何在不损失数据的情况下关闭生产者?

上面的是我最初的问题,通过淡水河谷的回答解决了。

现在,当我再次更改producer.type属性时-数据会随机丢失。

代码语言:javascript
复制
sendProps.put("producer.type", "sync")

为了澄清,producer.send正在运行我需要在输出主题中输入的所有单词。但是,一些词在输出卡夫卡主题中却没有显示出来。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-05-27 10:55:32

这太奇怪了。close()方法应该等待发送完成,这就是引入close(time)方法的原因:正如你在这里看到的

所以,我使用Java7。rdd.foreach是否在它内部的每个分区上运行?或者是对每个元组进行操作(就像我认为的那样)?

如果是后者,你能试试rdd.foreachPartition (请参考以下内容)吗?因为您正在为您所使用的每一行创建一个生成器,我担心这可能会导致问题(虽然理论上不应该)。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37481329

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档