我正在尝试使用spring批处理远程分区来扩展作业和Apache作为中间件。下面是masterStep的简要配置:
@Bean
public Step managerStep() {
return managerStepBuilderFactory.get("managerStep")
.partitioner("workerStep", filePartitioner)
.outputChannel(requestForWorkers())
.inputChannel(repliesFromWorkers())
.build();
}因此,我既利用渠道向工人发送请求,也收到他们的答复。我知道另一种选择是轮询JobRepository (在我的情况下这很好),但我不想使用它。
这里还有一些卡夫卡的秘密:
spring.kafka.producer.key-serializer= org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.key-deserializer= org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer= org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.properties.spring.json.add.type.headers=true
spring.kafka.consumer.properties.spring.json.trusted.packages = org.springframework.batch.integration.partition,org.springframework.batch.core主人和工人被配置,主人可以通过卡夫卡将请求发送给工人。工人们开始处理,一切都很好,直到工人们试图通过卡夫卡发送回复。
如您所见,我使用JsonSerializer和JsonDeserializer发送/接收消息。问题是,当杰克逊试图序列化StepExecution时,它会陷入一个无限循环,因为StepExetion中有一个JobExecution,而JobExecution也有一个List of StepExetion:
Caused by: org.apache.kafka.common.errors.SerializationException: Can't serialize data [StepExecution: id=3001, version=6, name=workerStep:61127a319d6caf656442ff53, status=COMPLETED, exitStatus=COMPLETED, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=4, rollbackCount=0, exitDescription=] for topic [repliesFromWorkers]
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Infinite recursion (StackOverflowError) (through reference chain: org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]-....所以我想也许我可以定制StepExecution的序列化,这样它就忽略了StepExecutions在第一个StepExecution的JobExecution中的列表!但是即使在这种情况下,当反序列化这个StepExecution时,它也会在主端失败。
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `org.springframework.batch.core.StepExecution` (although at least one Creator exists): cannot deserialize from Object value (no delegate- or property-based Creator)有办法让这件事成功吗?我使用SpringBoot2.4.2及其相应版本的spring-boot-starter-batch、spring-batch-integration、spring-integration-kafka和spring-kafka
发布于 2022-06-15 10:16:17
您可以创建自定义(反)序列化程序并手动处理它。像这样的东西会有帮助:
public class KafkaStringOrByteSerializer<T> extends JsonSerializer<T> {
private final Serializer<Object> byteSerializer = new DefaultSerializer();
private final org.apache.kafka.common.serialization.Serializer<String> stringSerializer = new StringSerializer();
@Override
public byte[] serialize(String topic, T data) {
if (needsBinarySerializer(data)) {
return this.serializeBinary(data);
} else {
return stringSerializer.serialize(topic, (String) data);
}
}
private boolean needsBinarySerializer(Object data) {
if (data instanceof byte[] || data instanceof Byte[] || data instanceof Byte)
return true;
if (data != null && data.getClass() != null) {
return (data.getClass().getName()).startsWith("org.springframework.batch");
}
return false;
}
private byte[] serializeBinary(Object data) {
try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
byteSerializer.serialize(data, output);
return output.toByteArray();
} catch (IOException e) {
throw new MessageConversionException("Cannot convert object to bytes", e);
}
}
}反序列化器也可以采用类似的方法。
https://stackoverflow.com/questions/68727787
复制相似问题