我正在尝试用rsocket-websocket客户端构建一个聊天前端。我能够使用requestChannel(new Flowable(source...))从前端发送消息,并使用requestChannel(new Flowable.just({metatdata}))接收消息。
我试图使用FlowableProcessor将requestChannel的两个调用减少为一个。
无法找到rsocket的FlowableProcessor文档。
以下是我的尝试:
const processor = new FlowableProcessor(
new Flowable(source => {
source.onSubscribe({
cancel: () => {},
request: n => {}
});
source.onNext({
metadata: constructMetadataWithChannelId(channelId),
});
})
);
sock.requestChannel(processor.map(item => item))
.subscribe({
onComplete: () => {
console.log(
`complted subscribe`,
);
},
onError: error1 => {
console.log(
`subscriber err: ${error1}`,
);
},
onSubscribe: subscription => {
console.log(
`onSubscribe`,
);
setConnectStatus('connected');
setChannelIdDone(true);
subscription.request(1000);
},
onNext: (val: any) => {
const value = JSON.parse(val) as Message;
console.log(
`received event from channel: ${JSON.stringify(
value,
)}`,
);
}
})我知道这是类型问题。无法确定processor.map(item => item)在哪里出错。
TS2345: Argument of type 'IPublisher<unknown>' is not assignable to parameter of type 'Flowable<Payload<Buffer, Buffer>>'.
Type 'IPublisher<unknown>' is missing the following properties from type 'Flowable<Payload<Buffer, Buffer>>': lift, take发布于 2021-12-30 13:59:29
这个错误是微不足道的。不能使用FlawableProcessor,因为它没有实现与Flawable相同的接口。
目前,rsocket-js还没有得到很好的抛光,也有一些缺陷。其中一些缺陷是类型使用不一致的。据推测,IPublisher和ISubscriber接口应该在所有其他公共接口中使用。但是,为了作者的简单性(我猜),他们被Flowable和Single类型所取代。
根据源代码,FlowableProcessor没有扩展Flowable,而是实现了IPublisher、ISubscriber和ISubscription接口本身,也没有实现由Flowable实现的lift和take方法。因此,它不能直接替代Flowable,尽管它应该作为IPublisher使用。
在您的示例中,我认为没有理由使用FlowableProcessor。相反,您可以将用作构造Flowable的参数的FlowableProcessor直接传递给requestChannel方法:
const requestSource = new Flowable(source => {
source.onSubscribe({
cancel: () => {},
request: n => {}
});
source.onNext({
metadata: constructMetadataWithChannelId(channelId),
});
});
sock.requestChannel(requestSource.map(item => item))
...如果您真的需要在这段代码中使用FlowableProcessor处理器,那么您可以强制将它强制转换为Flowable,但它可能是未来意外错误的来源:
sock.requestChannel(processor.map(item => item) as any as Flowable)请注意,您使用Flowable不正确。当尚未请求数据时,可以在订阅时发送数据。这违反了RSocket合同。适当的实现应该类似于:
let requestsSink: {
sendRequest(myRequest: unknown): void,
complete(): void
};
const requestsSource = new Flowable((requestsSubscriber) => {
// Number of the requests requested by subscriber.
let requestedRequests = 0;
// Buffer for requests which should be sent but not requested yet.
const pendingRequests: unknown[] = [];
let completed = false;
requestsSink = {
sendRequest(myRequest: unknown) {
if (completed) {
// It's completed, nobody expects this request.
return;
}
if (requestedRequests > 0) {
--requestedRequests;
requestsSubscriber.onNext(myRequest);
} else {
pendingRequests.push(myRequest);
}
},
complete() {
if (!completed) {
completed = true;
requestsSubscriber.onComplete();
}
},
};
requestsSubscriber.onSubscribe({
cancel: () => {
// TODO: Should be handled somehow.
},
request(n: number) {
const toSend = pendingRequests.splice(n);
requestedRequests += n - toSend.length;
for (const pending of toSend) {
requestsSubscriber.onNext(pending);
}
}
});
});
sock.requestChannel(requestsSource.map(item => item))
...
// Somewhere else the data is provided:
if (requestsSink != null) {
requestsSink.sendRequest({});
requestsSink.sendRequest({});
requestsSink.sendRequest({});
requestsSink.sendRequest({});
requestsSink.complete();
}https://stackoverflow.com/questions/70363812
复制相似问题