首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在一行中将两个结构数组连接起来?

如何在一行中将两个结构数组连接起来?
EN

Stack Overflow用户
提问于 2020-05-07 16:10:20
回答 1查看 280关注 0票数 0

我和从Facebook的Ads Insights中提取的“动作分解”数据一起工作

Facebook没有将action (购买的#)和action_value (购买的金额)放在同一栏中,所以我需要根据动作的标识符(在我的例子中是id#+设备类型)加入到我的终端。

如果每个操作只是它自己的行,那么使用SQL将它们连接起来当然很简单。但是在这种情况下,我需要在每一行中加入这两个结构。我想要做的事情相当于两个结构的LEFT JOIN,匹配在两列上。理想情况下,我可以单独使用SQL (而不是PySpark/Scala/etc)来完成这个任务。

到目前为止,我已经尝试过:

  • 生成器。这给了我自己行上的每个操作,但是由于原始dataset中的父行没有唯一的标识符,所以没有一种方法可以在每一行的基础上连接这些结构。也尝试在两列上使用inline(),但一次只能使用一个“生成器”函数。
  • 使用函数组合它们。但这不起作用,因为顺序不总是一样的,有时他们没有相同的钥匙。
  • 我考虑用map编写PySpark函数。但是,映射函数似乎只按索引而不是名称来标识列,如果以后这些列应该更改(可能是在使用第三方API时),这就显得很脆弱。
  • 我考虑过编写一个PySpark UDF,这似乎是最好的选择,但需要一个我没有的权限(SELECT on anonymous function)。如果这真的是最好的选择,我会努力争取这个许可。

为了更好地说明:我的数据集中的每一行都有一个actionsaction_values列,其数据如下所示。

代码语言:javascript
复制
actions = [
  {
    "action_device": "desktop",
    "action_type": "offsite_conversion.custom.123",
    "value": "1"
  },
  {
    "action_device": "desktop", /* Same conversion ID; different device. */
    "action_type": "offsite_conversion.custom.321",
    "value": "1"
  },
  {
    "action_device": "iphone", /* Same conversion ID; different device. */
    "action_type": "offsite_conversion.custom.321",
    "value": "2"
  }
  {
    "action_device": "iphone", /* has "actions" but not "actions_values" */
    "action_type": "offsite_conversion.custom.789",
    "value": "1"
  },
]
action_values = [
  {
    "action_device": "desktop",
    "action_type": "offsite_conversion.custom.123",
    "value": "49.99"
  },
  {
    "action_device": "desktop",
    "action_type": "offsite_conversion.custom.321",
    "value": "19.99"
  },
  {
    "action_device": "iphone",
    "action_type": "offsite_conversion.custom.321",
    "value": "99.99"
  }
]

我希望每一行在一个结构中都有两个数据点,如下所示:

代码语言:javascript
复制
my_desired_result = [
  {
    "action_device": "desktop",
    "action_type": "offsite_conversion.custom.123",
    "count": "1", /* This comes from the "action" struct */
    "value": "49.99" /* This comes from the "action_values" struct */
  },
  {
    "action_device": "desktop",
    "action_type": "offsite_conversion.custom.321",
    "count": "1",
    "value": "19.99"
  },
  {
    "action_device": "iphone",
    "action_type": "offsite_conversion.custom.321",
    "count": "2",
    "value": "99.99"
  },
  {
    "action_device": "iphone",
    "action_type": "offsite_conversion.custom.789",
    "count": "1",
    "value": null /* NULL because there is no value for conversion#789 AND iphone */
  }
]
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-05-07 17:32:27

IIUC,您可以尝试转换,然后使用过滤器通过匹配action_device和action_type从action_values查找第一个匹配项:

代码语言:javascript
复制
df.printSchema()
root
 |-- action_values: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- action_device: string (nullable = true)
 |    |    |-- action_type: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- actions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- action_device: string (nullable = true)
 |    |    |-- action_type: string (nullable = true)
 |    |    |-- value: string (nullable = true)

df.createOrReplaceTempView("df_table")

spark.sql("""

  SELECT       
    transform(actions, x -> named_struct(
      'action_device', x.action_device,
      'action_type', x.action_type,
      'count', x.value,
      'value', filter(action_values, y -> y.action_device = x.action_device AND y.action_type = x.action_type)[0].value
    )) as result
  FROM df_table

""").show(truncate=False)
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|result                                                                                                                                                                                                  |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[[desktop, offsite_conversion.custom.123, 1, 49.99], [desktop, offsite_conversion.custom.321, 1, 19.99], [iphone, offsite_conversion.custom.321, 2, 99.99], [iphone, offsite_conversion.custom.789, 1,]]|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

更新:在完全连接的情况下,可以尝试以下

代码语言:javascript
复制
spark.sql("""

  SELECT

  concat(
    /* actions left join action_values with potentially multiple matched values */
    flatten(
      transform(actions, x ->
        transform(
          filter(action_values, y -> y.action_device = x.action_device AND y.action_type = x.action_type),
          z -> named_struct(
            'action_device', x.action_device,
            'action_type', x.action_type,
            'count', x.value,
            'value', z.value
          )
        )
      )
    ),
    /* action_values missing from actions */
    transform(
      filter(action_values, x -> !exists(actions, y -> x.action_device = y.action_device AND x.action_type = y.action_type)),
      z -> named_struct(
        'action_device', z.action_device,
        'action_type', z.action_type,
        'count', NULL,
        'value', z.value
      )
    )
  ) as result

  FROM df_table

""").show(truncate=False)
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61662553

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档