我想给Parquet写一个这样的Dataframe:
| foo | bar |
|-----|-------------------|
| 1 | {"a": 1, "b": 10} |
| 2 | {"a": 2, "b": 20} |
| 3 | {"a": 3, "b": 30} |我是用潘达斯和拼花做的:
df = pd.DataFrame({
"foo": [1, 2, 3],
"bar": [{"a": 1, "b": 10}, {"a": 2, "b": 20}, {"a": 3, "b": 30}]
})
import fastparquet
fastparquet.write('/my/parquet/location/toy-fastparquet.parq', df)我希望在(py)Spark中加载Parquet,并使用Spark查询数据,例如:
df = spark.read.parquet("/my/parquet/location/")
df.registerTempTable('my_toy_table')
result = spark.sql("SELECT * FROM my_toy_table WHERE bar.b > 15")我的问题是,尽管fastparquet可以正确读取其Parquet文件( bar字段被正确地反序列化为结构),但在中被读取为String类型的列,该列仅包含原始结构的JSON表示形式:
In [2]: df.head()
Out[2]: Row(foo=1, bar='{"a": 1, "b": 10}')我试着用PyArrow写Parquet,但是没有运气:ArrowNotImplementedError: Level generation for Struct not supported yet。我也尝试过把file_scheme='hive'传给Fastparquet,但我也得到了同样的结果。将Fastparquet序列化更改为BSON (object_encoding='bson')会产生一个不可读的二进制字段。
编辑--我看到以下方法:
发布于 2020-02-14 18:12:50
在这里你至少有三个选择:
选项1:
您不需要使用像fastparquet这样的额外库,因为Spark已经提供了这样的功能:
pdf = pd.DataFrame({
"foo": [1, 2, 3],
"bar": [{"a": 1, "b": 10}, {"a": 2, "b": 20}, {"a": 3, "b": 30}]
})
df = spark.createDataFrame(pdf)
df.write.mode("overwrite").parquet("/tmp/parquet1")如果尝试用df = spark.read.parquet("/tmp/parquet1")加载数据,模式如下:
StructType([
StructField("foo", LongType(), True),
StructField("bar",MapType(StringType(), LongType(), True), True)])正如您在本例中所看到的,Spark将保留正确的架构。
备选方案2:
如果出于任何原因仍然需要使用fastparquet,那么bar将被视为string,因此您可以将bar作为字符串加载,然后使用json函数将其转换为JSON。在您的例子中,我们将把json处理为Map的字典(string,int)。这是为了我们自己的方便,因为数据似乎是一个键/值序列,可以由字典完美地表示:
from pyspark.sql.types import StringType, MapType,LongType
from pyspark.sql.functions import from_json
df = spark.read.parquet("/tmp/parquet1")
# schema should be a Map(string, string)
df.withColumn("bar", from_json("bar", MapType(StringType(), LongType()))).show()
# +---+-----------------+
# |foo| bar|
# +---+-----------------+
# | 1|[a -> 1, b -> 10]|
# | 2|[a -> 2, b -> 20]|
# | 3|[a -> 3, b -> 30]|
# +---+-----------------+选项3:
如果您的模式没有改变,并且您知道bar的每个值都有相同的字段组合(a,b),那么您也可以将bar转换为一个结构:
schema = StructType([
StructField("a", LongType(), True),
StructField("b", LongType(), True)
])
df = df.withColumn("bar", from_json("bar", schema))
df.printSchema()
# root
# |-- foo: long (nullable = true)
# |-- bar: struct (nullable = true)
# | |-- a: long (nullable = true)
# | |-- b: long (nullable = true)示例:
然后,您可以使用以下方法运行代码:
df.registerTempTable('my_toy_table')
spark.sql("SELECT * FROM my_toy_table WHERE bar.b > 20").show()
# or spark.sql("SELECT * FROM my_toy_table WHERE bar['b'] > 20")
# +---+-----------------+
# |foo| bar|
# +---+-----------------+
# | 3|[a -> 3, b -> 30]|
# +---+-----------------+https://stackoverflow.com/questions/60227123
复制相似问题