首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何并行处理部分有序的消息?

如何并行处理部分有序的消息?
EN

Stack Overflow用户
提问于 2021-11-30 23:55:54
回答 1查看 66关注 0票数 0

基于Netty的应用程序

  1. 每秒从一个TCP连接中摄取数十万条消息。
  2. 在两个入站处理程序中处理这些消息
  3. 将处理结果发送到下游某个地方

目前,所有这些都在线程上运行,就像在一个TCP连接上一样。我想知道如何并行化2,困难在于消息不能被并行地处理,因为消息是有部分顺序的。您可以认为这是一个key(message)函数,并且该函数返回相同结果的所有消息都需要按顺序处理,但是如果结果不同,它们可以并行运行。因此,我正在考虑从消息映射到像hash(key(message)) % threadCount这样的线程。

想象一下这条管道:

代码语言:javascript
复制
pipeline.addLast(deframer);
pipeline.addLast(new IdleStateHandler(...));
pipeline.addLast(decoder);
pipeline.addLast(bizLogicHandler1);
pipeline.addLast(bizLogicHandler2);

在解码器中,我能够计算key(message)的结果,所以我想并行处理解码器下游的所有内容。是记录在案,为了使用多个线程,我可以做

代码语言:javascript
复制
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
...
pipeline.addLast(group, "bizLogicHandler1", bizLogicHandler1);
pipeline.addLast("bizLogicHandler2", bizLogicHandler2);

我猜它的意思是bizLogicHandler1,它下面的所有东西(在上面的例子中是bizLogicHandler2)都可以并行运行?(或者我也必须为group指定bizLogicHandler2?)

然而,正如文档所解释的那样,上面的代码仍然是完全连续运行的,它提供UnorderedThreadPoolEventExecutor作为最大化并行性的替代方法,而代价是完全取消订单,而在我的情况下,这是行不通的。

查看EventExecutorGroupEventExecutor接口,我看不出如何能够传递哪些消息可以并行处理,哪些消息必须按顺序处理。

有什么想法吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-05-19 04:39:12

事实证明,使用一个LocalServerChannel和尽可能多的LocalChannel来实现并行性是非常容易的。

服务器通道将接收消息并将它们分派给其中一个客户端通道。另一个方向(从客户端通道到服务器通道)也可以工作。我成功地以这种方式并行化了一个应用程序,以允许更高的吞吐量,扩展到更多的核心。

下面是一个基本版本,删除了大部分错误处理、日志记录和业务逻辑:

代码语言:javascript
复制
public class Parallelizer extends SimpleChannelInboundHandler<Message> {

    private static final AtomicInteger EPHEMERAL_PORT = new AtomicInteger(0);
    private final Channel[] internalChannels;
    private final AtomicReference<ChannelHandlerContext> upstreamCtx = new AtomicReference<>(null);

    public Parallelizer(EventLoopGroup eventLoopGroup, List<ChannelInitializer<Channel>> channelInitializers) throws InterruptedException {
        internalChannels = (Channel[]) Array.newInstance(Channel.class, channelInitializers.size());

        int port = EPHEMERAL_PORT.getAndIncrement();
        final LocalAddress addr = new LocalAddress("PARALLELIZER-" + port);
        createServerChannel(eventLoopGroup, addr);

        channelInitializers.forEach(channelChannelInitializer -> createClientChannel(eventLoopGroup, addr, channelChannelInitializer));

        waitForInternalClientsToConnect();
    }

    private void waitForInternalClientsToConnect() throws InterruptedException {
        synchronized (internalChannels) {
            while (internalChannels[internalChannels.length - 1] == null) {
                internalChannels.wait();
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }

    private void dispatch(Message msg, int clientChannelIdx) {
        Channel channel = internalChannels[clientChannelIdx];
        channel.writeAndFlush(msg, channel.voidPromise());
    }

    private void createClientChannel(EventLoopGroup eventLoopGroup, LocalAddress addr, ChannelInitializer<Channel> channelInitializer) {
        Bootstrap cb = new Bootstrap();
        cb.group(eventLoopGroup)
                .channel(LocalChannel.class)
                .handler(channelInitializer);
        cb.connect(addr);
    }

    private void createServerChannel(EventLoopGroup eventLoopGroup, LocalAddress addr) throws InterruptedException {
        ServerBootstrap sb = new ServerBootstrap();
        sb.group(eventLoopGroup)
                .channel(LocalServerChannel.class)
                .childHandler(new ChannelInitializer<LocalChannel>() {
                    @Override
                    public void initChannel(LocalChannel ch) {
                        ch.pipeline().addLast(new InternalHandler());
                    }
                });
        sb.bind(addr).sync();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Message msg) {
        final int hash = msg.getDistributionKey().hashCode();
        int clientChannelIdx = Integer.remainderUnsigned(hash, internalChannels.length);
        dispatch(msg, clientChannelIdx);
    }

    private class InternalHandler extends SimpleChannelInboundHandler<ByteBuf> {
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            ctx.close();
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            synchronized (internalChannels) {
                final int firstNull = Arrays.asList(internalChannels).indexOf(null);
                internalChannels[firstNull] = ctx.channel();
                internalChannels.notify();
            }
            super.channelActive(ctx);
        }

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
            final ChannelHandlerContext upstreamCtx = Parallelizer.this.upstreamCtx.get();
            msg.retain();
            if (upstreamCtx != null)
                upstreamCtx.writeAndFlush(msg, upstreamCtx.channel().voidPromise());
        }
    }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70177554

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档