首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Multiprocess.pool.map()引发ValueError:没有要连接的对象

Multiprocess.pool.map()引发ValueError:没有要连接的对象
EN

Stack Overflow用户
提问于 2020-02-18 05:33:55
回答 1查看 3.5K关注 0票数 0

我必须运行一个for循环,每个循环都将访问数据库中的数据,执行一些操作并运行Dijkstra算法,然后将结果附加到最终列表中。代码如下所示:

代码语言:javascript
复制
def log_transform(x):
        transformed = math.e**(-x)
        return transformed 
input_region = '1.199'
t1 = '20200101'
t2 = '20200115' 
candid_sale_invoices = pd.read_excel('candid_sale_invoices.xlsx')
candid_barcodes = pd.read_excel('candid_barcodes.xlsx')

weights = []   
    for i in range(int(t1),(int(t2) + 1)):
        input_date = str(i)
        sql_data = """select trim(cast(p.Barcode as nvarchar(20))) Barcode ,cast(s.invoiceid as nvarchar(20)) invoiceid
        from sales s inner join Product_981115 p on s.productid = p.productid 
        where s.date = """+ input_date +""" and s.qty != 0 and p.sectionid != 1691.199 and s.RegionID = """ + input_region
        data = [] 
        for chunk in pd.read_sql(sql_data,conn,chunksize = 1000000):
            data.append(chunk)
        data = pd.concat(data, ignore_index = True)
        data = data.merge(candid_sale_invoices)
        data = data.merge(candid_barcodes)
        final_edges_df = data.iloc[:,[2,3,4]]
        final_edges_tuples = [tuple(x) for x in final_edges_df.values]

        Gm = ig.Graph.TupleList(final_edges_tuples, directed = True, edge_attrs = ['weight'])

        longest_paths = pd.DataFrame(Gm.shortest_paths_dijkstra(None,None, weights = 'weight'))
        longest_paths = longest_paths.swifter.apply(log_transform)
        longest_paths["Date"] = input_date
        longest_paths["RegionID"] = input_region
        weights.append(longest_paths)

weights = pd.concat(weights, ignore_index = True)

问题是处理过程的时间需要几个小时才能完成。因此,由于每个迭代都独立于其他迭代,所以我决定在此链接的帮助下并行运行这个循环。

代码语言:javascript
复制
import psutil
from multiprocess import Pool
pool = Pool(psutil.cpu_count(logical=False))
def graph_analysis(i):
    input_date = str(i)
    sql_data = """select trim(cast(p.Barcode as nvarchar(20))) Barcode ,cast(s.invoiceid as 
    nvarchar(20)) invoiceid
    from sales s inner join Product_981115 p on s.productid = p.productid 
    where s.date = """+ input_date +""" and s.qty != 0 and p.sectionid != 1691.199 and s.RegionID = """ + input_region
    data = [] 
    for chunk in pd.read_sql(sql_data,conn,chunksize = 1000000):
         data.append(chunk)
    data = pd.concat(data, ignore_index = True)
    data = data.merge(candid_sale_invoices)
    data = data.merge(candid_barcodes)
    final_edges_df = data.iloc[:,[2,3,4]]
    final_edges_tuples = [tuple(x) for x in final_edges_df.values]

    Gm = ig.Graph.TupleList(final_edges_tuples, directed = True, edge_attrs = ['weight'])

    longest_paths = pd.DataFrame(Gm.shortest_paths_dijkstra(None,None, weights = 'weight'))
    longest_paths = longest_paths.swifter.apply(log_transform)
    longest_paths["Date"] = input_date
    longest_paths["RegionID"] = input_region
    Return longest_paths

results = pool.map(graph_analysis,range(int(t1),(int(t2) + 1)))
pool.close()

在运行代码时,代码似乎是在并行地执行它的工作和计算,但是过了一段时间,它会引发这个错误:

回溯(最近一次调用): 文件"",第78行,权重= pool.map(graph_analysis,range(int(t1),(int(t2) +1)) 文件"C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\multiprocess\pool.py",第268行,在地图中返回self._map_async(func,iterable,mapstar,chunksize).get() 文件"C:\Users\AppData\Local\Continuum\anaconda3\lib\site-packages\multiprocess\pool.py",第657行,在get raise self._value中 ValueError:没有要连接的对象

此错误是否与从所有迭代中收集"longest_paths“数据有关?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-02-18 07:18:59

使用空可迭代调用pd.concat()时返回的Pandas错误是“没有要连接的对象”:

代码语言:javascript
复制
>>> pd.concat([])
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "pandas/core/reshape/concat.py", line 281, in concat
    sort=sort,
  File "pandas/core/reshape/concat.py", line 329, in __init__
    raise ValueError("No objects to concatenate")
ValueError: No objects to concatenate
>>>

当SQL查询不返回任何内容时,我假设您只需要提前退出:

代码语言:javascript
复制
# ...

    for chunk in pd.read_sql(sql_data, conn, chunksize=1000000):
        data.append(chunk)

    if not data: # <-- add this bit!
        return None

    data = pd.concat(data, ignore_index=True)

# ...

我还建议使用pool.imap_unordered();您也不需要psutil,因为Pool将默认为CPU的数量。

总之,类似这样的东西--注意,我将返回类型更改为(i, x),这样您也可以得到传入的索引。当然,这仍然没有将实际结果保存在任何地方。:)

代码语言:javascript
复制
from multiprocess import Pool


def log_transform(x):
    transformed = math.e ** (-x)
    return transformed


input_region = "1.199"
t1 = "20200101"
t2 = "20200115"
candid_sale_invoices = pd.read_excel("candid_sale_invoices.xlsx")
candid_barcodes = pd.read_excel("candid_barcodes.xlsx")


def graph_analysis(i):
    input_date = str(i)
    sql_data = (
        """select trim(cast(p.Barcode as nvarchar(20))) Barcode ,cast(s.invoiceid as 
    nvarchar(20)) invoiceid
    from sales s inner join Product_981115 p on s.productid = p.productid 
    where s.date = """
        + input_date
        + """ and s.qty != 0 and p.sectionid != 1691.199 and s.RegionID = """
        + input_region
    )
    data = []
    for chunk in pd.read_sql(sql_data, conn, chunksize=1000000):
        data.append(chunk)
    if not data:
        return (i, None)
    data = pd.concat(data, ignore_index=True)
    data = data.merge(candid_sale_invoices)
    data = data.merge(candid_barcodes)
    final_edges_df = data.iloc[:, [2, 3, 4]]
    final_edges_tuples = [tuple(x) for x in final_edges_df.values]

    Gm = ig.Graph.TupleList(final_edges_tuples, directed=True, edge_attrs=["weight"])

    longest_paths = pd.DataFrame(Gm.shortest_paths_dijkstra(None, None, weights="weight"))
    longest_paths = longest_paths.swifter.apply(log_transform)
    longest_paths["Date"] = input_date
    longest_paths["RegionID"] = input_region
    return (i, longest_paths)


if __name__ == "__main__":
    with Pool() as pool:
        for i, result in pool.imap_unordered(graph_analysis, range(int(t1), (int(t2) + 1))):
            print(i, result)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60274332

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档