首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >vertx.io java + twitter4j -事件循环集成

vertx.io java + twitter4j -事件循环集成
EN

Stack Overflow用户
提问于 2017-03-31 11:44:42
回答 1查看 149关注 0票数 1

我正在尝试将twitter4j与vertx事件循环集成在一起,我不知道我这样做是否正确。虽然我是一个Node.js开发人员,但我对vertx还是新手,所以我熟悉事件循环/单线程概念。

在我的测试中,我想订阅twitter流并在vertx事件总线上发布该流。

我已经创建了一个不可知论的TwitterAPI类,它将把twitter流转换为一个可观察的流(稍后将链接到vertx ):

代码语言:javascript
复制
public class TwitterAPI {
    public Observable<Status> getTwitterObservable() {
        return Observable.create(emitter -> {
            final TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
            twitterStream.addListener(new StatusListener(){
                public void onStatus(Status status) {
                    emitter.onNext(status);
                }
                public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
                public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
                public void onException(Exception ex) {
                    emitter.onError(ex);
                }
                @Override
                public void onScrubGeo(long userId, long upToStatusId) {}

                @Override
                public void onStallWarning(StallWarning warning) {}
            });

            //twitterStream.filter("some keyword");
            twitterStream.sample();
        });
    }
}

然后,我创建了一个TwitterVerticle,它将收听上述可观察到的内容,并在事件总线上发布流,以便其他一些眩晕人可以订阅它并处理它:

代码语言:javascript
复制
public class TwitterVerticle extends AbstractVerticle {
    public void start() {
        EventBus eb = this.vertx.eventBus();
        TwitterAPI twitterAPI = new TwitterAPI();
        twitterAPI.getTwitterObservable()
            .map(Status::getText)
            .filter(text -> text.startsWith("my keyword"))
            .subscribe(text -> {
                eb.publish("tweet-feed", text);
            });
    }
}

例如,我创建了另一个垂直线,它将在事件总线上侦听"twitter- feed“并将其发布到WebSocket上,这样您就可以在浏览器中看到提要。

第一个look...but...my的主要问题是:我不确定twitter4j是否会很好地处理事件循环,也许我的集成技术是错误的。也许我应该让TwitterVerticle成为一个工人?

谁能看一看,让我知道这是否是完成这项任务的最佳方式?

非常感谢!

编辑

在这种情况下,在事件总线上直接发布是一个更好的模式吗?以下是修改后的代码:

代码语言:javascript
复制
public class TwitterAPI {

    public void publishStreamOnEventBus(Vertx vertx) {
        EventBus eb = vertx.eventBus();
        TwitterStream twitterStream = new TwitterStreamFactory().getInstance();

        twitterStream.addListener(new StatusListener(){
            public void onStatus(Status status) {
                eb.publish("tweet-feed", status.getText());
            }
            public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
            public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
            public void onException(Exception ex) {
                //emitter.onError(ex);
            }
            @Override
            public void onScrubGeo(long userId, long upToStatusId) {}

            @Override
            public void onStallWarning(StallWarning warning) {}
        });

        twitterStream.sample();
    }
}



public class Main {

    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        new TwitterAPI().publishStreamOnEventBus(vertx);

        //vertx.deployVerticle(new TwitterVerticle()/*, new DeploymentOptions().setWorker(true)*/);
        vertx.deployVerticle(new WebServerVerticle());
    }

}
EN

回答 1

Stack Overflow用户

发布于 2017-03-31 12:31:58

首先,提出几项一般性建议。

1/如果订阅Observable涉及调用阻塞API,请使用blockingScheduler

代码语言:javascript
复制
Scheduler blockingScheduler = io.vertx.rx.java.RxHelper.blockingScheduler(vertx);
Observable<String> obs = twitterObservable.subscribeOn(blockingScheduler);

2.然后我假设twitter4j使用自己的线程来调用StatusListener,因此将对其中一个线程执行EventBus#publish调用。要返回到垂直上下文,请使用#observeOn操作符:

代码语言:javascript
复制
Scheduler contextScheduler = io.vertx.rx.java.RxHelper.scheduler(context);
Observable<String> obs = twitterObservable.observeOn(contextScheduler);

将这两项变化结合起来:

代码语言:javascript
复制
public class TwitterVerticle extends AbstractVerticle {
    public void start() {
        EventBus eb = this.vertx.eventBus();
        Scheduler contextScheduler = io.vertx.rx.java.RxHelper.scheduler(context);
        Scheduler blockingScheduler = io.vertx.rx.java.RxHelper.blockingScheduler(vertx);
        TwitterAPI twitterAPI = new TwitterAPI();
        twitterAPI.getTwitterObservable()
            .map(Status::getText)
            .filter(text -> text.startsWith("my keyword"))
            .observeOn(contextScheduler)
            .subscribeOn(blockingScheduler)
            .subscribe(text -> {
                eb.publish("tweet-feed", text);
            });
    }
}

所有这一切,如果这是唯一的工作,我建议删除它,并简单地发布到事件总线。事件总线实例是线程安全的,从外部(非Vert.x)调用publish是非常好的。实际上,这是将Vert.x代码与遗留代码结合在一起的好模式。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/43139884

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档