我有一个数据框架看起来
+-------+-------+
| Code1 | Code2 |
+-------+-------+
| A | 1 |
| B | 1 |
| A | 2 |
| B | 2 |
| C | 2 |
| D | 2 |
| D | 3 |
| F | 3 |
| G | 3 |
+-------+-------+然后,我想应用一组独特的过滤器,如下所示:
应用筛选器的结果应该是如下所示的数据框架:
+-------+-------+----------+
| Code1 | Code2 | Scenario |
+-------+-------+----------+
| A | 1 | 1 |
| B | 1 | 1 |
| A | 2 | 1 |
| B | 2 | 1 |
| A | 2 | 2 |
| D | 2 | 2 |
| D | 3 | 2 |
| A | 2 | 3 |
| B | 2 | 3 |
| C | 2 | 3 |
| D | 2 | 3 |
+-------+-------+----------+问题:通过python使用火花最有效的方法是什么?
我是新来的火花,所以我真的要求从一个概念层面,不需要一个明确的解决方案。我的目标是实现尽可能多的并行操作。我的实际示例涉及使用一个初始数据框架,其中38列的大小为100 My,以几个GB作为csv文件,而且我通常最多有100-150个场景。
该解决方案的最初设计是按顺序处理每个场景筛选器,并将结果过滤后的数据帧合并在一起,但我觉得这否定了使用spark的全部意义。
编辑__:是吗?对于每一种场景,我都会过滤和合并,这两者都是转换(懒散eval)。最终的执行计划是否足够聪明,能够自动并行化多个唯一过滤器?
有没有一种方法可以并行地应用过滤器,例如,在应用过滤器2和3的同时应用场景过滤器1?我们是否需要“炸毁”初始数据帧N次,其中N=#场景过滤器,在新的数据框架中追加一个场景#列,并应用一个类似于以下内容的大型过滤器:
WHERE (Scenario = 1 AND Code1 IN (A,B)) OR
(Scenario = 2 AND Code1 IN (A,D) AND Code2 IN (2,3)) OR
(Scenario = 3 AND Code2 = 2)如果这最终是最有效的方式,它不也取决于“炸毁”的数据帧需要多少内存吗?如果“炸毁”的数据帧占用的内存比我的集群占用的内存更多,那么我是否必须处理内存中所能容纳的尽可能多的场景呢?
发布于 2019-09-20 17:01:26
您可以同时应用所有过滤器:
data.withColumn("scenario",
when('code1.isin("A", "B"), 1).otherwise(
when('code1.isin("A", "D") && 'code2.isin("2","3"), 2).otherwise(
when('code2==="2",3)
)
)
).show()但是还有一个问题,例如,值(A,2)可能出现在所有场景1、2、3中。在这种情况下,您可以尝试这样的方法:
data.withColumn("s1", when('code1.isin("A", "B"), 1).otherwise(0))
.withColumn("s2",when('code1.isin("A", "D") && 'code2.isin("2","3"), 1).otherwise(0))
.withColumn("s3",when('code2==="2",1).otherwise(0))
.show()产出:
+-----+-----+---+---+---+
|code1|code2| s1| s2| s3|
+-----+-----+---+---+---+
| A| 1| 1| 0| 0|
| B| 1| 1| 0| 0|
| A| 2| 1| 1| 1|
| B| 2| 1| 0| 1|
| A| 2| 1| 1| 1|
| D| 2| 0| 1| 1|
| D| 3| 0| 1| 0|
| A| 2| 1| 1| 1|
| B| 2| 1| 0| 1|
| C| 2| 0| 0| 1|
| D| 2| 0| 1| 1|
+-----+-----+---+---+---+发布于 2019-09-23 13:45:00
在编辑我的问题时,我质疑懒惰的评估是否是解决问题的关键。在对Spark进行了一些研究之后,我得出的结论是,即使我最初的解决方案看起来是对每个场景依次应用转换(过滤器然后合并),但一旦调用了一个操作(例如,dataframe.count()),它实际上是同时应用所有转换。截图表示来自dataframe.count()作业的转换阶段的事件时间线。
该作业包括96个场景,每个场景在原始数据帧上都有一个唯一的过滤器。您可以看到,我的本地机器同时运行8个任务,其中每个任务表示来自其中一个场景的筛选器。
总之,一旦对生成的dataframe调用了一个操作,Spark就会对过滤器进行优化,以并行运行。
https://stackoverflow.com/questions/58032226
复制相似问题