我和从Facebook的Ads Insights中提取的“动作分解”数据一起工作
Facebook没有将action (购买的#)和action_value (购买的金额)放在同一栏中,所以我需要根据动作的标识符(在我的例子中是id#+设备类型)加入到我的终端。
如果每个操作只是它自己的行,那么使用SQL将它们连接起来当然很简单。但是在这种情况下,我需要在每一行中加入这两个结构。我想要做的事情相当于两个结构的LEFT JOIN,匹配在两列上。理想情况下,我可以单独使用SQL (而不是PySpark/Scala/etc)来完成这个任务。
到目前为止,我已经尝试过:
inline(),但一次只能使用一个“生成器”函数。map编写PySpark函数。但是,映射函数似乎只按索引而不是名称来标识列,如果以后这些列应该更改(可能是在使用第三方API时),这就显得很脆弱。SELECT on anonymous function)。如果这真的是最好的选择,我会努力争取这个许可。为了更好地说明:我的数据集中的每一行都有一个actions和action_values列,其数据如下所示。
actions = [
{
"action_device": "desktop",
"action_type": "offsite_conversion.custom.123",
"value": "1"
},
{
"action_device": "desktop", /* Same conversion ID; different device. */
"action_type": "offsite_conversion.custom.321",
"value": "1"
},
{
"action_device": "iphone", /* Same conversion ID; different device. */
"action_type": "offsite_conversion.custom.321",
"value": "2"
}
{
"action_device": "iphone", /* has "actions" but not "actions_values" */
"action_type": "offsite_conversion.custom.789",
"value": "1"
},
]
action_values = [
{
"action_device": "desktop",
"action_type": "offsite_conversion.custom.123",
"value": "49.99"
},
{
"action_device": "desktop",
"action_type": "offsite_conversion.custom.321",
"value": "19.99"
},
{
"action_device": "iphone",
"action_type": "offsite_conversion.custom.321",
"value": "99.99"
}
]我希望每一行在一个结构中都有两个数据点,如下所示:
my_desired_result = [
{
"action_device": "desktop",
"action_type": "offsite_conversion.custom.123",
"count": "1", /* This comes from the "action" struct */
"value": "49.99" /* This comes from the "action_values" struct */
},
{
"action_device": "desktop",
"action_type": "offsite_conversion.custom.321",
"count": "1",
"value": "19.99"
},
{
"action_device": "iphone",
"action_type": "offsite_conversion.custom.321",
"count": "2",
"value": "99.99"
},
{
"action_device": "iphone",
"action_type": "offsite_conversion.custom.789",
"count": "1",
"value": null /* NULL because there is no value for conversion#789 AND iphone */
}
]发布于 2020-05-07 17:32:27
IIUC,您可以尝试转换,然后使用过滤器通过匹配action_device和action_type从action_values查找第一个匹配项:
df.printSchema()
root
|-- action_values: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- action_device: string (nullable = true)
| | |-- action_type: string (nullable = true)
| | |-- value: string (nullable = true)
|-- actions: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- action_device: string (nullable = true)
| | |-- action_type: string (nullable = true)
| | |-- value: string (nullable = true)
df.createOrReplaceTempView("df_table")
spark.sql("""
SELECT
transform(actions, x -> named_struct(
'action_device', x.action_device,
'action_type', x.action_type,
'count', x.value,
'value', filter(action_values, y -> y.action_device = x.action_device AND y.action_type = x.action_type)[0].value
)) as result
FROM df_table
""").show(truncate=False)
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|result |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[[desktop, offsite_conversion.custom.123, 1, 49.99], [desktop, offsite_conversion.custom.321, 1, 19.99], [iphone, offsite_conversion.custom.321, 2, 99.99], [iphone, offsite_conversion.custom.789, 1,]]|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+更新:在完全连接的情况下,可以尝试以下:
spark.sql("""
SELECT
concat(
/* actions left join action_values with potentially multiple matched values */
flatten(
transform(actions, x ->
transform(
filter(action_values, y -> y.action_device = x.action_device AND y.action_type = x.action_type),
z -> named_struct(
'action_device', x.action_device,
'action_type', x.action_type,
'count', x.value,
'value', z.value
)
)
)
),
/* action_values missing from actions */
transform(
filter(action_values, x -> !exists(actions, y -> x.action_device = y.action_device AND x.action_type = y.action_type)),
z -> named_struct(
'action_device', z.action_device,
'action_type', z.action_type,
'count', NULL,
'value', z.value
)
)
) as result
FROM df_table
""").show(truncate=False)https://stackoverflow.com/questions/61662553
复制相似问题