我在C#代码中有以下消息:
public interface ResourcePerformance
{
public string ResourceId { get; }
public List<TimeSection> TimeSections { get; }
}
public class TimeSection
{
public Instant PeriodStart { get; set; }
public Instant PeriodEnd { get; set; }
public PerformanceStatus PerformanceStatus { get; set; }
public Duration ProcessingTime { get; set; }
public double Quantity { get; set; }
}我想从卡夫卡主题中反序列化这样的信息。但是,当从NodaTime库反序列化类型时会出现错误,例如:
Confluent.Kafka.ConsumeException: Local: Value deserialization error
---> Newtonsoft.Json.JsonSerializationException: Error converting value "2:00:00" to type 'NodaTime.Duration'. Path 'timeSections[0].processingTime', line 1, position 330.我想这一定是NodaTime序列化的问题,因为当我将NodaTime类型转换为对象时,没有报告错误。我已经在配置的NodaTime中为json序列化程序配置了RabbitMQ,但是我不知道如何用Kafka部分来实现它。目前,我有以下配置:
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureJsonSerializer( j => j.ConfigureForNodaTime(DateTimeZoneProviders.Tzdb) );
cfg.ConfigureEndpoints(context);
});
x.AddRider(rider =>
{
rider.AddConsumer<ResourcePerformanceConsumer>();
rider.UsingKafka((context, k) =>
{
k.TopicEndpoint<string, ResourcePerformance>("performances-resource", kafkaConsumerGroup , e =>
{
e.AutoOffsetReset = AutoOffsetReset.Earliest;
e.CreateIfMissing(t =>
{
t.NumPartitions = 4; //number of partitions
t.ReplicationFactor = 1; //number of replicas
});
e.ConfigureConsumer<ResourcePerformanceConsumer>(context);
});
});
});
});
services.AddMassTransitHostedService();我能做些什么来正确地反序列化它呢?
发布于 2021-12-21 08:00:50
我在MassTransit上找到了答案。除了配置序列化程序之外,还必须配置反序列化程序:
cfg.ConfigureJsonDeserializer(j => j.ConfigureForNodaTime(DateTimeZoneProviders.Tzdb) );https://stackoverflow.com/questions/70422055
复制相似问题