基于Netty的应用程序
目前,所有这些都在线程上运行,就像在一个TCP连接上一样。我想知道如何并行化2,困难在于消息不能被并行地处理,因为消息是有部分顺序的。您可以认为这是一个key(message)函数,并且该函数返回相同结果的所有消息都需要按顺序处理,但是如果结果不同,它们可以并行运行。因此,我正在考虑从消息映射到像hash(key(message)) % threadCount这样的线程。
想象一下这条管道:
pipeline.addLast(deframer);
pipeline.addLast(new IdleStateHandler(...));
pipeline.addLast(decoder);
pipeline.addLast(bizLogicHandler1);
pipeline.addLast(bizLogicHandler2);在解码器中,我能够计算key(message)的结果,所以我想并行处理解码器下游的所有内容。是记录在案,为了使用多个线程,我可以做
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
...
pipeline.addLast(group, "bizLogicHandler1", bizLogicHandler1);
pipeline.addLast("bizLogicHandler2", bizLogicHandler2);我猜它的意思是bizLogicHandler1,它下面的所有东西(在上面的例子中是bizLogicHandler2)都可以并行运行?(或者我也必须为group指定bizLogicHandler2?)
然而,正如文档所解释的那样,上面的代码仍然是完全连续运行的,它提供UnorderedThreadPoolEventExecutor作为最大化并行性的替代方法,而代价是完全取消订单,而在我的情况下,这是行不通的。
查看EventExecutorGroup和EventExecutor接口,我看不出如何能够传递哪些消息可以并行处理,哪些消息必须按顺序处理。
有什么想法吗?
发布于 2022-05-19 04:39:12
事实证明,使用一个LocalServerChannel和尽可能多的LocalChannel来实现并行性是非常容易的。
服务器通道将接收消息并将它们分派给其中一个客户端通道。另一个方向(从客户端通道到服务器通道)也可以工作。我成功地以这种方式并行化了一个应用程序,以允许更高的吞吐量,扩展到更多的核心。
下面是一个基本版本,删除了大部分错误处理、日志记录和业务逻辑:
public class Parallelizer extends SimpleChannelInboundHandler<Message> {
private static final AtomicInteger EPHEMERAL_PORT = new AtomicInteger(0);
private final Channel[] internalChannels;
private final AtomicReference<ChannelHandlerContext> upstreamCtx = new AtomicReference<>(null);
public Parallelizer(EventLoopGroup eventLoopGroup, List<ChannelInitializer<Channel>> channelInitializers) throws InterruptedException {
internalChannels = (Channel[]) Array.newInstance(Channel.class, channelInitializers.size());
int port = EPHEMERAL_PORT.getAndIncrement();
final LocalAddress addr = new LocalAddress("PARALLELIZER-" + port);
createServerChannel(eventLoopGroup, addr);
channelInitializers.forEach(channelChannelInitializer -> createClientChannel(eventLoopGroup, addr, channelChannelInitializer));
waitForInternalClientsToConnect();
}
private void waitForInternalClientsToConnect() throws InterruptedException {
synchronized (internalChannels) {
while (internalChannels[internalChannels.length - 1] == null) {
internalChannels.wait();
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
private void dispatch(Message msg, int clientChannelIdx) {
Channel channel = internalChannels[clientChannelIdx];
channel.writeAndFlush(msg, channel.voidPromise());
}
private void createClientChannel(EventLoopGroup eventLoopGroup, LocalAddress addr, ChannelInitializer<Channel> channelInitializer) {
Bootstrap cb = new Bootstrap();
cb.group(eventLoopGroup)
.channel(LocalChannel.class)
.handler(channelInitializer);
cb.connect(addr);
}
private void createServerChannel(EventLoopGroup eventLoopGroup, LocalAddress addr) throws InterruptedException {
ServerBootstrap sb = new ServerBootstrap();
sb.group(eventLoopGroup)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) {
ch.pipeline().addLast(new InternalHandler());
}
});
sb.bind(addr).sync();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) {
final int hash = msg.getDistributionKey().hashCode();
int clientChannelIdx = Integer.remainderUnsigned(hash, internalChannels.length);
dispatch(msg, clientChannelIdx);
}
private class InternalHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
synchronized (internalChannels) {
final int firstNull = Arrays.asList(internalChannels).indexOf(null);
internalChannels[firstNull] = ctx.channel();
internalChannels.notify();
}
super.channelActive(ctx);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
final ChannelHandlerContext upstreamCtx = Parallelizer.this.upstreamCtx.get();
msg.retain();
if (upstreamCtx != null)
upstreamCtx.writeAndFlush(msg, upstreamCtx.channel().voidPromise());
}
}
}https://stackoverflow.com/questions/70177554
复制相似问题