首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >春云流发送到DLQ的所有消息

春云流发送到DLQ的所有消息
EN

Stack Overflow用户
提问于 2020-08-13 08:29:32
回答 1查看 1.4K关注 0票数 0

我正在使用使用kafka绑定的spring云流来练习以下应用程序:

代码语言:javascript
复制
spring:
  cloud:
    function:
      definition: single;react
    stream:
      bindings:
        single-in-0:
          destination: single-in
          group: single-in
        single-out-0:
          destination: single-out
        react-in-0:
          destination: react-in
          group: react-in
        react-out-0:
          destination: react-out
      kafka:
        default:
          consumer:
            enableDlq: true
            dlqName: dlq
        binder:
          brokers: 192.168.153.133:9092
          autoAddPartitions: true
代码语言:javascript
复制
@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    
    @Bean
    public Function<String,String> single() {
        return msg -> {
            if ("EXCP".equalsIgnoreCase(msg)) {
                throw new RuntimeException("Received bad massage");
            }
            return "OK: " + msg;
        };
    }
    
    @Bean
    public Function<Flux<String>,Flux<String>> react() {
        return fluxMsg -> {
            return fluxMsg.map(msg -> {
                if ("EXCP".equalsIgnoreCase(msg)) {
                    throw new RuntimeException("Received bad massage");
                }
                return "OK: " + msg;
            });
        };
    }
}

如您所见,应用程序非常简单:如果收到的消息是"EXCP“,它会抛出一个异常,否则它会发布"OK”消息。

对我来说不清楚的是,为什么如果出现从“react in”读取的坏消息,那么从这个主题读取的每一条消息都会出现在DLQ主题中。例如:

把" test -1“写成”react“,在”"react-out"

  • Write“中写”-> 1: test 1“,在”

  • “中写”2“,在”->“中写"OK: test 2”,在"react-out"

  • Write "EXCP“中写”

  • 2“,把"ECP”写成“ECP”,在“dlq”

  • 书面写"test-3“中写”ECP“,在”dlq“的”-> Got“测试3中写”ECP“,在"dlq"

中写”-> Got“ECP。

我原以为最后一条信息将发表在“反应”主题中,而不是在"dlq“主题中。在这里,原木:

代码语言:javascript
复制
...
ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 100
    auto.offset.reset = earliest
    bootstrap.servers = [192.168.153.133:9092]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = react-in
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2020-08-13 09:59:10.622  INFO 17688 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-08-13 09:59:10.622  INFO 17688 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-08-13 09:59:10.622  INFO 17688 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1597305550622
2020-08-13 09:59:10.623  INFO 17688 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-react-in-4, groupId=react-in] Subscribed to topic(s): react-in
2020-08-13 09:59:10.624  INFO 17688 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2020-08-13 09:59:10.631  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-single-in-2, groupId=single-in] Successfully joined group with generation 8
2020-08-13 09:59:10.637  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-single-in-2, groupId=single-in] Adding newly assigned partitions: single-in-0
2020-08-13 09:59:10.648  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-single-in-2, groupId=single-in] Setting offset for partition single-in-0 to the committed offset FetchPosition{offset=18, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[192.168.153.133:9092 (id: 0 rack: null)], epoch=0}}
2020-08-13 09:59:10.658  INFO 17688 --- [           main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@de8039f
2020-08-13 09:59:10.668  INFO 17688 --- [container-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-react-in-4, groupId=react-in] Cluster ID: UzQ_A2bBRdOuprgdvSEIZg
2020-08-13 09:59:10.676  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-react-in-4, groupId=react-in] Discovered group coordinator 192.168.153.133:9092 (id: 2147483647 rack: null)
2020-08-13 09:59:10.677  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-react-in-4, groupId=react-in] (Re-)joining group
2020-08-13 09:59:10.689  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-react-in-4, groupId=react-in] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2020-08-13 09:59:10.689  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-react-in-4, groupId=react-in] (Re-)joining group
2020-08-13 09:59:10.699  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-react-in-4, groupId=react-in] Finished assignment for group at generation 4: {consumer-react-in-4-7ca5b03b-bc58-4af0-b4e5-c0666fc2f05a=Assignment(partitions=[react-in-0])}
2020-08-13 09:59:10.723  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-react-in-4, groupId=react-in] Successfully joined group with generation 4
2020-08-13 09:59:10.723  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-react-in-4, groupId=react-in] Adding newly assigned partitions: react-in-0
2020-08-13 09:59:10.726  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-react-in-4, groupId=react-in] Setting offset for partition react-in-0 to the committed offset FetchPosition{offset=51, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[192.168.153.133:9092 (id: 0 rack: null)], epoch=0}}
2020-08-13 09:59:10.726  INFO 17688 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : react-in: partitions assigned: [react-in-0]
2020-08-13 09:59:10.728  INFO 17688 --- [           main] com.example.demo.DemoApplication         : Started DemoApplication in 4.473 seconds (JVM running for 5.039)
2020-08-13 09:59:10.753  INFO 17688 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : single-in: partitions assigned: [single-in-0]
2020-08-13 09:59:23.925  INFO 17688 --- [container-0-C-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [192.168.153.133:9092]
    buffer.memory = 33554432
    client.dns.lookup = default
    client.id = producer-3
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2020-08-13 09:59:23.932  INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-08-13 09:59:23.932  INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-08-13 09:59:23.932  INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1597305563932
2020-08-13 09:59:23.944  INFO 17688 --- [ad | producer-3] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-3] Cluster ID: UzQ_A2bBRdOuprgdvSEIZg
2020-08-13 09:59:31.919  INFO 17688 --- [container-0-C-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.react-in-0' has 0 subscriber(s).
2020-08-13 09:59:34.922 ERROR 17688 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.react-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[4], headers={kafka_offset=54, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@60e67748, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=react-in, kafka_receivedTimestamp=1597305564033, contentType=application/json, kafka_groupId=react-in}], failedMessage=GenericMessage [payload=byte[4], headers={kafka_offset=54, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@60e67748, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=react-in, kafka_receivedTimestamp=1597305564033, contentType=application/json, kafka_groupId=react-in}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:75)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:443)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:417)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1878)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1860)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1797)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1737)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1634)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1364)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1080)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:988)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[4], headers={kafka_offset=54, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@60e67748, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=react-in, kafka_receivedTimestamp=1597305564033, contentType=application/json, kafka_groupId=react-in}]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    ... 27 more

