我想使用Netty或Ratpack来使用json流数据。我的用例是请求体将包含大型json数据(MBs中的json数组)。处理数据的一种方法是阻塞,直到接收到完整的数据,然后启动processing.But,我希望异步处理意味着一旦收到json对象的一个块就会处理它。
我在Netty中偶然发现了JsonObjectDecoder,但我没有运气使用它。这是我的ChannelInitializer课程:
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new JsonObjectDecoder(true));
// HttpServerCodec is a combination of HttpRequestDecoder and HttpResponseEncoder
p.addLast(new HttpServerCodec());
//
// add gizp compressor for http response content
p.addLast(new HttpContentCompressor());
p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new ChunkedWriteHandler());
p.addLast(new ServerHandler());
}
} 我要发送这些数据:
[
{
"timestamp": "2016-11-14 11:08:09+0100",
"message": "message 120",
"hostname": "myhost.com",
"device_product": "product123",
"device_vendor": "vendor123",
"device_version": "1",
"severity": "High"
},
.....
{
"timestamp": "2016-11-14 11:08:09+0100",
"message": "message 121",
"hostname": "myhost.com",
"device_product": "product123",
"device_vendor": "vendor123",
"device_version": "1",
"severity": "High"
}
]但是我发现了一个错误:
io.netty.handler.codec.CorruptedFrameException: invalid JSON received at byte position 0: 504f5354202f6c6f677320485454502f312e310d0a486f73743a206c6f63616c686f73743a383038300d0a436f6e6e656374696f6e3a206b6565702d616c6976650d0a436f6e74656e742d4c656e6774683a203230380d0a4163636570743a206170706c69636174696f6e2f6a736f6e0d0a506f73746d616e2d546f6b656e3a2062383064306264352d663234302d346563622d353631322d3863376139396434633934360d0a43616368652d436f6e74726f6c3a206e6f2d63616368650d0a4f726967696e3a206368726f6d652d657874656e73696f6e3a2f2f6668626a676269666c696e6a62646767656863646463626e636464646f6d6f700d0a557365722d4167656e743a204d6f7a696c6c612f352e30202857696e646f7773204e5420362e313b2057696e36343b2078363429204170706c655765624b69742f3533372e333620284b48544d4c2c206c696b65204765636b6f29204368726f6d652f35352e302e323838332e3837205361666172692f3533372e33360d0a436f6e74656e742d547970653a206170706c69636174696f6e2f6a736f6e0d0a4163636570742d456e636f64696e673a20677a69702c206465666c6174652c2062720d0a4163636570742d4c616e67756167653a20656e2d55532c656e3b713d302e382c6a613b713d302e362c66722d46523b713d302e342c66723b713d302e322c66722d43413b713d302e320d0a0d0a7b2274696d657374616d70223a2022323031362d31312d31342031313a30383a30392b30313030222c226d657373616765223a20226d65737361676520313230222c22686f73746e616d65223a20226d79686f73742e636f6d222c200a09226465766963655f70726f64756374223a202270726f64756374313233222c200a09226465766963655f76656e646f72223a202276656e646f72313233222c200a09226465766963655f76657273696f6e223a202231222c200a09227365766572697479223a202248696768220a090a097d
at io.netty.handler.codec.json.JsonObjectDecoder.decode(JsonObjectDecoder.java:163)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:316)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:230)
at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
at io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:127)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
at io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
at io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
at io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)
at io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)
at io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)我不知道我错过了什么。如果有人知道使用Ratpack实现这一目标的方法,请帮助我。提前谢谢。
发布于 2017-02-01 15:35:03
问题是JSON解码器是管道中的第一个处理程序,它试图解码HTTP。如果我从您发布的错误消息中获取无效数据流,将其解析为字节并从中创建一个字符串(在groovy中).
import javax.xml.bind.DatatypeConverter;
v = "504f5354202f6c6f677320485...<snip>";
byte[] bytes = DatatypeConverter.parseHexBinary(v);
println new String(bytes)结果是:
POST /logs HTTP/1.1主机:本地主机:8080连接:保持活动内容长度:208个接受:应用程序/json令牌:缓存-控制:无缓存来源:Chrome-扩展://User: Mozilla/5.0 (WindowsNT6.1;Win64;x64) AppleWebKit/537.36 (X64) Chrome/55.0.2883.87 Safari/537.36内容-类型:应用程序/json接受-编码:ja,平减,br接受-语言: en-US,en;q=0.8,ja;q=0.6,fr-FR;q=0.4,fr;q=0.2,fr-CA;q=0.2 {“时间戳”:"2016-11-14 11:08:09+0100“、”消息“:”消息120“、”主机名“:"myhost.com”、"device_product":"product123“、"device_vendor":"vendor123”、"device_version":"1“、”严重性“:"High”}
因此,您需要在JSON解码器之前将它们添加到管道中:
然后,JSON解码器将获得一部分JSON字节,并开始向上游发送解析的消息。
发布于 2017-02-02 19:38:52
要在HTTP POST中这样做,您需要确保请求是块的。这近似于您需要对管道所做的事情:
在某个时候,您将得到一个HttpContent,它也是LastHttpContent的一个实例,它将是最后一个块。
棘手的部分是,在某个时候,其中一个HttpContents将有一个不完整的JSON序列,这将在JSON解码器中触发一个错误,此时您需要将ByteBuf回滚到最后一个已知的“好”位置,然后等待下一个块出现并完成它,因为我不认为这是自动处理的。
https://stackoverflow.com/questions/41980892
复制相似问题