首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache与多个服务的双重锁定问题

Apache与多个服务的双重锁定问题
EN

Stack Overflow用户
提问于 2019-07-12 21:36:13
回答 1查看 961关注 0票数 3

我目前正在使用Apache将共享资源(数据库中的一行)的锁定外部化。总之,我正在运行两个服务实例(使用Spring ),让我们调用这个服务A,然后调用部署在不同区域的实例A1和A2。我锁定表示文件的共享数据库上表的id (主键)。

在服务A的代码中,我创建了一个单例(BaseLockService),它处理项目中的所有锁定。这也意味着对于2个正在运行的实例,每个实例都包含一个用于处理锁定的单例。我使用的配方是共享重入锁,它使用的是InterProcessMutex类,但是从来没有重入锁的情况。它的描述最接近我的需要。

运行的主要进程是@Scheduled进程,在执行时间之间有30秒的延迟。此外,我还为ThreadPoolTaskScheduler创建了一个bean,它将一个UUID附加到线程名中,池大小为1。这个UUID的原因是,如果没有它,当A1和A2同时运行时,它们都包含一个名为“任务-调度程序-1”的线程。这最初导致了锁定问题,因为A1可能拥有锁,同时在处理文件时,A2请求锁,并且由于它们共享相同的名称,Curator在lock.acquire()上返回true,因此有两个拥有相同锁的实例。

当运行一个实例时,这不是一个问题。我看到在ZooKeeper中正在创建ZNodes,我看到了管理器为临时锁生成的UUID。当运行两个或多个实例时,进程有时会进入A1拥有锁的争用状态,然后运行一个冗长的进程。然后A2以某种方式获得锁,快速完成进程并释放锁。然后,当A1完成并尝试解锁时,我会得到以下异常:

代码语言:javascript
复制
[2019-07-09 21:53:54,485] ERROR [08c598b9-7254-408c-8ed2-0e5849ca2b19_task-scheduler-1] c.m.c.myApp.lock.BaseLockService.unlock - Can't unlock lock #com.myApp.lock.BaseLockService$LockableHandle@4ca8ddab
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /myapp/lock/files/1376112
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
    at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:873)
    at org.apache.curator.framework.imps.DeleteBuilderImpl$5.call(DeleteBuilderImpl.java:274)
    at org.apache.curator.framework.imps.DeleteBuilderImpl$5.call(DeleteBuilderImpl.java:268)
    at org.apache.curator.connection.StandardConnectionHandlingPolicy.callWithRetry(StandardConnectionHandlingPolicy.java:64)
    at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:100)
    at org.apache.curator.framework.imps.DeleteBuilderImpl.pathInForeground(DeleteBuilderImpl.java:265)
    at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:249)
    at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:34)
    at com.myApp.lock.BaseLockService.unlock(BaseLockService.java:174)
    at com.myApp.lock.BaseLockService.lambda$unlockAllIDs$0(BaseLockService.java:143)
    at java.util.ArrayList.forEach(ArrayList.java:1257)
    at com.myApp.lock.BaseLockService.unlockAllIDs(BaseLockService.java:139)

下面是我的单元测试,以复制这种情况:

