此代码分别创建两个KStream实例,它们都是从同一个主题读取的:
final KStream<String, String> inputStream1 =
builder.stream(INPUT_TOPIC, consumed);
final KStream<String, String> inputStream2 =
builder.stream(INPUT_TOPIC, consumed);
final KStream<String, String> mappedStream1 = inputStream1
.peek((k, v) -> System.out.println("1: " + k + " -> " + v))
.mapValues((ValueMapper<String, String>) String::toLowerCase);
final KStream<String, String> mappedStream2 = inputStream2
.peek((k, v) -> System.out.println("2: " + k + " -> " + v))
.mapValues((ValueMapper<String, String>) String::toUpperCase);
mappedStream1.to(OUTPUT_TOPIC_1, produced);
mappedStream2.to(OUTPUT_TOPIC_2, produced);拓扑如下所示:只有一个源定义,然后使用两次
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-PEEK-0000000002, KSTREAM-PEEK-0000000004
Processor: KSTREAM-PEEK-0000000002 (stores: [])
--> KSTREAM-MAPVALUES-0000000003
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-PEEK-0000000004 (stores: [])
--> KSTREAM-MAPVALUES-0000000005
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])
--> KSTREAM-SINK-0000000006
<-- KSTREAM-PEEK-0000000002
Processor: KSTREAM-MAPVALUES-0000000005 (stores: [])
--> KSTREAM-SINK-0000000007
<-- KSTREAM-PEEK-0000000004
Sink: KSTREAM-SINK-0000000006 (topic: output-1)
<-- KSTREAM-MAPVALUES-0000000003
Sink: KSTREAM-SINK-0000000007 (topic: output-2)
<-- KSTREAM-MAPVALUES-0000000005现在我的问题是:假设StreamBuilder只创建一个源(=同一主题的一个使用者),这是否总是安全的?
换句话说:是否总是保证--给定一个具有多个分区的主题-- inputStream1和inputStream2会看到相同的记录?
还是将其改写为这样的东西,使其变得明确:
final KStream<String, String> inputStream =
builder.stream(INPUT_TOPIC, consumed);
final KStream<String, String> mappedStream1 = inputStream
.peek((k, v) -> System.out.println("1: " + k + " -> " + v))
.mapValues((ValueMapper<String, String>) String::toLowerCase);
final KStream<String, String> mappedStream2 = inputStream
.peek((k, v) -> System.out.println("2: " + k + " -> " + v))
.mapValues((ValueMapper<String, String>) String::toUpperCase);更新
第二个版本的结果是这个拓扑:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-PEEK-0000000001, KSTREAM-PEEK-0000000003
Processor: KSTREAM-PEEK-0000000001 (stores: [])
--> KSTREAM-MAPVALUES-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-PEEK-0000000003 (stores: [])
--> KSTREAM-MAPVALUES-0000000004
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
--> KSTREAM-SINK-0000000005
<-- KSTREAM-PEEK-0000000001
Processor: KSTREAM-MAPVALUES-0000000004 (stores: [])
--> KSTREAM-SINK-0000000006
<-- KSTREAM-PEEK-0000000003
Sink: KSTREAM-SINK-0000000005 (topic: output-1)
<-- KSTREAM-MAPVALUES-0000000002
Sink: KSTREAM-SINK-0000000006 (topic: output-2)
<-- KSTREAM-MAPVALUES-0000000004发布于 2022-01-27 17:17:17
builder将是相同的,相同的application.id
不能代表拓扑,但是考虑到Consumer级别的流,group.id是基于application.id构建的,因此您的使用者组对于这两个流都是相同的。
对于一个输入主题,只有一个使用者实例(两者之间)能够使用该输入主题。
这将解释为什么只有一个源;因此,您不需要具有相同参数的额外builder.stream()调用。
https://stackoverflow.com/questions/70844904
复制相似问题