我正在使用pymongo在MongoDB上做一些分析。
在MongoDB中,有480000个json对象代表在2020年3月至4月之间发布的关于新冠肺炎病毒的推文。
特别是,这些对象包含两个字段:
1)" created_at ",表示date类型的推文的创建时间戳(例如created_at: 2020-03-20T10:57:57.000+00:00 );
2)"retweet_count",表示该条推文被转发的次数(如"retweet_count:30");
我将创建一个聚合管道,该管道每天接收具有最高retweet_count值的前5000个json对象。
问题是,如果我必须使用group子句、match子句或project子句(我是新手),我也不理解。
下面是我做过的一个尝试:
import pymongo
from datetime import datetime, tzinfo, timezone
from pymongo import MongoClient
client['Covid19']['tweets'].aggregate([
{
'$match' : {
"created_at": { '$gte': datetime(2020, 3, 20), '$lt': datetime(2020, 3, 21) }
}
},
{
'$merge': {
'into': 'tweets_filtered'
}
}
])
print(client['Covid19']['tweets_filtered'].count_documents({}))因此,此管道提供了从3月20日到3月21日的tweet,但我将对该过程进行概括,并采用具有最高retweet_count值的每一天的前5000条tweet。
发布于 2020-08-13 03:37:40
您可以通过编程方式生成所需的边界并使用$bucket。
使用时间、计数和消息字段的Ruby示例:
require 'mongo'
Mongo::Logger.logger.level = Logger::WARN
client = Mongo::Client.new(['localhost:14420'])
c = client['foo']
c.delete_many
10.times do |i|
day_time = Time.now - i*86400
100.times do |j|
time = day_time + j*100
count = rand*1000
message = "message #{count}"
c.insert_one(time: time, count: count, message: message)
end
end
days = (-1..10).map { |i| Time.now - i*86400 }.reverse
pp c.aggregate([
{'$sort' => {count: -1}},
{'$bucket' => {groupBy: '$time', boundaries: days,
output: {messages: {'$push' => '$$ROOT'}},
}},
{'$project' => {top_messages: {'$slice' => ['$messages', 5]}}},
]).to_a发布于 2020-08-13 06:32:04
Pymongo使用pandas回答:
from pymongo import MongoClient
from datetime import datetime
import pandas as pd
TOP_N_PER_DAY = 5000
# Perform the find with a filter; strip out the _id
tweets = db.tweets.find({ 'created_at': {'$gte': datetime(2020, 3, 20), '$lt': datetime(2020, 3, 22) }}, {'_id': 0})
# Create a dataframe from the find
df = pd.DataFrame(list(tweets))
# Convert the datetime to a date only timeseries
df['date'] = df['created_at'].dt.date
# Group by date and sort by retweet count
df = df.groupby('date').apply(lambda x: x.sort_values('retweet_count', ascending = False)).reset_index(drop=True)
# Take the top n per day
df = df.groupby('date').head(TOP_N_PER_DAY)
# Convert the pandas timeseries back to a datetime
df['date'] = pd.to_datetime(df['date'])
# Convert the dataframe into a list of dicts
records = df.to_dict('records')
# Insert the filtered tweets into a new collection
db.tweets_filtered.insert_many(records)https://stackoverflow.com/questions/63382378
复制相似问题