我在AWS EMR中使用在Apache Flink上运行的AWS Keyspace (Cassandra 3.11.2)。下面的某个时间查询抛出异常。在AWS Lambda上使用的相同代码也具有相同的异常NoHost。我做错什么了?
String query = "INSERT INTO TEST (field1, field2) VALUES(?, ?)";
PreparedStatement prepared = CassandraConnector.prepare(query);
int i = 0;
BoundStatement bound = prepared.bind().setString(i++, "Field1").setString(i++, "Field2")
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
ResultSet rs = CassandraConnector.execute(bound); at com.datastax.oss.driver.api.core.NoNodeAvailableException.copy(NoNodeAvailableException.java:40)
at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:149)
at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:53)
at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:30)
at com.datastax.oss.driver.internal.core.session.DefaultSession.execute(DefaultSession.java:230)
at com.datastax.oss.driver.api.core.cql.SyncCqlSession.execute(SyncCqlSession.java:53)
at com.test.manager.connectors.CassandraConnector.execute(CassandraConnector.java:16)
at com.test.repository.impl.BackupRepositoryImpl.insert(BackupRepositoryImpl.java:36)
at com.test.service.impl.BackupServiceImpl.insert(BackupServiceImpl.java:18)
at com.test.flink.function.AsyncBackupFunction.processMessage(AsyncBackupFunction.java:78)
at com.test.flink.function.AsyncBackupFunction.lambda$asyncInvoke$0(AsyncBackupFunction.java:35)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)这是我的代码:
CassandraConnector.java:因为初始化preparedStatement的开销很大,所以我把它缓存起来了。
public class CassandraConnector {
private static final ConcurrentHashMap<String, PreparedStatement> preparedStatementCache = new ConcurrentHashMap<String, PreparedStatement>();
public static ResultSet execute(BoundStatement bound) {
CqlSession session = CassandraManager.getSessionInstance();
return session.execute(bound);
}
public static ResultSet execute(String query) {
CqlSession session = CassandraManager.getSessionInstance();
return session.execute(query);
}
public static PreparedStatement prepare(String query) {
PreparedStatement result = preparedStatementCache.get(query);
if (result == null) {
CqlSession session = CassandraManager.getSessionInstance();
result = session.prepare(query);
preparedStatementCache.putIfAbsent(query, result);
}
return result;
}
}CassandraManager.java:我正在对session对象使用单例双重检查锁定。
public class CassandraManager {
private static final Logger logger = LoggerFactory.getLogger(CassandraManager.class);
private static final String SSL_CASSANDRA_PASSWORD = "password";
private static volatile CqlSession session;
static {
try {
initSession();
} catch (Exception e) {
logger.error("Error CassandraManager getSessionInstance", e);
}
}
private static void initSession() {
List<InetSocketAddress> contactPoints = Collections.singletonList(InetSocketAddress.createUnresolved(
"cassandra.ap-southeast-1.amazonaws.com", 9142));
DriverConfigLoader loader = DriverConfigLoader.fromClasspath("application.conf");
Long start = BaseHelper.getTime();
session = CqlSession.builder().addContactPoints(contactPoints).withConfigLoader(loader)
.withAuthCredentials(AppUtil.getProperty("cassandra.username"),
AppUtil.getProperty("cassandra.password"))
.withSslContext(getSSLContext()).withLocalDatacenter("ap-southeast-1")
.withKeyspace(AppUtil.getProperty("cassandra.keyspace")).build();
logger.info("End connect: " + (new Date().getTime() - start));
}
public static CqlSession getSessionInstance() {
if (session == null || session.isClosed()) {
synchronized (CassandraManager.class) {
if (session == null || session.isClosed()) {
initSession();
}
}
}
return session;
}
public static SSLContext getSSLContext() {
InputStream in = null;
try {
KeyStore ks = KeyStore.getInstance("JKS");
in = CassandraManager.class.getClassLoader().getResourceAsStream("cassandra_truststore.jks");
ks.load(in, SSL_CASSANDRA_PASSWORD.toCharArray());
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(ks);
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(null, tmf.getTrustManagers(), null);
return ctx;
} catch (Exception e) {
logger.error("Error CassandraConnector getSSLContext", e);
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.error("", e);
}
}
}
return null;
}
}application.conf
datastax-java-driver {
basic.request {
timeout = 5 seconds
consistency = LOCAL_ONE
}
advanced.connection {
max-requests-per-connection = 1024
pool {
local.size = 1
remote.size = 1
}
}
advanced.reconnect-on-init = true
advanced.reconnection-policy {
class = ExponentialReconnectionPolicy
base-delay = 1 second
max-delay = 60 seconds
}
advanced.retry-policy {
class = DefaultRetryPolicy
}
advanced.protocol {
version = V4
}
advanced.heartbeat {
interval = 30 seconds
timeout = 1 second
}
advanced.session-leak.threshold = 8
advanced.metadata.token-map.enabled = false
}发布于 2020-09-22 14:27:09
在以下两种情况下,驱动程序将报告NoNodeAvailableException
如果一些插入正在工作,但最终运行到NoNodeAvailableException,这对我来说表明节点正在过载,最终变得没有响应,因此驱动程序不再选择协调器,因为它们都被标记为“关闭”。
如果所有请求都不起作用,这意味着接触点无法到达或无法解析,因此驱动程序无法连接到集群。干杯!
发布于 2021-10-28 05:51:22
NoHostAvailableException是开放源码驱动程序在重试可用主机后抛出的客户端异常。开源驱动程序封装了重试的根本原因,这可能会让人感到困惑。
我建议首先通过设置这些CloudWatch指标来提高可观察性。你可以按照这个预构建CloudFormation模板开始,只需要几秒钟的时间。
以下是使用Cloud Watch的亚马逊密钥空间的密钥空间和表度量的设置:https://github.com/aws-samples/amazon-keyspaces-cloudwatch-cloudformation-templates
您还可以使用此帮助程序项目中的以下示例替换重试策略。此项目中的重试策略将尝试或抛出原始异常,这将删除NoHostAvailableException的出现,这将为您的应用程序提供更好的透明度。下面是Github repo的相似之处:https://github.com/aws-samples/amazon-keyspaces-java-driver-helpers
如果您正在使用私有VPC端点,则需要添加以下权限以在system.peers表中启用更多条目。亚马逊密钥空间刚刚宣布了一项新功能,该功能将在与私有VPC端点建立会话时提供更多连接点。
这是一个关于Keyspace现在如何自动优化通过亚马逊网络服务PrivateLink建立的客户端连接以提高可用性和读写能力的链接:https://aws.amazon.com/about-aws/whats-new/2021/07/amazon-keyspaces-for-apache-cassandra-now-automatically-optimi/
此链接介绍如何将Amazon Keypscaes与Interface VPC Endpoint结合使用:https://docs.aws.amazon.com/keyspaces/latest/devguide/vpc-endpoints.html。要启用此新功能,您需要向DescribeNetworkInterfaces和DescribeVpcEndpoints提供其他权限。
{
"Version":"2012-10-17",
"Statement":[
{
"Sid":"ListVPCEndpoints",
"Effect":"Allow",
"Action":[
"ec2:DescribeNetworkInterfaces",
"ec2:DescribeVpcEndpoints"
],
"Resource":"*"
}
]
}发布于 2020-09-22 21:00:43
我怀疑这一点:
.withLocalDatacenter(AppUtil.getProperty("cassandra.localdatacenter"))拉回与密钥空间复制定义或配置的数据中心名称不匹配的数据中心名称:
nodetool status | grep Datacenter基本上,如果您的连接定义了一个不存在的本地数据中心,它仍然会尝试对该数据中心中的副本进行读/写。这将失败,因为它显然无法在不存在的数据中心中找到节点。
https://stackoverflow.com/questions/64002599
复制相似问题