我有一堆Python脚本来运行一些数据科学模型。这需要相当长的时间,而加快速度的唯一方法是使用多重处理。为了实现这一点,我使用了joblib库,它运行得非常好。然而,不幸的是,这会导致日志记录混乱,控制台输出也会被混淆(不过,也是如此),因为所有进程都同时转储各自的输出。
我刚开始使用logging库,并遵循了其他一些答案,试图让它发挥作用。我正在使用8个核心进行处理。使用SO上的答案,我编写了日志文件,并期望每次迭代都会有8个新文件。但是,它在第一次迭代时创建了8个文件,并且每个循环只编写/追加到这8个文件。这有点不方便,所以我进行了更多的探索,找到了loguru和logzero。虽然它们都介绍了使用multiprocessing的示例,但它们都没有说明如何将其与joblib一起使用。以下是我到目前为止所拥有的:
run_models.py
import math
import multiprocessing
import time
from datetime import datetime
from loguru import logger
import pandas as pd
import psutil
from joblib import Parallel, delayed
import helper
import log
import prep_data
import stock_subscriber_data
import train_model
def get_pred(cust_df, stock_id, logger):
logger.info('--------------------------------Stock loop {}-------------------------------'.format(stock_id))
cust_stockid_df = stock_subscriber_data.get_stockid_data(cust_df, stock_id)
weekly_timeseries, last_date, abn_df = prep_data.prep(cust_stockid_df, logger)
single_row_df = stock_subscriber_data.get_single_row(cust_df, stock_id)
stock_subscriber_data.write_data(abn_df, 't1')
test_y, prd = train_model.read_train_write(cust_df, stock_id, weekly_timeseries, last_date, logger)
return True
def main():
cust_df = stock_subscriber_data.get_data()
cust_df = helper.clean_data(cust_df)
stock_list = cust_df['intStockID'].unique()
max_proc = max(math.ceil(((psutil.virtual_memory().total >> 30) - 100) / 50), 1)
num_cores = min(multiprocessing.cpu_count(), max_proc)
logger.add("test_loguru.log", format="{time} {level}: ({file}:{module} - {line}) >> {message}", level="INFO", enqueue=True)
Parallel(n_jobs=num_cores)(delayed(get_pred)(cust_df, s, logger) for s in stock_list)
if __name__ == "__main__":
main()train_model.py
import math
from datetime import datetime
from itertools import product
from math import sqrt
import pandas as pd
from keras import backend
from keras.layers import Dense
from keras.layers import LSTM
from keras.models import Sequential
from numpy import array
from numpy import mean
from pandas import DataFrame
from pandas import concat
from sklearn.metrics import mean_squared_error
import helper
import stock_subscriber_data
# bunch of functions here that don't need logging...
# walk-forward validation for univariate data
def walk_forward_validation(logger, data, n_test, cfg):
#... do stuff here ...
#... and here ...
logger.info('{0:.3f}'.format(error))
return error, model
# score a model, return None on failure
def repeat_evaluate(logger, data, config, n_test, n_repeats=10):
#... do stuff here ...
#... and here ...
logger.info('> Model{0} {1:.3f}'.format(key, result))
return key, result, best_model
def read_train_write(data_df, stock_id, series, last_date, logger):
#... do stuff here ...
#... and here ...
logger.info('done')
#... do stuff here ...
#... and here ...
# bunch of logger.info() statements here...
#
#
#
#
#... do stuff here ...
#... and here ...
return test_y, prd当一次只有一个进程时,这是很好的。但是,在多进程模式下运行时,我会得到一个_pickle.PicklingError: Could not pickle the task to send it to the workers.错误。我做错了什么?我该怎么补救呢?我不介意切换到loguru或logzero以外的其他东西,只要我能够用连贯的日志创建一个文件,甚至创建n文件,每个文件都包含joblib每次迭代的日志。
发布于 2019-12-22 00:42:06
我通过修改我的run_models.py让它工作起来。现在,我每个循环有一个日志文件。这会创建大量的日志文件,但它们都与每个循环相关,而不是乱七八糟的。我想一步一步。我所做的是:
run_models.py
import math
import multiprocessing
import time
from datetime import datetime
from loguru import logger
import pandas as pd
import psutil
from joblib import Parallel, delayed
import helper
import log
import prep_data
import stock_subscriber_data
import train_model
def get_pred(cust_df, stock_id):
log_file_name = "log_file_{}".format(stock_id)
logger.add(log_file_name, format="{time} {level}: ({file}:{module} - {line}) >> {message}", level="INFO", enqueue=True)
logger.info('--------------------------------Stock loop {}-------------------------------'.format(stock_id))
cust_stockid_df = stock_subscriber_data.get_stockid_data(cust_df, stock_id)
weekly_timeseries, last_date, abn_df = prep_data.prep(cust_stockid_df, logger)
single_row_df = stock_subscriber_data.get_single_row(cust_df, stock_id)
stock_subscriber_data.write_data(abn_df, 't1')
test_y, prd = train_model.read_train_write(cust_df, stock_id, weekly_timeseries, last_date, logger)
return True
def main():
cust_df = stock_subscriber_data.get_data()
cust_df = helper.clean_data(cust_df)
stock_list = cust_df['intStockID'].unique()
max_proc = max(math.ceil(((psutil.virtual_memory().total >> 30) - 100) / 50), 1)
num_cores = min(multiprocessing.cpu_count(), max_proc)
Parallel(n_jobs=num_cores)(delayed(get_pred)(cust_df, s) for s in stock_list)
if __name__ == "__main__":
main()发布于 2021-08-09 15:48:48
因此,将loguru与joblib一起使用的正确方法是将后端更改为多处理。
from loguru import logger
from joblib import Parallel, delayed
from tqdm.autonotebook import tqdm
logger.remove()
logger.add(sys.stdout, level = 'INFO', enqueue=True)
logger.info('test')
logger.debug('should not appear')
def do_thing(i):
logger.info('item %i' %i)
logger.debug('should not appaear')
return None
Parallel(n_jobs=4, backend='multiprocessing')(
delayed(do_thing)(i)
for i in tqdm(range(10))
)
Parallel(n_jobs=4)(
delayed(do_thing)(i)
for i in tqdm(range(10))
)第一个并行调用工作。第二个问题是你前面提到的老问题。
https://stackoverflow.com/questions/59433146
复制相似问题