我目前正在使用Apache将共享资源(数据库中的一行)的锁定外部化。总之,我正在运行两个服务实例(使用Spring ),让我们调用这个服务A,然后调用部署在不同区域的实例A1和A2。我锁定表示文件的共享数据库上表的id (主键)。
在服务A的代码中,我创建了一个单例(BaseLockService),它处理项目中的所有锁定。这也意味着对于2个正在运行的实例,每个实例都包含一个用于处理锁定的单例。我使用的配方是共享重入锁,它使用的是InterProcessMutex类,但是从来没有重入锁的情况。它的描述最接近我的需要。
运行的主要进程是@Scheduled进程,在执行时间之间有30秒的延迟。此外,我还为ThreadPoolTaskScheduler创建了一个bean,它将一个UUID附加到线程名中,池大小为1。这个UUID的原因是,如果没有它,当A1和A2同时运行时,它们都包含一个名为“任务-调度程序-1”的线程。这最初导致了锁定问题,因为A1可能拥有锁,同时在处理文件时,A2请求锁,并且由于它们共享相同的名称,Curator在lock.acquire()上返回true,因此有两个拥有相同锁的实例。
当运行一个实例时,这不是一个问题。我看到在ZooKeeper中正在创建ZNodes,我看到了管理器为临时锁生成的UUID。当运行两个或多个实例时,进程有时会进入A1拥有锁的争用状态,然后运行一个冗长的进程。然后A2以某种方式获得锁,快速完成进程并释放锁。然后,当A1完成并尝试解锁时,我会得到以下异常:
[2019-07-09 21:53:54,485] ERROR [08c598b9-7254-408c-8ed2-0e5849ca2b19_task-scheduler-1] c.m.c.myApp.lock.BaseLockService.unlock - Can't unlock lock #com.myApp.lock.BaseLockService$LockableHandle@4ca8ddab
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /myapp/lock/files/1376112
at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:873)
at org.apache.curator.framework.imps.DeleteBuilderImpl$5.call(DeleteBuilderImpl.java:274)
at org.apache.curator.framework.imps.DeleteBuilderImpl$5.call(DeleteBuilderImpl.java:268)
at org.apache.curator.connection.StandardConnectionHandlingPolicy.callWithRetry(StandardConnectionHandlingPolicy.java:64)
at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:100)
at org.apache.curator.framework.imps.DeleteBuilderImpl.pathInForeground(DeleteBuilderImpl.java:265)
at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:249)
at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:34)
at com.myApp.lock.BaseLockService.unlock(BaseLockService.java:174)
at com.myApp.lock.BaseLockService.lambda$unlockAllIDs$0(BaseLockService.java:143)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at com.myApp.lock.BaseLockService.unlockAllIDs(BaseLockService.java:139)下面是我的单元测试,以复制这种情况:
@Test
public void baseLockTest() {
List<Lockable> filesToProcess = new ArrayList<>();
//For now only 1 to limit complexity
Lockable fileToLock = FileSource.builder()
.id(1)
.build();
filesToProcess.add(fileToLock);
Runnable task = () -> {
log.info("ATTEMPT LOCK");
Set<BaseLockService.LockableHandle> lockedBatch = lockService.lockBatch(filesToProcess, 1);
if (!lockedBatch.isEmpty()) {
try {
log.info("ATTEMPT FAKE PROCESS TIME SLEEP 100 MS");
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("ATTEMPT UNLOCK");
lockService.unlockAll(lockedBatch);
}
};
System.out.println("**********************************************************");
//Simulate two Service instances of 1 thread
int totalThreads = 2;
ExecutorService executorService = Executors.newFixedThreadPool(totalThreads);
List<Future> locksProcessed = new ArrayList<>(totalThreads);
for (int i = 0; i < 1000; i++) {
locksProcessed.add(executorService.submit(task));
}
Future f;
while(!locksProcessed.isEmpty()){
Iterator<Future> iterator = locksProcessed.iterator();
while(iterator.hasNext()){
f = iterator.next();
if(f.isDone()){
iterator.remove();
}
}
}
System.out.println("ALL DONE!!!");
}下面是BaseLockService中的锁和解锁方法:
public Set<LockableHandle> lockBatch(final List<Lockable> desiredLock, final int batchSize) {
Set<LockableHandle> effectivelyLocked = new HashSet<>();
Iterator<Lockable> desiredLockIterator = desiredLock.iterator();
while ((desiredLockIterator.hasNext()) && (effectivelyLocked.size() <= batchSize)) {
Lockable toLock = desiredLockIterator.next();
String lockPath = ZKPaths.makePath(getLockPath(), String.valueOf(toLock.getId()));
InterProcessMutex lock = createMutex(lockPath);
try {
if (lock.acquire(0, TimeUnit.SECONDS)) {
LockableHandle handle = new LockableHandle(toLock, lock);
effectivelyLocked.add(handle);
locks.put(handle.getId(), handle);
} else {
log.warn(String.format("Object was not locked. Object id is %d, lock path is %s.",
toLock.getId(),
lockPath));
}
} catch (Exception e) {
log.error("Cannot lock path " + lockPath, e);
}
}
log.info(String.format("%d object(s) were requested to lock. %d were effectively locked.",
desiredLock.size(),
effectivelyLocked.size()));
return effectivelyLocked;
}
public void unlock(final LockableHandle lockHandle) {
boolean success = false;
try {
InterProcessMutex lock = lockHandle.getMutex();
if (lock != null) {
lock.release();
client.delete()
.deletingChildrenIfNeeded()
.forPath(ZKPaths.makePath(getLockPath(), String.valueOf(lockHandle.getId())));
success = true;
}
} catch (Exception e) {
log.error("Can't unlock lock #" + lockHandle, e);
} finally {
locks.remove(lockHandle.getId());
}
log.info(String.format("The lock #%d was requested to be unlocked. Success = %b",
lockHandle.getId(),
success));
}这是在实例化服务之后调用的init()方法:
public void init() {
log.info("Stating initialization of the Lock Service");
locks = new HashMap<>();
client = createClient();
client.start();
try {
client.blockUntilConnected();
if (client.isZk34CompatibilityMode()) {
log.info("The Curator Framework is running in ZooKeeper 3.4 compatibility mode.");
}
} catch (InterruptedException ie) {
log.error("Cannot connect to ZooKeeper.", ie);
}
log.info("Completed initialization of the Lock Service");
}我不知道缺了什么,但都没有选择了。谢谢您的评论/建议
发布于 2019-07-16 04:01:26
我在您发送的锁定问题示例中发现了一些问题。它可能是这些特殊的例子,但如果这些也在您的代码中,它将解释您正在看到的问题。
locks字段在BaseLockService中应该是ConcurrentHashMapBaseLockService#unlock试图通过调用client.delete()...来清理锁路径。这行不通。在这类代码中有一个固有的竞争,这就是为什么Curator有"Reaper“类,以及为什么我将容器节点推入ZooKeep3.5.x中。注意,生成NoNode异常的是这一行代码,而不是策展人锁代码。我建议你摆脱这段代码,不要担心,也不要去动物园管理员3.5.x。BaseLockService应该继续重新创建InterProcessMutex。它应该保存一张他们的地图什么的。当我应用上面的1-3次测试时,测试成功地通过了(我多次尝试)。我打开了一个包含3个更改的关于您的测试项目的PR。
https://stackoverflow.com/questions/57014270
复制相似问题