首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用Spark2.4.4使用delta创建表?

如何使用Spark2.4.4使用delta创建表?
EN

Stack Overflow用户
提问于 2019-12-31 16:33:52
回答 3查看 17.3K关注 0票数 11

这里是星火2.4.4和德尔塔湖0.5.0。

我正在尝试使用delta数据源创建一个表,而且似乎遗漏了一些东西。尽管CREATE TABLE USING delta命令运行良好,但既没有创建表目录,也没有insertInto工作。

下面的CREATE TABLE USING delta运行良好,但insertInto失败了。

代码语言:javascript
复制
scala> sql("""
create table t5
USING delta
LOCATION '/tmp/delta'
""").show

scala> spark.catalog.listTables.where('name === "t5").show
+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
|  t5| default|       null| EXTERNAL|      false|
+----+--------+-----------+---------+-----------+

scala> spark.range(5).write.option("mergeSchema", true).insertInto("t5")
org.apache.spark.sql.AnalysisException: `default`.`t5` requires that the data to be inserted have the same number of columns as the target table: target table has 0 column(s) but the inserted data has 1 column(s), including 0 partition column(s) having constant value(s).;
  at org.apache.spark.sql.execution.datasources.PreprocessTableInsertion.org$apache$spark$sql$execution$datasources$PreprocessTableInsertion$$preprocess(rules.scala:341)
  ...

我想我应该用定义的列来创建,但这也不起作用。

代码语言:javascript
复制
scala> sql("""
create table t6
(id LONG, name STRING)
USING delta
LOCATION '/tmp/delta'
""").show
org.apache.spark.sql.AnalysisException: delta does not allow user-specified schemas.;
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:325)
  at org.apache.spark.sql.execution.command.CreateDataSourceTableCommand.run(createDataSourceTables.scala:78)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:194)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3370)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3370)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
  ... 54 elided
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2020-06-23 12:43:36

tl; CREATE TABLE USING delta博士在3.0.0之前不被星火所支持,而Delta在0.7.0之前不支持。

DeltaLake0.7.0和Spark3.0.0(都刚刚发布)支持CREATE TABLE SQL命令。

确保使用带有spark.sql.catalog.spark_catalog配置属性的org.apache.spark.sql.delta.catalog.DeltaCatalog“安装”Delta。

代码语言:javascript
复制
$ ./bin/spark-submit \
  --packages io.delta:delta-core_2.12:0.7.0 \
  --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

scala> spark.version
res0: String = 3.0.0

scala> sql("CREATE TABLE delta_101 (id LONG) USING delta").show
++
||
++
++

scala> spark.table("delta_101").show
+---+
| id|
+---+
+---+

scala> sql("DESCRIBE EXTENDED delta_101").show(truncate = false)
+----------------------------+---------------------------------------------------------+-------+
|col_name                    |data_type                                                |comment|
+----------------------------+---------------------------------------------------------+-------+
|id                          |bigint                                                   |       |
|                            |                                                         |       |
|# Partitioning              |                                                         |       |
|Not partitioned             |                                                         |       |
|                            |                                                         |       |
|# Detailed Table Information|                                                         |       |
|Name                        |default.delta_101                                        |       |
|Location                    |file:/Users/jacek/dev/oss/spark/spark-warehouse/delta_101|       |
|Provider                    |delta                                                    |       |
|Table Properties            |[]                                                       |       |
+----------------------------+---------------------------------------------------------+-------+
票数 4
EN

Stack Overflow用户

发布于 2019-12-31 16:48:21

到目前为止,Delta的OSS版本还没有语法。这将在未来的版本中使用Spark3.0实现。

要创建Delta表,必须以Delta格式写出一个DataFrame。Python中的一个例子

代码语言:javascript
复制
df.write.format("delta").save("/some/data/path")

下面是用于Python和Java的创建表文档的链接

票数 7
EN

Stack Overflow用户

发布于 2020-08-28 14:40:29

一个用pyspark 3.0.0 &delta0.7.0的例子

代码语言:javascript
复制
print(f"LOCATION '{location}")
spark.sql(f"""
CREATE OR REPLACE TABLE  {TABLE_NAME} (
  CD_DEVICE INT, 
  FC_LOCAL_TIME TIMESTAMP,  
  CD_TYPE_DEVICE STRING,
  CONSUMTION DOUBLE,
  YEAR INT,
  MONTH INT, 
  DAY INT )
USING DELTA
PARTITIONED BY (YEAR , MONTH , DAY, FC_LOCAL_TIME)
LOCATION '{location}'
""")

其中"location“是火花集群模式保存delta表的dir。

票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59546516

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档