首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在for语句中使用concurrent.futures

在for语句中使用concurrent.futures
EN

Stack Overflow用户
提问于 2020-05-05 18:13:30
回答 2查看 324关注 0票数 0

我将QuertyText存储在pandas数据帧中。一旦我将所有查询加载到中,我想再次对每个查询进行分析。目前,我有大约50K需要评估。因此,一个接一个地做,将需要很长时间。

所以,我想实现concurrent.futures。如何将存储在fullAnalysis中的单个QueryText作为参数传递给concurrent.futures并将输出作为变量返回?

下面是我的完整代码:

代码语言:javascript
复制
import pandas as pd
import time
import gensim
import sys
import warnings

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

fullAnalysis = pd.DataFrame()

def fetch_data(jFile = 'ProcessingDetails.json'):
    print("Fetching data...please wait")

    #read JSON file for latest dictionary file name
    baselineDictionaryFileName = 'Dictionary/Dictionary_05-03-2020.json'

    #copy data to pandas dataframe
    labelled_data = pd.read_json(baselineDictionaryFileName)

    #Add two more columns to get the most similar text and score
    labelled_data['SimilarText'] = ''
    labelled_data['SimilarityScore'] = float()

    print("Data fetched from " + baselineDictionaryFileName + " and there are " + str(labelled_data.shape[0]) + " rows to be evalauted")

    return labelled_data


def calculateScore(inputFunc):
    warnings.filterwarnings("ignore", category=DeprecationWarning) 

    model = gensim.models.Word2Vec.load('w2v_model_bigdata')

    inp = inputFunc
    print(inp)
    out = dict()

    strEvaluation = inp.split("most_similar ",1)[1]

    #while inp != 'quit':
    split_inp = inp.split()

    try:
        if split_inp[0] == 'help':
            pass
        elif split_inp[0] == 'similarity' and len(split_inp) >= 3:
            pass
        elif split_inp[0] == 'most_similar' and len(split_inp) >= 2:
            for pair in model.most_similar(positive=[split_inp[1]]):
                out.update({pair[0]: pair[1]})

    except KeyError as ke:
        #print(str(ke) + "\n")
        inp = input()
    return out

def main():
    with ThreadPoolExecutor(max_workers=5) as executor:
        for i in range(len(fullAnalysis)):
            text = fullAnalysis['QueryText'][i]
            arg = 'most_similar'+ ' ' + text
            #for item in executor.map(calculateScore, arg):
            output = executor.map(calculateScore, arg)

    return output

if __name__ == "__main__":
    fullAnalysis = fetch_data()
    results = main()
    print(f'results: {results}')
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-05-05 18:43:56

Python全局解释器锁或GIL只允许一个线程控制Python解释器。由于您的函数calculateScore可能是受cpu限制的,并且需要解释器执行其字节码,因此使用线程可能不会获得什么好处。另一方面,如果它主要执行I/O操作,那么它将在大部分运行时间内放弃GIL,允许其他线程运行。但这里的情况似乎并非如此。您可能应该使用来自concurrent.futuresProcessPoolExecutor (尝试两种方式并查看):

代码语言:javascript
复制
def main():
    with ProcessPoolExecutor(max_workers=None) as executor:
        the_futures = {}
        for i in range(len(fullAnalysis)):
            text = fullAnalysis['QueryText'][i]
            arg = 'most_similar'+ ' ' + text
            future = executor.submit(calculateScore, arg)
            the_futures[future] = i # map future to request
        for future in as_completed(the_futures): # results as they become available not necessarily the order of submission
            i = the_futures[future] # the original index
            result = future.result() # the result

如果在ProcessPoolExecutor构造函数中省略了max_workers参数(或指定值为None),则缺省值将是您的机器上拥有的处理器数量(这是一个不错的缺省值)。指定一个大于您拥有的处理器数量的值是没有意义的。

如果您不需要将未来与原始请求捆绑在一起,那么the_futures可以只是一个列表,其中最简单的就是不使用as_completed方法:

代码语言:javascript
复制
def main():
    with ProcessPoolExecutor(max_workers=5) as executor:
        the_futures = []
        for i in range(len(fullAnalysis)):
            text = fullAnalysis['QueryText'][i]
            arg = 'most_similar'+ ' ' + text
            future = executor.submit(calculateScore, arg)
            the_futures.append(future)
        # wait for the completion of all the results and return them all:
        results = [f.result() for f in the_futures()] # results in creation order
        return results 

值得一提的是,启动ProcessPoolExecutor函数的代码应该在由if __name__ = '__main__':管理的块中。如果不是,您将进入一个递归循环,每个子进程都会启动ProcessPoolExecutor。但这里似乎就是这种情况。也许你的意思是一直使用ProcessPoolExecutor

另外:

我不知道这条线..。

代码语言:javascript
复制
model = gensim.models.Word2Vec.load('w2v_model_bigdata')

..。在函数中,calculateStore做到了。它可能是一个I/O限制语句。但这似乎不会因调用而改变。如果是这种情况,并且model没有在函数中修改,那么不应该将此语句移出函数并只计算一次吗?那么这个函数显然会运行得更快(并且明显受cpu的限制)。

另外:

异常块...

代码语言:javascript
复制
except KeyError as ke:
    #print(str(ke) + "\n")
    inp = input()

..。令人费解。您正在输入一个永远不会在返回之前使用的值。如果要暂停执行,则不会输出错误消息。

票数 0
EN

Stack Overflow用户

发布于 2020-05-07 04:46:08

在Booboo的帮助下,我能够更新代码以包含ProcessPoolExecutor。这是我更新的代码。总体而言,处理速度提高了60%以上。

我确实遇到了一个处理问题,并找到了这个主题BrokenPoolProcess来解决这个问题。

代码语言:javascript
复制
output = {}
thePool = {}

def main(labelled_data, dictionaryRevised):

    args = sys.argv[1:]

    with ProcessPoolExecutor(max_workers=None) as executor:
        for i in range(len(labelled_data)):
            text = labelled_data['QueryText'][i]
            arg = 'most_similar'+ ' '+ text

            output = winprocess.submit(
            executor, calculateScore, arg
            )
            thePool[output] = i  #original index for future to request


        for output in as_completed(thePool): # results as they become available not necessarily the order of submission
            i = thePool[output] # the original index
            text = labelled_data['QueryText'][i]
            result = output.result() # the result

            maximumKey = max(result.items(), key=operator.itemgetter(1))[0]
            maximumValue = result.get(maximumKey)

            labelled_data['SimilarText'][i] = maximumKey
            labelled_data['SimilarityScore'][i] = maximumValue


    return labelled_data, dictionaryRevised

if __name__ == "__main__":
    start = time.perf_counter()

    print("Starting to evaluate Query Text for labelling...")

    output_Labelled_Data, output_dictionary_revised = preProcessor()

    output,dictionary = main(output_Labelled_Data, output_dictionary_revised)


    finish = time.perf_counter()
    print(f'Finished in {round(finish-start, 2)} second(s)')
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61610713

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档