首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从Kafka到JMS的骆驼路线不工作(JMS->Kafka->JMS)

从Kafka到JMS的骆驼路线不工作(JMS->Kafka->JMS)
EN

Stack Overflow用户
提问于 2020-09-18 14:55:42
回答 1查看 687关注 0票数 1

在我的骆驼Router.java,我有下一个路线

代码语言:javascript
复制
from("jms:topic:test.source.topic?asyncConsumer=true")
            .log("Message: ${body}")
            .to("kafka:testing?brokers=192.168.0.100:9092");

from("kafka:testing?brokers=192.168.0.100:9092")
            .log("Message received from Kafka : ${body}")
            .log("    on the topic ${headers[kafka.TOPIC]}")
            .log("    on the partition ${headers[kafka.PARTITION]}")
            .log("    with the offset ${headers[kafka.OFFSET]}")
            .log("    with the key ${headers[kafka.KEY]}")
            // manually set JMSDeliveryMode (1 - NON_PERSISTENT, 2 - PERSISTENT)
            .process(new Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    exchange.getIn().setHeader("JMSDeliveryMode", "1");
               }
            })
            .to("jms:topic:test.sink.topic");

以上骆驼路线出现问题。如果我向使用bin/kafka-console-producer.sh --topic testing --bootstrap-server localhost:9092运行的Kafka生产者向主题bin/kafka-console-producer.sh --topic testing --bootstrap-server localhost:9092发送一些消息,那么从Kafka到JMS的路由就很好了。因此,这是一些问题的链接骆驼路线。

在骆驼pom.xml是春天启动,骆驼-卡夫卡-启动和骆驼-jms-启动依赖。

当我用Maven启动Spring并从Kafka生产者发送一些消息到Kafka broker testing主题时,我可以看到这条消息是由Kafka broker接收的,上面的日志被打印出来了。

错误出现在在线.to("jms:topic:test.sink.topic");上,我不知道它意味着什么。

代码语言:javascript
复制
ERROR 28642 --- [aConsumer[testing]] o.a.c.p.e.DefaultErrorHandler: Failed delivery for (MessageId: ID-PCID on ExchangeId: ID-PCID). Exhausted after delivery attempt: 1 caught: org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is javax.jms.JMSException: AMQ139015: Illegal deliveryMode value: 0

org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is javax.jms.JMSException: AMQ139015: Illegal deliveryMode value: 0
    at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:311) ~[spring-jms-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:185) ~[spring-jms-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:507) ~[spring-jms-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:525) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:438) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:392) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:155) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168) ~[camel-base-3.4.0.jar:3.4.0]
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:395) ~[camel-base-3.4.0.jar:3.4.0]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148) ~[camel-base-3.4.0.jar:3.4.0]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60) ~[camel-base-3.4.0.jar:3.4.0]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:147) ~[camel-base-3.4.0.jar:3.4.0]
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:286) ~[camel-base-3.4.0.jar:3.4.0]
    at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83) ~[camel-base-3.4.0.jar:3.4.0]
    at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:40) ~[camel-support-3.4.0.jar:3.4.0]
    at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:346) ~[camel-kafka-3.4.0.jar:3.4.0]
    at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:222) ~[camel-kafka-3.4.0.jar:3.4.0]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: javax.jms.JMSException: AMQ139015: Illegal deliveryMode value: 0
    at org.apache.activemq.artemis.jms.client.ActiveMQMessage.setJMSDeliveryMode(ActiveMQMessage.java:450) ~[artemis-jms-client-2.12.0.jar:2.12.0]
    at org.apache.camel.component.jms.JmsMessageHelper.setJMSDeliveryMode(JmsMessageHelper.java:435) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.apache.camel.component.jms.JmsBinding.appendJmsProperty(JmsBinding.java:393) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.apache.camel.component.jms.JmsBinding.appendJmsProperties(JmsBinding.java:371) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:346) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.apache.camel.component.jms.JmsProducer$2.createMessage(JmsProducer.java:325) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:561) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.lambda$send$0(JmsConfiguration.java:527) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:504) ~[spring-jms-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    ... 19 common frames omitted

当从JMS向Kafka发送一些消息时,路由运行良好。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-09-18 15:12:32

下面是相关的错误消息:

代码语言:javascript
复制
AMQ139015: Illegal deliveryMode value: 0

此错误消息意味着某些东西已经调用了值为0的JMS方法00的值无效。有效的传递模式值由javax.jms.DeliveryMode定义。

堆栈跟踪指示此无效值是由Camel根据输入消息设置的。见这段代码。为了解决这个问题,您需要确定在org.apache.camel.Message上设置标头org.apache.camel.Message的位置。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63958139

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档