比方说,一个Veritcle中的SomeOtherService在不同的垂直方向使用UserService,通过事件总线进行通信。为了表示它:
class SomeOtherService {
final UserService userService = new UserService();
// Mutable state
final Map<String, Single<String>> cache = new HashMap(); // Not Synchronized ?
public Single<String> getUserSessionInfo(String id) {
// Seems it is not save ! :
return cache.computeIfAbsent(id, _id -> {
log("could not find " + id + " in cache. Connecting to userService...");
return userService.getUserSessionInfo(id); // uses generated proxy to send msg to the event bus to call it
}
);
}
}//在另一台机器上的另一垂直/微服务中的某处。
class UserService {
public Single<String> getUserSessionInfo(String id) {
return Single.fromCallable( () -> {
waitForOneSecond();
log("getUserSessionInfo for " + id);
if (id.equals("1"))
return "one";
if (id.equals("2"))
return "two";
else throw new Exception("could not"); // is it legal?
}
);
}以及客户端代码,我们在其中订阅并决定调度程序:
final Observable<String> obs2 = Observable.from(new String[] {"1", "1"});
// Emulating sequential call of 'getUserSessionInfo' to fork in separate scheduler A
obs.flatMap(id -> {
log("flatMap"); // on main thread
return someOtherService.getUserSessionInfo(id)
.subscribeOn(schedulerA) // Forking. will thread starvation happen? (since we have only 10 threads in the pool)
.toObservable();
}
).subscribe(
x -> log("next: " + x)
);问题是,通过使用computeIfAbsent方法将HashMap用于缓存(因为它在这里是共享状态)的解决方案有多好?
即使我们使用事件循环和事件总线,它也不会使我们避免共享状态和可能的并发问题,假设日志操作(像getUserSessionInfo(id)发生在单独的调度器/线程中?
我应该使用ReplySubject来实现缓存吗?vert.x + rx-java的最佳实践是什么?
看起来就像在EventLoop上运行cache.computeIfAbsent一样,它是安全的,因为它是顺序的?
对不起..。有很多问题,我想我可以缩小到:在Vert.x和Rx-Java中实现Cash for the Service calls的最佳实践是什么?
整个示例都是here
发布于 2017-08-19 03:56:24
我想我在这里找到了答案:http://blog.danlew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/ -
Observable<Data> source = Observable .concat(memory, diskWithCache, networkWithSave) .first(); 当我使用map.put(..)保存它时显式地而不是使用computeIfAbsent
作为日志,我在事件循环中,我可以安全地使用非同步现金地图
https://stackoverflow.com/questions/45762631
复制相似问题