首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka Streams RoundRobinPartitioner

Kafka Streams RoundRobinPartitioner
EN

Stack Overflow用户
提问于 2020-01-08 19:42:38
回答 1查看 631关注 0票数 1

我写了一个kafka streams代码,使用kafka 2.4 kafka客户端版本和kafka 2.2服务器版本。我有50个分区在我的主题和内部主题。

我的kafka流代码有selectKey() DSL操作,我有两百万条使用相同密钥的记录。在流配置中,我已经完成了

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);

这样我就可以使用具有完全相同的键的不同分区。如果我没有像预期的那样使用循环调度,我的所有消息都会转到同一个分区。

直到现在一切都还好,但我意识到了;当我使用RoundRobinPartitioner类时,我的消息大约有40个分区。10分区处于空闲状态。我想知道我错过了什么?它应该使用其中的50条,大约200万条记录,对吗?

代码语言:javascript
复制
      final KStream<String, IdListExportMessage> exportedDeviceIdsStream =
            builder.stream("deviceIds");

        // k: appId::deviceId, v: device
        final KTable<String, Device> deviceTable = builder.table(
            "device",
            Consumed.with(Serdes.String(), deviceSerde)
        );
            // Some DSL operations
            .join(
                deviceTable,
                (exportedDevice, device) -> {
                    exportedDevice.setDevice(device);

                    return exportedDevice;
                },
                Joined.with(Serdes.String(), exportedDeviceSerde, deviceSerde)
            )
            .selectKey((deviceId, exportedDevice) -> exportedDevice.getDevice().getId())
            .to("bulk_consumer");

代码语言:javascript
复制
   props.put(StreamsConfig.STATE_DIR_CONFIG, /tmp/kafka-streams);
   props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
   props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
   props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
   props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
   props.put("num.stream.threads", 10);
   props.put("application.id", applicationId);

RoundRobinPartitioner.java

代码语言:javascript
复制
public class RoundRobinPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

    public RoundRobinPartitioner() {
    }

    public void configure(Map<String, ?> configs) {
    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int nextValue = this.nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (!availablePartitions.isEmpty()) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return ((PartitionInfo)availablePartitions.get(part)).partition();
        } else {
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> {
            return new AtomicInteger(0);
        });
        return counter.getAndIncrement();
    }

    public void close() {
    }
}
EN

回答 1

Stack Overflow用户

发布于 2020-01-09 14:38:28

您不能使用ProducerConfig.PARTITIONER_CLASS_CONFIG配置更改分区--这只适用于普通生产者。

在Kafka Streams中,您需要实现接口StreamsPartitioner,并将您的实现传递给相应的运算符,例如to("topic", Produced.streamPartitioner(new MyPartitioner())

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59645127

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档