首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark-jobserver的Cassandra连接问题

Spark-jobserver的Cassandra连接问题
EN

Stack Overflow用户
提问于 2015-04-24 15:22:39
回答 2查看 548关注 0票数 2

我是Spark world和Job Server的新手

我的代码:

代码语言:javascript
复制
package spark.jobserver 

import java.nio.ByteBuffer 

import scala.collection.JavaConversions._ 
import scala.collection.mutable.ListBuffer 
import scala.collection.immutable.Map 

import org.apache.cassandra.hadoop.ConfigHelper 
import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat 
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper 
import org.apache.cassandra.hadoop.cql3.CqlOutputFormat 
import org.apache.cassandra.utils.ByteBufferUtil 
import org.apache.hadoop.mapreduce.Job 

import com.typesafe.config.{Config, ConfigFactory} 
import org.apache.spark._ 
import org.apache.spark.SparkContext._ 
import scala.util.Try 

object CassandraCQLTest extends SparkJob{ 

  def main(args: Array[String]) {   
    val sc = new SparkContext("local[4]", "CassandraCQLTest") 
    sc.addJar("/extra_data/spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.3.0-SNAPSHOT.jar"); 
    val config = ConfigFactory.parseString("") 
    val results = runJob(sc, config) 
    println("Result is " + "test") 
  } 

  override def validate(sc: SparkContext, config: Config): SparkJobValidation = { 
    Try(config.getString("input.string")) 
      .map(x => SparkJobValid) 
      .getOrElse(SparkJobInvalid("No input.string config param")) 
  } 

  override def runJob(sc: SparkContext, config: Config): Any = { 
    val cHost: String = "localhost" 
    val cPort: String = "9160" 
    val KeySpace = "retail" 
    val InputColumnFamily = "ordercf" 
    val OutputColumnFamily = "salecount" 

    val job = new Job() 
    job.setInputFormatClass(classOf[CqlPagingInputFormat]) 
    ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost) 
    ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort) 
    ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily) 
    ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner") 
    CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3") 

    /** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "user_id='bob'") */ 

    /** An UPDATE writes one or more columns to a record in a Cassandra column family */ 
    val query = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET sale_count = ? " 
    CqlConfigHelper.setOutputCql(job.getConfiguration(), query) 

    job.setOutputFormatClass(classOf[CqlOutputFormat]) 
    ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily) 
    ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost) 
    ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort) 
    ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner") 

    val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(), 
      classOf[CqlPagingInputFormat], 
      classOf[java.util.Map[String,ByteBuffer]], 
      classOf[java.util.Map[String,ByteBuffer]]) 


    val productSaleRDD = casRdd.map { 
      case (key, value) => { 
        (ByteBufferUtil.string(value.get("prod_id")), ByteBufferUtil.toInt(value.get("quantity"))) 
      } 
    } 
    val aggregatedRDD = productSaleRDD.reduceByKey(_ + _) 
    aggregatedRDD.collect().foreach { 
      case (productId, saleCount) => println(productId + ":" + saleCount) 
    } 

    val casoutputCF  = aggregatedRDD.map { 
      case (productId, saleCount) => { 
        val outColFamKey = Map("prod_id" -> ByteBufferUtil.bytes(productId)) 
        val outKey: java.util.Map[String, ByteBuffer] = outColFamKey 
        var outColFamVal = new ListBuffer[ByteBuffer] 
        outColFamVal += ByteBufferUtil.bytes(saleCount) 
        val outVal: java.util.List[ByteBuffer] = outColFamVal 
       (outKey, outVal) 
      } 
    } 

    casoutputCF.saveAsNewAPIHadoopFile( 
        KeySpace, 
        classOf[java.util.Map[String, ByteBuffer]], 
        classOf[java.util.List[ByteBuffer]], 
        classOf[CqlOutputFormat], 
        job.getConfiguration() 
      ) 
    casRdd.count 
  } 
} 

当我使用spark-jobServer推送Jar并执行它时,我在spark-jobserver终端上得到以下代码

代码语言:javascript
复制
job-server[ERROR] Exception in thread "pool-1-thread-1" java.lang.NoClassDefFoundError: org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat 
job-server[ERROR] at spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:46) 
job-server[ERROR] at spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:21) 
job-server[ERROR] at spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:235) 
job-server[ERROR] at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 
job-server[ERROR] at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
job-server[ERROR] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
job-server[ERROR] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
job-server[ERROR] at java.lang.Thread.run(Thread.java:745) 
job-server[ERROR] Caused by: java.lang.ClassNotFoundException: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat 
job-server[ERROR] at java.net.URLClassLoader$1.run(URLClassLoader.java:366) 
job-server[ERROR] at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 
job-server[ERROR] at java.security.AccessController.doPrivileged(Native Method) 
job-server[ERROR] at java.net.URLClassLoader.findClass(URLClassLoader.java:354) 
job-server[ERROR] at java.lang.ClassLoader.loadClass(ClassLoader.java:425) 
job-server[ERROR] at java.lang.ClassLoader.loadClass(ClassLoader.java:358) 
job-server[ERROR] ... 8 more 

我已经将$EXTRA_JAR变量添加到我的cassandra-spark-connector-assembly中。

EN

回答 2

Stack Overflow用户

发布于 2015-04-25 09:25:39

当您将程序提交到Spark中时,需要包含所有依赖的jar文件(逗号分隔列表)。假设您的项目结构如下所示。

代码语言:javascript
复制
 simpleapp
  - src/main/java
    - org.apache.spark.examples
      - SimpleApp.java
  - lib
    - dependent.jars (you can put all dependent jars inside lib   directory)
  - target
    - simpleapp.jar (after compiling your source)

所以你可以使用下面的命令。

代码语言:javascript
复制
spark-submit --jars $(echo lib/*.jar | tr ' ' ',' ) --class org.apache.spark.examples.SimpleApp --master local[2]  target/simpleapp.jar

此外,您可以使用spark web控制台查看jar发行版,转到您的程序->环境,找出spark抱怨的jar文件是否已经存在。

票数 0
EN

Stack Overflow用户

发布于 2016-04-26 22:14:16

CqlPagingInputFormat在cassandra-all版本2.0.4中,在更高版本中找不到。在运行时,您的应用程序采用的是高于2.0.4的cassandra版本。您必须将此依赖项添加到您的pom:

代码语言:javascript
复制
<dependency>
  <groupId>org.apache.cassandra</groupId>
  <artifactId>cassandra-all</artifactId>   
  <version>2.0.4</version>

才能得到这个类。

但是我不能保证其他的东西都会工作的很好。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/29841158

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档