首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark RDD:从其他RDD查找

Spark RDD:从其他RDD查找
EN

Stack Overflow用户
提问于 2018-08-20 09:04:37
回答 1查看 465关注 0票数 2

我正在尝试在Spark中执行最快的查找,作为一些练习滚动我自己的关联规则模块的一部分。请注意,我知道下面的指标,信心,在PySpark中是受支持的。这只是一个例子--另一个指标lift不受支持,但我打算使用这次讨论的结果来开发它。

作为计算规则置信度的一部分,我需要查看先行项和后果项一起出现的频率,以及先行项在整个事务集(在本例中为rdd)中出现的频率。

代码语言:javascript
复制
from itertools import combinations, chain

def powerset(iterable, no_empty=True):
    ''' Produce the powerset for a given iterable '''
    s = list(iterable)
    combos = (combinations(s, r) for r in range(len(s)+1))
    powerset = chain.from_iterable(combos)
    return (el for el in powerset if el) if no_empty else powerset

# Set-up transaction set
rdd = sc.parallelize(
    [
        ('a',),
        ('a', 'b'),
        ('a', 'b'),
        ('b', 'c'),
        ('a', 'c'),
        ('a', 'b'),
        ('b', 'c'),
        ('c',),
        ('b'),
    ]
)

# Create an RDD with the counts of each
# possible itemset
counts = (
    rdd
    .flatMap(lambda x: powerset(x))
    .map(lambda x: (x, 1))
    .reduceByKey(lambda x, y: x + y)
    .map(lambda x: (frozenset(x[0]), x[1]))
)

# Function to calculate confidence of a rule
confidence = lambda x: counts.lookup(frozenset(x)) / counts.lookup((frozenset(x[1]),))

confidence_result = (
    rdd
    # Must be applied to length-two and greater itemsets
    .filter(lambda x: len(x) > 1)
    .map(confidence)
)

对于那些熟悉这种类型的查找问题的人,您将知道会引发这种类型的异常:

代码语言:javascript
复制
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

解决此异常的一种方法是将counts转换为字典:

代码语言:javascript
复制
counts = dict(counts.collect())

confidence = lambda x: (x, counts[frozenset(x)] / counts[frozenset(x[1])])

confidence_result = (
    rdd
    # Must be applied to length-two and greater itemsets
    .filter(lambda x: len(x) > 1)
    .map(confidence)
)

这给了我我的结果。但是运行counts.collect的过程非常昂贵,因为实际上我有一个包含50m+记录的数据集。有没有更好的选择来执行这种类型的查找?

EN

回答 1

Stack Overflow用户

发布于 2018-08-20 18:07:31

如果您的目标指标可以在每个RDD分区上独立计算,然后组合起来达到目标结果,那么您可以在计算指标时使用mapPartitions而不是map

泛型流程应该类似于:

代码语言:javascript
复制
metric_result = (
    rdd
    # apply your metric calculation independently on each partition       
    .mapPartitions(confidence_partial) 
    # collect results from the partitions into a single list of results
    .collect()
    # reduce the list to combine the metrics calculated on each partition
    .reduce(confidence_combine)
)

confidence_partialconfidence_combine都是接受迭代器/列表输入的常规python函数。

另外,通过使用dataframe API和本机表达式函数来计算指标,您可能会获得巨大的性能提升。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51923025

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档