代码语言:javascript
复制
@Test
public void baseLockTest() {
    List<Lockable> filesToProcess = new ArrayList<>();

    //For now only 1 to limit complexity
    Lockable fileToLock = FileSource.builder()
            .id(1)
            .build();

    filesToProcess.add(fileToLock);

    Runnable task = () -> {
        log.info("ATTEMPT LOCK");
        Set<BaseLockService.LockableHandle> lockedBatch = lockService.lockBatch(filesToProcess, 1);

        if (!lockedBatch.isEmpty()) {

            try {
                log.info("ATTEMPT FAKE PROCESS TIME SLEEP 100 MS");
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            log.info("ATTEMPT UNLOCK");
                lockService.unlockAll(lockedBatch);
        }
    };

    System.out.println("**********************************************************");

    //Simulate two Service instances of 1 thread
    int totalThreads = 2;
    ExecutorService executorService = Executors.newFixedThreadPool(totalThreads);

    List<Future> locksProcessed = new ArrayList<>(totalThreads);
    for (int i = 0; i < 1000; i++) {
        locksProcessed.add(executorService.submit(task));
    }

    Future f;
    while(!locksProcessed.isEmpty()){
        Iterator<Future> iterator = locksProcessed.iterator();
        while(iterator.hasNext()){
            f = iterator.next();
            if(f.isDone()){
                iterator.remove();
            }
        }

    }

    System.out.println("ALL DONE!!!");
}

下面是BaseLockService中的锁和解锁方法:

代码语言:javascript
复制
    public Set<LockableHandle> lockBatch(final List<Lockable> desiredLock, final int batchSize) {
    Set<LockableHandle> effectivelyLocked = new HashSet<>();
    Iterator<Lockable> desiredLockIterator = desiredLock.iterator();

    while ((desiredLockIterator.hasNext()) && (effectivelyLocked.size() <= batchSize)) {
        Lockable toLock = desiredLockIterator.next();
        String lockPath = ZKPaths.makePath(getLockPath(), String.valueOf(toLock.getId()));
        InterProcessMutex lock = createMutex(lockPath);

        try {
            if (lock.acquire(0, TimeUnit.SECONDS)) {
                LockableHandle handle = new LockableHandle(toLock, lock);
                effectivelyLocked.add(handle);
                locks.put(handle.getId(), handle);
            } else {
                log.warn(String.format("Object was not locked. Object id is %d, lock path is %s.",
                        toLock.getId(),
                        lockPath));
            }
        } catch (Exception e) {
            log.error("Cannot lock path " + lockPath, e);
        }
    }

    log.info(String.format("%d object(s) were requested to lock. %d were effectively locked.",
            desiredLock.size(),
            effectivelyLocked.size()));

    return effectivelyLocked;
}

    public void unlock(final LockableHandle lockHandle) {
    boolean success = false;

    try {
        InterProcessMutex lock = lockHandle.getMutex();
        if (lock != null) {
            lock.release();
            client.delete()
                    .deletingChildrenIfNeeded()
                    .forPath(ZKPaths.makePath(getLockPath(), String.valueOf(lockHandle.getId())));
            success = true;
        }
    } catch (Exception e) {
        log.error("Can't unlock lock #" + lockHandle, e);
    } finally {
        locks.remove(lockHandle.getId());
    }

    log.info(String.format("The lock #%d was requested to be unlocked. Success = %b",
            lockHandle.getId(),
            success));
}

这是在实例化服务之后调用的init()方法:

代码语言:javascript
复制
    public void init() {
    log.info("Stating initialization of the Lock Service");
    locks = new HashMap<>();
    client = createClient();
    client.start();

    try {
        client.blockUntilConnected();
        if (client.isZk34CompatibilityMode()) {
            log.info("The Curator Framework is running in ZooKeeper 3.4 compatibility mode.");
        }
    } catch (InterruptedException ie) {
        log.error("Cannot connect to ZooKeeper.", ie);
    }

    log.info("Completed initialization of the Lock Service");
}
  • 我检查了连接问题,这不是问题。
  • 日志中没有重新连接、丢失、挂起的消息。
  • 锁的超时不是问题,因为除非会话/连接死了,否则ZooKeeper不会过期任何锁。
  • 我试过其他的策展人食谱,但它们并不能满足我的需要。而且,无论如何,它们也会抛出类似的异常。
  • Apache的版本是4.2.0,ZooKeeper是3.4.X

我不知道缺了什么,但都没有选择了。谢谢您的评论/建议

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-07-16 04:01:26

我在您发送的锁定问题示例中发现了一些问题。它可能是这些特殊的例子,但如果这些也在您的代码中,它将解释您正在看到的问题。

  1. Maven POM被错误地指定。策展人需要知道它是在ZK3.4.x兼容模式下--这样做的方法是在此描述。TL;DR将动物园管理员排除在馆长依赖项之外,并将直接依赖添加到ZooKeep3.4.x中。
  2. locks字段在BaseLockService中应该是ConcurrentHashMap
  3. BaseLockService#unlock试图通过调用client.delete()...来清理锁路径。这行不通。在这类代码中有一个固有的竞争,这就是为什么Curator有"Reaper“类,以及为什么我将容器节点推入ZooKeep3.5.x中。注意,生成NoNode异常的是这一行代码,而不是策展人锁代码。我建议你摆脱这段代码,不要担心,也不要去动物园管理员3.5.x。
  4. 我不认为BaseLockService应该继续重新创建InterProcessMutex。它应该保存一张他们的地图什么的。

当我应用上面的1-3次测试时,测试成功地通过了(我多次尝试)。我打开了一个包含3个更改的关于您的测试项目的PR

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57014270

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档