首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >星火上的DataStax条件

星火上的DataStax条件
EN

Stack Overflow用户
提问于 2016-11-15 19:58:11
回答 1查看 499关注 0票数 0

我是星火的新手。我试图做简单的计算使用星火与卡桑德拉。如果我添加where条件(where("id=?",5)),我将得到下面提到的异常。如果不添加哪个条件,我就没有异常。但是,这需要更多的时间来处理,尽管数据库中只有4条记录。Id是products表中的分区键。

卡桑德拉2.1.11火花- 2.0.1-hadoop2.7 Java 1.8

如何添加什么条件?

代码语言:javascript
复制
    SparkConf conf = initSparkContext();
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<Product> productRDD = javaFunctions(sc).cassandraTable("java_api", "products", Product.class).where("id=?", 5);
    List<Product> productList = productRDD.collect();
    for(Product product: productList) {
        System.out.println(" product = "+product);
    }

依赖性

代码语言:javascript
复制
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.0.0</version>
    </dependency>

     <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.0.0</version>
    </dependency>


    <!--Spark Cassandra Connector-->
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>1.0.0</version>
    </dependency>

异常

代码语言:javascript
复制
6/11/15 13:44:34 ERROR executor.Executor: Exception in task ID 0
java.io.IOException: Exception during preparation of SELECT "id", "name", "parents" FROM "java_api"."products" WHERE token("id") > -1732598212583841281 AND token("id") <= -1668034862038885205 AND id=? ALLOW FILTERING: id cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:310)
    at com.datastax.spark.connector.rdd.CassandraRDD.com$datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:317)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at com.datastax.spark.connector.util.CountingIterator.foreach(CountingIterator.scala:4)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at com.datastax.spark.connector.util.CountingIterator.to(CountingIterator.scala:4)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at com.datastax.spark.connector.util.CountingIterator.toBuffer(CountingIterator.scala:4)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at com.datastax.spark.connector.util.CountingIterator.toArray(CountingIterator.scala:4)
    at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
    at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
    at org.apache.spark.scheduler.Task.run(Task.scala:51)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: id cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:35)
    at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256)
    at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:91)
    at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28)
    at com.sun.proxy.$Proxy6.prepare(Unknown Source)
    at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:293)
    ... 26 more
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: id cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:97)
    at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:156)
    at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:131)
    at com.google.common.util.concurrent.Futures$1.apply(Futures.java:720)
    at com.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:859)
    ... 3 more
EN

回答 1

Stack Overflow用户

发布于 2016-11-16 03:53:33

您已经说过您使用的是Spark-2.0.1-hadoop2.7,但是pom.xml文件中的依赖项有1.0.0版本。对于Spark2.0.1,您的pom应该如下所示。

代码语言:javascript
复制
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.0.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.0.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.0.1</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.11</artifactId>
        <version>2.0.0-M3</version>
</dependency>

从Cassandra读取的星火代码应该是

代码语言:javascript
复制
SparkSession spark = SparkSession
              .builder()
              .appName("SparkCassandraApp")
              .config("spark.sql.warehouse.dir", "/file:C:/temp")
              .config("spark.cassandra.connection.host", "127.0.0.1")
              .config("spark.cassandra.connection.port", "9042")
              .master("local[*]")
              .getOrCreate();

JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

JavaRDD<Product> productRDD = javaFunctions(jsc).cassandraTable("java_api", "products", CassandraJavaUtil.mapRowTo(Product.class)).where("id=?", 5);
productRDD.foreach(data -> {
            System.out.println(data.getId());
            System.out.println(data.getName());
        });
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40618607

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档