我是Python多处理的新手。我不太明白池和过程之间的区别。有人能建议我应该用哪一个来满足我的需要吗?
我有成千上万的http GET请求要发送。在发送每个文件并获得响应后,我希望存储到响应(一个简单的int)到一个(共享) dict。我的最终目标是将dict中的所有数据写入文件中。
这根本不是CPU密集型的。我的所有目标是加速发送http请求,因为太多了。这些请求都是孤立的,不相互依赖。
在这种情况下,我应该使用池还是进程?
谢谢!
-下面的代码添加在8/28-
我用多处理程序编程。我面临的主要挑战是:
1) GET请求有时可能失败。我必须设置3次重试,以尽量减少重新运行代码/所有请求的需要。我只想重试那些失败的。我可以在不使用Pool的情况下使用异步http请求来实现这一点吗?
2)我希望检查每个请求的响应值,并进行异常处理
下面的代码是根据我的实际代码简化的。它运转得很好,但我不知道这是否是最有效的做事方法。有人能给点建议吗?非常感谢!
def get_data(endpoint, get_params):
response = requests.get(endpoint, params = get_params)
if response.status_code != 200:
raise Exception("bad response for " + str(get_params))
return response.json()
def get_currency_data(endpoint, currency, date):
get_params = {'currency': currency,
'date' : date
}
for attempt in range(3):
try:
output = get_data(endpoint, get_params)
# additional return value check
# ......
return output['value']
except:
time.sleep(1) # I found that sleeping for 1s almost always make the retry successfully
return 'error'
def get_all_data(currencies, dates):
# I have many dates, but not too many currencies
for currency in currencies:
results = []
pool = Pool(processes=20)
for date in dates:
results.append(pool.apply_async(get_currency_data, args=(endpoint, date)))
output = [p.get() for p in results]
pool.close()
pool.join()
time.sleep(10) # Unfortunately I have to give the server some time to rest. I found it helps to reduce failures. I didn't write the server. This is not something that I can control发布于 2017-08-23 16:46:08
都不是。使用异步编程。考虑下面的代码直接从那篇文章中提取出来(信用归于PawełMiech)
#!/usr/local/bin/python3.5
import asyncio
from aiohttp import ClientSession
async def fetch(url, session):
async with session.get(url) as response:
return await response.read()
async def run(r):
url = "http://localhost:8080/{}"
tasks = []
# Fetch all responses within one Client session,
# keep connection alive for all requests.
async with ClientSession() as session:
for i in range(r):
task = asyncio.ensure_future(fetch(url.format(i), session))
tasks.append(task)
responses = await asyncio.gather(*tasks)
# you now have all response bodies in this variable
print(responses)
def print_responses(result):
print(result)
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run(4))
loop.run_until_complete(future)只需创建一个URL的数组,而不是给定的代码,对该数组循环并将每个数组发送给fetch。
编辑:使用requests_futures
如下面@roganjosh评论所示,期货是实现这一目标的一种非常简单的方法。
from requests_futures.sessions import FuturesSession
sess = FuturesSession()
urls = ['http://google.com', 'https://stackoverflow.com']
responses = {url: sess.get(url) for url in urls}
contents = {url: future.result().content
for url, future in responses.items()
if future.result().status_code == 200}编辑:使用grequests支持Python2.7
您还可以使用grequest,它支持Python2.7执行异步URL调用。
import grequests
urls = ['http://google.com', 'http://stackoverflow.com']
responses = grequests.map(grequests.get(u) for u in urls)
print([len(r.content) for r in rs])
# [10475, 250785]编辑:使用多处理
如果要使用多重处理来完成此操作,则可以。免责声明:通过这样做,您将有大量的开销,而且它不会像异步编程那样高效.但这是有可能的。
实际上非常简单,您正在通过http函数映射URL:
import requests
urls = ['http://google.com', 'http://stackoverflow.com']
from multiprocessing import Pool
pool = Pool(8)
responses = pool.map(requests.get, urls)池的大小将是同时发出GET请求的数目。对其进行调整会提高您的网络效率,但它会增加本地计算机上的通信和分叉开销。
同样,我不建议这样做,但这当然是可能的,如果您有足够的核心,它可能比同步执行调用更快。
https://stackoverflow.com/questions/45844876
复制相似问题