首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >链接迪斯科中的作业(MapReduce)

链接迪斯科中的作业(MapReduce)
EN

Stack Overflow用户
提问于 2013-07-09 04:33:31
回答 1查看 515关注 0票数 0

我想修改高级教程的innerjoin-example,这样它就可以使用mapreduce进行稀疏矩阵乘法(由Ullman描述)。因此,我需要第二个map-reduce步骤,对结果矩阵中相同位置的值求和。

不幸的是,我没有设法将类CsvInnerJoin的第一个reduce函数的输出放到SumJob的map函数中。

代码语言:javascript
复制
import sys
sys.path.append("/home/damian/disco/lib/")
from disco.core import Job, result_iterator
from disco.worker.classic.func import chain_reader
import csv, sys


if __name__ == '__main__':
    input_filename = "input.csv"
    output_filename = "output.csv"
    if len(sys.argv) > 1:
        input_filename = sys.argv[1]
        if len(sys.argv) > 2:
            output_filename = sys.argv[2]

    from CsvInnerJoiner import CsvInnerJoiner
    from SumJob import SumJob

    job = CsvInnerJoiner().run(input=[input_filename])
    job = SumJob().run() (******************)

    with open(output_filename, 'w') as fp:
        writer = csv.writer(fp)
        for url_key, descriptors in result_iterator(job.wait(show=True)):
            writer.writerow([url_key] + descriptors)

CsvInnerJoiner.py是这样的文件:

代码语言:javascript
复制
import sys
sys.path.append("/home/damian/disco/lib/")
from disco.core import Job, result_iterator
from disco.worker.classic.func import chain_reader
import csv, sys
class CsvInnerJoiner(Job):
    partitions = 2
    sort = True

    def map(self, row, params):
        yield row[0], row[1:]

    @staticmethod
    def map_reader(fd, size, url, params):
        reader = csv.reader(fd, delimiter=',')
        for row in reader:
            yield row

    #@staticmethod
def reduce(self, rows_iter, out, params):
    from disco.util import kvgroup
    from itertools import chain
    #for url_key, descriptors in kvgroup(sorted(rows_iter)):
    for url_key, descriptors in kvgroup(rows_iter):
        merged_descriptors = list(chain.from_iterable(descriptors))
        print url_key,"_______",merged_descriptors
        if len(merged_descriptors) > 3:
            Alist = merged_descriptors[:merged_descriptors.index("B")]
            Blist = merged_descriptors[merged_descriptors.index("B"):]
            Alistlength = len(Alist)/3
            Blistlength = len(Blist)/3
            for i in range(Alistlength):
                for j in range(Blistlength):
                    container = int(Alist[3*i+2])*int(Blist[3*j+2])
                    yield [Alist[3*i+1],Blist[3*j+1]],container
                    #out.add(Alist[3*i+1],[Blist[3*j+1],container])        

SumJob.py是这样的:

代码语言:javascript
复制
import sys
sys.path.append("/home/damian/disco/lib/")
from disco.core import Job, result_iterator
from disco.worker.classic.func import chain_reader
import csv, sys


class SumJob(Job):
    map_reader = staticmethod(chain_reader)

    @staticmethod
    def map(self,key_value, params):
        print "KEY::::::",str(key_value[0])
        print "VAL::::::",str(key_value[1])
        yield key_value[0], key_value[1]
    @staticmethod    
    def reduce(self,key_value,out, params):
        Summe = sum(key_value[1])
        out.add(key_value[0],Summe)

问题是,我不知道如何更改(**)行,以便将第一个reduce步骤的第二个输出作为第二个map-function的输入。

非常感谢你的帮助!达米安

EN

回答 1

Stack Overflow用户

发布于 2013-09-23 16:24:37

您可以使用一个map/reduce阶段的输出作为另一个阶段的输入(从job.wait()返回)。

代码语言:javascript
复制
job1 = SumJob().run(input=[...])
job2 = SumJob().run(input=[...])

output = SomeOtherJob.run(input=[job1.wait(), job2.wait()]).wait(show=True)
for key, value in result_iterator(output):
    print key, value 

我不是专家,因为代码块对我有效(我实现了pagerank算法,它有很多阶段和几次迭代)。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/17535501

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档