首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >作为Spring-Cloud-Data-Flow流的一部分部署的Spring-Cloud-Function应用的Kafka主题名称错误

作为Spring-Cloud-Data-Flow流的一部分部署的Spring-Cloud-Function应用的Kafka主题名称错误
EN

Stack Overflow用户
提问于 2020-04-28 23:59:08
回答 1查看 142关注 0票数 0

我有一个简单的SCDF流,如下所示:

http --port=12346 | mvmn-transform | file --name=tmp.txt --directory=/tmp

mvmn-transform是一个简单的自定义转换器,如下所示:

代码语言:javascript
复制
@SpringBootApplication
@EnableBinding(Processor.class)
@EnableConfigurationProperties(ScdfTestTransformerProperties.class)
@Configuration
public class ScdfTestTransformer {
    public static void main(String args[]) {
        SpringApplication.run(ScdfTestTransformer.class, args);
    }

    @Autowired
    protected ScdfTestTransformerProperties config;

    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public Object transform(Message<?> message) {
        Object payload = message.getPayload();
        Map<String, Object> result = new HashMap<>();
        Map<String, String> headersStr = new HashMap<>();

        message.getHeaders().forEach((k, v) -> headersStr.put(k, v != null ? v.toString() : null));

        result.put("headers", headersStr);
        result.put("payload", payload);
        result.put("configProp", config.getSomeConfigProp());

        return result;
    }

    // See https://stackoverflow.com/questions/59155689/could-not-decode-json-type-for-key-file-name-in-a-spring-cloud-data-flow-stream
    @Bean("kafkaBinderHeaderMapper")
    public KafkaHeaderMapper kafkaBinderHeaderMapper() {
        BinderHeaderMapper mapper = new BinderHeaderMapper();
        mapper.setEncodeStrings(true);
        return mapper;
    }
}

这可以很好地工作。

但我读到Spring Cloud Function应该允许我实现这样的应用程序,而不需要指定绑定和转换器注释,所以我将其更改为:

代码语言:javascript
复制
@SpringBootApplication
// @EnableBinding(Processor.class)
@EnableConfigurationProperties(ScdfTestTransformerProperties.class)
@Configuration
public class ScdfTestTransformer {
    public static void main(String args[]) {
        SpringApplication.run(ScdfTestTransformer.class, args);
    }

    @Autowired
    protected ScdfTestTransformerProperties config;

    // @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    @Bean
    public Function<Message<?>, Map<String, Object>> transform(
    // Message<?> message
    ) {
        return message -> {
            Object payload = message.getPayload();
            Map<String, Object> result = new HashMap<>();
            Map<String, String> headersStr = new HashMap<>();

            message.getHeaders().forEach((k, v) -> headersStr.put(k, v != null ? v.toString() : null));

            result.put("headers", headersStr);
            result.put("payload", payload);
            result.put("configProp", "Config prop val: " + config.getSomeConfigProp());

            return result;
        };
    }

    // See https://stackoverflow.com/questions/59155689/could-not-decode-json-type-for-key-file-name-in-a-spring-cloud-data-flow-stream
    @Bean("kafkaBinderHeaderMapper")
    public KafkaHeaderMapper kafkaBinderHeaderMapper() {
        BinderHeaderMapper mapper = new BinderHeaderMapper();
        mapper.setEncodeStrings(true);
        return mapper;
    }
}

现在我遇到了一个问题-- Spring-Cloud-Function显然忽略了SCDF源和目标主题的名称,而是创建了主题transform-in-0transform-out-0

SCDF创建名称类似<stream-name>.<app-name>的主题,例如TestStream123.httpTestStream123.mvmn-transform

以前,它们被用于transformer -因为它是SCDF流的一部分,所以它应该是。但现在它们被Spring-Cloud-Function忽略,取而代之的是创建transform-in-0transform-out-0

因此,我的转换器不再接收任何输入,因为它期望输入的是错误的Kafka主题。并且可能也不会输出到流,因为它也会输出到错误的Kafka主题。

另外,为了以防万一,在GitHub上有完整的项目代码:https://github.com/mvmn/scdftest-transformer/tree/scfunc

要运行本地启动Kafka、Skipper、SCDF和SCDF控制台,请在app文件夹中执行mvn clean install,然后在coonsole中执行app register --name mvmn-transform-1 --type processor --uri maven://x.mvmn.study.scdf.scdftest:scdftest-transformer:0.1.1-SNAPSHOT --metadata-uri maven://x.mvmn.study.scdf.scdftest:scdftest-transformer:0.1.1-SNAPSHOT。然后您可以使用definition http --port=12346 | mvmn-transform | file --name=tmp.txt --directory=/tmp部署流

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-04-29 06:21:54

由于您使用的是编写Spring Cloud Stream应用程序的功能模型,因此在部署此应用程序时,您需要在自定义处理器上传递两个属性,以恢复Spring Cloud数据流行为。

spring.cloud.stream.function.bindings.transform-in-0=input spring.cloud.stream.function.bindings.transform-out-0=output

你能不能试一下,看看会不会有什么不同?

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61484547

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档