我正在尝试将twitter4j与vertx事件循环集成在一起,我不知道我这样做是否正确。虽然我是一个Node.js开发人员,但我对vertx还是新手,所以我熟悉事件循环/单线程概念。
在我的测试中,我想订阅twitter流并在vertx事件总线上发布该流。
我已经创建了一个不可知论的TwitterAPI类,它将把twitter流转换为一个可观察的流(稍后将链接到vertx ):
public class TwitterAPI {
public Observable<Status> getTwitterObservable() {
return Observable.create(emitter -> {
final TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
twitterStream.addListener(new StatusListener(){
public void onStatus(Status status) {
emitter.onNext(status);
}
public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
public void onException(Exception ex) {
emitter.onError(ex);
}
@Override
public void onScrubGeo(long userId, long upToStatusId) {}
@Override
public void onStallWarning(StallWarning warning) {}
});
//twitterStream.filter("some keyword");
twitterStream.sample();
});
}
}然后,我创建了一个TwitterVerticle,它将收听上述可观察到的内容,并在事件总线上发布流,以便其他一些眩晕人可以订阅它并处理它:
public class TwitterVerticle extends AbstractVerticle {
public void start() {
EventBus eb = this.vertx.eventBus();
TwitterAPI twitterAPI = new TwitterAPI();
twitterAPI.getTwitterObservable()
.map(Status::getText)
.filter(text -> text.startsWith("my keyword"))
.subscribe(text -> {
eb.publish("tweet-feed", text);
});
}
}例如,我创建了另一个垂直线,它将在事件总线上侦听"twitter- feed“并将其发布到WebSocket上,这样您就可以在浏览器中看到提要。
第一个look...but...my的主要问题是:我不确定twitter4j是否会很好地处理事件循环,也许我的集成技术是错误的。也许我应该让TwitterVerticle成为一个工人?
谁能看一看,让我知道这是否是完成这项任务的最佳方式?
非常感谢!
编辑
在这种情况下,在事件总线上直接发布是一个更好的模式吗?以下是修改后的代码:
public class TwitterAPI {
public void publishStreamOnEventBus(Vertx vertx) {
EventBus eb = vertx.eventBus();
TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
twitterStream.addListener(new StatusListener(){
public void onStatus(Status status) {
eb.publish("tweet-feed", status.getText());
}
public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
public void onException(Exception ex) {
//emitter.onError(ex);
}
@Override
public void onScrubGeo(long userId, long upToStatusId) {}
@Override
public void onStallWarning(StallWarning warning) {}
});
twitterStream.sample();
}
}
public class Main {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
new TwitterAPI().publishStreamOnEventBus(vertx);
//vertx.deployVerticle(new TwitterVerticle()/*, new DeploymentOptions().setWorker(true)*/);
vertx.deployVerticle(new WebServerVerticle());
}
}发布于 2017-03-31 12:31:58
首先,提出几项一般性建议。
1/如果订阅Observable涉及调用阻塞API,请使用blockingScheduler
Scheduler blockingScheduler = io.vertx.rx.java.RxHelper.blockingScheduler(vertx);
Observable<String> obs = twitterObservable.subscribeOn(blockingScheduler);2.然后我假设twitter4j使用自己的线程来调用StatusListener,因此将对其中一个线程执行EventBus#publish调用。要返回到垂直上下文,请使用#observeOn操作符:
Scheduler contextScheduler = io.vertx.rx.java.RxHelper.scheduler(context);
Observable<String> obs = twitterObservable.observeOn(contextScheduler);将这两项变化结合起来:
public class TwitterVerticle extends AbstractVerticle {
public void start() {
EventBus eb = this.vertx.eventBus();
Scheduler contextScheduler = io.vertx.rx.java.RxHelper.scheduler(context);
Scheduler blockingScheduler = io.vertx.rx.java.RxHelper.blockingScheduler(vertx);
TwitterAPI twitterAPI = new TwitterAPI();
twitterAPI.getTwitterObservable()
.map(Status::getText)
.filter(text -> text.startsWith("my keyword"))
.observeOn(contextScheduler)
.subscribeOn(blockingScheduler)
.subscribe(text -> {
eb.publish("tweet-feed", text);
});
}
}所有这一切,如果这是唯一的工作,我建议删除它,并简单地发布到事件总线。事件总线实例是线程安全的,从外部(非Vert.x)调用publish是非常好的。实际上,这是将Vert.x代码与遗留代码结合在一起的好模式。
https://stackoverflow.com/questions/43139884
复制相似问题