首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka producer -发送消息列表

Kafka producer -发送消息列表
EN

Stack Overflow用户
提问于 2018-07-09 14:26:16
回答 1查看 1.2K关注 0票数 0

我需要发送几批消息,并确保每批中的所有消息都在同一批中一起到达消费者。

例如,假设我需要发送5批/组400条消息,每组包含80条消息,需要在消费者端进行同一批消费。

我使用spark structured-streaming来消费消息。

我读过类似的问题,但我仍然对正确的方法感到困惑。

生产者是否应该将所有消息(每批)放在一个列表中,并将该列表发送给kafka?

有没有其他更好的方法?

谢谢

EN

回答 1

Stack Overflow用户

发布于 2018-07-09 14:48:36

这可以通过创建一个有5个分区的主题来实现,这样您就可以将每种类型的批量消息发送到每个分区

代码语言:javascript
复制
ProducerRecord(java.lang.String topic, java.lang.Integer partition, K key, V value)
Creates a record to be sent to a specified topic and partition

我们可以创建5个消费者,并将每个消费者分配给每个分区,但我不确定每个消费者poll()是否会一次提取该分区中的所有消息

手动分区分配。here doc

例如:如果进程维护与该分区相关的某种本地状态(如本地磁盘上的键值存储),那么它应该只获取它在磁盘上维护的分区的记录。

如果进程本身高度可用,并将在失败时重新启动(可能使用YARN、Mesos或AWS工具等集群管理框架,或者作为流处理框架的一部分)。在这种情况下,Kafka不需要检测故障并重新分配分区,因为消费进程将在另一台机器上重新启动。

要使用此模式,而不是使用subscribe订阅主题,只需调用assign(Collection),其中包含要使用的分区的完整列表。

代码语言:javascript
复制
 String topic = "foo";
 TopicPartition partition0 = new TopicPartition(topic, 0);
 TopicPartition partition1 = new TopicPartition(topic, 1);
 consumer.assign(Arrays.asList(partition0, partition1));
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51239314

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档