首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >PySpark马尔可夫模型的算法/编码帮助

PySpark马尔可夫模型的算法/编码帮助
EN

Stack Overflow用户
提问于 2015-09-17 23:39:09
回答 1查看 1.6K关注 0票数 5

我需要一些帮助,让我的大脑围绕设计一个(有效的)马尔可夫链在火花(通过python)。我已经尽我所能写好了,但是我想出的代码不适合..基本上,对于不同的地图阶段,我编写了自定义函数,它们可以很好地工作在几千个序列上,但是当我们进入20,000+ (我有一些高达800 K的东西)时,爬行速度很慢。

对于那些不熟悉马尔可夫模型的人来说,这就是它的要点。

这是我的数据。此时,我已经在RDD中获得了实际数据(没有标头)。

代码语言:javascript
复制
ID, SEQ
500, HNL, LNH, MLH, HML

我们用元组来研究序列,所以

代码语言:javascript
复制
(HNL, LNH), (LNH,MLH), etc..

我要讲到这一点..。在这里,我返回一个字典(针对每一行数据),然后序列化并存储在内存数据库中。

代码语言:javascript
复制
{500:
    {HNLLNH : 0.333},
    {LNHMLH : 0.333},
    {MLHHML : 0.333},
    {LNHHNL : 0.000},
    etc..
}

因此,从本质上说,每个序列与下一个序列(HNL,LNH变成'HNLLNH')结合在一起,然后对所有可能的跃迁(序列组合)进行计数,然后除以总跃迁数(在这种情况下为3),得到它们的出现频率。

上面有3个跃迁,其中一个是HNLLNH。所以对于HNLLNH,1/3 = 0.333

我不确定它是否相关,但是序列中每个位置的值都是有限的。第1位(H/M/ L),第2位(M/L),第3位(H,M,L)。

我的代码之前所做的就是收集() rdd,并使用我编写的函数映射它几次。这些函数首先将字符串转换为列表,然后将list1与list2合并,然后将list2与list3合并,然后将list3与list4合并,等等。所以我最终得到了这样的..。

代码语言:javascript
复制
[HNLLNH],[LNHMLH],[MHLHML], etc..

然后,下一个函数从该列表中创建一个字典,使用列表项作为键,然后在完整列表中计算该键的总占用率,除以len( list )以获得频率。然后,我将该字典与它的ID号一起包装在另一个字典中(结果是第二个代码块,上面的代码块)。

就像我说的,这对于小的ish序列很有效,但是对于长度为100k+的列表就不是那么好了。

另外,请记住,这只是一行数据。我必须对10-20k行的数据执行此操作,每行的数据行长度为500-80万个。

任何关于我如何编写吡咯烷酮代码的建议(使用API映射/还原/agg/等等)。(功能)有效地完成这个任务?

编辑代码如下。从底部开始也许是有意义的。请记住,我正在学习这个(Python和Spark),我不是靠这个谋生的,所以我的编码标准不是很好。

代码语言:javascript
复制
def f(x):
    # Custom RDD map function
    # Combines two separate transactions
    # into a single transition state

    cust_id = x[0]
    trans = ','.join(x[1])
    y = trans.split(",")
    s = ''
    for i in range(len(y)-1):
        s= s + str(y[i] + str(y[i+1]))+","
    return str(cust_id+','+s[:-1])

def g(x):
    # Custom RDD map function
    # Calculates the transition state probabilities
    # by adding up state-transition occurrences
    # and dividing by total transitions
    cust_id=str(x.split(",")[0])
    trans = x.split(",")[1:]
    temp_list=[]
    middle = int((len(trans[0])+1)/2)
    for i in trans:
        temp_list.append( (''.join(i)[:middle], ''.join(i)[middle:]) )

    state_trans = {}
    for i in temp_list:
            state_trans[i] = temp_list.count(i)/(len(temp_list))

    my_dict = {}
    my_dict[cust_id]=state_trans
    return my_dict


def gen_tsm_dict_spark(lines):
    # Takes RDD/string input with format CUST_ID(or)PROFILE_ID,SEQ,SEQ,SEQ....
    # Returns RDD of dict with CUST_ID and tsm per customer
    #  i.e.  {cust_id : { ('NLN', 'LNN') : 0.33, ('HPN', 'NPN') : 0.66}

    # creates a tuple ([cust/profile_id], [SEQ,SEQ,SEQ])
    cust_trans = lines.map(lambda s: (s.split(",")[0],s.split(",")[1:]))

    with_seq = cust_trans.map(f)

    full_tsm_dict = with_seq.map(g)

    return full_tsm_dict


def main():
result = gen_tsm_spark(my_rdd)

# Insert into DB
for x in result.collect():
    for k,v in x.iteritems():
         db_insert(k,v)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-09-18 09:08:40

你可以试试下面这样的方法。它在很大程度上依赖于tooolz,但是如果您希望避免外部依赖,您可以轻松地用一些标准的Python库来替换它。

代码语言:javascript
复制
from __future__ import division
from collections import Counter
from itertools import product
from toolz.curried import sliding_window, map, pipe, concat
from toolz.dicttoolz import merge

# Generate all possible transitions 
defaults = sc.broadcast(dict(map(
    lambda x: ("".join(concat(x)), 0.0), 
    product(product("HNL", "NL", "HNL"), repeat=2))))

rdd = sc.parallelize(["500, HNL, LNH, NLH, HNL", "600, HNN, NNN, NNN, HNN, LNH"])

def process(line):
    """
    >>> process("000, HHH, LLL, NNN")
    ('000', {'LLLNNN': 0.5, 'HHHLLL': 0.5})
    """
    bits = line.split(", ")
    transactions = bits[1:]
    n = len(transactions) - 1
    frequencies = pipe(
        sliding_window(2, transactions), # Get all transitions
        map(lambda p: "".join(p)), # Joins strings
        Counter, # Count 
        lambda cnt: {k: v / n for (k, v) in cnt.items()} # Get frequencies
    )
    return bits[0], frequencies

def store_partition(iter):
    for (k, v) in iter:
        db_insert(k, merge([defaults.value, v]))

rdd.map(process).foreachPartition(store_partition)

由于您知道所有可能的转换,我建议使用稀疏表示,忽略零。此外,您可以用稀疏向量替换字典,以减少内存占用。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/32641643

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档