我正在构建一个简单的示例,以了解dask分布式如何在HPC集群上分发python脚本。分发方法是一种基本操作,它在磁盘上写入文件。当从命令行($python simple-function.py)运行时,此脚本运行良好。
import os
import argparse
import time
def inc(x):
time.sleep(1)
return x + 1
def get_args():
"""
Get args
"""
parser = argparse.ArgumentParser()
parser.add_argument("x", help="Value")
args = parser.parse_args()
x = args.x
return int(x)
if __name__ == "__main__":
x = get_args()
print("{0} + 1 = {1}".format(x, inc(x)))
with open("results.txt", 'w') as file:
file.write(str(x) + '\n')现在,我已经创建了另一个python脚本,它将分发这段代码。其思想是使用子进程和client.map或client.submit来启动上述脚本的多个实例。我遇到的问题是,当使用以下任何方法(client.map、client.submit然后收集或计算或.result())时,输出client.submit文件不会被写入。也许我没有用正确的方法?
import os
import time
import subprocess
import yaml
import argparse
import dask
from dask.distributed import Client
from dask_jobqueue import PBSCluster
def create_cluster():
cluster = PBSCluster(
cores=4,
memory="20GB",
interface="ib0",
queue="qdev",
processes=4,
nanny=True,
walltime="12:00:00",
shebang="#!/bin/bash",
local_directory="$TMPDIR"
)
cluster.scale(4)
time.sleep(10) # Wait for workers
return cluster
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("script", help="Script to distribute")
parser.add_argument("nodes", type=int, help="Number of nodes")
args = parser.parse_args()
script = args.script
n_nodes = args.nodes
return script, n_nodes
def close_all(client, cluster):
client.close()
cluster.close()
def methode(script, x):
subprocess.run(["python",
script,
x])
return None
if __name__ == "__main__":
cluster = create_cluster()
client = Client(cluster)
time.sleep(1)
script, n_nodes = get_args() #Get arguments
#With client.submit
futures = []
for n, o in enumerate(range(10)):
futures.append(client.submit(methode, *[script, str(o)], priority=-n))
[f.result() for f in futures]
#Or client.map
L = client.map(methode, *[script, str(range(10))])
client.compute(L)
client.gather(L)
time.sleep(20)
close_all(client, cluster)在附带说明中,如果我执行以下代码: dask.compute(methode(args))
然后,编写.txt输出文件。
似乎只有不同的客户端方法不能工作。
发布于 2021-03-16 14:18:38
最后,我发现要执行的脚本的路径不正确。
如果从命令行运行以下命令:
$python distribute.py simple-function.py 6
不知何故,路径不正确,文件也找不到。
我试着把这个写在笔记本上。如果我执行:
methode(*[script, "8"]该文件被写入当前目录。但是,如果我执行这个单元格:
futures = []
for n, o in enumerate(range(12)):
print(o)
futures.append(client.submit(methode, *[script, str(o)], priority=-n))
wait(futures)
print("Done !")结果文件不是写入当前目录,而是写入我的主目录。
我还不太清楚为什么会发生这种情况,但似乎在使用客户端时,路径发生了一些变化.
发布于 2021-03-17 13:56:33
所以你尝试了一些小的改变,这对我来说是很有效的。我拿出了HPC的东西,因为我认为问题是在达斯克,而不是达斯克-工作队列,并集中精力,使只是未来的工作。然后我注意到你的简单功能. so破坏了所有的结果,所以我只是改变了它。输出是我当前目录中的结果0.txt-结果9.txt(不是脚本所在的主目录)。
distribute.py:
import os
import time
import subprocess
import yaml
import argparse
import dask
from dask.distributed import Client
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("script", help="Script to distribute")
parser.add_argument("nodes", type=int, help="Number of nodes")
args = parser.parse_args()
script = args.script
n_nodes = args.nodes
return script, n_nodes
def methode(script, x):
subprocess.run(["python",
script,
x])
return None
if __name__ == "__main__":
client = Client()
script, n_nodes = get_args() #Get arguments
futures = []
for n, o in enumerate(range(10)):
futures.append(client.submit(methode, *[script, str(o)], priority=-n))
results = client.gather(futures)
client.close()简单-函数.:
import os
import argparse
import time
def inc(x):
time.sleep(1)
return x + 1
def get_args():
"""
Get args
"""
parser = argparse.ArgumentParser()
parser.add_argument("x", help="Value")
args = parser.parse_args()
x = args.x
return int(x)
if __name__ == "__main__":
x = get_args()
print("{0} + 1 = {1}".format(x, inc(x)))
with open(f"results{x}.txt", 'w') as file:
file.write(str(x + 1) + '\n')编辑:哦,我刚读了你的答案,如果你从一个jupyter单元格执行这段代码,它是从笔记本的目录中执行的,这可能是你的家。
https://stackoverflow.com/questions/66652682
复制相似问题