使用java OrbitzWorld领事客户端,我试图通过acquireLock方法同步java应用程序的多个实例。
到目前为止我的代码:
注册应用为领事服务:
private void registerService(Config config) {
String serviceId = config.getService().getId();
String serviceName = config.getService().getName();
long ttl = config.getService().getTtl();
AgentClient agentClient = client.agentClient();
Registration service = ImmutableRegistration.builder()
.id(serviceId)
.name(serviceName)
.check(Registration.RegCheck.ttl(ttl))
.build();
agentClient.register(service);
new HeartBeater(agentClient, serviceId, ttl).start();
}HeartBeater:
@Override
public void run() {
while(true) {
try {
client.pass(serviceId);
Thread.sleep((Math.max(ttl / 2, 1)));
} catch (NotRegisteredException | InterruptedException e) {}
}
}上述代码在领事处成功刷新。
现在我想知道锁定的实现。
到目前为止,我写的是:
public boolean amILeader() {
// return if current java app is leader
}
private String createSession() {
final Session session = ImmutableSession.builder().name(config.getService().getName()).build();
return client.sessionClient().createSession(session).getId();
}
private void watchLeaderLockStateChange() {
KeyValueClient keyValueClient = client.keyValueClient();
KVCache kvCache = KVCache.newCache(keyValueClient, Constants.LEADER_LOCK_KEY, config.getService().getWatchKey());
kvCache.addListener(map -> {
Value value = map.get(Constants.LEADER_LOCK_KEY);
if(!value.getSession().isPresent()) {
keyValueClient.acquireLock(Constants.LEADER_LOCK_KEY, ???); //create new session here ???
}
});
kvCache.start();
}我被困在这里,因为我不理解这个理论,也没有在文档中找到任何有用的东西。
我的问题:
acquireLock方法进行同步所必需的会话吗?如果是的话,应该何时以及如何通过acquireLock方法创建/同步会话?会话失效是常见的事情吗?根据文档,如果其中一个服务无法发送ttl,这通常会发生。。
您能提供一些代码示例或填写我的实现吗?谢谢您的答复:]
发布于 2020-02-21 16:58:24
我想我现在明白了。
理论是这样的:
public class SessionFacade {
private String leaderLock;
private String sessionId;
private Consul client;
private Config config;
public SessionFacade(Consul client, Config config) {
this.client = client;
this.config = config;
this.leaderLock = "service/" + config.getService().getName() + "/leader";
this.sessionId = createSession();
new SessionHeartBeater(client, sessionId, config.getService().getSessionTtl()).start();
watchLeaderLockStateChange(sessionId);
client.keyValueClient().acquireLock(leaderLock, sessionId);
}
public boolean doIPossesLeaderLock() {
Optional<Value> leaderValue = client.keyValueClient().getValue(leaderLock);
if(leaderValue.isPresent()) {
Optional<String> session = leaderValue.get().getSession();
return session.isPresent() && session.get().equals(sessionId);
}
return false;
}
private String createSession() {
int sessionTtl = config.getService().getSessionTtl();
final Session session = ImmutableSession.builder()
.name(config.getService().getName())
.ttl(sessionTtl + "s")
.build();
return client.sessionClient().createSession(session).getId();
}
private void watchLeaderLockStateChange(String sessionId) {
KeyValueClient keyValueClient = client.keyValueClient();
KVCache kvCache = KVCache.newCache(keyValueClient, leaderLock, config.getService().getWatchLockEach());
kvCache.addListener(map -> {
Value value = map.get(leaderLock);
if(!value.getSession().isPresent()) {
keyValueClient.acquireLock(leaderLock, sessionId);
}
});
kvCache.start();
}
}请注意,由于我还没有对代码进行全面测试,所以代码可能是错误的。
发布于 2020-02-23 07:35:17
你读过https://learn.hashicorp.com/consul/developer-configuration/elections了吗?它在一个应用程序的层次上,使用领事进行领导人选举,完成了这个场景。
https://stackoverflow.com/questions/60340310
复制相似问题