我正在尝试序列化订阅,以便通过网络发送。我使用的是Scala,并且做了这样的事情:
observable.materialize.subscribe{ n : Notification => sendToNetwork(n)}然而,我得到了一些错误:
java.io.NotSerializableException: rx.lang.scala.Notification$OnNext(准确地说,我正在使用Akka并尝试将通知发送到远程参与者。但我认为这个问题比那个更普遍)。
它似乎拒绝序列化OnNext类,该类实际上是Notification的子类,后者是rx.lang.scala.Notification伴生对象的内部类:
http://rxscala.github.io/scaladoc/#rx.lang.scala.Notification$$OnNext
..。我想我在java文档中的某个地方看到,我们不能序列化内部的非静态类。
我对此的理解正确吗?如果是这样,这是rx-java类层次结构的限制吗?或者,有没有办法解决这个问题,并序列化Notification?
发布于 2014-04-24 13:15:15
,我对此的理解正确吗?
如果外部类是可序列化的,则可以序列化非静态内部类。但在Java和Scala中,您都需要显式地告诉编译器类是可序列化的(通过扩展Serializable),而rx-java Notification和rx-scala OnNext都不是可序列化的。
或者有什么方法可以解决这个问题,并序列化通知吗?
在Akka中,您可以为任何类编写自己的序列化程序:http://doc.akka.io/docs/akka/snapshot/scala/serialization.html。Java序列化仅在默认情况下使用。
发布于 2015-07-08 03:33:42
Kontraktor- reactive -Streams基于reactive streams提供了一流的远程处理性能。您不会被Kontraktor吸引,但可以将其用作提供快速远程处理(基于快速序列化)的工具。
public static void remotingRxToRx() {
Observable<Integer> range = Observable.range(0, 50_000_000);
Publisher<Integer> pub = RxReactiveStreams.toPublisher(range);
KxReactiveStreams.get().asRxPublisher(pub)
.serve(new TCPNIOPublisher().port(3456));
RateMeasure rm = new RateMeasure("events");
KxPublisher<Integer> remoteStream =
KxReactiveStreams.get()
.connect(Integer.class, new TCPConnectable().host("localhost").port(3456));
RxReactiveStreams.toObservable(remoteStream)
.forEach( i -> rm.count() );
}将很快移动到一个单独的项目中(只需检查我的项目)
https://stackoverflow.com/questions/23250012
复制相似问题