首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Dask distributed不运行python脚本。

Dask distributed不运行python脚本。
EN

Stack Overflow用户
提问于 2021-03-16 09:47:15
回答 2查看 376关注 0票数 0

我正在构建一个简单的示例,以了解dask分布式如何在HPC集群上分发python脚本。分发方法是一种基本操作,它在磁盘上写入文件。当从命令行($python simple-function.py)运行时,此脚本运行良好。

代码语言:javascript
复制
import os
import argparse
import time

def inc(x):
    time.sleep(1)
    return x + 1

def get_args():
    """
    Get args
    """ 
    parser = argparse.ArgumentParser()
    parser.add_argument("x", help="Value")
    args = parser.parse_args()
    x = args.x
    return int(x)

if __name__ == "__main__":
    x = get_args()       
    print("{0} + 1 = {1}".format(x, inc(x)))
    with open("results.txt", 'w') as file:
        file.write(str(x) + '\n')

现在,我已经创建了另一个python脚本,它将分发这段代码。其思想是使用子进程和client.map或client.submit来启动上述脚本的多个实例。我遇到的问题是,当使用以下任何方法(client.map、client.submit然后收集或计算或.result())时,输出client.submit文件不会被写入。也许我没有用正确的方法?

代码语言:javascript
复制
import os
import time
import subprocess
import yaml
import argparse

import dask
from dask.distributed import Client
from dask_jobqueue import PBSCluster
  
def create_cluster():
    cluster = PBSCluster(
        cores=4,
        memory="20GB",
        interface="ib0",
        queue="qdev",
        processes=4,
        nanny=True,
        walltime="12:00:00",
        shebang="#!/bin/bash",
        local_directory="$TMPDIR"
        )
    cluster.scale(4)
    time.sleep(10) # Wait for workers
    return cluster

def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("script", help="Script to distribute")
    parser.add_argument("nodes", type=int, help="Number of nodes")
    args = parser.parse_args()    
    script = args.script
    n_nodes = args.nodes
    return script, n_nodes

def close_all(client, cluster):
    client.close()
    cluster.close()

def methode(script, x):
    subprocess.run(["python", 
                script,
                x])
    return None

if __name__ == "__main__":
    
    cluster = create_cluster()
    client = Client(cluster)
    time.sleep(1)
    script, n_nodes = get_args() #Get arguments
    #With client.submit
    futures = []    
    for n, o in enumerate(range(10)):
        futures.append(client.submit(methode, *[script, str(o)], priority=-n))

    [f.result() for f in futures]
    #Or client.map
    L = client.map(methode, *[script, str(range(10))])  
    client.compute(L)
    client.gather(L)
    time.sleep(20)  
    close_all(client, cluster)

在附带说明中,如果我执行以下代码: dask.compute(methode(args))

然后,编写.txt输出文件。

似乎只有不同的客户端方法不能工作。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-03-16 14:18:38

最后,我发现要执行的脚本的路径不正确。

如果从命令行运行以下命令:

$python distribute.py simple-function.py 6

不知何故,路径不正确,文件也找不到。

我试着把这个写在笔记本上。如果我执行:

代码语言:javascript
复制
methode(*[script, "8"]

该文件被写入当前目录。但是,如果我执行这个单元格:

代码语言:javascript
复制
futures = []
for n, o in enumerate(range(12)):
    print(o)
    futures.append(client.submit(methode, *[script, str(o)], priority=-n))
wait(futures)
print("Done !")

结果文件不是写入当前目录,而是写入我的主目录。

我还不太清楚为什么会发生这种情况,但似乎在使用客户端时,路径发生了一些变化.

票数 0
EN

Stack Overflow用户

发布于 2021-03-17 13:56:33

所以你尝试了一些小的改变,这对我来说是很有效的。我拿出了HPC的东西,因为我认为问题是在达斯克,而不是达斯克-工作队列,并集中精力,使只是未来的工作。然后我注意到你的简单功能. so破坏了所有的结果,所以我只是改变了它。输出是我当前目录中的结果0.txt-结果9.txt(不是脚本所在的主目录)。

distribute.py:

代码语言:javascript
复制
import os
import time
import subprocess
import yaml
import argparse

import dask
from dask.distributed import Client

def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("script", help="Script to distribute")
    parser.add_argument("nodes", type=int, help="Number of nodes")
    args = parser.parse_args()    
    script = args.script
    n_nodes = args.nodes
    return script, n_nodes

def methode(script, x):
    subprocess.run(["python", 
                script,
                x])
    return None

if __name__ == "__main__":
    client = Client()
    script, n_nodes = get_args() #Get arguments
    futures = []    
    for n, o in enumerate(range(10)):
        futures.append(client.submit(methode, *[script, str(o)], priority=-n))

    results = client.gather(futures)
    client.close()

简单-函数.:

代码语言:javascript
复制
import os
import argparse
import time

def inc(x):
    time.sleep(1)
    return x + 1

def get_args():
    """
    Get args
    """ 
    parser = argparse.ArgumentParser()
    parser.add_argument("x", help="Value")
    args = parser.parse_args()
    x = args.x
    return int(x)

if __name__ == "__main__":
    x = get_args()       
    print("{0} + 1 = {1}".format(x, inc(x)))
    with open(f"results{x}.txt", 'w') as file:
        file.write(str(x + 1) + '\n')

编辑:哦,我刚读了你的答案,如果你从一个jupyter单元格执行这段代码,它是从笔记本的目录中执行的,这可能是你的家。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66652682

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档