首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >TCP入站端点- Mule ESB -多线程

TCP入站端点- Mule ESB -多线程
EN

Stack Overflow用户
提问于 2015-07-01 22:52:15
回答 1查看 1.1K关注 0票数 0

我有一个TCP入站端点,它引用TCP连接器。这是一个请求响应端点.TCP客户端是一个第三方应用程序,它只在一个套接字上发送请求。我就是这样设置TCP端点的。端点:

代码语言:javascript
复制
<tcp:inbound-endpoint exchange-pattern="request-response"
            responseTimeout="10000" doc:name="TCP" address="${Endpoint}" encoding="ISO-8859-1" connector-ref="TCP"/>

连接器:

代码语言:javascript
复制
<tcp:connector name="TCP" doc:name="TCP connector"
    clientSoTimeout="${Client_SO_Timeout}" receiveBacklog="0" receiveBufferSize="0"
    sendBufferSize="0" serverSoTimeout="${Server_SO_Timeout}" socketSoLinger="0"
    validateConnections="true" keepAlive="true" sendTcpNoDelay="true">
            <receiver-threading-profile maxThreadsActive="${TCP_MaxThreadsActive}" maxThreadsIdle = "${TCP_MaxThreadsIdle}" />
    <reconnect-forever />
    <service-overrides messageReceiver="CustomMessageReceiver" />
    <tcp:custom-protocol ref="CustomLengthProtocol" />
</tcp:connector>

水流运转良好。但是,当必须处理并发请求时,最后几个请求将超时。我从这里了解到,消息正在接收方等待处理(因为只使用一个TCP会话),直到前面的请求被mule流完成为止。

为了对此进行调优,我正在寻找一种方法来更改mule流,如下所示:在收到来自客户端的请求之后,我需要将它发送到可能异步处理它并将响应推送到同一个套接字的mule流。在端点接收到请求后,不需要等待上一个请求的流在处理下一个请求之前完成。不需要将请求/响应顺序从骡子流中保存下来。是否有一种方法可以通过扩展mule端点功能来实现这一点?这与排队异步流处理策略类似,只是响应必须被发送回原始TCP套接字。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-07-14 23:18:31

我就是这样解决这个问题的:(i)不是使用tcp请求响应入站端点,而是使用TCP入站(单向)和TCP出站(单向) (ii) TCP入站接收请求,自定义长度协议拆分消息并将其提供给流。(iii)在VM端点(入站和出站) (iv)中使用请求应答作用域(Iv) vm端点将消息定向到一个单独的流,该流的processingStrategy为“排队-异步”。我计划在这个流级别上为线程池设置maxThreads。(v)第二个流执行业务逻辑,并将响应发送回要发送到套接字的主流。(vi)为了从入站端点访问套接字,我在TCPMessageReceiver类中重写了TCPMessageReceiver方法,并向mulemessage添加了一个名为"ClientSocket“的出站属性。然后,我将此属性传播到主流的末尾到出站端点。在TCPOutbound端点上,我创建了自己的TCPMessageDispatcher并扩展了doDispatch方法。我没有使用出站端点线程池中的套接字,而是使用作为mulemessage一部分的共享的套接字对象。样本流:

代码语言:javascript
复制
<tcp:connector name="TCP" doc:name="TCP connector"
    clientSoTimeout="70000" receiveBacklog="0" receiveBufferSize="0"
    sendBufferSize="0" serverSoTimeout="70000" socketSoLinger="0"
    validateConnections="true" keepAlive="true" sendTcpNoDelay="true" keepSendSocketOpen="true">
    <receiver-threading-profile
        maxThreadsActive="1" maxThreadsIdle="1" />
    <reconnect-forever />
    <service-overrides messageReceiver="CustomMessageReceiver" />

    <tcp:custom-protocol ref="CustomLengthProtocol" />
</tcp:connector>

<tcp:connector name="TCP2" doc:name="TCP connector"
    clientSoTimeout="70000" receiveBacklog="0" receiveBufferSize="0"
    sendBufferSize="0" serverSoTimeout="70000" socketSoLinger="0"
    validateConnections="true" keepAlive="true" sendTcpNoDelay="true"
    keepSendSocketOpen="true">
    <receiver-threading-profile
        maxThreadsActive="1" maxThreadsIdle="1" />
    <reconnect-forever />
    <service-overrides dispatcherFactory="CustomMessageDispatcherFactory"/>
    <tcp:custom-protocol ref="CustomLengthProtocol" />

</tcp:connector>


<spring:beans>
    <spring:bean id="CustomLengthProtocol" name="CustomLengthProtocol"
        class="CustomLengthProtocol" />
</spring:beans>
<flow name="tcptestFlow" doc:name="tcptestFlow">
    <tcp:inbound-endpoint address="tcp://localhost:4444" 
        responseTimeout="100000"  doc:name="TCP" connector-ref="TCP" />
    <byte-array-to-string-transformer
        doc:name="Byte Array to String" />
    <logger level="INFO" category="Expression" doc:name="Logger" />

    <set-session-variable variableName="Socket"
        value="#[message.outboundProperties['ClientSocket']]"
        doc:name="Session Variable" />
    <logger message="#[payload] - #[Socket]" level="INFO" category="Request"
        doc:name="Logger" />

    <request-reply doc:name="Request-Reply">
        <vm:outbound-endpoint exchange-pattern="one-way"
            path="/qin" doc:name="VM" >
            <message-properties-transformer scope="outbound"> 
                <delete-message-property key="MULE_REPLYTO"></delete-message-property>  
            </message-properties-transformer> 
          </vm:outbound-endpoint>
        <vm:inbound-endpoint exchange-pattern="one-way"
            path="/qout" doc:name="VM" />
    </request-reply>
    <logger message="#[payload]" level="INFO" doc:name="Logger"
        category="Response" />
    <string-to-byte-array-transformer
        doc:name="String to Byte Array" />
    <tcp:outbound-endpoint address="tcp://localhost:4444" 
        responseTimeout="10000" doc:name="TCP" connector-ref="TCP2" />

</flow>
<flow name="tcptestFlow1" processingStrategy="queued-asynchronous"
    doc:name="tcptestFlow1">

    <vm:inbound-endpoint exchange-pattern="one-way"
        path="/qin" doc:name="VM" />
    <logger message="Inside VM Flow" level="INFO" doc:name="Logger" />
    <set-payload value="Appended Response - #[payload]"
        doc:name="Set Payload" />
    <vm:outbound-endpoint exchange-pattern="one-way"
        path="/qout" doc:name="VM" />

</flow>
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/31172983

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档