首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用Spark将数据写入表?

如何使用Spark将数据写入表?
EN

Stack Overflow用户
提问于 2020-12-09 18:21:20
回答 2查看 7.3K关注 0票数 1

我正在尝试熟悉Apache,我在理解如何使用Spark将一些外部数据写入表时遇到了一些困难。

  • I有一个文件,one.csv,位于一个目录中,/data
  • 我的Iceberg目录被配置为指向这个目录,/warehouse
  • 我想将这个one.csv写到Apache表(最好使用Spark )

甚至可以使用Spark读取外部数据吗?然后写到冰山的桌子上?我是否必须使用scala或python来做到这一点?我已经看过冰山和星火3.0.1文档,但是我可能遗漏了什么。

代码更新

下面是一些我希望能有所帮助的代码

代码语言:javascript
复制
spark.conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
spark.conf.set("spark.sql.catalog.spark_catalog.type", "hive")
spark.conf.set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.local.type", "hadoop")
spark.conf.set("spark.sql.catalog.local.warehouse", "data/warehouse")

我拥有坐在/one/one.csv目录中所需的数据。

我怎么用星火把它放进冰柜里呢?所有这些都能完全使用SparkSQL吗?

代码语言:javascript
复制
spark.sql(
"""
CREATE or REPLACE TABLE local.db.one
USING iceberg
AS SELECT * FROM `/one/one.csv`
"""
)

然后我的目标是我可以直接处理这个冰山表,例如:

代码语言:javascript
复制
select * from local.db.one

这将给我/one/one.csv文件中的所有内容。

EN

回答 2

Stack Overflow用户

发布于 2021-08-31 13:09:30

若要使用SparkSQL,请将文件读入数据格式,然后将其注册为临时视图。这个临时视图现在可以在SQL中引用为:

代码语言:javascript
复制
var df = spark.read.format("csv").load("/data/one.csv")
df.createOrReplaceTempView("tempview");

spark.sql("CREATE or REPLACE TABLE local.db.one USING iceberg AS SELECT * FROM tempview");

要回答另一个问题,不需要Scala或Python;上面的示例是Java中的。

票数 2
EN

Stack Overflow用户

发布于 2021-11-16 03:45:59

代码语言:javascript
复制
val sparkConf = new SparkConf()
sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
sparkConf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
sparkConf.set("spark.sql.catalog.spark_catalog.type", "hive")
sparkConf.set("spark.sql.catalog.hive_catalog", "org.apache.iceberg.spark.SparkCatalog")
sparkConf.set("spark.sql.catalog.hive_catalog.type", "hadoop")
sparkConf.set("spark.sql.catalog.hive_catalog.warehouse", "hdfs://host:port/user/hive/warehouse")
sparkConf.set("hive.metastore.uris", "thrift://host:19083")
sparkConf.set("spark.sql.catalog.hive_prod", " org.apache.iceberg.spark.SparkCatalog")
sparkConf.set("spark.sql.catalog.hive_prod.type", "hive")
sparkConf.set("spark.sql.catalog.hive_prod.uri", "thrift://host:19083")
sparkConf.set("hive.metastore.warehouse.dir", "hdfs://host:port/user/hive/warehouse")
val spark: SparkSession = SparkSession.builder()
  .enableHiveSupport()
  .config(sparkConf)
  .master("yarn")
  .appName("kafkaTableTest")
  .getOrCreate()

spark.sql(
  """
    |
    |create table if not exists hive_catalog.icebergdb.kafkatest1(
    |    company_id int,
    |    event string,
    |    event_time timestamp,
    |    position_id int,
    |    user_id int
    |)using iceberg
    |PARTITIONED BY (days(event_time))
    |""".stripMargin)

import spark.implicits._



val df: DataFrame = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka_server")
  .option("subscribe", "topic")
  .option("startingOffsets", "latest")
  .load()
//.selectExpr("cast (value as string)")

val value: DataFrame = df.selectExpr("CAST(value AS STRING)")
  .as[String]
  .map(data => {
    val json_str: JSONObject = JSON.parseObject(data)
    val company_id: Integer = json_str.getInteger("company_id")
    val event: String = json_str.getString("event")
    val event_time: String = json_str.getString("event_time")
    val position_id: Integer = json_str.getInteger("position_id")
    val user_id: Integer = json_str.getInteger("user_id")
    (company_id, event, event_time, position_id, user_id)
  })
  .toDF("company_id", "event", "event_time", "position_id", "user_id")



value.createOrReplaceTempView("table")

spark.sql(
  """
    |select
    | company_id,
    | event,
    | to_timestamp(event_time,'yyyy-MM-dd HH:mm:ss') as event_time,
    | position_id,
    | user_id
    |from table
    |""".stripMargin)
  .writeStream
  .format("iceberg")
  .outputMode("append")
  .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
  .option("path","hive_catalog.icebergdb.kafkatest1") // tablePath: catalog.db.tableName
  .option("checkpointLocation","hdfspath")
  .start()
  .awaitTermination()

这个例子是从Kafka读取数据并将数据写入Iceberg表。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65222658

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档