我有以下Apache Spark SQL连接谓词:
t1.field1 = t2.field1 and t2.start_date <= t1.event_date and t1.event_date < t2.end_date数据:
t1 DataFrame have over 50 millions rows
t2 DataFrame have over 2 millions rowst1 DataFrame中几乎所有的t1.field1字段都有相同的值(null)。
现在,Spark集群在单个任务上挂起超过10分钟,以便执行此连接,并且由于数据不对称。此时,只有一个worker和此worker上的一个任务在工作。所有其他9个工人都是空闲的。如何改进这个连接,以便将负载从这个特定的任务分配到整个Spark集群?
发布于 2019-04-02 02:34:14
我假设你在做内部连接。
可以遵循以下步骤来优化join - 1。在加入之前,我们可以根据最小或最大的start_date、event_date、end_date过滤出t1和t2。它将减少行数。
发布于 2021-08-01 07:52:14
如果t1中的几乎所有行都有t1.field1 = null,并且event_date行是数字(或者将其转换为时间戳),则可以首先使用Apache DataFu进行范围连接,然后过滤掉t1.field1 != t2.field1的行。
范围连接代码将如下所示:
t1.joinWithRange("event_date", t2, "start_date", "end_date", 10)最后一个参数- 10 -是递减因子。正如Raphael Roth在他的回答中所建议的那样,这就是bucketing。
您可以在the blog post introducing DataFu-Spark中看到这样一个范围连接的示例。
完全公开--我是DataFu的一员,并撰写了这篇博文。
发布于 2019-04-02 14:47:51
我假设spark已经在t1.field1上推送了非空过滤器,您可以在explain-plan中验证这一点。
我更愿意尝试创建一个额外的属性,它可以用作等联接条件,例如通过bucketing。例如,您可以创建一个month属性。为此,您需要在t2中枚举months,这通常是使用UDF完成的。参见这个SO-问题的示例:How to improve broadcast Join speed with between condition in Spark
https://stackoverflow.com/questions/55460923
复制相似问题