首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >FlowableProcessor rsocket-js类型记录

FlowableProcessor rsocket-js类型记录
EN

Stack Overflow用户
提问于 2021-12-15 12:36:29
回答 1查看 114关注 0票数 0

我正在尝试用rsocket-websocket客户端构建一个聊天前端。我能够使用requestChannel(new Flowable(source...))从前端发送消息,并使用requestChannel(new Flowable.just({metatdata}))接收消息。

我试图使用FlowableProcessorrequestChannel的两个调用减少为一个。

无法找到rsocket的FlowableProcessor文档。

以下是我的尝试:

代码语言:javascript
复制
const processor = new FlowableProcessor(
    new Flowable(source => {
        source.onSubscribe({
            cancel: () => {},
            request: n => {}
        });
        source.onNext({
            metadata: constructMetadataWithChannelId(channelId),
        });
    })
);
sock.requestChannel(processor.map(item => item))
    .subscribe({
        onComplete: () => {
            console.log(
                `complted subscribe`,
            );
        },
        onError: error1 => {
            console.log(
                `subscriber err: ${error1}`,
            );
        },
        onSubscribe: subscription => {
            console.log(
                `onSubscribe`,
            );
            setConnectStatus('connected');
            setChannelIdDone(true);
            subscription.request(1000);
        },
        onNext: (val: any) => {
            const value = JSON.parse(val) as Message;
            console.log(
                `received event from channel: ${JSON.stringify(
                                            value,
                                        )}`,
            );
        }
    })

我知道这是类型问题。无法确定processor.map(item => item)在哪里出错。

代码语言:javascript
复制
TS2345: Argument of type 'IPublisher<unknown>' is not assignable to parameter of type 'Flowable<Payload<Buffer, Buffer>>'.
Type 'IPublisher<unknown>' is missing the following properties from type 'Flowable<Payload<Buffer, Buffer>>': lift, take
EN

回答 1

Stack Overflow用户

发布于 2021-12-30 13:59:29

这个错误是微不足道的。不能使用FlawableProcessor,因为它没有实现与Flawable相同的接口。

目前,rsocket-js还没有得到很好的抛光,也有一些缺陷。其中一些缺陷是类型使用不一致的。据推测,IPublisherISubscriber接口应该在所有其他公共接口中使用。但是,为了作者的简单性(我猜),他们被FlowableSingle类型所取代。

根据源代码,FlowableProcessor没有扩展Flowable,而是实现了IPublisherISubscriberISubscription接口本身,也没有实现由Flowable实现的lifttake方法。因此,它不能直接替代Flowable,尽管它应该作为IPublisher使用。

在您的示例中,我认为没有理由使用FlowableProcessor。相反,您可以将用作构造Flowable的参数的FlowableProcessor直接传递给requestChannel方法:

代码语言:javascript
复制
const requestSource = new Flowable(source => {
    source.onSubscribe({
        cancel: () => {},
        request: n => {}
    });
    source.onNext({
        metadata: constructMetadataWithChannelId(channelId),
    });
});
sock.requestChannel(requestSource.map(item => item))
    ...

如果您真的需要在这段代码中使用FlowableProcessor处理器,那么您可以强制将它强制转换为Flowable,但它可能是未来意外错误的来源:

代码语言:javascript
复制
sock.requestChannel(processor.map(item => item) as any as Flowable)

请注意,您使用Flowable不正确。当尚未请求数据时,可以在订阅时发送数据。这违反了RSocket合同。适当的实现应该类似于:

代码语言:javascript
复制
    let requestsSink: {
        sendRequest(myRequest: unknown): void,
        complete(): void
    };
    const requestsSource = new Flowable((requestsSubscriber) => {
        // Number of the requests requested by subscriber.
        let requestedRequests = 0;
        // Buffer for requests which should be sent but not requested yet.
        const pendingRequests: unknown[] = [];
        let completed = false;

        requestsSink = {
            sendRequest(myRequest: unknown) {
                if (completed) {
                    // It's completed, nobody expects this request.
                    return;
                }
                if (requestedRequests > 0) {
                    --requestedRequests;
                    requestsSubscriber.onNext(myRequest);
                } else {
                    pendingRequests.push(myRequest);
                }
            },
            complete() {
                if (!completed) {
                    completed = true;
                    requestsSubscriber.onComplete();
                }
            },
        };

        requestsSubscriber.onSubscribe({
            cancel: () => {
                // TODO: Should be handled somehow.
            },
            request(n: number) {
                const toSend = pendingRequests.splice(n);
                requestedRequests += n - toSend.length;
                for (const pending of toSend) {
                    requestsSubscriber.onNext(pending);
                }
            }
        });
    });

    sock.requestChannel(requestsSource.map(item => item))
        ...
    
    // Somewhere else the data is provided:
    if (requestsSink != null) {
        requestsSink.sendRequest({});
        requestsSink.sendRequest({});
        requestsSink.sendRequest({});
        requestsSink.sendRequest({});
        requestsSink.complete();
    }
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70363812

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档