首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >rx-java中的Socket watchdog

rx-java中的Socket watchdog
EN

Stack Overflow用户
提问于 2014-03-07 19:57:47
回答 1查看 2.8K关注 0票数 4

我目前正在努力尝试使用rx实现一个tcp watchdog/retry系统,您的帮助将非常感谢。

有了一个Observable,我想让一个Observable通过定期检查我们是否仍然可以写入套接字而得到一个Observable。很简单,我可以这样做:

代码语言:javascript
复制
class SocketSubscribeFunc implements Observable.OnSubscribeFunc<Socket> {
  private final String hostname;
  private final int port;
  private Socket socket;

  SocketSubscribeFunc(String hostname, int port) {
    this.hostname = hostname;
    this.port = port;
  }

  public Subscription onSubscribe(final Observer<? super Socket> observer) {
    try {
      log.debug("Trying to connect...");
      socket = new Socket(hostname, port);
      observer.onNext(socket);
    } catch (IOException e) {
      observer.onError(e);
    }
    return new Subscription() {
      public void unsubscribe() {
        try {
          socket.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    };
  }
}

Observable<Socket> socketObservable = Observable.create(new SocketSubscribeFunc(hostname,port));
Observable<Boolean> watchdog = Observable.combineLatest(socketObservable, Observable.interval(1, TimeUnit.SECONDS), new Func2<Socket, Long, Boolean>() {

  public Boolean call(final Socket socket, final Long aLong) {
    try {
      socket.getOutputStream().write("ping\n".getBytes());
      return true;
    } catch (IOException e) {
     return false;
    }
  }
});

现在,如果套接字可以被获取(服务器/链接在创建时关闭)或变得不可写(成功连接后无法访问服务器/链接),我想重试连接。理想情况下,通过重新订阅套接字可观察对象,该套接字对象的OnSubscribeFunc使用重试操作符创建连接。正如您所看到的,这将在套接字和看门狗观察值之间引入循环依赖。我玩弄了一段时间的switchMap/materialize...以便将最终的错误传播到无用的地方。

我几乎要放弃这个想法,并使用副作用代码中的主题。但在全球的思想中,应该有更好的方法:)

提前感谢!

EN

回答 1

Stack Overflow用户

发布于 2015-01-26 07:28:42

首先,我会在大多数情况下避免使用Observable.create,因为它通常是不必要的,并且引入了不必要的复杂性。在本例中,Rx有一个名为using的运算符,它允许您创建一个在可观察对象的生命周期中存在的资源对象。它自动捕获运行时错误,并且还提供了一个dispose操作,因此这对于此用例中的套接字来说将是完美的。我使用Java8 lambdas,因为它们的伪代码非常容易。

代码语言:javascript
复制
Observable.using(
    // Resource (socket) factory
    () -> {
      try {
        return new Socket(hostname, port);
      } catch (IOException e) {
        // Rx will propagate this as an onError event.
        throw new RuntimeException(e);
      }
    },
    // Observable factory
    (socket) -> {
      return Observable.interval(1, TimeUnit.SECONDS)
          .map((unusedTick) {
            try {
              socket.getOutputStream().write("ping\n".getBytes());
              return true;
            } catch (IOException e) {
              throw new RuntimeException(e);
            }
          })
          // Retry the inner job up to 3 times before propagating.
          .retry(3);
    },
    // Dispose action for socket.
    // In real life the close probably needs a try/catch.
    (socket) -> socket.close())
    // Retry the outer job up to 3 times.
    .retry(3)
    // If we propagate all errors, emit a 'false', signaling service is not available.
    .onErrorResumeNext(Observable.just(false));

请注意,如果内部作业传播(在3次失败后),这将重试外部作业。要解决这个问题,您应该使用谓词和retryWhen在重试时签出文档。您可以抛出一个特殊的RuntimeException,并且只在外部作业不是由内部作业传播的类型时才重试外部作业。

using文档:http://reactivex.io/RxJava/javadoc/rx/Observable.html#using(rx.functions.Func0,%20rx.functions.Func1,%20rx.functions.Action1)

票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/22249480

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档