首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >MapReduce in PyMongo

MapReduce in PyMongo
EN

Stack Overflow用户
提问于 2015-07-16 14:15:30
回答 2查看 7.4K关注 0票数 3

我的Mongo集合:Impressions有以下格式的文档:-

代码语言:javascript
复制
   {
        _uid: 10,
        "impressions": [
            {
                "pos": 6,
                "id": 123,
                "service": "furniture"
            },
            {
                "pos": 0,
                "id": 128,
                "service": "electronics"
            },
            {
                "pos": 2,
                "id": 127,
                "service": "furniture"
            },
            {
                "pos": 2,
                "id": 125,
                "service": "electronics"
            },
            {
                "pos": 10,
                "id": 124,
                "service": "electronics"
            }
        ]
      },
     {
        _uid: 11,
        "impressions": [
            {
                "pos": 1,
                "id": 124,
                "service": "furniture"
            },
            {
                "pos": 10,
                "id": 124,
                "service": "electronics"
            },
            {
                "pos": 1,
                "id": 123,
                "service": "furniture"
            },
            {
                "pos": 21,
                "id": 122,
                "service": "furniture"
            },
            {
                "pos": 3,
                "id": 125,
                "service": "electronics"
            },
            {
                "pos": 10,
                "id": 121,
                "service": "electronics"
            }
            ]
         },
            .
            .
            .
            .
            .

集合中的每个文档都有"impressions"键,这是一个字典数组。在每个字典中,"id"是实体的id,"service"是服务类型,"pos"是搜索页面结果中项的位置。我的目标是找出每个类别中每个"id"的印象数。因此,对于"service" == "furniture"的上述数据,我希望将其作为我的聚合结果:-

代码语言:javascript
复制
[
{"id": 123,"impressions_count":2},
{"id": 127,"impressions_count":1},
{"id": 124,"impressions_count":1},
{"id": 122,"impressions_count":1}
]

我试图通过python脚本中的以下函数使用MAPREDUCE在"id“上进行聚合

代码语言:javascript
复制
def fetch_impressions():
    try:
        imp_collection = get_mongo_connection('Impressions')
        map = Code("""
                function(){
                    for( x in this.impressions){
                        var flat_id = x['id'];
                        var service_type = x['service']
                        emit(parseInt(flat_id),1);
                        }
                    };
                """)

                        """)
        reduce = Code("""
                        function(a,b){
                            return Array.sum(b);
                            };
                        """)

        results = imp_collection.map_reduce(map, reduce, 'aggregation_result')
        return results
    except Exception as e:
        raise Exception(e)

但我得到的结果是零,可能是因为错误的地图功能,我是新的Javascript和蒙戈友好的帮助!

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2015-07-16 22:03:32

您可以使用聚合框架

代码语言:javascript
复制
import pymongo
conn = pymongo.MongoClient()
db = conn.test
col =  db.collection

for doc in col.aggregate([{'$unwind': '$impressions'}, 
    {'$match': {'impressions.service': 'furniture'}}, 
    {'$group': {'_id': '$impressions.id', 'impressions_count': {'$sum': 1}}}, 
    ]):
    print(doc)

或者更有效地使用$map$setDifference运算符。

代码语言:javascript
复制
col.aggregate([
    { "$project": { "impressions": {"$setDifference": [{ "$map": { "input": "$impressions", "as": "imp", "in": { "$cond": { "if": { "$eq": [ "$$imp.service", "furniture" ] }, "then": "$$imp.id", "else": 0 }}}}, [0]]}}}, 
    { "$unwind": "$impressions" }, 
    { "$group": { "_id": "$impressions", "impressions_count": { "$sum": 1 }}}
])

产生的结果:

代码语言:javascript
复制
{'_id': 122.0, 'impressions_count': 1}
{'_id': 124.0, 'impressions_count': 1}
{'_id': 127.0, 'impressions_count': 1}
{'_id': 123.0, 'impressions_count': 2}
票数 3
EN

Stack Overflow用户

发布于 2020-01-14 20:29:56

我开发了一个工具,允许您在Python / MongoDB中运行

https://mreduce.com

代码语言:javascript
复制
import random
import threading

import bson
import pymongo

import mreduce


mongo_client = pymongo.MongoClient("mongodb://your_mongodb_server")

def map_func(document):
    for impression in document["impressions"]:
        yield document["id"], 1

def reduce_func(id, prices):
    return sum(prices)

worker_functions = {
    "exampleMap": map_func,
    "exampleReduce": reduce_func
}

api = mreduce.API(
    api_key = "...",
    mongo_client = mongo_client
)

project_id = "..."

thread = threading.Thread(
    target=api.run,
    args=[project_id, worker_functions]
)
thread.start()

job = api.submit_job(
    projectId=project["_id"],
    mapFunctionName="exampleMap",
    reduceFunctionName="exampleReduce",
    inputDatabase="db",
    inputCollection="impressions",
    outputDatabase="db",
    outputCollection="impressions_results"
)
result = job.wait_for_result()
for key, value in result:
    print("Key: " + key, ", Value: " + str(value))
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/31456788

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档