我有两个不同的目录与ORC文件在他们下面。这两个文件有不同的模式。当将两个目录读取到同一个DataFrame中时,最终模式取决于路径的顺序。
请考虑以下代码来复制此代码:
data = [
(1, "player1", "google.com", True),
(2, "player1", "youtube.com", True),
(3, "player2", "facebook.com", True),
(4, "player2", "record.pt", True),
(5, "player2", "yahoo.com", True),
(6, "player3", "facebook.com", False),
(7, "player3", "record.pt", True),
(8, "player3", "yahoo.com", True),
(9, "player4", "", True),
(10, "player4", "record.pt", True),
(11, "player4", "abola.pt", True),
(12, "player4", None, True)
]
data2 = [
(13, "player1", True),
(14, "player2", True),
(15, "player3", True),
(16, "player4", True),
(17, "player3", True),
(18, "player3", True),
]
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame(data, ["id", "splayer", "website", "bool"])
df2 = spark.createDataFrame(data2, ["id", "splayer", "bool"])
df1.coalesce(1).write.orc('temporary/bla=1', mode='overwrite')
df2.coalesce(1).write.orc('temporary/bla=2', mode='overwrite')
df = spark.read.option("mergeSchema", "true").option("basePath", "temporary").orc(['temporary/bla=2', 'temporary/bla=1'])
df.show()这将产生输出:
+---+-------+-----+---+
| id|splayer| bool|bla|
+---+-------+-----+---+
| 1|player1| true| 1|
| 2|player1| true| 1|
| 3|player2| true| 1|
| 4|player2| true| 1|
| 5|player2| true| 1|
| 6|player3|false| 1|
| 7|player3| true| 1|
| 8|player3| true| 1|
| 9|player4| true| 1|
| 10|player4| true| 1|
| 11|player4| true| 1|
| 12|player4| true| 1|
| 13|player1| true| 2|
| 14|player2| true| 2|
| 15|player3| true| 2|
| 16|player4| true| 2|
| 17|player3| true| 2|
| 18|player3| true| 2|
+---+-------+-----+---+如果更改目录的顺序,将生成以下输出:
+---+-------+------------+-----+---+
| id|splayer| website| bool|bla|
+---+-------+------------+-----+---+
| 1|player1| google.com| true| 1|
| 2|player1| youtube.com| true| 1|
| 3|player2|facebook.com| true| 1|
| 4|player2| record.pt| true| 1|
| 5|player2| yahoo.com| true| 1|
| 6|player3|facebook.com|false| 1|
| 7|player3| record.pt| true| 1|
| 8|player3| yahoo.com| true| 1|
| 9|player4| | true| 1|
| 10|player4| record.pt| true| 1|
| 11|player4| abola.pt| true| 1|
| 12|player4| null| true| 1|
| 13|player1| null| true| 2|
| 14|player2| null| true| 2|
| 15|player3| null| true| 2|
| 16|player4| null| true| 2|
| 17|player3| null| true| 2|
| 18|player3| null| true| 2|
+---+-------+------------+-----+---+当我研究这个问题时,我发现有几个帖子说option("mergeSchema", "true")将是一个解决方案。事实上,这里有一个拉请求。
这是否有解决办法,还是仍是一个悬而未决的问题?
我正在使用(Py)Spark2.4.3和Python3.6.8。
提前谢谢你!
更新
上述PR只适用于星火3.0.0。谢谢你的信息@Shaido。
发布于 2020-06-11 14:16:23
由于某些供应商数据的模式演变,我遇到了同样的问题。我一直在尝试一些不同的想法,因为ORC mergeSchema选项在Spark3.0之前是不可用的,我们正在运行2.3,我的第一个想法是用我的完整模式创建一个空的数据格式,包括任何新的列,并将其保存为一个按字母顺序排列的目录。例如,如果我的数据是由load_date分区的,那么我将有文件夹,如load_date=00000000、load_date=20200501、load_date=20200601等。然后,我会将包含完整模式的空数据have放在00000000分区中。这是可行的,但它并不是那么干净,而且我也不确定是否存在这样一种边缘情况,即ORC阅读器不会以某种方式选择不同的ORC文件作为模式的基础。因此,我想,只要提供一个模式,我需要的所有科尔,我需要的ORC阅读器,这是可行的。
schema = StructType([StructField('state', StringType(), True), StructField('new_col_middle', StringType(), True), StructField('abbr', StringType(), False), StructField('population', IntegerType(), False), StructField('new_col2', StringType(), False)])
df = spark.read.schema(schema).orc('/data/sandbox/orc_schema_evolution/')在HDFS的orc_schema_evolution文件夹中,我们有分区的load_date文件夹,其中一些load_date文件具有模式('state‘、'population'),而其他文件的模式为('state’、'population‘、'abbr')。注意,我甚至能够用此方法重新排列现有列的顺序。
https://stackoverflow.com/questions/57960517
复制相似问题