我有超过75万条推特。这些tweet存储在本地MongoDB实例中。我想用下面的代码为每条推特创建时间戳。这些时间戳稍后会在其他脚本中使用。
import pymongo
import datetime
import time
import gc
gc.collect()
client = pymongo.MongoClient("localhost:27017")
db = client.copySB #Database
col = db.original_tweets #Collectiion
start_time_total = time.time()
if col.find({"timestamp":{"$exists":False}}):
for tweet in col.find({"timestamp":{"$exists":False}}):
try:
# loop_start_time = time.time()
tweetId = tweet['id']
date = datetime.datetime.strptime(tweet['created_at'],'%a %b %d %H:%M:%S +0000 %Y')
epoch = date.utcfromtimestamp(0)
timestamp = (date-epoch).total_seconds()
col.update_one( {'id': tweetId},
#set - add item to the existing set, if not exist, will create a new field
{'$set': {'timestamp': timestamp }},
upsert=False
)
# loop_end_time = (time.time()-loop_start_time)/60
# print("----------------------------------------------------")
# print(f'Mongo data update loop took {loop_end_time} seconds')
# print ("Total tweets altered in this loop")
print(len(tweet))
except pymongo.errors.DuplicateKeyError:
pass
else:
pass
end_time_total = (time.time()-start_time_total)/60
print("-----------------------------------")
print(f'Mongo data update took {end_time_total} minutes')
print ("total tweets in the dataset ")
print (col.estimated_document_count())
print ("total valid tweets with timestamp ")
print (col.count_documents({'timestamp' : {'$exists': True}}))
print("-----------------------------------")这段代码可以很好地满足我的需要,但它的迭代速度很慢。我看到我的脚本只在每个查询中收集了大约30条推特。我整个下午都在运行它,它只更新了15万条推特。
为什么我的循环只收集这么少的推特呢?我没有指定任何限制,但是脚本只收集每个循环的少量内容。
我希望看到我的循环遍历批处理大小,但我不确定如何最好地实现这一点。有什么建议吗?
编辑:现在我已经接近300 K修改后的推特,更新过程大大减慢了。据我所知,在更新下一个值之前,脚本是否检查了集合中的每个tweet?
Edt-2:在使用@BellyBuster的解决方案之后,我能够快速更新我的推特。现在,我很好奇在时间戳方面什么是最好的。我的原始代码创建了这个时间戳,1549250444。当新代码创建这个时间戳时,1549248971000。这些值的不同让我好奇为什么新方法要创建一个更短的时间戳。
Final-编辑
我的最后命令:
col.update_many({"timestamp": {"$exists": False}},
[{'$addFields':
{'timestamp':
{'$divide':
[
{'$toLong':
{'$toDate': '$created_at'}
},
1000
]
}
}
}
]
)--我最初的问题--没有得到真正的回答,但是这个解决方案更健壮,操作也很好。
发布于 2019-12-22 12:23:40
假设您拥有Mongo4.2或更高版本,您可以使用$addFields、$toDate和$toLong操作符在一个聚合命令中完成此操作;在我的库存笔记本上,750 K记录所需时间不到一分钟。
我建议将日期/时间存储为ISODates,而不是时间戳,但这是另一回事。
ISODate版本:
col.update_many({"timestamp": {"$exists": False}},
[{'$addFields': {'timestamp': {'$toDate': '$created_at'}}}])时间戳版本:
col.update_many({"timestamp": {"$exists": False}},
[{'$addFields': {'timestamp': {'$toLong': {'$toDate': '$created_at'}}}}])使用数据设置的完整示例:
import pymongo
import datetime
import time
from bson.json_util import dumps
db = pymongo.MongoClient()['mydatabase']
col = db.original_tweets # Collectiion
operations = []
# Data Setup Only
for i in range(750000):
operations.append(pymongo.InsertOne(
{'created_at': datetime.datetime.strftime(datetime.datetime.utcnow(), '%a %b %d %H:%M:%S +0000 %Y')}))
col.bulk_write(operations)
# Update each record without a timestamp from the created_at field
start_time_total = time.time()
col.update_many({"timestamp": {"$exists": False}},
[{'$addFields': {'timestamp': {'$toLong': {'$toDate': '$created_at'}}}}])
# Output
end_time_total = (time.time() - start_time_total) / 60
print("-----------------------------------")
print(f'Mongo data update took {end_time_total} minutes')
print("total tweets in the dataset ")
print(col.estimated_document_count())
print("total valid tweets with timestamp ")
print(col.count_documents({'timestamp': {'$exists': True}}))
print("-----------------------------------")
print(dumps(col.find_one({}, {'_id': 0}), indent=4))给予:
-----------------------------------
Mongo data update took 0.4217496275901794 minutes
total tweets in the dataset
750000
total valid tweets with timestamp
750000
-----------------------------------
{
"created_at": "Sun Dec 22 12:19:48 +0000 2019",
"timestamp": 1577017188000
}https://stackoverflow.com/questions/59441114
复制相似问题