我是非常新的火种和寻找下面的结果。
表A,其中只有员工姓名列表
#+---------------------------+
#| emp_names_list |
#+---------------------------+
#| [Sham,Kishor] |
#| [Bob,Alex] | 表B是一个单独的表,它有员工名称和emp id,但是所有的雇员名称和id不是在单列下,而是分布在多个列上。
#+-----------+----------+-----------+---------+----------+--------+
#| emp_name1 | emp_id_1 | emp_name2 |emp_id_2 |emp_name3 |emp_id_3|
#+-----------+----------+-----------+--------------------+--------+
#| Sham | 5 | Alex | 10 |Kishor | 11 |
#| Bob | 7 | | | | |输出所需,我正在查找表A中的新列,如下面所示--您可以看到每个键都有需要从表B获取/查找值的值,例如,对于密钥name_id_map Sham,可以从表B中搜索值5
#+-----------------------+----------------------+
#| emp_names_list | name_id_map |
#+-----------------------+-----+----------------+
#| [Sham,Kishor] | [Sham:5,Kishor:11] |
#| [Bob,Alex] | [Bob:7,Alex:10] | 我试图解释表A中的emp_name_list列表,并加入表B的每一列,但都没有成功。
发布于 2021-07-08 18:38:59
你可以试试下面的方法,让我知道它是否适合你。
下面假设您已经创建了一个临时视图,或者您可以访问spark会话中的table_a和table_b的表/视图。这可以通过
table_a_df.createOrReplaceTemporaryView('table_a')
table_b_df.createOrReplaceTemporaryView('table_b')如果有数据的话。
然后,您可以在spark会话上使用spark sql运行以下示例。例如
result_df = sparkSession.sql("<insert sql statement from below here>")因为我没有您的设置,所以我使用CTEs在table_b和table_a中创建一个可重复的示例。但是,在这个答案的末尾,我已经包含了没有这些CTE的sql。
我认为emp_name_list在table_a中是一个数组。我首先将table_b简化为name和id列,然后使用emp_data CTE在table_a上使用连接进行匹配,然后提取emp_name_list以及与emp_matches中匹配的name和id。
最后由emp_name_list选择组,并使用collect_list将名称-id对聚合到数组中。
WITH table_b AS (
SELECT
'Sham' as emp_name1,
5 as emp_id_1,
'Alex' as emp_name2,
10 as emp_id_2,
'Kishor' as emp_name3,
11 as emp_id_3
UNION ALL
SELECT
'Bob' as emp_name1,
7 as emp_id_1,
NULL as emp_name2,
NULL as emp_id_2,
NULL as emp_name3,
NULL as emp_id_3
),
table_a AS (
SELECT SPLIT(emp_names_list,',') as emp_names_list FROM (
SELECT 'Sham,Kishor' as emp_names_list UNION ALL
SELECT 'Bob,Alex' as emp_names_list
) t
),
emp_data AS (
SELECT e1.id, e1.name FROM (
SELECT emp_name1 as name, emp_id_1 as id FROM table_b UNION ALL
SELECT emp_name2 as name, emp_id_2 as id FROM table_b UNION ALL
SELECT emp_name3 as name, emp_id_3 as id FROM table_b
) e1
WHERE e1.name is not null or e1.id is not null
),
emp_matches AS (
SELECT DISTINCT
a1.emp_names_list,
CONCAT(e1.name,':',e1.id) as name_id
FROM
emp_data e1
INNER JOIN
table_a a1 ON array_contains(a1.emp_names_list,e1.name)
)
SELECT
emp_names_list,
collect_list(name_id) name_id_map
FROM
emp_matches
GROUP BY
emp_names_list如果name_id_map是数据类型映射,则尝试以下操作。下面的查询与上面的查询类似,但它使用其他函数将最终的name_id_map作为映射。
WITH table_b AS (
SELECT
'Sham' as emp_name1,
5 as emp_id_1,
'Alex' as emp_name2,
10 as emp_id_2,
'Kishor' as emp_name3,
11 as emp_id_3
UNION ALL
SELECT
'Bob' as emp_name1,
7 as emp_id_1,
NULL as emp_name2,
NULL as emp_id_2,
NULL as emp_name3,
NULL as emp_id_3
),
table_a AS (
SELECT SPLIT(emp_names_list,',') as emp_names_list FROM (
SELECT 'Sham,Kishor' as emp_names_list UNION ALL
SELECT 'Bob,Alex' as emp_names_list
) t
),
emp_data AS (
SELECT e1.id, e1.name FROM (
SELECT emp_name1 as name, emp_id_1 as id FROM table_b UNION ALL
SELECT emp_name2 as name, emp_id_2 as id FROM table_b UNION ALL
SELECT emp_name3 as name, emp_id_3 as id FROM table_b
) e1
WHERE e1.name is not null or e1.id is not null
),
emp_matches AS (
SELECT DISTINCT
a1.emp_names_list,
CONCAT(e1.name,':',e1.id) as name_id
FROM
emp_data e1
INNER JOIN
table_a a1 ON array_contains(a1.emp_names_list,e1.name)
)
SELECT
emp_names_list,
str_to_map(concat_ws(';',collect_list(name_id)),',',';') name_id_map
FROM
emp_matches
GROUP BY
emp_names_list没有table_b和table_a引用的实际spark sql
WITH emp_data AS (
SELECT e1.id, e1.name FROM (
SELECT emp_name1 as name, emp_id_1 as id FROM table_b UNION ALL
SELECT emp_name2 as name, emp_id_2 as id FROM table_b UNION ALL
SELECT emp_name3 as name, emp_id_3 as id FROM table_b
) e1
WHERE e1.name is not null or e1.id is not null
),
emp_matches AS (
SELECT DISTINCT
a1.emp_names_list,
CONCAT(e1.name,':',e1.id) as name_id
FROM
emp_data e1
INNER JOIN
table_a a1 ON array_contains(a1.emp_names_list,e1.name)
)
SELECT
emp_names_list,
str_to_map(concat_ws(';',collect_list(name_id)),';',',') name_id_map
FROM
emp_matches
GROUP BY
emp_names_list您可能需要在火花配置 spark.sql.crossJoin.enabled=true上启用该选项。
https://stackoverflow.com/questions/68298968
复制相似问题