首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring Integration Idempotent Receiver & Zookeeper元数据存储

Spring Integration Idempotent Receiver & Zookeeper元数据存储
EN

Stack Overflow用户
提问于 2017-03-16 00:19:23
回答 1查看 401关注 0票数 0

我在批处理作业代码中添加了带有过滤器的Zookeeper元数据存储,以便在集群环境中同步数据,并在多个JVM上水平扩展。但是当作业(Jpa适配器)运行时,我的所有消息都会进入丢弃通道,并在日志中记录一次以下错误。

代码语言:javascript
复制
Hibernate: 
    select
        componenti0_.id as id1_4_,
        componenti0_.component_info_file_id as componen2_4_,
        componenti0_.component_serial_no as componen3_4_,
        componenti0_.date_received as date_rec4_4_,
        componenti0_.date_recorded as date_rec5_4_,
        componenti0_.engine_hours as engine_h6_4_,
        componenti0_.is_processed as is_proce7_4_,
        componenti0_.product_serial_no as product_8_4_ 
    from
        cs_component_info componenti0_
2017-03-15 21:28:55.290  INFO [componentdatafiles,,,]       39712 --- [54.26.101:2181)] org.apache.zookeeper.ClientCnxn          :  Session establishment complete on server **************, sessionid = 0x35a82a934c40127, negotiated timeout = 40000
2017-03-15 21:28:55.294  INFO [componentdatafiles,,,]       39712 --- [54.26.100:2181)] org.apache.zookeeper.ClientCnxn          :  Session establishment complete on server *************, sessionid = 0x25a82aab90a0122, negotiated timeout = 40000
2017-03-15 21:28:55.313  INFO [componentdatafiles,,,]       39712 --- [p-1-EventThread] o.a.c.f.state.ConnectionStateManager     :  State change: CONNECTED
2017-03-15 21:28:55.313  INFO [componentdatafiles,,,]       39712 --- [p-1-EventThread] o.a.c.f.state.ConnectionStateManager     :  State change: CONNECTED
2017-03-15 21:28:55.332  INFO [componentdatafiles,,,]       39712 --- [54.26.101:2181)] org.apache.zookeeper.ClientCnxn          :  Session establishment complete on server ****************, sessionid = 0x35a82a934c40128, negotiated timeout = 40000
2017-03-15 21:28:55.333  INFO [componentdatafiles,,,]       39712 --- [p-1-EventThread] o.a.c.f.state.ConnectionStateManager     :  State change: CONNECTED
2017-03-15 21:28:55.507  INFO [componentdatafiles,053728794a9688b3,374707c3669809db,false]       39712 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler   :  [com.deere.componentdatafiles.domain.ComponentInfo@4f31c4d6, com.deere.componentdatafiles.domain.ComponentInfo@54405aa7, com.deere.componentdatafiles.domain.ComponentInfo@584f0396, com.deere.componentdatafiles.domain.ComponentInfo@25710ebd, com.deere.componentdatafiles.domain.ComponentInfo@3b490019, com.deere.componentdatafiles.domain.ComponentInfo@6d0ae0b5, com.deere.componentdatafiles.domain.ComponentInfo@4df90d31, com.deere.componentdatafiles.domain.ComponentInfo@6050f6d5, com.deere.componentdatafiles.domain.ComponentInfo@13ea7b0e, com.deere.componentdatafiles.domain.ComponentInfo@29df2e8e, com.deere.componentdatafiles.domain.ComponentInfo@2c4a2e01, com.deere.componentdatafiles.domain.ComponentInfo@4b4b8303, com.deere.componentdatafiles.domain.ComponentInfo@bb9b36f, com.deere.componentdatafiles.domain.ComponentInfo@5b4257c8, com.deere.componentdatafiles.domain.ComponentInfo@77a57353, com.deere.componentdatafiles.domain.ComponentInfo@3e909678, com.deere.componentdatafiles.domain.ComponentInfo@53e4f875]
2017-03-15 21:28:55.511  INFO [componentdatafiles,053728794a9688b3,83a7350bbbdb4b79,false]       39712 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler   :  com.deere.componentdatafiles.domain.ComponentInfo@4f31c4d6
2017-03-15 21:28:55.519  INFO [componentdatafiles,9f9c46137110bacc,a02fd11fe137a83c,false]       39712 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler   :  org.springframework.messaging.MessageHandlingException: Expression evaluation failed: @componentInfoMetadataStore.get(payload.id) == null; nested exception is java.lang.NullPointerException, failedMessage=GenericMessage [payload=com.deere.componentdatafiles.domain.ComponentInfo@4f31c4d6, headers={sequenceNumber=1, sequenceSize=17, X-Message-Sent=true, messageSent=true, spanTraceId=053728794a9688b3, spanId=91816fafd9b447d0, X-B3-SpanId=91816fafd9b447d0, currentSpan=[Trace: 053728794a9688b3, Span: 91816fafd9b447d0, Parent: 053728794a9688b3, exportable:false], X-B3-Sampled=0, X-B3-TraceId=053728794a9688b3, correlationId=3bac32df-115c-4ff1-8270-2bfca59a4e83, id=4ad7414c-7b63-cfa8-fb03-ce2e20bf16b8, X-Current-Span=[Trace: 053728794a9688b3, Span: 91816fafd9b447d0, Parent: 053728794a9688b3, exportable:false], spanSampled=0}]
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:143)
    at org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor.processMessage(ExpressionEvaluatingMessageProcessor.java:72)
    at org.springframework.integration.filter.AbstractMessageProcessingSelector.accept(AbstractMessageProcessingSelector.java:62)
    at org.springframework.integration.filter.MessageFilter.doHandleRequestMessage(MessageFilter.java:161)
    at org.springframework.integration.handler.AbstractReplyProducingPostProcessingMessageHandler.handleRequestMessage(AbstractReplyProducingPostProcessingMessageHandler.java:46)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212)
    at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:159)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:210)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:272)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.run(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException
    at org.springframework.integration.zookeeper.metadata.ZookeeperMetadataStore.get(ZookeeperMetadataStore.java:205)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:113)
    at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:129)
    at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:49)
    at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:347)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
    at org.springframework.expression.spel.ast.OpEQ.getValueInternal(OpEQ.java:42)
    at org.springframework.expression.spel.ast.OpEQ.getValueInternal(OpEQ.java:32)
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:131)
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:330)
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:169)
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:128)
    ... 49 more

