我正在尝试在Spark中执行最快的查找,作为一些练习滚动我自己的关联规则模块的一部分。请注意,我知道下面的指标,信心,在PySpark中是受支持的。这只是一个例子--另一个指标lift不受支持,但我打算使用这次讨论的结果来开发它。
作为计算规则置信度的一部分,我需要查看先行项和后果项一起出现的频率,以及先行项在整个事务集(在本例中为rdd)中出现的频率。
from itertools import combinations, chain
def powerset(iterable, no_empty=True):
''' Produce the powerset for a given iterable '''
s = list(iterable)
combos = (combinations(s, r) for r in range(len(s)+1))
powerset = chain.from_iterable(combos)
return (el for el in powerset if el) if no_empty else powerset
# Set-up transaction set
rdd = sc.parallelize(
[
('a',),
('a', 'b'),
('a', 'b'),
('b', 'c'),
('a', 'c'),
('a', 'b'),
('b', 'c'),
('c',),
('b'),
]
)
# Create an RDD with the counts of each
# possible itemset
counts = (
rdd
.flatMap(lambda x: powerset(x))
.map(lambda x: (x, 1))
.reduceByKey(lambda x, y: x + y)
.map(lambda x: (frozenset(x[0]), x[1]))
)
# Function to calculate confidence of a rule
confidence = lambda x: counts.lookup(frozenset(x)) / counts.lookup((frozenset(x[1]),))
confidence_result = (
rdd
# Must be applied to length-two and greater itemsets
.filter(lambda x: len(x) > 1)
.map(confidence)
)对于那些熟悉这种类型的查找问题的人,您将知道会引发这种类型的异常:
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.解决此异常的一种方法是将counts转换为字典:
counts = dict(counts.collect())
confidence = lambda x: (x, counts[frozenset(x)] / counts[frozenset(x[1])])
confidence_result = (
rdd
# Must be applied to length-two and greater itemsets
.filter(lambda x: len(x) > 1)
.map(confidence)
)这给了我我的结果。但是运行counts.collect的过程非常昂贵,因为实际上我有一个包含50m+记录的数据集。有没有更好的选择来执行这种类型的查找?
发布于 2018-08-20 18:07:31
如果您的目标指标可以在每个RDD分区上独立计算,然后组合起来达到目标结果,那么您可以在计算指标时使用mapPartitions而不是map。
泛型流程应该类似于:
metric_result = (
rdd
# apply your metric calculation independently on each partition
.mapPartitions(confidence_partial)
# collect results from the partitions into a single list of results
.collect()
# reduce the list to combine the metrics calculated on each partition
.reduce(confidence_combine)
)confidence_partial和confidence_combine都是接受迭代器/列表输入的常规python函数。
另外,通过使用dataframe API和本机表达式函数来计算指标,您可能会获得巨大的性能提升。
https://stackoverflow.com/questions/51923025
复制相似问题