首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >具有Struct列类型的读/写部分

具有Struct列类型的读/写部分
EN

Stack Overflow用户
提问于 2020-02-14 13:17:42
回答 1查看 11.7K关注 0票数 4

我想给Parquet写一个这样的Dataframe:

代码语言:javascript
复制
| foo | bar               |
|-----|-------------------|
|  1  | {"a": 1, "b": 10} |
|  2  | {"a": 2, "b": 20} |
|  3  | {"a": 3, "b": 30} |

我是用潘达斯和拼花做的:

代码语言:javascript
复制
df = pd.DataFrame({
    "foo": [1, 2, 3],
    "bar": [{"a": 1, "b": 10}, {"a": 2, "b": 20}, {"a": 3, "b": 30}]
})

import fastparquet
fastparquet.write('/my/parquet/location/toy-fastparquet.parq', df)

我希望在(py)Spark中加载Parquet,并使用Spark查询数据,例如:

代码语言:javascript
复制
df = spark.read.parquet("/my/parquet/location/")
df.registerTempTable('my_toy_table')
result = spark.sql("SELECT * FROM my_toy_table WHERE bar.b > 15")

我的问题是,尽管fastparquet可以正确读取其Parquet文件( bar字段被正确地反序列化为结构),但中被读取为String类型的列,该列仅包含原始结构的JSON表示形式

代码语言:javascript
复制
In [2]: df.head()                                                                                                                                                                                           
Out[2]: Row(foo=1, bar='{"a": 1, "b": 10}')

我试着用PyArrow写Parquet,但是没有运气:ArrowNotImplementedError: Level generation for Struct not supported yet。我也尝试过把file_scheme='hive'传给Fastparquet,但我也得到了同样的结果。将Fastparquet序列化更改为BSON (object_encoding='bson')会产生一个不可读的二进制字段。

编辑--我看到以下方法:

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-02-14 18:12:50

在这里你至少有三个选择:

选项1:

您不需要使用像fastparquet这样的额外库,因为Spark已经提供了这样的功能:

代码语言:javascript
复制
pdf = pd.DataFrame({
    "foo": [1, 2, 3],
    "bar": [{"a": 1, "b": 10}, {"a": 2, "b": 20}, {"a": 3, "b": 30}]
})

df = spark.createDataFrame(pdf)
df.write.mode("overwrite").parquet("/tmp/parquet1")

如果尝试用df = spark.read.parquet("/tmp/parquet1")加载数据,模式如下:

代码语言:javascript
复制
StructType([ 
            StructField("foo", LongType(), True),
            StructField("bar",MapType(StringType(), LongType(), True), True)])

正如您在本例中所看到的,Spark将保留正确的架构。

备选方案2:

如果出于任何原因仍然需要使用fastparquet,那么bar将被视为string,因此您可以将bar作为字符串加载,然后使用json函数将其转换为JSON。在您的例子中,我们将把json处理为Map的字典(string,int)。这是为了我们自己的方便,因为数据似乎是一个键/值序列,可以由字典完美地表示:

代码语言:javascript
复制
from pyspark.sql.types import StringType, MapType,LongType
from pyspark.sql.functions import from_json

df = spark.read.parquet("/tmp/parquet1")

# schema should be a Map(string, string) 
df.withColumn("bar", from_json("bar", MapType(StringType(), LongType()))).show()

# +---+-----------------+
# |foo|              bar|
# +---+-----------------+
# |  1|[a -> 1, b -> 10]|
# |  2|[a -> 2, b -> 20]|
# |  3|[a -> 3, b -> 30]|
# +---+-----------------+

选项3:

如果您的模式没有改变,并且您知道bar的每个值都有相同的字段组合(a,b),那么您也可以将bar转换为一个结构:

代码语言:javascript
复制
schema = StructType([ 
                    StructField("a", LongType(), True),
                    StructField("b", LongType(), True)
            ])

df = df.withColumn("bar", from_json("bar", schema))

df.printSchema()

# root
#  |-- foo: long (nullable = true)
#  |-- bar: struct (nullable = true)
#  |    |-- a: long (nullable = true)
#  |    |-- b: long (nullable = true)

示例:

然后,您可以使用以下方法运行代码:

代码语言:javascript
复制
df.registerTempTable('my_toy_table')

spark.sql("SELECT * FROM my_toy_table WHERE bar.b > 20").show()
# or spark.sql("SELECT * FROM my_toy_table WHERE bar['b'] > 20")

# +---+-----------------+
# |foo|              bar|
# +---+-----------------+
# |  3|[a -> 3, b -> 30]|
# +---+-----------------+
票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60227123

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档