我必须使用来自reactive-kafka模块的代码:https://github.com/akka/reactive-kafka/blob/master/README.md
我的代码是这样开头的:
val kafka = new ReactiveKafka()
val kafkaIdpsMsgs: Publisher[StringKafkaMessage] = kafka.consume(
ConsumerProperties(
brokerList = kafkaHosts,
zooKeeperHost = zkHosts,
topic = "test",
groupId = "idps-translator",
decoder = new StringDecoder()
).readFromEndOfStream())
val kafkaSamples: Subscriber[String] = kafka.publish(ProducerProperties(
brokerList = kafkaHosts,
topic = "test",
encoder = new StringEncoder()
))我想产生一条消息(由发布者)。我必须编写什么代码才能实现它?
发布于 2017-04-20 19:02:08
val done = Source(1 to 100)
.map(_.toString)
.map { elem =>
new ProducerRecord[Array[Byte], String]("topic1", elem)
}
.runWith(Producer.plainSink(producerSettings))您可以阅读以下文档:http://doc.akka.io/docs/akka-stream-kafka/current/producer.html
https://stackoverflow.com/questions/39094967
复制相似问题