首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >RxNetty重用连接

RxNetty重用连接
EN

Stack Overflow用户
提问于 2018-08-15 18:20:23
回答 1查看 355关注 0票数 3

我想使用Netflix-Ribbon作为没有Spring Cloud的TCP客户端负载均衡器,我写了测试代码。

代码语言:javascript
复制
public class App  implements Runnable
{
    public static String msg = "hello world";
    public BaseLoadBalancer lb;
    public RxClient<ByteBuf, ByteBuf >  client;
    public Server echo;

    App(){
        lb = new BaseLoadBalancer();
        echo = new Server("localhost", 8000);
        lb.setServersList(Lists.newArrayList(echo));
        DefaultClientConfigImpl impl = DefaultClientConfigImpl.getClientConfigWithDefaultValues();
        client = RibbonTransport.newTcpClient(lb, impl);
    }
    public static void main( String[] args ) throws Exception
    {
        for( int i = 40; i > 0; i--)
        {
            Thread t = new Thread(new App());
            t.start();
            t.join();
        }
        System.out.println("Main thread is finished");
    }
    public String sendAndRecvByRibbon(final String data) 
    {
        String response = "";
        try {
            response = client.connect().flatMap(new Func1<ObservableConnection<ByteBuf, ByteBuf>,
                    Observable<ByteBuf>>() {
                public Observable<ByteBuf> call(ObservableConnection<ByteBuf, ByteBuf> connection) {

                    connection.writeStringAndFlush(data);
                    return connection.getInput();
                }
            }).timeout(1, TimeUnit.SECONDS).retry(1).take(1)
                    .map(new Func1<ByteBuf, String>() {
                        public String call(ByteBuf ByteBuf) {
                            return ByteBuf.toString(Charset.defaultCharset());
                        }
                    })
                    .toBlocking()
                    .first();
        }
        catch (Exception e) {
            System.out.println(((LoadBalancingRxClientWithPoolOptions) client).getMaxConcurrentRequests());
            System.out.println(lb.getLoadBalancerStats());
        }
        return response;
    }
    public void run() {
        for (int i = 0; i < 200; i++) {
            sendAndRecvByRibbon(msg);
        }
    }

}

我发现它会在每次我调用sendAndRecvByRibbon时创建一个新的套接字,即使poolEnabled设置为true。所以,这让我很困惑,我错过了什么?并且没有配置池大小的选项,但有一个PoolMaxThreads和一个MaxConnectionsPerHost

我的问题是如何在我的简单代码中使用连接池,以及我的sendAndRecvByRibbon有什么问题,它打开一个套接字,然后只使用一次,我如何重用这个连接?谢谢你的时间。

服务器只是一个用pyhton3编写的简单回显服务器,我注释掉了conn.close(),因为我想使用长连接。

代码语言:javascript
复制
import socket
import threading
import time
import socketserver
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
    def handle(self):
        conn = self.request
        while True:
            client_data = conn.recv(1024)
            if not client_data:
                time.sleep(5)
            conn.sendall(client_data)
#            conn.close()

class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
    pass

if __name__ == "__main__":
    HOST, PORT = "localhost", 8000
    server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
    ip, port = server.server_address
    server_thread = threading.Thread(target=server.serve_forever)
    server_thread.daemon = True
    server_thread.start()
    server.serve_forever()

而mevan的POM,我只是在IED自动生成的pom中添加了两个依赖项。

代码语言:javascript
复制
<dependency>
    <groupId>commons-configuration</groupId>
    <artifactId>commons-configuration</artifactId>
    <version>1.6</version>
</dependency>
<dependency>
    <groupId>com.netflix.ribbon</groupId>
    <artifactId>ribbon</artifactId>
    <version>2.2.2</version>
</dependency>

打印src_port的代码

代码语言:javascript
复制
@Sharable
public class InHandle extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(ctx.channel().localAddress());
        super.channelRead(ctx, msg);
    }
}

public class Pipeline implements PipelineConfigurator<ByteBuf, ByteBuf> {
    public InHandle handler;
    Pipeline() {
        handler = new InHandle();
    }
    public void configureNewPipeline(ChannelPipeline pipeline) {
        pipeline.addFirst(handler);
    }
}

并将client = RibbonTransport.newTcpClient(lb, impl);更改为Pipeline pipe = new Pipeline();client = RibbonTransport.newTcpClient(lb, pipe, impl, new DefaultLoadBalancerRetryHandler(impl));

EN

回答 1

Stack Overflow用户

发布于 2018-08-22 03:57:06

因此,您的客户端构造函数初始化lb/ App() /等。

然后,通过在第一个for循环中调用new App(),您将使用40个不同的RxClient实例启动40个不同的线程(默认情况下,每个实例都有自己的池)。为了说明这一点-您在这里生成多个RxClient实例的方式不允许它们共享任何公共池。请尝试使用一个RxClient实例。

如果你像下面这样改变你的main方法,它会停止创建额外的socket吗?

代码语言:javascript
复制
    public static void main( String[] args ) throws Exception
    {
        App app = new App() // Create things just once
        for( int i = 40; i > 0; i--)
        {
            Thread t = new Thread(()->app.run()); // pass the run()
            t.start();
            t.join();
        }
        System.out.println("Main thread is finished");
    }

如果上面不能完全帮助你(至少它会减少40倍的创建套接字数量)-你能澄清一下你是如何确定的:

每次我调用sendAndRecvByRibbon时,我发现它会创建一个新的

在使用下面这行代码更新构造函数后,您的测量结果是什么:

代码语言:javascript
复制
    DefaultClientConfigImpl impl = DefaultClientConfigImpl.getClientConfigWithDefaultValues();
    impl.set(CommonClientConfigKey.PoolMaxThreads,1); //Add this one and test

更新

是的,看看sendAndRecvByRibbon,它似乎缺少将PooledConnection标记为no longer acquired,一旦您不希望从它获得任何进一步的读取,就调用close

只要您期望的是唯一的单一读取事件,只需更改此行

代码语言:javascript
复制
connection.getInput()

发送到

代码语言:javascript
复制
return connection.getInput().zipWith(Observable.just(connection), new Func2<ByteBuf, ObservableConnection<ByteBuf, ByteBuf>, ByteBuf>() {
                        @Override
                        public ByteBuf call(ByteBuf byteBuf, ObservableConnection<ByteBuf, ByteBuf> conn) {
                            conn.close();
                            return byteBuf; 
                        }
                    });

请注意,如果您在TCP上设计了更复杂的协议,那么可以分析输入的bytebuf,以确定您特定的“通信结束”标志,该标志指示连接可以返回到池中。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51856938

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档