当我将900万行批次写入一个12个节点cassandra (2.1.2)集群时,我遇到了火花卡桑德拉连接器(1.0.4,1.1.0)的问题。我是用一致的方式写的,读的是一致的,但是每次读的行数都不同于900万行(8.865.753,8.753.213等等)。
我检查了连接器的代码,没有发现任何问题。然后,我决定独立于spark和连接器编写自己的应用程序来调查问题(唯一的依赖项是datastax驱动程序代码版本2.1.3)。
完整的代码、启动脚本和配置文件现在可以是 在github上发现。
在伪代码中,我编写了两个不同版本的应用程序,同步版本:
try (Session session = cluster.connect()) {
String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
PreparedStatement pstm = session.prepare(cql);
for(String partitionKey : keySource) {
// keySource is an Iterable<String> of partition keys
BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
bound.setConsistencyLevel(ConsistencyLevel.ALL);
session.execute(bound);
}
}而异步的是:
try (Session session = cluster.connect()) {
List<ResultSetFuture> futures = new LinkedList<ResultSetFuture>();
String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
PreparedStatement pstm = session.prepare(cql);
for(String partitionKey : keySource) {
// keySource is an Iterable<String> of partition keys
while(futures.size()>=10 /* Max 10 concurrent writes */) {
// Wait for the first issued write to terminate
ResultSetFuture future = futures.get(0);
future.get();
futures.remove(0);
}
BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
bound.setConsistencyLevel(ConsistencyLevel.ALL);
futures.add(session.executeAsync(bound));
}
while(futures.size()>0) {
// Wait for the other write requests to terminate
ResultSetFuture future = futures.get(0);
future.get();
futures.remove(0);
}
}最后一个类似于连接器在无批处理配置情况下所使用的配置。
应用程序的两个版本在所有情况下都是相同的,除非负载很高。
例如,当在9台机器(45个线程)上运行同步版本,将900万行写入集群时,我会在随后的read中找到所有行(使用火花-cassandra-连接器)。
如果我以每台机器一个线程的方式运行异步版本(9个线程),则执行速度要快得多,但我无法在随后的读取中找到所有行(与使用火花-cassandra-连接器的问题相同)。
在执行过程中,代码没有抛出任何异常。
造成这个问题的原因是什么?
我还要补充一些其他的结果(谢谢你的评论):
异步写入和多个并发编写器> 45和<=90似乎开始出现问题,因此我做了其他测试以确保发现是正确的:
最后的发现表明,并发编写器(90)的高数量并不像第一次测试中所预期的那样是一个问题。问题是使用同一个会话的异步写入的数量很高。
对于同一会话上的5个并发异步写入,则不存在此问题。如果我将并发写的次数增加到10个,一些操作就会在没有通知的情况下丢失。
如果您在同一会话中并发发出多次(>5)写入,那么在Cassandra2.1.2(或Cassandra驱动程序)中,异步写入似乎会中断。
发布于 2015-01-12 04:10:23
这个周末,尼古拉和我通过电子邮件进行了交流,并认为我会在这里提供最新的理论。我看了一下github项目 Nicola共享并在EC2上试验了一个8节点集群。
我能够用2.1.2重现这个问题,但我确实观察到,过了一段时间,我可以重新执行星火作业,并返回所有900万行。
我似乎注意到的是,当节点处于压缩状态时,我并没有得到900万行数据。一时兴起,我看了一下更改2.1日志,并观察到了一个可能解释这个问题的CASSANDRA-8429 -“一些键在压缩过程中不可读”问题。
看到这个问题已经被修正为2.1.3,我重新运行了针对cassandra-2.1分支的测试,并在压缩活动发生时运行了count作业,返回了900万行。
我想尝试更多,因为我在cassandra-2.1分支的测试是相当有限的,压缩活动可能纯粹是巧合,但我希望这可以解释这些问题。
发布于 2014-12-27 16:02:42
有几种可能性:
future.get(),但建议使用getUninterruptibly(),如文档中所述:
等待查询返回并返回其结果。这个方法通常比Future.get()更方便,因为它:不间断地等待结果,因此不会抛出InterruptedException。返回有意义的异常,而不必处理ExecutionException。因此,这是获得未来结果的首选方法。因此,您可能没有看到异步示例中出现的写异常。
增编: 12/30/14
我试着用Cassandra 2.1.2和driver 2.1.3的示例代码再现症状。我使用了一个带有递增数字键的表,这样我就可以看到数据中的空白。我做了很多异步插入(在10个线程中,每个线程每次30次,全部使用一个全局会话)。然后,我对表进行了"select计数(*)“,实际上它报告的表中的行数比预期的少。然后我做了一个"select *“,并将行转储到一个文件中,并检查是否丢失了键。它们似乎是随机分布的,但当我询问那些丢失的个别行时,发现它们实际上存在于表中。然后,我注意到每次执行"select计数(*)“时,它都会返回一个不同的数字,因此它似乎是在近似表中的行数,而不是实际的数字。
因此,我修改了测试程序,使其在写完所有的操作后进行一个读回阶段,因为我知道所有的键值。当我这样做时,所有的异步写入都出现在表中。
因此,我的问题是,在完成写作之后,您如何检查表中的行数?您是在查询每个单独的键值,还是使用"select *“之类的操作?如果是后者,这似乎给出了大多数行,但不是所有行,所以也许您的数据实际上是存在的。由于没有异常被抛出,这似乎表明写都是成功的。另一个问题是,您是否确定您的键值对于所有900万行都是唯一的。
https://stackoverflow.com/questions/27667228
复制相似问题