我已经为spark-cassandra-connector编写了一个自定义的LoadBalancerPolicy,现在我想确保它真的能工作!
我有一个包含3个节点和一个复制因子为2的键空间的Cassandra集群,因此当我们想要检索一条记录时,cassandra上将只有两个节点保存数据。
问题是,我希望确保spark-cassandra-connector (使用我的负载均衡器策略)仍然是令牌感知的,并且将选择正确的节点作为每个"SELECT“语句的协调器。
现在,我在想,如果我们可以在每个节点的SELECT语句上编写一个触发器,在该节点不保存数据的情况下,触发器将创建一个日志,我意识到负载均衡器策略不能正常工作。我们如何在Cassandra中编写SELECT触发器?有没有更好的方法来实现这一点?
我已经查看了创建触发器的文档,这些文档太有限了:
Official documentation
发布于 2019-12-08 22:12:47
根据Alex的说法,我们可以这样做:
在创建SparkSession之后,我们应该创建一个连接器:
import com.datastax.spark.connector.cql.CassandraConnector
val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf)现在我们可以定义一个preparedStatement,然后完成剩下的工作:
connector.withSessionDo(session => {
val selectQuery = "select * from test where id=?"
val prepareStatement = session.prepare(selectQuery)
val protocolVersion = session.getCluster.getConfiguration.getProtocolOptions.getProtocolVersion
// We have to explicitly bind the all of parameters that partition key is based on them, otherwise the routingKey will be null.
val boundStatement = prepareStatement.bind(s"$id")
val routingKey = boundStatement.getRoutingKey(protocolVersion, null)
// We can get tha all of nodes that contains the row
val replicas = session.getCluster.getMetadata.getReplicas("test", routingKey)
val resultSet = session.execute(boundStatement)
// We can get the node which gave us the row
val host = resultSet.getExecutionInfo.getQueriedHost
// Final step is to check whether the replicas contains the host or not!!!
if (replicas.contains(host)) println("It works!")
})重要的是,我们必须显式绑定分区键基于它们的所有参数(即,我们不能在SELECT语句中对它们进行硬编码设置),否则routingKey将为空。
发布于 2019-12-03 21:50:00
您可以在程序端执行此操作,如果对绑定语句执行get routing key (必须使用预准备语句),找到replicas for it via Metadata class,然后比较可以从ResultSet获得的if this host is in the ExecutionInfo。
https://stackoverflow.com/questions/59152964
复制相似问题