我正在按键对流进行分组,并试图按分组键聚合值。我在跟踪流.开发人员.指南
我在withValueSerde上有个错误。上面写着:
The method withValueSerde(Serde<Object>) in the type Materialized<Object,Object,StateStore> is not applicable for the arguments (Serde<Long>)
代码:
KStream<String, String> inputStream = builder.stream("input_topic");
KStream<String, Integer> transformedStream = inputStream.map(
(key, value) -> KeyValue.pair(getKey(value), getValue(value)));
KGroupedStream<String, Integer> groupedStream = transformedStream.groupByKey();
KTable<String, Long> aggregatedStream = groupedStream.aggregate(() -> 0L,
(aggKey, newValue, aggValue) -> aggValue + newValue,
Materialized.as("aggregated-stream-store").withValueSerde(Serdes.Long()));发布于 2018-06-26 18:36:13
您需要指定泛型类型。Java不能自动推断它们(如果您查看错误消息,它只是表示表示未知类型的Materialized<Object,Object,StateStore> ):
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("aggregated-stream-store")
.withValueSerde(Serdes.Long())https://stackoverflow.com/questions/51040555
复制相似问题