首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >RxJava --终止无限流

RxJava --终止无限流
EN

Stack Overflow用户
提问于 2013-12-18 16:22:49
回答 1查看 3.8K关注 0票数 11

我正在探索反应性编程和RxJava。这很有趣,但我被困在一个我找不到答案的问题上。我的基本问题是:什么是一种反应恰当的方式来终止另一种无穷无尽的可观察的运行?我还欢迎关于我的代码的批判性和反应性最佳实践。

作为练习,我正在编写日志文件尾实用程序。日志文件中的行流由Observable<String>表示。为了让BufferedReader继续读取添加到文件中的文本,我忽略了通常的reader.readLine() == null终止检查,而是将其解释为我的线程应该休眠,等待更多的记录器文本。

但是,虽然我可以使用takeUntil终止观察者,但我需要找到一种干净的方法来终止否则无限运行的文件监视程序。我可以编写我自己的terminateWatcher方法/字段,但这打破了可观察到的/观察者封装--我希望尽可能严格地对待反应性范例。

以下是Observable<String>代码:

代码语言:javascript
复制
public class FileWatcher implements OnSubscribeFunc<String> {
    private Path path = . . .;

    @Override
    // The <? super String> generic is pointless but required by the compiler
    public Subscription onSubscribe(Observer<? super String> observer) {
        try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
            String newLine = "";
            while (!Thread.interrupted()) {  // How do I terminate this reactively?
                if ((newLine = reader.readLine()) != null)
                    observer.onNext(newLine);
                else
                    try {
                        // Wait for more text
                        Thread.sleep(250);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
            }
            observer.onCompleted();
        } catch (Exception e) {
            observer.onError(e);
        }

        return null;  // Not sure what Subscription I should return
    }
}

下面是在新行出现时打印新行的观察者代码:

代码语言:javascript
复制
public static void main(String... args) {
    . . .
    Observable<String> lines = Observable.create(createWatcher(file));
    lines = lines.takeWhile(new Func1<String, Boolean>() {
        @Override
        public Boolean call(String line) {
            // Predicate for which to continue processing
            return !line.contains("shutdown");
        }
    }).subscribeOn(Schedulers.threadPoolForIO())
            .observeOn(Schedulers.currentThread());
    // Seems like I should use subscribeOn() and observeOn(), but they
    // make my tailer terminate without reading any text.

    Subscription subscription = lines.subscribe(new Action1<String>() {
        @Override
        public void call(String line) {
             System.out.printf("%20s\t%s\n", file, line);
        }
    });
}

我的两个问题是:

  1. 什么是响应一致的方式来终止否则无限运行的流?
  2. 我的代码中还有哪些其他错误让你哭泣?:)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2013-12-18 18:48:39

因为当请求订阅时,您正在启动文件监视,所以在订阅结束时(即当Subscription关闭时)终止它是有意义的。这解决了代码注释中一个松散的结尾:返回一个Subscription,它告诉FileWatcher停止监视文件。然后替换您的循环条件,以便它检查订阅是否已被取消,而不是检查当前线程是否已被中断。

问题是,如果您的onSubscribe()方法永远不返回,这就没有多大帮助。也许您的FileWatcher应该需要指定一个调度程序,并在该调度程序上进行读取。

票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/20663255

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档