有办法从DataFrame的tempDir转储中创建RedShift吗?
我的用例是当作业失败时,我想重试,但是继续从转储到S3的临时数据转储,而不是再次从RedShift中重新获取数据集,这是非常大的!
加载代码执行以下操作:
val df1 = spark.read
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", spmeTable)
.option("tempdir", tempDir)
.option("user", jdbcUsername)
.option("password", jdbcPassword)
.option("forward_spark_s3_credentials", true)
.load();该作业稍后会失败,但我希望重新创建df1,而不需要再次从RedShift中获取任何内容。
有办法这样做吗?
在SparkSession下找到一个名为SparkSession的方法,不确定这是否是一种可能的解决方案.https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/SparkSession.html
更新#1
temp看起来类似于这里的目录结构,examples.html
我从S3打开了一个临时文件,它是管道分隔的。
edd66540-fa17-599b-9b22-7df29a5f9229|kNOCugU4wuKAUw7m2UXS7MfX|2018-11-27 19:48:44|POST|f|@NULL@|@NULL@|@NULL@|@NULL@|https://www.example.com/r/conversations/0grt6540-更新#2
根据这个https://github.com/databricks/spark-redshift/tree/master/tutorial
一旦将文件写入S3,就会使用自定义InputFormat InputFormat并行地使用这些文件。这个类类似于Hadoop的标准TextInputFormat类,其中键是文件中每一行开头的字节偏移量。然而,值类的类型是Array字符串。这些值是通过使用默认分隔符分隔行来创建的。RedshiftInputFormat逐行处理S3文件以生成RDD。然后将前面获得的模式应用到这个RDD上,以将字符串转换为正确的数据类型,并生成一个DataFrame。
除了跳过卸载之外,还知道怎么做吗?
发布于 2019-08-16 04:35:36
连接器默认情况下以avro格式(也可以用CSV,CSV GZIP格式倾倒。)转储数据。
还请注意,连接器不会自动清除临时位置(检查注意事项)。
我们可以通过指向tempdir来读取数据
val df = spark.read.format("avro").load(tempdir)https://stackoverflow.com/questions/57518372
复制相似问题