首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在发送下一个内容之前等待变量的更改,websocket by reactor-netty

如何在发送下一个内容之前等待变量的更改,websocket by reactor-netty
EN

Stack Overflow用户
提问于 2019-03-19 22:27:22
回答 1查看 53关注 0票数 0

该怎么办呢?ReadFlushMessage ()将等待这一结果。获取新内容的消息。

不立即退出websocket连接

代码语言:javascript
复制
    private Set<String> message = new HashSet<>();

    private void writeMessage(String message) {
        this.message.add(message);
    }

    private String[] readFlushMessage() {
        String[] _message = (String[])this.message.toArray();
        this.message = new HashSet<>();
        return _message;
    }

    private Publisher<Void> websocketPublisherA(HttpServerRequest request, HttpServerResponse response, WebSocketServerHandle handleObject) {
        return response
            .header("content-type", "text/plain")
            .sendWebsocket((in, out) ->
                out.options(NettyPipeline.SendOptions::flushOnEach)
                    .sendString(
                        Flux.just(readFlushMessage())
                    )
            );
    }
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-03-20 00:27:07

尝试使用Reactor Core类型。

这里有一个非常简单的示例来说明您想要实现的目标:

代码语言:javascript
复制
    @Test
    public void test() {
        FluxProcessor<String, String> serverMsg =
                ReplayProcessor.<String>create();

        Flux.range(1, 20)
                .map(Object::toString)
                .subscribe(serverMsg::onNext);

        DisposableServer server =
                HttpServer.create()
                        .port(0)
                        .handle((req, resp) ->
                                resp.header("content-type", "text/plain")
                                    .sendWebsocket((in, out) ->
                                            out.options(NettyPipeline.SendOptions::flushOnEach)
                                                    .sendString(serverMsg)
                                    ))
                        .bindNow();

        HttpClient.create()
                .port(server.port())
                .websocket()
                .receive()
                .asString()
                .doOnNext(System.out::println)
                .blockLast();

        server.disposeNow();
    }
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55243321

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档