我想知道在设置元数据存储时是否遗漏了一些配置细节,或者表达式的使用是不正确的。下面是我的配置详细信息。

代码语言:javascript
复制
    <int-jpa:inbound-channel-adapter
        channel="inboundChannelAdapterOne" entity-manager-factory="entityManagerFactory"
        auto-startup="true" jpa-query="select cmpntinfo from ComponentInfo cmpntinfo"
        expect-single-result="false" delete-after-poll="false">
        <int:poller fixed-rate="${componentInfoPollarInterval}">
            <!-- <int:transactional propagation="REQUIRES_NEW" transaction-manager="transactionManager"/> -->
        </int:poller>
    </int-jpa:inbound-channel-adapter>

    <splitter id="splitter" input-channel="inboundChannelAdapterOne"
        output-channel="splitteroutputChannel" />


    <int:publish-subscribe-channel id="idempotentServiceChannel"/>

      <integration:channel id="discardChannel" />

    <int:filter input-channel="splitteroutputChannel"
            output-channel="idempotentServiceChannel"
            discard-channel="discardChannel"
            expression="@componentInfoMetadataStore.get(payload.id) == null"/>


    <int:outbound-channel-adapter channel="idempotentServiceChannel"
                              expression="@componentInfoMetadataStore.put(payload.id, 'payload.componentSerialNo')"/>

   <int:service-activator id="componentInfoPollarActivator"
        input-channel="idempotentServiceChannel" ref="componentInfoPollarConsumer"
        method="componentInfoListen" /> 

      <int:service-activator id="discardChannelActivator"
        input-channel="discardChannel" ref="componentInfoPollarConsumer"
        method="discard" /> 

    <beans:bean id="componentInfoMetadataStore"
        class="org.springframework.integration.zookeeper.metadata.ZookeeperMetadataStore">
        <beans:constructor-arg ref="componentInfoZookeeperClient" />
        <beans:property name="root" value="/componentInfoMetaDataStore" />
        <beans:property name="phase" value="-2147483648" />
    </beans:bean>
        <beans:bean id="componentInfoZookeeperClient"
        class="org.springframework.integration.zookeeper.config.CuratorFrameworkFactoryBean">
        <beans:constructor-arg value="${zookeeper.server.uri}" />
    </beans:bean>
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-03-16 01:03:29

该错误表示元数据存储尚未启动。

它需要在适配器之前启动。

尝试将它的phase属性设置为一个很大的负数-比如Integer.MIN_VALUE (这样它就可以提早启动)。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42815332

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档