首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >spring-kafka kafkaStreamsBuilder.getKafkaStreams()为空

spring-kafka kafkaStreamsBuilder.getKafkaStreams()为空
EN

Stack Overflow用户
提问于 2021-04-27 03:22:17
回答 1查看 68关注 0票数 0

这是我的代码,第一个bean查看Topic.TRANSACTION_RAW上的消息,将一个消息一分为二并发送到Topic.TRANSACTION_INTERNAL,第二个bean进行分组和还原,并将其具体化到状态存储"StateStore.BALANCE“中。最后一个是让ReadOnlyKeyValueStore从"ReadOnlyKeyValueStore“中读取状态。

代码语言:javascript
复制
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {

    @Bean
    public KStream<String, BankTransaction> alphaBankKStream(StreamsBuilder streamsBuilder) {
        JsonSerde<BankTransaction> valueSerde = new JsonSerde<>(BankTransaction.class);
        KStream<String, BankTransaction> stream = streamsBuilder.stream(Topic.TRANSACTION_RAW,
                Consumed.with(Serdes.String(), valueSerde));
        stream.flatMap((k, v) -> {
            List<BankTransactionInternal> txInternals = BankTransactionInternal.splitBankTransaction(v);
            List<KeyValue<String, BankTransactionInternal>> result = new LinkedList<>();
            result.add(KeyValue.pair(v.getFromAccount(), txInternals.get(0)));
            result.add(KeyValue.pair(v.getToAccount(), txInternals.get(1)));
            return result;
        }).filter((k, v) -> !Constants.EXTERNAL_ACCOUNT.equalsIgnoreCase(k))
                .to(Topic.TRANSACTION_INTERNAL, Produced.with(Serdes.String(), new JsonSerde<>()));
        return stream;
    }

    @Bean
    public KStream<String, BankTransactionInternal> alphaBankInternalKStream(StreamsBuilder streamsBuilder) {
        JsonSerde<BankTransactionInternal> valueSerde = new JsonSerde<>(BankTransactionInternal.class);
        KStream<String, BankTransactionInternal> stream = streamsBuilder.stream(Topic.TRANSACTION_INTERNAL,
                Consumed.with(Serdes.String(), valueSerde));

        KGroupedStream<String, Double> groupedByAccount = stream
                .map((k,v) -> KeyValue.pair(k, v.getAmount()))
                .groupBy((account, amount) -> account, Grouped.with(Serdes.String(), Serdes.Double()));
        groupedByAccount.reduce(Double::sum,
                Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as(StateStore.BALANCE)
                        .withValueSerde(Serdes.Double()));

        return stream;
    }

    @Bean
    public ReadOnlyKeyValueStore<String, Double> balanceStateStore(StreamsBuilderFactoryBean defaultKafkaStreamsBuilder) {
        if (defaultKafkaStreamsBuilder == null) {
            System.out.println("... defaultKafkaStreamsBuilder is null ...");
        }
        if (defaultKafkaStreamsBuilder.getKafkaStreams() == null) {
            System.out.println("... defaultKafkaStreamsBuilder.getKafkaStreams() is null ...");
            // this one got printed
        }
        ReadOnlyKeyValueStore<String, Double> store = defaultKafkaStreamsBuilder.getKafkaStreams().store(
                StateStore.BALANCE,
                QueryableStoreTypes.keyValueStore());
        return store;
    }

}

我总是在defaultKafkaStreamsBuilder.getKafkaStreams()上得到NullPointException。

你知道这里出了什么问题吗?谢谢!

EN

回答 1

Stack Overflow用户

发布于 2021-04-27 03:31:14

代码语言:javascript
复制
if (defaultKafkaStreamsBuilder.getKafkaStreams() == null) {
        System.out.println("... defaultKafkaStreamsBuilder.getKafkaStreams() is null ...");
        // this one got printed
    }

此操作不适合在bean定义阶段执行。

查看其JavaDocs:

代码语言:javascript
复制
/**
 * Get a managed by this {@link StreamsBuilderFactoryBean} {@link KafkaStreams} instance.
 * @return KafkaStreams managed instance;
 * may be null if this {@link StreamsBuilderFactoryBean} hasn't been started.
 * @since 1.1.4
 */
public synchronized KafkaStreams getKafkaStreams() {

由于您在生命周期start阶段之前过早地调用了此方法,因此最终会出现错误。

您应该重新考虑您的逻辑,以便在您想要使用该ReadOnlyKeyValueStore的目标服务中使用SmartLifecycle.start()。因此,您可以自动绑定这个StreamsBuilderFactoryBean,并从start()实现中调用它的getKafkaStreams()

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67272617

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档