我有一个名为“使用python的multiprocess库”的函数( mulitprocessing库在朱庇特笔记本中不能正常工作)。代码的第一次迭代工作得很好:
usa_places_folder_id = 'hidden'
def download_cities_geopackage_test(usa_places_folder_id=usa_places_folder_id):
# Make filepath of gpkg to download
city_geopackage_filepath = os.path.join(
'geo_files', 'city_boundaries', 'usa_places_2019.gpkg')
# Get all of the items inside the drive folder
folder_objects = drive.ListFile(
{'q': f"'{usa_places_folder_id}' in parents and trashed=false"}).GetList()
# If the city package is found in the folder, download it
for folder_object in folder_objects:
if folder_object['title'] == 'usa_places_2019.gpkg':
folder_object.GetContentFile(city_geopackage_filepath)
print("Downloaded usa_places_2019.gpkg")
# Set variable to loop if download times out
timeout = True
while timeout:
print("Working on downloading usa_places_2019.gpkg...")
# Create Process with function defined above
p1 = Process(target=download_cities_geopackage_test,
name='Process_download_cities_geopackage')
# Start Process with a time limit
p1.start()
p1.join(timeout=10)
p1.terminate()
if p1.exitcode is None:
# Let the while-loop restart
print(f'{p1} timed out, restarting.\n')
else:
print("usa_places_2019.gpkg has been downloaded!")
timeout = False输出:
Working on downloading usa_places_2019.gpkg...
Downloaded usa_places_2019.gpkg
usa_places_2019.gpkg has been downloaded!但是由于我在整个朱庇特笔记本中多次使用multiprocess代码,所以我想将它封装在一个函数中来调用。下面的代码是包装器函数:
# Wrapper function to restart pydrive get-requests
def timeout_wrapper_function(
function_to_wrap,
num_elements_to_return=1,
timeout=60,
use_queue=False
):
"""
Use this function to restart another passed
function if it hangs for more than the allotted
timeout period.
Parameters:
function_to_wrap (function): The name of the
function you are wrapping with this
timeout function.
num_elements_to_return (int): The number of
elements you'll be returning from the queue.
timeout (int): How many seconds you'd like
this function to wait before timing out.
use_queue (Boolean): True/False on whether
there will be items put into a queue
for retrieval later. Only use this if
your function_to_wrap is trying to return
something.
Returns:
result_tuple (tuple): A tuple containing all
elements that you'd like your function_to_wrap
to return.
"""
# Set variable to loop if download times out
timeout = True
while timeout:
print("Working on fetching data...")
# Instantiate Queue and start process if you are
# using a queue.
if use_queue:
queue = Queue()
# Create Process with function defined above,
# with a queue
p1 = Process(target=function_to_wrap,
name=f'Process_{function_to_wrap}',
args=(queue,))
else:
# Create Process with function defined above,
# without a queue
p1 = Process(target=function_to_wrap,
name=f'Process_{function_to_wrap}')
# Start process with a time limit
p1.start()
p1.join(timeout=timeout)
p1.terminate()
if p1.exitcode is None:
# Let the while-loop restart
print(f'{p1} timed out, restarting.\n')
else:
print("Data has been fetched!")
# Start empty list that will eventually
# be returned as the tuple with data
result_list = []
# If there is a queue, get the items from it
if use_queue:
for i in range(num_elements_to_return):
result_list.append(queue.get())
# Turn result_list into a tuple
result_tuple = tuple(result_list)
# End the while loop
timeout = False
return result_tuple最后,下面是使用包装器函数的代码。但是,它使立即超时
usa_places_folder_id = 'hidden'
def download_cities_geopackage_test(usa_places_folder_id=usa_places_folder_id):
# Make filepath of gpkg to download
city_geopackage_filepath = os.path.join(
'geo_files', 'city_boundaries', 'usa_places_2019.gpkg')
# Get all of the items inside the drive folder
folder_objects = drive.ListFile(
{'q': f"'{usa_places_folder_id}' in parents and trashed=false"}).GetList()
# If the city package is found in the folder, download it
for folder_object in folder_objects:
if folder_object['title'] == 'usa_places_2019.gpkg':
folder_object.GetContentFile(city_geopackage_filepath)
print("Downloaded usa_places_2019.gpkg")
timeout_wrapper_function(
function_to_wrap=download_cities_geopackage_test,
num_elements_to_return=0,
timeout=60,
use_queue=False
)输出:
Working on fetching data...
<Process name='Process_<function download_cities_geopackage_test at 0x5407bfb80>' pid=93723 parent=9165 started> timed out, restarting.
Working on fetching data...
<Process name='Process_<function download_cities_geopackage_test at 0x5407bfb80>' pid=93724 parent=9165 started> timed out, restarting.
Working on fetching data...
<Process name='Process_<function download_cities_geopackage_test at 0x5407bfb80>' pid=93727 parent=9165 started> timed out, restarting.
Working on fetching data...如果p1.exitcode是None,则多进程代码会自动重复。为什么在第二个例子中,当使用包装函数时,这种情况会一直发生,而如果不是,则会很好地工作。
发布于 2022-09-15 00:13:37
用户(我)错误我的包装器函数使用“超时值”作为参数,在等待超时之前等待秒数:
# Wrapper function to restart pydrive get-requests
def timeout_wrapper_function(
function_to_wrap,
num_elements_to_return=1,
timeout=60,
use_queue=False
):然后几乎就在函数中,timeout被设置为True。
因此,要解决这个问题,我所需要的只是将函数参数更新为timeout_seconds
# Wrapper function to restart pydrive get-requests
def timeout_wrapper_function(
function_to_wrap,
num_elements_to_return=1,
timeout_seconds=60,
use_queue=False
):并调整使用该参数的函数中的一些代码。
https://stackoverflow.com/questions/73724274
复制相似问题