我有一个套接字连接,它发出带有标识符的消息。我想为每种类型的消息创建一个单独的观察值。我有一些解决方案,但它们都很笨拙,或者可能存在性能问题。我对RxJS比较陌生,所以我没有意识到我可能会遇到的陷阱。
我的第一个直觉是为每种类型创建一个过滤后的观察值:
const receive_message = Rx.fromEvent(socket, 'data').pipe(share());
const message_type_a = receive_message.pipe(filter(message => message.type === 'a'));
const message_type_b = receive_message.pipe(filter(message => message.type === 'b'));
const message_type_c = receive_message.pipe(filter(message => message.type === 'c'));
const message_type_d = receive_message.pipe(filter(message => message.type === 'd'));我认为这会导致性能问题,因为每当有任何消息传入时,它都会对每种消息类型执行此检查。
我想过像这样做一个多阶段的分区:
const receive_message = Rx.fromEvent(socket, 'data');
const [message_type_a, not_a] = receive_message.pipe(partition(message => message.type === 'a'));
const [message_type_b, not_b] = not_a.pipe(partition(message => message.type === 'b'));
const [message_type_c, message_type_d] = not_b.pipe(partition(message => message.type === 'c'));这是非常笨拙的,我不确定它是否比过滤器解决方案的性能更好。
接下来,我尝试像这样使用主题:
const message_type_a = new Rx.Subject();
const message_type_b = new Rx.Subject();
const message_type_c = new Rx.Subject();
const message_type_d = new Rx.Subject();
Rx.fromEvent(socket, 'data').subscribe(function (message) {
switch (message.type) {
case 'a':
message_type_a.next(message);
break;
case 'b':
message_type_b.next(message);
break;
case 'c':
message_type_c.next(message);
break;
case 'd':
message_type_d.next(message);
break;
default:
console.log('Uh oh');
}
},
console.log,
function () {
message_type_a.complete();
message_type_b.complete();
message_type_c.complete();
message_type_d.complete();
}
);再说一次,这很笨拙,每当我使用主题时,我都会问自己,这是不是"Rx“的做事方式。
理想情况下,我可以这样做:
const [
message_type_a,
message_type_b,
message_type_c,
message_type_d
] = Rx.fromEvent(socket, 'data').pipe(partitionMany(message.type));有没有什么优雅的解决方案,或者我像这样拆分源代码的整体方法是不是有根本上的缺陷?
这是我的第一个问题,所以我希望我做得很好。提前感谢!
发布于 2018-06-19 23:21:13
我把你的开关方案改成了性能更好的方案。
const message_type_a = new Rx.Subject();
const message_type_b = new Rx.Subject();
const message_type_c = new Rx.Subject();
const message_type_d = new Rx.Subject();
subjects = {
'a': message_type_a,
'b': message_type_b,
'c': message_type_c,
'd': message_type_d
}
Rx.fromEvent(socket, 'data').pipe(tap(message =>
subjects[message.type].next(message))).subscribe();发布于 2018-06-25 21:06:43
参见https://www.npmjs.com/package/rx-splice。
我们有完全相同的情况,在我们的情况下,它确实是一个性能问题(使用node --perf测量)。我只是在读了你的问题后创建了这个包,因为分享就是关心。如果对你有效,请让我知道!
请注意,只有在执行过滤器的选择器函数成为问题时,才需要这样做!如拼接自述文件中所述:
如果只使用惯用的RxJS代码,那么在拼接的用例中将使用
。但是,如果您正在编写高性能代码,并且上面的输入$ Observable (或者更有可能是Subject)将被订阅数小时或数千次(X),因此filter(fn)的选择器函数将被调用X次。这可能是我们应用程序中最大的性能瓶颈,因此我们编写了splice,它对每个发出的值只执行一次索引选择器。
https://stackoverflow.com/questions/50931344
复制相似问题