首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >反应性编程-在集群中运行作业

反应性编程-在集群中运行作业
EN

Stack Overflow用户
提问于 2019-01-26 09:25:47
回答 1查看 194关注 0票数 0

我需要在集群中运行一些作业,一次只能运行一个作业。因为我的团队使用Hazelcast,所以我最终得到了一个基于Hazelcast ILock实现的解决方案。为了这个问题的目的,我将对此作一个概括。让我们假设我们有以下接口(这些接口可以很容易地实现,例如,由Hazelcast或Reddison (Redis)实现):

代码语言:javascript
复制
public interface MyDistributedLock {

    boolean lock();

    void unlock();

    boolean isLockedByCurrentThread();
}

public interface MyLockDistributedFactory {

    MyDistributedLock getLock(String name);

}

如果无法获得锁,则方法等待:

代码语言:javascript
复制
private Mono<Void> lock(String name, Publisher<?> publisher, MyLockDistributedFactory myLockFactory) {
    // important to release lock on the same thread as
    // it was aquired    
    Scheduler scheduler = Schedulers.newSingle(name.toLowerCase());

    return Mono.defer(() -> Mono.just(myLockFactory.getLock(name)))
        publishOn(scheduler)
         .doOnNext(MyDistributedLock::lock)
         .doOnNext(lock -> LOGGER.info("Process acquired lock for resource {}", name))
         .flatMapMany(lock -> Flux.from(publisher))
         .publishOn(scheduler) 
         .doFinally(signalType -> {
              MyDistributedLock lock = myLockFactory.getLock(name);
              if (signalType == SignalType.CANCEL) {
                 // cancel ignores publishOn 
                 scheduler.schedule(() -> { 
                    lock.unlock();
                    LOGGER.info("Process released lock for resource {} due to signal type {}", name, signalType);
                 });
              } else if (lock.isLockedByCurrentThread()) {
                 lock.unlock();
                 LOGGER.info("Process released lock for resource {} due to signal type {}", name, signalType);
              }
          })
          .then();
}

和一些工作的例子

代码语言:javascript
复制
private Mono<Void> someJobRunEveryOneHourOnEveryNodeInCluster() {
    MyLockDistributedFactory hazelcast = ...;
    return lock("some-job", Flux.just(1,2), hazelcast)
                .repeatWhen(afterOneHour());
}

我不知道这是一种使用项目反应堆的好方法(以及正确的实现),还是应该以另一种方式进行。请指点。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-01-28 15:54:54

当使用反应堆时,这是一种正确的方法,因为您负责将阻塞部分抵消到专用的Scheduler/Thread中。

但我要说的是,这样的互斥代码一般不太适合于反应式编程:您失去了使用较少的线程进行更多操作的关键好处之一,如果忘记使用专用线程publishOn,则可能会阻塞应用程序的其他部分,等等……

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54377145

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档