首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >消费者之间的kafka消息分发

消费者之间的kafka消息分发
EN

Stack Overflow用户
提问于 2019-03-02 04:05:12
回答 2查看 1.3K关注 0票数 2

我有一个非常简单的kafka用例,我面临着两个分区之间的消息分发问题。

我在topic上有2个分区,每个分区都有2个使用者。我可以看到更多的消息进入了特定的分区,并且只有一个使用者正在处理消息,而另一个订阅了较少消息的分区的使用者则永远处于空闲状态。两个使用者具有相同的组id。我不能在这个问题上实现水平扩展。

下面是我正在添加的关键配置。

代码语言:javascript
复制
kafka.session.timeout.ms=10000
kafka.auto.commit=false
kafka.maxpoll.interval.ms=50000
kafka.request.timeout.ms=15000
kafka.maxpoll.records=100

**PS:**名称来自我的prop文件,与真实的kafka属性名称不完全匹配。我需要大的最大轮询间隔,以便在一次处理大块。有没有猜想我需要在配置中添加什么或更改它?

EN

回答 2

Stack Overflow用户

发布于 2019-03-02 12:45:47

正如在其他答案中提到的,kafka使用键的散列来决定分区。这可能是您的键不是均匀分布的,在这种情况下,您可以定义自己的策略,在生成记录时按生产者选择分区。创建一个自定义的partitoner类,并实现其分区方法,如下所示。

代码语言:javascript
复制
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;

public class CustomPartitioner implements Partitioner {

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if ((keyBytes == null) || (!(key instanceof String)))
            throw new InvalidRecordException("We expect all messages to have a key");
        // Your logic to decide partition based on key
        return 0;// Here return thr partition decided based on key
    }

    public void close() {
    }

    public void configure(Map<String, ?> configs) {
        // TODO Auto-generated method stub

    }
}

在生产者配置中添加以下内容

代码语言:javascript
复制
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getCanonicalName());
property 
票数 1
EN

Stack Overflow用户

发布于 2019-03-02 04:49:53

Kafka生产者:生产者根据记录的键将记录发送到分区。Java的缺省分区程序使用记录的键的散列来选择分区,如果记录没有键,则使用循环策略。因此,为了更具伸缩性,请始终对消息使用唯一键

生产者将数据发布到他们选择的主题。生产者负责选择将哪个记录分配给主题中的哪个分区。这可以简单地以循环方式完成,以平衡负载,也可以根据某种语义划分函数(例如,基于记录中的某个键)来完成。更多关于如何在一秒钟内使用分区的信息!

如果记录具有相同的键,则它们将以相同的分区结束

您还可以将记录发送到特定分区

代码语言:javascript
复制
public ProducerRecord(String topic,
          Integer partition,
          K key,
          V value)

创建一条要发送到指定主题和分区

的记录

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54951561

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档