2020-08-13 09:59:34.925  INFO 17688 --- [container-0-C-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [192.168.153.133:9092]
    buffer.memory = 33554432
    client.dns.lookup = default
    client.id = producer-4
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2020-08-13 09:59:34.929  INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-08-13 09:59:34.930  INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-08-13 09:59:34.930  INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1597305574929
2020-08-13 09:59:34.943  INFO 17688 --- [ad | producer-4] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-4] Cluster ID: UzQ_A2bBRdOuprgdvSEIZg
2020-08-13 09:59:39.770 ERROR 17688 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.react-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[6], headers={kafka_offset=55, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@60e67748, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=react-in, kafka_receivedTimestamp=1597305568884, contentType=application/json, kafka_groupId=react-in}], failedMessage=GenericMessage [payload=byte[6], headers={kafka_offset=55, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@60e67748, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=react-in, kafka_receivedTimestamp=1597305568884, contentType=application/json, kafka_groupId=react-in}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:75)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:443)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:417)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1878)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1860)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1797)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1737)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1634)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1364)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1080)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:988)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[6], headers={kafka_offset=55, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@60e67748, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=react-in, kafka_receivedTimestamp=1597305568884, contentType=application/json, kafka_groupId=react-in}]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    ... 27 more

有趣的是,“单一”函数的行为与我预期的一样:

"single-out"

  • Write中的
  1. 将" test -1“写成”-> - in“中的”确定:测试1“到”- in“中的”
    1. 2“,在"single-out"

    中,->获得了"OK: test 2”中的"EXCP“到”-> - in“中的”->“获得了"ECP”,在“dlq”中写到了“

  2. 写的测试- 3”,在“single-out”

中写到了"single- in“,”-> Got : test 3“。

有人能解释一下为什么在反应堆实现中所有消息都在dql中发布,什么意味着"Dispatcher没有订阅者“错误?

谢谢

EN

回答 1

Stack Overflow用户

发布于 2021-10-01 10:06:13

它可能会失败,因为对所有其他事件的消耗都关闭了反应流,就像它在引发异常的第一个事件的过程中失败一样,因此其他使用的事件无法处理,因为您正在接收"Dispatcher没有订阅者“的错误。

onErrorContinue使用

代码语言:javascript
复制
  @Bean
  public Function<Flux<String>,Flux<String>> react() {
    return fluxMsg -> {
      return fluxMsg.map(msg -> {
        if ("EXCP".equalsIgnoreCase(msg)) {
          throw new RuntimeException("Received bad massage");
        }
        return "OK: " + msg;
      }).onErrorContinue((throwable, o) -> logger.error(throwable.getMessage(), throwable));
    };
  }

  1. 删除所有使用者组和主题并重新启动应用程序
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63391127

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档