我试图发送一个字计数问题的输出(以火花- scala)在一个名为"test“的kafka主题上。见下文“守则”:
val Dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
val lines = Dstream.map(f => f._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD(
rdd => rdd.foreach(
f =>
{
val sendProps = new Properties()
sendProps.put("metadata.broker.list", brokers)
sendProps.put("serializer.class", "kafka.serializer.StringEncoder")
sendProps.put("producer.type", "async")
val config = new ProducerConfig(sendProps)
val producer = new Producer[String, String](config)
producer.send(new KeyedMessage[String, String]"test", f._1 + " " +f._2))
producer.close();
})) 问题是输出中有些词是随机缺失的。我还注意到如果我删除声明
producer.close()没有数据丢失。
这是否意味着producer.close()在将数据放入缓冲区之前中断了producer.send(),因为缓冲区中没有将特定的元组发送给使用者?如果是的话,我该如何在不损失数据的情况下关闭生产者?
上面的是我最初的问题,通过淡水河谷的回答解决了。
现在,当我再次更改producer.type属性时-数据会随机丢失。
sendProps.put("producer.type", "sync")为了澄清,producer.send正在运行我需要在输出主题中输入的所有单词。但是,一些词在输出卡夫卡主题中却没有显示出来。
发布于 2016-05-27 10:55:32
这太奇怪了。close()方法应该等待发送完成,这就是引入close(time)方法的原因:正如你在这里看到的。
所以,我使用Java7。rdd.foreach是否在它内部的每个分区上运行?或者是对每个元组进行操作(就像我认为的那样)?
如果是后者,你能试试rdd.foreachPartition (请参考以下内容)吗?因为您正在为您所使用的每一行创建一个生成器,我担心这可能会导致问题(虽然理论上不应该)。
https://stackoverflow.com/questions/37481329
复制相似问题