我正在尝试使用DataFrame.hint()方法向我的联接中添加一个范围连接提示。
我有两张桌子:minutes和events。
minutes表中有minute_start和minute_end列,它们是自固定时间以来的秒数。当然,它们的价值是60倍。
events表具有类似的event_start和event_end列,仅用于事件。事件可以在任何时刻开始和结束。
对于每一个事件,我需要找到它重叠的所有分钟。
我正在Databricks上尝试这一点(运行时5.1,Python3.5):
# from pyspark.sql.types import StructType, StructField, IntegerType
# minutes = spark.sparkContext\
# .parallelize(((0, 60),
# (60, 120)))\
# .toDF(StructType([
# StructField('minute_start', IntegerType()),
# StructField('minute_end', IntegerType())
# ]))
# events = spark.sparkContext\
# .parallelize(((12, 33),
# (0, 120),
# (33, 72),
# (65, 178)))\
# .toDF(StructType([
# StructField('event_start', IntegerType()),
# StructField('event_end', IntegerType())
# ]))
events.hint("range_join", "60")\
.join(minutes,
on=[events.event_start < minutes.minute_end,
minutes.minute_start < events.event_end])\
.orderBy(events.event_start,
events.event_end,
minutes.minute_start)\
.show()如果没有hint调用,结果将如预期的那样:
+-----------+---------+------------+----------+
|event_start|event_end|minute_start|minute_end|
+-----------+---------+------------+----------+
| 0| 120| 0| 60|
| 0| 120| 60| 120|
| 12| 33| 0| 60|
| 33| 72| 0| 60|
| 33| 72| 60| 120|
| 65| 178| 60| 120|
+-----------+---------+------------+----------+对于hint,我得到了例外:
AnalysisException: 'Range join hint: invalid arguments Buffer(60);'当我尝试将提示中的60传递为数字而不是字符串时,它抱怨说,提示的参数必须是字符串。
我是不在Azure上,但我希望结果会是一样的。
有没有人遇到过类似的问题,找到了解决方案,或者知道我在哪里犯了错误?
更新1
(目前,我正在DatabricksRuntime6.1、Python3.7.3、Spark2.4.4上试用它)
我想我错过了这些参数应该是可迭代的,所以我再试一次,用events.hint("range_join", [60])。关于论点不是字符串的同样的抱怨:TypeError: all parameters should be str, got 60 of type <class 'int'>。
我想知道Databricks的Spark版本是否已经过时了。
这是在星火GitHub上的源代码
def hint(self, name, *parameters):
... (no checks on `parameters` up to here)
allowed_types = (basestring, list, float, int)
for p in parameters:
if not isinstance(p, allowed_types):
raise TypeError(
"all parameters should be in {0}, got {1} of type {2}".format(
allowed_types, p, type(p)))
... (no checks beyond this point)因此,应该允许一个int的列表。
我得到的是all parameters should be str,但是如果传递错误类型的参数,GitHub版本会说all parameters should be in (basestring, list, float, int)。
更新2
hint("skew", "col_name")似乎起作用了。
发布于 2019-12-13 01:11:08
我在GitHub上查看了Spark源代码。
版本2.4.4有以下内容:
def hint(self, name, *parameters):
... # no checks on `parameters` up to here
for p in parameters:
if not isinstance(p, str):
raise TypeError(
"all parameters should be str, got {0} of type {1}".format(p, type(p)))
... # no checks beyond here但是从版本3.0.0-预览-Rc1开始,消息来源是这样的:
def hint(self, name, *parameters):
... # no checks on `parameters` up to here
allowed_types = (basestring, list, float, int)
for p in parameters:
if not isinstance(p, allowed_types):
raise TypeError(
"all parameters should be in {0}, got {1} of type {2}".format(
allowed_types, p, type(p)))
... # no checks beyond here因此,看起来版本2.4.4有一个错误,在3.0.0-预览-Rc1开始的版本中已经修复了这个错误。
https://stackoverflow.com/questions/56242674
复制相似问题