我有一个由byte[]键指定的主题,我想重新划分它,并在消息体的一个字段中用另一个键处理这个主题。
我发现有KGroupedStream和groupby函数。但是它要求将聚合函数转换为KTable/KStream。我不需要骨料。我只想重新划分并处理输出。
发布于 2019-09-05 05:03:36
(Kafka流2.5.x或更高)
不确定这是否完全合乎情理,但是它可以工作,并且重新分区主题是自动创建的,并且具有正确的分区数wrt stream。
KTable emptyTable = someTable.filter((k, v) -> false);
KStream stream = ...
KStream repartionedStream = stream.selectKey(...)
.leftJoin(emptyTable, (v, Null) -> v, ...);编辑
这种方法显然成了一个复杂的令人憎恶的东西,在2020年8月卡夫卡流2.6.0被引入并出现KStream.repartition()时,应该受到大量的否决和鞭打。
因此,对于流版本2.6.x+,您必须使用
KStream stream = ...
KStream repartionedStream = stream.selectKey(...)
.repartition();发布于 2018-03-31 04:07:34
是的你可以。您设置了一个新键,然后再通过另一个主题来传递数据。
// repartition() will create the required topic automatically for your,
// with the same number of partitions as your input topic;
//
// it's also possible to set the number of partitions explicitly to scale in/out
// via `repartitioned(Repartitioned.numberOfPartitions(...))`
KStream stream = ...
KStream repartionedStream = stream.selectKey(...)
.repartition();
// older versions:
//
// using `through()` you need to create the use topic manually,
// before you start your application
KStream stream = ...
KStream repartionedStream = stream.selectKey(...)
.through("topic-name");注意,在以所需数量的分区启动应用程序之前,需要创建在through()中使用的主题。
发布于 2022-05-19 18:45:15
在KStream接口上有一个重新分区()的方法,允许您基于Serdes和StreamPartitioner重新划分主题,而不是映射/selectingKey(),再加上应用一个贯通或重新分区。
https://stackoverflow.com/questions/49578931
复制相似问题