首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >有状态Rsocket应用程序

有状态Rsocket应用程序
EN

Stack Overflow用户
提问于 2019-05-03 11:15:31
回答 3查看 917关注 0票数 5

在我的项目中,我希望有多个客户端连接到一个服务。我正在使用java实现。

服务应该为每个客户端维护一个状态。现在,我可以通过某种标识符来管理客户端。我已经实施了这个选项。但是,我不希望使用字符串手动管理会话。

因此,另一个想法是通过Rsocket连接标识客户端。是否有办法使用Rsocket通道来识别特定的客户端?

想象一下一个示例服务和几个客户端。每个客户端都有Rsocket通道,服务已经启动并运行。是否有办法使用Rsocket通道识别服务器端的这些客户端?如果您能给出这样的行为的编程示例,那就太棒了。谢谢!

编辑(描述更详细的情况)

这是我的例子。

我们目前有三个CORBA对象,如图中所示:

  • LoginObject (通过NamingService检索引用)。客户端可以调用login()方法来获取会话
  • 会话对象有各种方法来查询有关当前序列化上下文的详细信息,最重要的是获取事务对象。
  • 事务对象可以通过一个使用commandName和键值对列表作为参数的通用方法来执行各种命令。在客户机执行n个命令之后,他可以提交或回滚事务(也可以通过事务对象上的方法)。

因此,在这里,我们使用session对象来执行服务上的事务。

现在我们决定从CORBA转移到Rsocket。因此,我们需要Rsocket微服务来存储会话的状态,否则我们就无法知道什么将被提交或回滚。这能与每个客户的单个出版商一起完成吗?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2019-05-15 05:47:33

下面是我前几天做的一个示例,它将使用Netifi的代理创建一个有状态的RSocket:https://github.com/netifi/netifi-stateful-socket

不幸的是,您需要在本地构建我们的开发分支(https://github.com/netifi/netifi-java) --如果您不想在本地构建它,那么在周末之前应该会有一个带有代码的版本。

我也在处理一个纯粹的RSocket示例,但是如果您想看看它将如何查看在示例中找到的StatefulSocket。它应该为您提供一个如何使用纯RSocket处理会话的线索。

关于事务管理器的其他问题--您需要将事务绑定到正在发出的反应性流信号--如果收到取消、回滚onError,如果收到onComplete,则提交事务。从Flux/Mono有副作用的方法,这应该使这很容易处理。根据您正在做的事情,您也可以使用BaseSubscriber,因为它有钩子来处理不同的反应流信号。

谢谢,罗伯特

票数 4
EN

Stack Overflow用户

发布于 2019-05-16 06:17:04

恢复连接(即在服务器上维护状态)的一个示例已降落在rsocket-java repo中。

https://github.com/rsocket/rsocket-java/commit/d47629147dd1a4d41c7c8d5af3d80838e01d3ba5

恢复整个连接,包括与每个单独通道相关联的任何状态等。

有一个rsocket-cli项目可以让您尝试这一点。启动和停止套接字进程,并观察客户端和服务器进程。

代码语言:javascript
复制
$ socat -d TCP-LISTEN:5001,fork,reuseaddr TCP:localhost:5000
代码语言:javascript
复制
$ ./rsocket-cli --debug --resume --server -i cli:time tcp://localhost:5000
代码语言:javascript
复制
$ ./rsocket-cli -i client --stream --resume tcp://localhost:5001
票数 2
EN

Stack Overflow用户

发布于 2019-05-09 13:48:30

从您的描述来看,channel看起来会运行得最好,我以前还没有使用过通道,所以我不能保证(对不起)。但我建议你试试这样的方法:

超越者:

代码语言:javascript
复制
public class TransactionController implements Publisher<Payload> {

    List<Transaction> transcations = new ArrayList<>();

    @Override
    public void subscribe(Subscriber<? super Payload> subscriber) {

    }

    public void processPayload(Payload payload) {
        // handle transcations...
    }
}

在您的RSocket实现中,重写requestChannel

代码语言:javascript
复制
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
    // Create new controller for each channel
    TranscationController cntrl = new TranscationController();
    Flux.from(payloads)
      .subscribe(cntrl::processPayload);
    return Flux.from(cntrl);
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55968704

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档