我有以下代码(Python2.7):
from concurrent.futures import ThreadPoolExecutor, as_completed
from multiprocessing import cpu_count
NUM_WORKERS = cpu_count()
c = 0
while True:
results = []
pages = [i for i in range(c, c + NUM_WORKERS)]
with ThreadPoolExecutor(NUM_WORKERS) as executor:
futures = [executor.submit(bro.get_content, page) for page in pages]
for future in as_completed(futures):
results.extend(future.result())
if len(results) < 1:
break
print("Get batch {0} with {1} results".format(c, len(results)))
df = DataFrame(results)
df.to_sql(sql_table_stage, engine, sql_schema, if_exists='append', index=False)
print("Pages {0} to {1} was insert".format(c, c + NUM_WORKERS))
c += NUM_WORKERS代码运行并按预期执行操作(艰难,难以置信地慢!)问题是,当我查看日志时,我看到:
[2018-08-21 01:06:54,513] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 01:06:54,513] {bash_operator.py:70} INFO - Tmp dir root location:
[2018-08-21 01:06:54,513] {base_task_runner.py:98} INFO - Subtask: /tmp
[2018-08-21 01:06:54,514] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 01:06:54,514] {bash_operator.py:80} INFO - Temporary script location: /tmp/airflowtmpGyRCX2//tmp/airflowtmpGyRCX2/importwQaRgB
[2018-08-21 01:06:54,514] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 01:06:54,514] {bash_operator.py:88} INFO - Running command: python /home/ubuntu/airflow/scripts/import.py
[2018-08-21 01:06:54,519] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 01:06:54,518] {bash_operator.py:97} INFO - Output:
[2018-08-21 05:45:48,758] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,758] {bash_operator.py:101} INFO - Get batch 0 with 20000 results
[2018-08-21 05:45:48,759] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,758] {bash_operator.py:101} INFO - Pages 0 to 4 was insert
[2018-08-21 05:45:48,759] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,758] {bash_operator.py:101} INFO - Get batch 4 with 19996 results
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,758] {bash_operator.py:101} INFO - Pages 4 to 8 was insert
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,758] {bash_operator.py:101} INFO - Get batch 8 with 20000 results
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 8 to 12 was insert
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 12 with 20000 results
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 12 to 16 was insert
[2018-08-21 05:45:48,760] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 16 with 20000 results
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 16 to 20 was insert
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 20 with 20000 results
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 20 to 24 was insert
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 24 with 20000 results
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 24 to 28 was insert
[2018-08-21 05:45:48,761] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 28 with 20000 results
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 28 to 32 was insert
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Get batch 32 with 20000 results
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,759] {bash_operator.py:101} INFO - Pages 32 to 36 was insert
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 36 with 20000 results
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 36 to 40 was insert
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 40 with 20000 results
[2018-08-21 05:45:48,762] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 40 to 44 was insert
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 44 with 20000 results
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 44 to 48 was insert
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 48 with 19997 results
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 48 to 52 was insert
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 52 with 20000 results
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 52 to 56 was insert
[2018-08-21 05:45:48,763] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Get batch 56 with 20000 results
[2018-08-21 05:45:48,764] {base_task_runner.py:98} INFO - Subtask: [2018-08-21 05:45:48,760] {bash_operator.py:101} INFO - Pages 56 to 60 was insertcpu_count()开始的,它是4。这是我继承的代码,在我将脚本转换为Python 3之前,我正在尝试了解它的含义。
发布于 2018-08-21 08:14:34
在python中创建线程池时,由于python中的全局解释器锁(GIL),线程是用户级别的线程,并且运行在同一个处理器上。因为一次只有一个线程可以控制python解释器。因此,使用(Python)线程在数据密集型任务中没有任何真正的并发性。
如何解决这个问题?很简单。生成在不同处理器上运行的多个python进程(每个进程都有自己的解释器)。这是使用多处理(Mp)模块的地方,用于从调用它的父python进程中派生多个进程。
您可以通过运行htop(在linux、mac上)并分析python进程的数量来验证这一点。对于mp模块,它们都将具有与调用pool.map函数的父脚本相同的名称。
https://stackoverflow.com/questions/51944179
复制相似问题