首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >以Kafka为中间件的春季批处理远程分区

以Kafka为中间件的春季批处理远程分区
EN

Stack Overflow用户
提问于 2021-08-10 13:33:08
回答 1查看 218关注 0票数 0

我正在尝试使用spring批处理远程分区来扩展作业和Apache作为中间件。下面是masterStep的简要配置:

代码语言:javascript
复制
    @Bean
    public Step managerStep() {
        return managerStepBuilderFactory.get("managerStep")
                .partitioner("workerStep", filePartitioner)
                .outputChannel(requestForWorkers())
                .inputChannel(repliesFromWorkers())
                .build();
    }

因此,我既利用渠道向工人发送请求,也收到他们的答复。我知道另一种选择是轮询JobRepository (在我的情况下这很好),但我不想使用它。

这里还有一些卡夫卡的秘密:

代码语言:javascript
复制
spring.kafka.producer.key-serializer= org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.key-deserializer= org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer= org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.producer.properties.spring.json.add.type.headers=true
spring.kafka.consumer.properties.spring.json.trusted.packages = org.springframework.batch.integration.partition,org.springframework.batch.core

主人和工人被配置,主人可以通过卡夫卡将请求发送给工人。工人们开始处理,一切都很好,直到工人们试图通过卡夫卡发送回复。

如您所见,我使用JsonSerializerJsonDeserializer发送/接收消息。问题是,当杰克逊试图序列化StepExecution时,它会陷入一个无限循环,因为StepExetion中有一个JobExecution,而JobExecution也有一个List of StepExetion

代码语言:javascript
复制
Caused by: org.apache.kafka.common.errors.SerializationException: Can't serialize data [StepExecution: id=3001, version=6, name=workerStep:61127a319d6caf656442ff53, status=COMPLETED, exitStatus=COMPLETED, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=4, rollbackCount=0, exitDescription=] for topic [repliesFromWorkers]
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Infinite recursion (StackOverflowError) (through reference chain: org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]-....

所以我想也许我可以定制StepExecution的序列化,这样它就忽略了StepExecutions在第一个StepExecution的JobExecution中的列表!但是即使在这种情况下,当反序列化这个StepExecution时,它也会在主端失败。

代码语言:javascript
复制
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `org.springframework.batch.core.StepExecution` (although at least one Creator exists): cannot deserialize from Object value (no delegate- or property-based Creator)

有办法让这件事成功吗?我使用SpringBoot2.4.2及其相应版本的spring-boot-starter-batchspring-batch-integrationspring-integration-kafkaspring-kafka

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-06-15 10:16:17

您可以创建自定义(反)序列化程序并手动处理它。像这样的东西会有帮助:

代码语言:javascript
复制
public class KafkaStringOrByteSerializer<T> extends JsonSerializer<T> {

    private final Serializer<Object> byteSerializer = new DefaultSerializer();
    private final org.apache.kafka.common.serialization.Serializer<String> stringSerializer = new StringSerializer();

    @Override
    public byte[] serialize(String topic, T data) {
        if (needsBinarySerializer(data)) {
            return this.serializeBinary(data);
        } else {
            return stringSerializer.serialize(topic, (String) data);
        }
    }

    private boolean needsBinarySerializer(Object data) {
        if (data instanceof byte[] || data instanceof Byte[] || data instanceof Byte)
            return true;
        if (data != null && data.getClass() != null) {
            return (data.getClass().getName()).startsWith("org.springframework.batch");
        }
        return false;
    }

    private byte[] serializeBinary(Object data) {
        try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
            byteSerializer.serialize(data, output);
            return output.toByteArray();
        } catch (IOException e) {
            throw new MessageConversionException("Cannot convert object to bytes", e);
        }
    }

}

反序列化器也可以采用类似的方法。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68727787

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档