我正在编写一个小型多线程http文件下载程序,希望能够在代码遇到错误时收缩可用线程。
这些错误将特定于返回的http错误,其中web服务器不允许任何其他连接。
例如:如果我设置了一个由5个线程组成的池,那么每个线程都试图打开自己的连接并下载文件块。服务器可能只允许两个连接,我相信会返回503个错误,我希望检测到这个错误并关闭一个线程,最终将池的大小限制在服务器所允许的2。
我能让线停下来吗?
self.Thread_stop()是否足够?
我还需要加入()吗?
下面是我的worker类,它执行下载,从队列抓取到要处理的进程,下载后将结果转储到resultQ中,由主线程保存到文件中
在这里,我希望检测到一个http 503,并从可用池中停止/杀死/删除一个线程,当然,还可以将失败的块重新添加到队列中,这样剩下的线程就会处理它。
class Downloader(threading.Thread):
def __init__(self, queue, resultQ, file_name):
threading.Thread.__init__(self)
self.workQ = queue
self.resultQ = resultQ
self.file_name = file_name
def run(self):
while True:
block_num, url, start, length = self.workQ.get()
print 'Starting Queue #: %s' % block_num
print start
print length
#Download the file
self.download_file(url, start, length)
#Tell queue that this task is done
print 'Queue #: %s finished' % block_num
self.workQ.task_done()
def download_file(self, url, start, length):
request = urllib2.Request(url, None, headers)
if length == 0:
return None
request.add_header('Range', 'bytes=%d-%d' % (start, start + length))
while 1:
try:
data = urllib2.urlopen(request)
except urllib2.URLError, u:
print "Connection did not start with", u
else:
break
chunk = ''
block_size = 1024
remaining_blocks = length
while remaining_blocks > 0:
if remaining_blocks >= block_size:
fetch_size = block_size
else:
fetch_size = int(remaining_blocks)
try:
data_block = data.read(fetch_size)
if len(data_block) == 0:
print "Connection: [TESTING]: 0 sized block" + \
" fetched."
if len(data_block) != fetch_size:
print "Connection: len(data_block) != length" + \
", but continuing anyway."
self.run()
return
except socket.timeout, s:
print "Connection timed out with", s
self.run()
return
remaining_blocks -= fetch_size
chunk += data_block
resultQ.put([start, chunk])下面是我插入线程池的位置,然后我将项放到队列中。
# create a thread pool and give them a queue
for i in range(num_threads):
t = Downloader(workQ, resultQ, file_name)
t.setDaemon(True)
t.start()发布于 2013-05-05 18:39:18
我能让线停下来吗?
不要使用self._Thread__stop()。退出线程的run()方法就足够了(您可以检查一个标志或从队列中读取一个哨兵值以知道何时退出)。
在这里,我希望检测到一个http 503,并从可用池中停止/杀死/删除一个线程,当然,还可以将失败的块重新添加到队列中,这样剩下的线程就会处理它。
您可以通过分离职责来简化代码:
download_file()不应该尝试在无限循环中重新连接。如果有错误,让我们调用download_file()的代码在必要时重新提交它Semaphore对象中。在这种情况下,线程数可能与并发连接的数目不同。import concurrent.futures # on Python 2.x: pip install futures
from threading import BoundedSemaphore
def download_file(args):
nconcurrent.acquire(timeout=args['timeout']) # block if too many connections
# ...
nconcurrent.release() #NOTE: don't release it on exception,
# allow the caller to handle it
# you can put it into a dictionary: server -> semaphore instead of the global
nconcurrent = BoundedSemaphore(5) # start with at most 5 concurrent connections
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
future_to_args = dict((executor.submit(download_file, args), args)
for args in generate_initial_download_tasks())
while future_to_args:
for future in concurrent.futures.as_completed(dict(**future_to_args)):
args = future_to_args.pop(future)
try:
result = future.result()
except Exception as e:
print('%r generated an exception: %s' % (args, e))
if getattr(e, 'code') != 503:
# don't decrease number of concurrent connections
nconcurrent.release()
# resubmit
args['timeout'] *= 2
future_to_args[executor.submit(download_file, args)] = args
else: # successfully downloaded `args`
print('f%r returned %r' % (args, result))见 example。
发布于 2013-05-05 16:16:55
您应该使用线程池来控制线程的生命周期:
然后,当线程存在时,您可以向主线程发送一条消息(即处理线程池),然后更改线程池的大小,并在堆栈中延迟新请求或失败请求,然后将其清空。
tedelanay对于给线程的守护进程状态是完全正确的。没有必要将它们设置为守护进程。
基本上,您可以简化代码,您可以这样做:
import threadpool
def process_tasks():
pool = threadpool.ThreadPool(4)
requests = threadpool.makeRequests(download_file, arguments)
for req in requests:
pool.putRequest(req)
#wait for them to finish (or you could go and do something else)
pool.wait()
if __name__ == '__main__':
process_tasks()arguments在哪里取决于您的策略。要么给线程一个队列作为参数,然后清空队列。或者您可以在process_tasks中获得队列,在池已满时阻塞,并在线程完成时打开一个新线程,但队列不是空的。这都取决于您的需要和您的下载环境。
资源:
发布于 2013-05-05 15:52:29
Thread对象仅通过从run方法返回来终止线程--它不调用stop。如果您将线程设置为守护进程模式,则不需要连接,但是主线程需要这样做。线程通常使用结果q报告其退出,而主线程则使用该信息来进行连接。这有助于有序地终止您的过程。如果python仍然在处理多个线程,那么在系统退出过程中可能会出现奇怪的错误。
https://stackoverflow.com/questions/16385998
复制相似问题