我有一个dataframe,它有两个列a和b,其中b列中的值是a列中值的a子集。例如:
df
+---+---+
| a| b|
+---+---+
| 1| 2|
| 1| 3|
| 2| 1|
| 3| 2|
+---+---+我想生成一个包含a和anti_b列的数据格式,其中anti_b列中的值是来自a列的任何值,这样a!=anti_b和行(a,anti_b)就不会出现在原始的数据格式中。因此,在上面的数据中,结果应该是:
anti df
+---+------+
| a|anti_b|
+---+------+
| 3| 1|
| 2| 3|
+---+------+这可以通过一个crossJoin和对array_contains的调用来实现,但是它非常慢而且效率很低。有没有人知道一个更好的火花成语来完成这个任务,比如anti_join**?**
下面是使用一个小的dataframe的低效示例,这样您就可以看到我想要的是什么:
df = spark.createDataFrame(pandas.DataFrame(numpy.array(
[[1,2],[1,3],[2,1],[3,2]]),columns=['a','b']))
crossed_df = df.select('a').withColumnRenamed('a','_a').distinct().crossJoin(df.select('a').withColumnRenamed('a','anti_b').distinct()).where(pyspark.sql.functions.col('_a')!=pyspark.sql.functions.col('anti_b'))
anti_df = df.groupBy(
'a'
).agg(
pyspark.sql.functions.collect_list('b').alias('bs')
).join(
crossed_df,
on=((pyspark.sql.functions.col('a')==pyspark.sql.functions.col('_a'))&(~pyspark.sql.functions.expr('array_contains(bs,anti_b)'))),
how='inner'
).select(
'a','anti_b'
)
print('df')
df.show()
print('anti df')
anti_df.show()编辑:也能工作,但速度并不快:
df = spark.createDataFrame(pandas.DataFrame(numpy.array(
[[1,2],[1,3],[2,1],[3,2]]),columns=['a','b']))
crossed_df = df.select('a').distinct().crossJoin(df.select('a').withColumnRenamed('a','b').distinct()).where(pyspark.sql.functions.col('a')!=pyspark.sql.functions.col('b'))
anti_df = crossed_df.join(
df,
on=['a','b'],
how='left_anti'
)发布于 2019-11-18 19:00:13
这应该比你拥有的更好:
from pyspark.sql.functions import collect_set, expr
anti_df = df.groupBy("a").agg(collect_set("b").alias("bs")).alias("l")\
.join(df.alias("r"), on=expr("NOT array_contains(l.bs, r.b)"))\
.where("l.a != r.b")\
.selectExpr("l.a", "r.b AS anti_b")\
anti_df.show()
#+---+------+
#| a|anti_b|
#+---+------+
#| 3| 1|
#| 2| 3|
#+---+------+如果您将此执行计划与您的方法进行比较,您会发现这更好(因为您可以将distinct替换为collect_set),但它仍然有一个笛卡尔产品。
anti_df.explain()
#== Physical Plan ==
#*(3) Project [a#0, b#294 AS anti_b#308]
#+- CartesianProduct (NOT (a#0 = b#294) && NOT array_contains(bs#288, b#294))
# :- *(1) Filter isnotnull(a#0)
# : +- ObjectHashAggregate(keys=[a#0], functions=[collect_set(b#1, 0, 0)])
# : +- Exchange hashpartitioning(a#0, 200)
# : +- ObjectHashAggregate(keys=[a#0], functions=[partial_collect_set(b#1, 0, 0)])
# : +- Scan ExistingRDD[a#0,b#1]
# +- *(2) Project [b#294]
# +- *(2) Filter isnotnull(b#294)
# +- Scan ExistingRDD[a#293,b#294]然而,如果没有更多的信息,我不认为有任何方法可以避免笛卡尔积对这个特定的问题。
https://stackoverflow.com/questions/58920754
复制相似问题