我将QuertyText存储在pandas数据帧中。一旦我将所有查询加载到中,我想再次对每个查询进行分析。目前,我有大约50K需要评估。因此,一个接一个地做,将需要很长时间。
所以,我想实现concurrent.futures。如何将存储在fullAnalysis中的单个QueryText作为参数传递给concurrent.futures并将输出作为变量返回?
下面是我的完整代码:
import pandas as pd
import time
import gensim
import sys
import warnings
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
fullAnalysis = pd.DataFrame()
def fetch_data(jFile = 'ProcessingDetails.json'):
print("Fetching data...please wait")
#read JSON file for latest dictionary file name
baselineDictionaryFileName = 'Dictionary/Dictionary_05-03-2020.json'
#copy data to pandas dataframe
labelled_data = pd.read_json(baselineDictionaryFileName)
#Add two more columns to get the most similar text and score
labelled_data['SimilarText'] = ''
labelled_data['SimilarityScore'] = float()
print("Data fetched from " + baselineDictionaryFileName + " and there are " + str(labelled_data.shape[0]) + " rows to be evalauted")
return labelled_data
def calculateScore(inputFunc):
warnings.filterwarnings("ignore", category=DeprecationWarning)
model = gensim.models.Word2Vec.load('w2v_model_bigdata')
inp = inputFunc
print(inp)
out = dict()
strEvaluation = inp.split("most_similar ",1)[1]
#while inp != 'quit':
split_inp = inp.split()
try:
if split_inp[0] == 'help':
pass
elif split_inp[0] == 'similarity' and len(split_inp) >= 3:
pass
elif split_inp[0] == 'most_similar' and len(split_inp) >= 2:
for pair in model.most_similar(positive=[split_inp[1]]):
out.update({pair[0]: pair[1]})
except KeyError as ke:
#print(str(ke) + "\n")
inp = input()
return out
def main():
with ThreadPoolExecutor(max_workers=5) as executor:
for i in range(len(fullAnalysis)):
text = fullAnalysis['QueryText'][i]
arg = 'most_similar'+ ' ' + text
#for item in executor.map(calculateScore, arg):
output = executor.map(calculateScore, arg)
return output
if __name__ == "__main__":
fullAnalysis = fetch_data()
results = main()
print(f'results: {results}')发布于 2020-05-05 18:43:56
Python全局解释器锁或GIL只允许一个线程控制Python解释器。由于您的函数calculateScore可能是受cpu限制的,并且需要解释器执行其字节码,因此使用线程可能不会获得什么好处。另一方面,如果它主要执行I/O操作,那么它将在大部分运行时间内放弃GIL,允许其他线程运行。但这里的情况似乎并非如此。您可能应该使用来自concurrent.futures的ProcessPoolExecutor (尝试两种方式并查看):
def main():
with ProcessPoolExecutor(max_workers=None) as executor:
the_futures = {}
for i in range(len(fullAnalysis)):
text = fullAnalysis['QueryText'][i]
arg = 'most_similar'+ ' ' + text
future = executor.submit(calculateScore, arg)
the_futures[future] = i # map future to request
for future in as_completed(the_futures): # results as they become available not necessarily the order of submission
i = the_futures[future] # the original index
result = future.result() # the result如果在ProcessPoolExecutor构造函数中省略了max_workers参数(或指定值为None),则缺省值将是您的机器上拥有的处理器数量(这是一个不错的缺省值)。指定一个大于您拥有的处理器数量的值是没有意义的。
如果您不需要将未来与原始请求捆绑在一起,那么the_futures可以只是一个列表,其中最简单的就是不使用as_completed方法:
def main():
with ProcessPoolExecutor(max_workers=5) as executor:
the_futures = []
for i in range(len(fullAnalysis)):
text = fullAnalysis['QueryText'][i]
arg = 'most_similar'+ ' ' + text
future = executor.submit(calculateScore, arg)
the_futures.append(future)
# wait for the completion of all the results and return them all:
results = [f.result() for f in the_futures()] # results in creation order
return results 值得一提的是,启动ProcessPoolExecutor函数的代码应该在由if __name__ = '__main__':管理的块中。如果不是,您将进入一个递归循环,每个子进程都会启动ProcessPoolExecutor。但这里似乎就是这种情况。也许你的意思是一直使用ProcessPoolExecutor?
另外:
我不知道这条线..。
model = gensim.models.Word2Vec.load('w2v_model_bigdata')..。在函数中,calculateStore做到了。它可能是一个I/O限制语句。但这似乎不会因调用而改变。如果是这种情况,并且model没有在函数中修改,那么不应该将此语句移出函数并只计算一次吗?那么这个函数显然会运行得更快(并且明显受cpu的限制)。
另外:
异常块...
except KeyError as ke:
#print(str(ke) + "\n")
inp = input()..。令人费解。您正在输入一个永远不会在返回之前使用的值。如果要暂停执行,则不会输出错误消息。
发布于 2020-05-07 04:46:08
在Booboo的帮助下,我能够更新代码以包含ProcessPoolExecutor。这是我更新的代码。总体而言,处理速度提高了60%以上。
我确实遇到了一个处理问题,并找到了这个主题BrokenPoolProcess来解决这个问题。
output = {}
thePool = {}
def main(labelled_data, dictionaryRevised):
args = sys.argv[1:]
with ProcessPoolExecutor(max_workers=None) as executor:
for i in range(len(labelled_data)):
text = labelled_data['QueryText'][i]
arg = 'most_similar'+ ' '+ text
output = winprocess.submit(
executor, calculateScore, arg
)
thePool[output] = i #original index for future to request
for output in as_completed(thePool): # results as they become available not necessarily the order of submission
i = thePool[output] # the original index
text = labelled_data['QueryText'][i]
result = output.result() # the result
maximumKey = max(result.items(), key=operator.itemgetter(1))[0]
maximumValue = result.get(maximumKey)
labelled_data['SimilarText'][i] = maximumKey
labelled_data['SimilarityScore'][i] = maximumValue
return labelled_data, dictionaryRevised
if __name__ == "__main__":
start = time.perf_counter()
print("Starting to evaluate Query Text for labelling...")
output_Labelled_Data, output_dictionary_revised = preProcessor()
output,dictionary = main(output_Labelled_Data, output_dictionary_revised)
finish = time.perf_counter()
print(f'Finished in {round(finish-start, 2)} second(s)')https://stackoverflow.com/questions/61610713
复制相似问题