首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用MapReduce的MRjob生成前10个值的MRjob作业

使用MapReduce的MRjob生成前10个值的MRjob作业
EN

Stack Overflow用户
提问于 2016-11-29 16:26:55
回答 2查看 6.7K关注 0票数 3

我希望这个地图减少作业(下面的代码)输出前10名最受欢迎的产品。它一直给我以下错误消息:

它= izip (迭代,count(0,-1)) #修饰TypeError: izip参数#1必须支持迭代。

我想这和我想要申请的最大的函数有关。

有什么指示吗?

谢谢!

代码语言:javascript
复制
from mrjob.job import MRJob
from mrjob.step import MRStep
from heapq import nlargest


class MostRatedProduct(MRJob):

def steps(self):
    return [
        MRStep(mapper = self.mapper_get_ratings,
               reducer = self.reducer_count_ratings),
        MRStep(reducer = self.reducer_find_top10)
    ]


def mapper_get_ratings(self, _, line):
    (userID, itemID, rating, timestamp) = line.split(',')
    yield itemID, 1

def reducer_count_ratings(self, itemID, ratingCount):
    yield None, (sum(ratingCount), itemID)

def top_10(self, ratingPair):
    for ratingTotal, itemID in ratingPair:
        top_rated = nlargest(10, ratingTotal)
    for top_rated in ratingTotal:
        return (ratingTotal, itemID)

def reducer_find_top10(self, key, ratingPair):
    ratingTotal, itemID = self.top_10(ratingPair)
    yield ratingTotal, itemID


if __name__ == '__main__':
    MostRatedProduct.run()
EN

回答 2

Stack Overflow用户

发布于 2019-08-08 14:47:11

使用mrjob库,您可以在python中执行相同的操作:-

代码语言:javascript
复制
#Write a Code to print the top 5 word - occurences

#Import Dependencies
from mrjob.job import MRJob
from mrjob.step import MRStep

class MRWordCount(MRJob):

  def steps(self):
    return [MRStep(mapper=self.mapper,reducer=self.reducer),MRStep(reducer = self.secondreducer)]

  def mapper(self,_,lines):
    words = lines.split()
    for word in words:
      yield word.lower(),1

  def reducer(self,key,values):
    yield None,('%04d'%int(sum(values)),key)

  def secondreducer(self,key,values):
    self.alist = []
    for value in values:
      self.alist.append(value)
    self.blist = []
    for i in range(5):
      self.blist.append(max(self.alist))
      self.alist.remove(max(self.alist))
    for i in range(5):
      yield self.blist[i]

if __name__ == '__main__':
    MRWordCount.run()
票数 2
EN

Stack Overflow用户

发布于 2016-11-29 16:52:03

我没有使用mrjob,但我以前在AWS集群上使用了MapReduce来查找顶级值。这是我的代码,它不使用heapq。希望您能够将相同的概念应用到代码中。这是映射函数

代码语言:javascript
复制
import sys, time

def Parser():
    for line in sys.stdin:
        line = line.strip('\n')
        yield line.split()


def mapper():
    counts = list(Parser())
    z = sorted(counts, key = lambda x: int(x[1]))[-10:]
    print '\n'.join(map(lambda x: '\t'.join(x), z))


if __name__=='__main__':
    mapper()

这是减速机的代码

代码语言:javascript
复制
import sys, operator, itertools

def Parser():
    for line in sys.stdin:
        yield tuple(line.strip('\n').split('\t'))

def reducer():
    for key, pairs in itertools.groupby(Parser(), operator.itemgetter(0)):
        counts = list(Parser())
        z = sorted(counts, key = lambda x: int(x[1]))[-10:]
        print '\n'.join(map(lambda x: '\t'.join(x), z))

if __name__=='__main__':
    reducer()

我把它改为输出前10个单词。请记住,这是一个单词计数示例,其中我分析了一个文本文档。我希望这在某种程度上有帮助!

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40870854

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档