首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >多处理日志记录-如何使用loguru与joblib并行

多处理日志记录-如何使用loguru与joblib并行
EN

Stack Overflow用户
提问于 2019-12-21 02:53:15
回答 2查看 2.5K关注 0票数 4

我有一堆Python脚本来运行一些数据科学模型。这需要相当长的时间,而加快速度的唯一方法是使用多重处理。为了实现这一点,我使用了joblib库,它运行得非常好。然而,不幸的是,这会导致日志记录混乱,控制台输出也会被混淆(不过,也是如此),因为所有进程都同时转储各自的输出。

我刚开始使用logging库,并遵循了其他一些答案,试图让它发挥作用。我正在使用8个核心进行处理。使用SO上的答案,我编写了日志文件,并期望每次迭代都会有8个新文件。但是,它在第一次迭代时创建了8个文件,并且每个循环只编写/追加到这8个文件。这有点不方便,所以我进行了更多的探索,找到了logurulogzero。虽然它们都介绍了使用multiprocessing的示例,但它们都没有说明如何将其与joblib一起使用。以下是我到目前为止所拥有的:

run_models.py

代码语言:javascript
复制
import math
import multiprocessing
import time
from datetime import datetime
from loguru import logger

import pandas as pd
import psutil
from joblib import Parallel, delayed

import helper
import log
import prep_data
import stock_subscriber_data
import train_model


def get_pred(cust_df, stock_id, logger):

    logger.info('--------------------------------Stock loop {}-------------------------------'.format(stock_id))

    cust_stockid_df = stock_subscriber_data.get_stockid_data(cust_df, stock_id)
    weekly_timeseries, last_date, abn_df = prep_data.prep(cust_stockid_df, logger)  
    single_row_df = stock_subscriber_data.get_single_row(cust_df, stock_id)

    stock_subscriber_data.write_data(abn_df, 't1')
    test_y, prd = train_model.read_train_write(cust_df, stock_id, weekly_timeseries, last_date, logger)

    return True


def main():

    cust_df = stock_subscriber_data.get_data()
    cust_df = helper.clean_data(cust_df)
    stock_list = cust_df['intStockID'].unique()

    max_proc = max(math.ceil(((psutil.virtual_memory().total >> 30) - 100) / 50), 1)
    num_cores = min(multiprocessing.cpu_count(), max_proc)

    logger.add("test_loguru.log", format="{time} {level}: ({file}:{module} - {line}) >> {message}", level="INFO", enqueue=True)

    Parallel(n_jobs=num_cores)(delayed(get_pred)(cust_df, s, logger) for s in stock_list)


if __name__ == "__main__":
    main()

train_model.py

代码语言:javascript
复制
import math
from datetime import datetime
from itertools import product
from math import sqrt

import pandas as pd
from keras import backend
from keras.layers import Dense
from keras.layers import LSTM
from keras.models import Sequential
from numpy import array
from numpy import mean
from pandas import DataFrame
from pandas import concat
from sklearn.metrics import mean_squared_error

import helper
import stock_subscriber_data

# bunch of functions here that don't need logging...

# walk-forward validation for univariate data
def walk_forward_validation(logger, data, n_test, cfg):
    #... do stuff here ...
    #... and here ...
    logger.info('{0:.3f}'.format(error))
    return error, model


# score a model, return None on failure
def repeat_evaluate(logger, data, config, n_test, n_repeats=10):
    #... do stuff here ...
    #... and here ...
    logger.info('> Model{0} {1:.3f}'.format(key, result))
    return key, result, best_model



def read_train_write(data_df, stock_id, series, last_date, logger):
    #... do stuff here ...
    #... and here ...
    logger.info('done')

    #... do stuff here ...
    #... and here ...

    # bunch of logger.info() statements here... 
    #
    #
    #
    #

    #... do stuff here ...
    #... and here ...

    return test_y, prd

当一次只有一个进程时,这是很好的。但是,在多进程模式下运行时,我会得到一个_pickle.PicklingError: Could not pickle the task to send it to the workers.错误。我做错了什么?我该怎么补救呢?我不介意切换到logurulogzero以外的其他东西,只要我能够用连贯的日志创建一个文件,甚至创建n文件,每个文件都包含joblib每次迭代的日志。

EN

回答 2

Stack Overflow用户

发布于 2019-12-22 00:42:06

我通过修改我的run_models.py让它工作起来。现在,我每个循环有一个日志文件。这会创建大量的日志文件,但它们都与每个循环相关,而不是乱七八糟的。我想一步一步。我所做的是:

run_models.py

代码语言:javascript
复制
import math
import multiprocessing
import time
from datetime import datetime
from loguru import logger

import pandas as pd
import psutil
from joblib import Parallel, delayed

import helper
import log
import prep_data
import stock_subscriber_data
import train_model


def get_pred(cust_df, stock_id):

    log_file_name = "log_file_{}".format(stock_id)

    logger.add(log_file_name, format="{time} {level}: ({file}:{module} - {line}) >> {message}", level="INFO", enqueue=True)

    logger.info('--------------------------------Stock loop {}-------------------------------'.format(stock_id))

    cust_stockid_df = stock_subscriber_data.get_stockid_data(cust_df, stock_id)
    weekly_timeseries, last_date, abn_df = prep_data.prep(cust_stockid_df, logger)  
    single_row_df = stock_subscriber_data.get_single_row(cust_df, stock_id)

    stock_subscriber_data.write_data(abn_df, 't1')
    test_y, prd = train_model.read_train_write(cust_df, stock_id, weekly_timeseries, last_date, logger)

    return True


def main():

    cust_df = stock_subscriber_data.get_data()
    cust_df = helper.clean_data(cust_df)
    stock_list = cust_df['intStockID'].unique()

    max_proc = max(math.ceil(((psutil.virtual_memory().total >> 30) - 100) / 50), 1)
    num_cores = min(multiprocessing.cpu_count(), max_proc)

    Parallel(n_jobs=num_cores)(delayed(get_pred)(cust_df, s) for s in stock_list)


if __name__ == "__main__":
    main()
票数 0
EN

Stack Overflow用户

发布于 2021-08-09 15:48:48

因此,将loguru与joblib一起使用的正确方法是将后端更改为多处理。

代码语言:javascript
复制
from loguru import logger
from joblib import Parallel, delayed
from tqdm.autonotebook import tqdm 

logger.remove()
logger.add(sys.stdout, level = 'INFO', enqueue=True)

logger.info('test')
logger.debug('should not appear')

def do_thing(i):
    logger.info('item %i' %i)
    logger.debug('should not appaear')
    return None


Parallel(n_jobs=4, backend='multiprocessing')(
    delayed(do_thing)(i)
    for i in tqdm(range(10))
)



Parallel(n_jobs=4)(
    delayed(do_thing)(i)
    for i in tqdm(range(10))
)

第一个并行调用工作。第二个问题是你前面提到的老问题。

票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59433146

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档