我知道spring集成有TcpInboundGateway和ByteArrayStxEtxSerializer来处理通过TCP端口传输的数据。
如果TCP服务器需要读取从客户端发送的所有数据,然后对其进行处理,ByteArrayStxEtxSerializer就会工作得很好。(请求和响应模型)我使用的是single=false,这样就可以在同一个连接中处理多个请求。
例如,如果客户端发送0x02AAPL0x03,那么Server可以发送AAPL价格。
如果客户端发送0x02AAPL0x030x02GOOG0x03,则我的TCP服务器正在工作。它发送AAPL和GOOG的价格。
有时客户端可以发送EOT (0x04)。如果客户端发送EOT,我想关闭套接字连接。
例如:客户机请求可以是0x02AAPL0x030x02GOOG0x03 0x020x040x03。注意,EOT出现在最后一个包中。
我知道可以定制ByteArrayStxEtxSerializer反序列化器来读取客户端发送的字节。
反序列化器是关闭套接字连接的好地方吗?如果没有,应该如何通知spring集成框架关闭套接字连接?
请帮帮忙。
这是我的弹簧配置:
<int-ip:tcp-connection-factory id="crLfServer"
type="server"
port="${availableServerSocket}"
single-use="false"
so-timeout="10000"
using-nio="false"
serializer="connectionSerializeDeserialize"
deserializer="connectionSerializeDeserialize"
so-linger="2000"/>
<bean id="connectionSerializeDeserialize" class="org.springframework.integration.ip.tcp.serializer.ByteArrayStxEtxSerializer"/>
<int-ip:tcp-inbound-gateway id="gatewayCrLf"
connection-factory="crLfServer"
request-channel="serverBytes2StringChannel"
error-channel="errorChannel"
reply-timeout="10000"/> <!-- reply-timeout works on inbound-gateway -->
<int:channel id="toSA" />
<int:service-activator input-channel="toSA"
ref="myService"
method="prepare"/>
<int:object-to-string-transformer id="serverBytes2String"
input-channel="serverBytes2StringChannel"
output-channel="toSA"/>
<int:transformer id="errorHandler"
input-channel="errorChannel"
expression="payload.failedMessage.payload + ':' + payload.cause.message"/>更新:添加抛出新的SoftEndOfStreamException(“流关闭”)来关闭序列化程序中的流,我可以在EventListener中看到关闭的日志条目。当服务器关闭连接时,我希望在客户机中接收java.io.InputStream.read()作为-1。但是客户端正在接收
java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:129)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:264)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:306)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:158)
at sun.nio.cs.StreamDecoder.read0(StreamDecoder.java:107)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:93)
at java.io.InputStreamReader.read(InputStreamReader.java:151)是否还有其他东西可以关闭服务器端的连接并将其传播到客户端?
谢谢你的帮助。
谢谢
发布于 2016-11-07 18:09:04
反序列化器不能访问套接字,只有输入流;关闭它可能会工作,但是您可能会在日志中得到很多噪音。
最好的解决方案是抛出一个SoftEndOfStreamException;这意味着套接字应该关闭,所有的东西都清理好了。
编辑
添加侦听器以检测/记录关闭。
@SpringBootApplication
public class So40471456Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So40471456Application.class, args);
Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234);
socket.getOutputStream().write("foo\r\n".getBytes());
socket.close();
Thread.sleep(10000);
context.close();
}
@Bean
public EventListener eventListener() {
return new EventListener();
}
@Bean
public TcpNetServerConnectionFactory server() {
return new TcpNetServerConnectionFactory(1234);
}
@Bean
public TcpReceivingChannelAdapter inbound() {
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(server());
adapter.setOutputChannelName("foo");
return adapter;
}
@ServiceActivator(inputChannel = "foo")
public void syso(byte[] in) {
System.out.println(new String(in));
}
public static class EventListener implements ApplicationListener<TcpConnectionCloseEvent> {
private final Log logger = LogFactory.getLog(getClass());
@Override
public void onApplicationEvent(TcpConnectionCloseEvent event) {
logger.info(event);
}
}
}使用XML,只需为侦听器类添加一个<bean/>。
结果:
foo
2016-11-07 16:52:04.133 INFO 29536 --- [pool-1-thread-2] c.e.So40471456Application$EventListener : TcpConnectionCloseEvent
[source=org.springframework.integration.ip.tcp.connection.TcpNetConnection@118a7548],
[factory=server, connectionId=localhost:50347:1234:b9fcfaa9-e92c-487f-be59-1ed7ebd9312e]
**CLOSED**EDIT2
正如我所期望的那样..。
@SpringBootApplication
public class So40471456Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So40471456Application.class, args);
Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234);
socket.getOutputStream().write("foo\r\n".getBytes());
try {
System.out.println("\n\n\n" + socket.getInputStream().read() + "\n\n\n");
context.getBean(EventListener.class).latch.await(10, TimeUnit.SECONDS);
}
finally {
socket.close();
context.close();
}
}
@Bean
public EventListener eventListener() {
return new EventListener();
}
@Bean
public TcpNetServerConnectionFactory server() {
TcpNetServerConnectionFactory server = new TcpNetServerConnectionFactory(1234);
server.setDeserializer(is -> {
throw new SoftEndOfStreamException();
});
return server;
}
@Bean
public TcpReceivingChannelAdapter inbound() {
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(server());
adapter.setOutputChannelName("foo");
return adapter;
}
public static class EventListener implements ApplicationListener<TcpConnectionCloseEvent> {
private final Log logger = LogFactory.getLog(getClass());
private final CountDownLatch latch = new CountDownLatch(1);
@Override
public void onApplicationEvent(TcpConnectionCloseEvent event) {
logger.info(event);
latch.countDown();
}
}
}结果:
2016-11-08 08:27:25.964 INFO 86147 --- [ main] com.example2.So40471456Application : Started So40471456Application in 1.195 seconds (JVM running for 1.764)
-1
2016-11-08 08:27:25.972 INFO 86147 --- [pool-1-thread-2] c.e.So40471456Application$EventListener : TcpConnectionCloseEvent [source=org.springframework.integration.ip.tcp.connection.TcpNetConnection@fee3774], [factory=server, connectionId=localhost:54984:1234:f79a6826-0336-4823-8844-67054903a094] **CLOSED**https://stackoverflow.com/questions/40471456
复制相似问题