这是我的代码,第一个bean查看Topic.TRANSACTION_RAW上的消息,将一个消息一分为二并发送到Topic.TRANSACTION_INTERNAL,第二个bean进行分组和还原,并将其具体化到状态存储"StateStore.BALANCE“中。最后一个是让ReadOnlyKeyValueStore从"ReadOnlyKeyValueStore“中读取状态。
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {
@Bean
public KStream<String, BankTransaction> alphaBankKStream(StreamsBuilder streamsBuilder) {
JsonSerde<BankTransaction> valueSerde = new JsonSerde<>(BankTransaction.class);
KStream<String, BankTransaction> stream = streamsBuilder.stream(Topic.TRANSACTION_RAW,
Consumed.with(Serdes.String(), valueSerde));
stream.flatMap((k, v) -> {
List<BankTransactionInternal> txInternals = BankTransactionInternal.splitBankTransaction(v);
List<KeyValue<String, BankTransactionInternal>> result = new LinkedList<>();
result.add(KeyValue.pair(v.getFromAccount(), txInternals.get(0)));
result.add(KeyValue.pair(v.getToAccount(), txInternals.get(1)));
return result;
}).filter((k, v) -> !Constants.EXTERNAL_ACCOUNT.equalsIgnoreCase(k))
.to(Topic.TRANSACTION_INTERNAL, Produced.with(Serdes.String(), new JsonSerde<>()));
return stream;
}
@Bean
public KStream<String, BankTransactionInternal> alphaBankInternalKStream(StreamsBuilder streamsBuilder) {
JsonSerde<BankTransactionInternal> valueSerde = new JsonSerde<>(BankTransactionInternal.class);
KStream<String, BankTransactionInternal> stream = streamsBuilder.stream(Topic.TRANSACTION_INTERNAL,
Consumed.with(Serdes.String(), valueSerde));
KGroupedStream<String, Double> groupedByAccount = stream
.map((k,v) -> KeyValue.pair(k, v.getAmount()))
.groupBy((account, amount) -> account, Grouped.with(Serdes.String(), Serdes.Double()));
groupedByAccount.reduce(Double::sum,
Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as(StateStore.BALANCE)
.withValueSerde(Serdes.Double()));
return stream;
}
@Bean
public ReadOnlyKeyValueStore<String, Double> balanceStateStore(StreamsBuilderFactoryBean defaultKafkaStreamsBuilder) {
if (defaultKafkaStreamsBuilder == null) {
System.out.println("... defaultKafkaStreamsBuilder is null ...");
}
if (defaultKafkaStreamsBuilder.getKafkaStreams() == null) {
System.out.println("... defaultKafkaStreamsBuilder.getKafkaStreams() is null ...");
// this one got printed
}
ReadOnlyKeyValueStore<String, Double> store = defaultKafkaStreamsBuilder.getKafkaStreams().store(
StateStore.BALANCE,
QueryableStoreTypes.keyValueStore());
return store;
}
}我总是在defaultKafkaStreamsBuilder.getKafkaStreams()上得到NullPointException。
你知道这里出了什么问题吗?谢谢!
发布于 2021-04-27 03:31:14
if (defaultKafkaStreamsBuilder.getKafkaStreams() == null) {
System.out.println("... defaultKafkaStreamsBuilder.getKafkaStreams() is null ...");
// this one got printed
}此操作不适合在bean定义阶段执行。
查看其JavaDocs:
/**
* Get a managed by this {@link StreamsBuilderFactoryBean} {@link KafkaStreams} instance.
* @return KafkaStreams managed instance;
* may be null if this {@link StreamsBuilderFactoryBean} hasn't been started.
* @since 1.1.4
*/
public synchronized KafkaStreams getKafkaStreams() {由于您在生命周期start阶段之前过早地调用了此方法,因此最终会出现错误。
您应该重新考虑您的逻辑,以便在您想要使用该ReadOnlyKeyValueStore的目标服务中使用SmartLifecycle.start()。因此,您可以自动绑定这个StreamsBuilderFactoryBean,并从start()实现中调用它的getKafkaStreams()。
https://stackoverflow.com/questions/67272617
复制相似问题