首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >异步书写在Cassandra中似乎被中断了。

异步书写在Cassandra中似乎被中断了。
EN

Stack Overflow用户
提问于 2014-12-27 12:28:40
回答 2查看 5K关注 0票数 11

当我将900万行批次写入一个12个节点cassandra (2.1.2)集群时,我遇到了火花卡桑德拉连接器(1.0.4,1.1.0)的问题。我是用一致的方式写的,读的是一致的,但是每次读的行数都不同于900万行(8.865.753,8.753.213等等)。

我检查了连接器的代码,没有发现任何问题。然后,我决定独立于spark和连接器编写自己的应用程序来调查问题(唯一的依赖项是datastax驱动程序代码版本2.1.3)。

完整的代码、启动脚本和配置文件现在可以是 在github上发现

在伪代码中,我编写了两个不同版本的应用程序,同步版本:

代码语言:javascript
复制
try (Session session = cluster.connect()) {

    String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
    PreparedStatement pstm = session.prepare(cql);

    for(String partitionKey : keySource) {
        // keySource is an Iterable<String> of partition keys

        BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
        bound.setConsistencyLevel(ConsistencyLevel.ALL);

        session.execute(bound);
    }

}

而异步的是:

代码语言:javascript
复制
try (Session session = cluster.connect()) {

    List<ResultSetFuture> futures = new LinkedList<ResultSetFuture>();

    String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
    PreparedStatement pstm = session.prepare(cql);

    for(String partitionKey : keySource) {
        // keySource is an Iterable<String> of partition keys

        while(futures.size()>=10 /* Max 10 concurrent writes */) {
            // Wait for the first issued write to terminate
            ResultSetFuture future = futures.get(0);
            future.get();
            futures.remove(0);
        }

        BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
        bound.setConsistencyLevel(ConsistencyLevel.ALL);

        futures.add(session.executeAsync(bound));
    }

    while(futures.size()>0) {
        // Wait for the other write requests to terminate
        ResultSetFuture future = futures.get(0);
        future.get();
        futures.remove(0);
    }
}

最后一个类似于连接器在无批处理配置情况下所使用的配置。

应用程序的两个版本在所有情况下都是相同的,除非负载很高。

例如,当在9台机器(45个线程)上运行同步版本,将900万行写入集群时,我会在随后的read中找到所有行(使用火花-cassandra-连接器)。

如果我以每台机器一个线程的方式运行异步版本(9个线程),则执行速度要快得多,但我无法在随后的读取中找到所有行(与使用火花-cassandra-连接器的问题相同)。

在执行过程中,代码没有抛出任何异常。

造成这个问题的原因是什么?

我还要补充一些其他的结果(谢谢你的评论):

  • 异步版本,9台机器上有9个线程,每个线程有5个并发写入器(45个并发写入器):没有问题
  • 与9台机器上的90个线程同步版本(每个JVM实例10个线程):没有问题

异步写入和多个并发编写器> 45和<=90似乎开始出现问题,因此我做了其他测试以确保发现是正确的:

  • 将ResultSetFuture的"get“方法替换为"getUninterruptibly":相同的问题。
  • 异步版本,在9台机器上有18个线程,每个线程有5个并发写入器(90个并发写入器):no

最后的发现表明,并发编写器(90)的高数量并不像第一次测试中所预期的那样是一个问题。问题是使用同一个会话的异步写入的数量很高。

对于同一会话上的5个并发异步写入,则不存在此问题。如果我将并发写的次数增加到10个,一些操作就会在没有通知的情况下丢失。

如果您在同一会话中并发发出多次(>5)写入,那么在Cassandra2.1.2(或Cassandra驱动程序)中,异步写入似乎会中断。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2015-01-12 04:10:23

这个周末,尼古拉和我通过电子邮件进行了交流,并认为我会在这里提供最新的理论。我看了一下github项目 Nicola共享并在EC2上试验了一个8节点集群。

我能够用2.1.2重现这个问题,但我确实观察到,过了一段时间,我可以重新执行星火作业,并返回所有900万行。

我似乎注意到的是,当节点处于压缩状态时,我并没有得到900万行数据。一时兴起,我看了一下更改2.1日志,并观察到了一个可能解释这个问题的CASSANDRA-8429 -“一些键在压缩过程中不可读”问题。

看到这个问题已经被修正为2.1.3,我重新运行了针对cassandra-2.1分支的测试,并在压缩活动发生时运行了count作业,返回了900万行。

我想尝试更多,因为我在cassandra-2.1分支的测试是相当有限的,压缩活动可能纯粹是巧合,但我希望这可以解释这些问题。

票数 7
EN

Stack Overflow用户

发布于 2014-12-27 16:02:42

有几种可能性:

  • 异步示例是用9个线程同时发出10次写,所以每次90次,而同步示例一次只写45次,所以我会尝试将异步降低到相同的速率,因此这是苹果与苹果的比较。 您没有说明如何使用异步方法检查异常。我看到您正在使用future.get(),但建议使用getUninterruptibly(),如文档中所述: 等待查询返回并返回其结果。这个方法通常比Future.get()更方便,因为它:不间断地等待结果,因此不会抛出InterruptedException。返回有意义的异常,而不必处理ExecutionException。因此,这是获得未来结果的首选方法。

因此,您可能没有看到异步示例中出现的写异常。

  • 另一个不太可能的可能性是,您的keySource由于某种原因返回重复的分区键,所以当您执行写操作时,它们中的一些最终会覆盖先前插入的行,而不会增加行数。但这也会影响同步版本,所以我说这不太可能。 我会尝试写比900万小的集合,并且速度慢,看看这个问题是否只在一定数量的插入或一定的插入速率下才会发生。如果插入的数量有影响,那么我怀疑数据中的行键有问题。如果插入速度有影响,那么我怀疑导致写入超时错误的热点。
  • 另一件要检查的事情是Cassandra日志文件,看看那里是否报告了任何异常。

增编: 12/30/14

我试着用Cassandra 2.1.2和driver 2.1.3的示例代码再现症状。我使用了一个带有递增数字键的表,这样我就可以看到数据中的空白。我做了很多异步插入(在10个线程中,每个线程每次30次,全部使用一个全局会话)。然后,我对表进行了"select计数(*)“,实际上它报告的表中的行数比预期的少。然后我做了一个"select *“,并将行转储到一个文件中,并检查是否丢失了键。它们似乎是随机分布的,但当我询问那些丢失的个别行时,发现它们实际上存在于表中。然后,我注意到每次执行"select计数(*)“时,它都会返回一个不同的数字,因此它似乎是在近似表中的行数,而不是实际的数字。

因此,我修改了测试程序,使其在写完所有的操作后进行一个读回阶段,因为我知道所有的键值。当我这样做时,所有的异步写入都出现在表中。

因此,我的问题是,在完成写作之后,您如何检查表中的行数?您是在查询每个单独的键值,还是使用"select *“之类的操作?如果是后者,这似乎给出了大多数行,但不是所有行,所以也许您的数据实际上是存在的。由于没有异常被抛出,这似乎表明写都是成功的。另一个问题是,您是否确定您的键值对于所有900万行都是唯一的。

票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/27667228

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档