首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >spark sql请求两阶段NPE

spark sql请求两阶段NPE
EN

Stack Overflow用户
提问于 2020-04-11 00:47:56
回答 1查看 100关注 0票数 0

我是Spark的新手(使用2.4.0)。我遇到了一个奇怪的(对我来说) NPE异常。下面的代码返回NPE。

代码语言:javascript
复制
val ds = "2020-04-01"
spark.sql("select ds, db_name, table_name, type FROM datainfra.hive_tables " +
  "where ds = '%s' and db_name = 'db_exports' limit 1".format(ds)).map(table =>
  spark.sql("select col_name FROM datainfra.hive_columns " +
    "where ds = '%s' and db_name = '%s' and table_name = '%s' and table_type = '%s' and col_type = 'string'"
      .format(table.getAs[String]("ds"),
        table.getAs[String]("db_name"),
        table.getAs[String]("table_name"),
        table.getAs[String]("type")))
    .map(columnNameRow => columnNameRow.getAs[String](0)).collect().mkString("||")
)

但每个DFs单独工作都很好:

代码语言:javascript
复制
spark.sql("select ds, db_name, table_name, type FROM datainfra.hive_tables " +
  "where ds = '%s' and db_name = 'db_exports' limit 1".format(ds)).show // returns results

spark.sql("select col_name FROM datainfra.hive_columns " +
  ("where ds = '%s' and db_name = '%s' and table_name = '%s' and table_type = '%s' and col_type = 'string' " +
    "and col_name != 'ds'")
    .format(ds,
      "hardcode_db_name",
      "hardcode_table_name",
      "hardcode_type")).map(columnNameRow => columnNameRow.getAs[String](0)).collect().mkString("||")

怎么会这样呢?

EN

回答 1

Stack Overflow用户

发布于 2020-04-12 05:25:11

问:我是Spark的新手(使用2.4.0)。我遇到了一个奇怪的(对我来说) NPE异常。下面的代码返回NPE。这是怎么回事?

spark.sql(“sql”).map.(spark.sql(“某些sql "))模式有问题。

在您情况下是空指针异常的原因

代码语言:javascript
复制
    val ds = "2020-04-01"

       val test1: Dataset[String] =  spark.sql("select ds, db_name, table_name, type FROM datainfra.hive_tables " +
          "where ds = '%s' and db_name = 'db_exports' limit 1".format(ds))
         .map(table =>
          spark.sql("select col_name FROM datainfra.hive_columns " +
            "where ds = '%s' and db_name = '%s' and table_name = '%s' and table_type = '%s' and col_type = 'string'"
              .format(table.getAs[String]("ds"),
                table.getAs[String]("db_name"),
                table.getAs[String]("table_name"),
                table.getAs[String]("type")))
            .map(columnNameRow => columnNameRow.getAs[String](0)).collect().mkString("||")
        )

为了证明这一点,我准备了类似的例子,请参见下面的..我复制了相同的空指针异常,看起来它不受支持。

代码语言:javascript
复制
package com.examples

import org.apache.log4j.Level
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
  * Created by Ram Ghadiyaram
  */
object RDDOfTupleExample {
  org.apache.log4j.Logger.getLogger("org").setLevel(Level.ERROR)

  def main(args: Array[String]) {

    val spark = SparkSession.builder.
      master("local")
      .appName(this.getClass.getName)
      .getOrCreate()
    import spark.implicits._

    val donuts: DataFrame = Seq(("plain donut", 1.50), ("plain donut", 1.50)
      , ("vanilla donut", 2.0), ("vanilla donut", 2.0)
      , ("glazed donut", 2.50))
      .toDF("Donut_Name", "Price")

    //lets suppose this is your hive table since i dont have hive i simulated with temp table
    donuts.createOrReplaceTempView("mydonuts")
    //    }
    val test: Dataset[String] = spark.sql("select \"NCA-15\" as mylabel, count(Donut_Name) as mydonutcount from mydonuts")
      .map(x => spark.sql(s"select ${x.get(0)}, ${x.get(1)} ").collect().mkString(",")) // this is problem
    test.show

  }
}

结果:

代码语言:javascript
复制
[2020-04-11 16:27:45,687] ERROR Exception in task 0.0 in stage 1.0 (TID 1) (org.apache.spark.executor.Executor:91)
java.lang.NullPointerException
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
    at com.examples.RDDOfTupleExample$$anonfun$1.apply(RDDOfTupleExample.scala:29)
    at com.examples.RDDOfTupleExample$$anonfun$1.apply(RDDOfTupleExample.scala:29)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.mapelements_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.deserializetoobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2020-04-11 16:27:45,710] ERROR Task 0 in stage 1.0 failed 1 times; aborting job (org.apache.spark.scheduler.TaskSetManager:70)
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): java.lang.NullPointerException
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
    at com.examples.RDDOfTupleExample$$anonfun$1.apply(RDDOfTupleExample.scala:29)
    at com.examples.RDDOfTupleExample$$anonfun$1.apply(RDDOfTupleExample.scala:29)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.mapelements_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.deserializetoobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
    at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:751)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:710)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:719)
    at com.examples.RDDOfTupleExample$.main(RDDOfTupleExample.scala:30)
    at com.examples.RDDOfTupleExample.main(RDDOfTupleExample.scala)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
    at com.examples.RDDOfTupleExample$$anonfun$1.apply(RDDOfTupleExample.scala:29)
    at com.examples.RDDOfTupleExample$$anonfun$1.apply(RDDOfTupleExample.scala:29)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.mapelements_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.deserializetoobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

结论:上述嵌套spark.sql模式不起作用。您必须单独执行,或者使用其他方法

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61144828

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档