首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Python多处理创建了错误的pids

Python多处理创建了错误的pids
EN

Stack Overflow用户
提问于 2012-08-09 21:45:30
回答 1查看 575关注 0票数 2

我已经在这上面工作了一段时间了,但似乎不能弄明白。我将范围缩小到我的代码和操作系统(linux上的Python 2.7.3 )不能就应该运行的进程达成一致的情况。当这种情况发生时,我的代码将永远挂起,但不会抛出异常。有时代码会正常运行几个小时,有时只运行几分钟,我不知道为什么。这表现在下面。谢谢你看,我真的很为难(双关语)。

代码输出:

创建离散字符矩阵

代码语言:javascript
复制
running PoolWorker_82 (72 triplets), pid 25777, ppid 24892
running PoolWorker_83 (72 triplets), pid 25778, ppid 24892
running PoolWorker_84 (72 triplets), pid 25779, ppid 24892
running PoolWorker_85 (72 triplets), pid 25780, ppid 24892
running PoolWorker_86 (72 triplets), pid 25781, ppid 24892
running PoolWorker_87 (72 triplets), pid 25782, ppid 24892
running PoolWorker_88 (72 triplets), pid 25783, ppid 24892
running PoolWorker_89 (90 triplets), pid 25784, ppid 24892

ps aux的输出...

代码语言:javascript
复制
1000     24892  2.0  0.9 559948 151088 pts/0   Sl+  09:14   0:16 p runsimulation.py
1000     25776  0.0  0.8 559932 138320 pts/0   S+   09:19   0:00 p runsimulation.py
1000     26015  0.0  0.8 559948 138140 pts/0   S+   09:22   0:00 p runsimulation.py
1000     26021  0.0  0.8 559948 138140 pts/0   S+   09:22   0:00 p runsimulation.py
1000     26023  0.0  0.8 559948 138140 pts/0   S+   09:22   0:00 p runsimulation.py
1000     26025  0.0  0.8 559948 138140 pts/0   S+   09:22   0:00 p runsimulation.py
1000     26027  0.0  0.8 559948 138140 pts/0   S+   09:22   0:00 p runsimulation.py
1000     26029  0.0  0.8 559948 138140 pts/0   S+   09:22   0:00 p runsimulation.py
1000     26031  0.0  0.8 559948 138140 pts/0   S+   09:22   0:00 p runsimulation.py
1000     26036  0.0  0.8 559948 138140 pts/0   S+   09:22   0:00 p runsimulation.py

您可以看到父进程24982在那里,但工作进程的pids不在那里。正常情况下,这些都是匹配的,我可以看到worker CPU使用率在它们工作时达到100%,然后在迭代完成后它们都消失了。当它失败时,我得到pid不匹配和使用0.0% CPU的进程(列3)。

下面是我的代码的相关部分(以它们被调用的相反顺序):

R使用rpy2调用的函数进行设置:

代码语言:javascript
复制
def create_R(dir):
    """
    creates the r environment
    @param dir: the directory for the output files
    """
    r = robjects.r
    importr("phangorn")
    importr("picante")
    importr("MASS")
    importr("vegan")
    r("options(expressions=500000)")
    robjects.globalenv['outfile'] = os.path.abspath(os.path.join(dir, "trees.pdf"))
    r('pdf(file=outfile, onefile=T)')
    r("par(mfrow=c(2,3))")

    r("""
        generate_triplet = function(bits) {
        triplet = replicate(bits, rTraitDisc(tree, model="ER", k=2,states=0:1))
        triplet = t(apply(triplet, 1, as.numeric))
        sums = rowSums(triplet)
        if (length(which(sums==0)) > 0 && length(which(sums==3)) == 1) {
            return(triplet)
        }
        return(generate_triplet(bits))
        }
    """)

    r("""
        get_valid_triplets = function(numsamples, needed, bits) {
            tryCatch({
                m = generate_triplet(bits)
                while (ncol(m) < needed) {
                    m = cbind(m, generate_triplet(bits))
                }
            return(m)
            }, error = function(e){print(message(e))}, warning = function(e){print(message(e))})
        }
    """)

在工作进程中调用的函数:

代码语言:javascript
复制
def __get_valid_triplets(num_samples, num_triplets, bits, q):
    r = robjects.r
    name = current_process().name.replace("-", "_")
    timer = stopwatch.Timer()
    log("\trunning %s (%d triplets), pid %d, ppid %d" % (name, num_triplets, current_process().pid, os.getppid()),
        log_file)
    r('%s = get_valid_triplets(%d, %d, %d)' % (name, num_samples, num_triplets, bits))
    q.put((name, r[name]))
    timer.stop()
    log("\t%s complete (%s)" % (name, str(timer)), log_file)

设置池并使用apply_async调度工作进程的函数。wokers写入一个托管队列,该队列是池加入后的进程:

代码语言:javascript
复制
def __generate_candidate_discrete_matrix(num_cols, num_samples, sample_tree, bits, usable_cols):
    assert isinstance(sample_tree, dendropy.Tree)
    print "Creating discrete character matrix"
    r = robjects.r
    newick = sample_tree.as_newick_string()
    num_samples = len(sample_tree.leaf_nodes())
    robjects.globalenv['numcols'] = usable_cols
    robjects.globalenv['newick'] = newick + ";"
    r("tree = read.tree(text=newick)")
    r('m = matrix(nrow=length(tree$tip.label))') #create empty matrix
    r('m = m[,-1]') #drop the first NA column
    num_procs = mp.cpu_count()
    args = []
    div, mod = divmod(usable_cols, num_procs)
    [args.append(div) for i in range(num_procs)]
    args[-1] += mod
    for i, elem in enumerate(args):
        div, mod = divmod(elem, bits)
        args[-1] += mod
        args[i] -= mod
    manager = Manager()
    pool = Pool(processes=num_procs, maxtasksperchild=1)
    q = manager.Queue(maxsize=num_procs)
    for arg in args:
        pool.apply_async(__get_valid_triplets, (num_samples, arg, bits, q))
    pool.close()
    pool.join()

    while not q.empty():
        name, data = q.get()
        robjects.globalenv[name] = data
        r('m = cbind(m, %s)' % name)

    r('m = m[,1:%d]' % usable_cols)
    r('m = m[order(rownames(m)),]') # consistently order the rows 
    r('m = t(apply(m, 1, as.numeric))') # convert all factors given by rTraitDisc to numeric
    a = r['m']
    n = r('rownames(m)')
    return a, n

最后,调用的第一个函数生成候选矩阵,确保它是有效的,如果不是,它将重新尝试一个新的矩阵。如果它是有效的,它会在R会话中存储一些内容并返回数据

代码语言:javascript
复制
def create_discrete_matrix(num_cols, num_samples, sample_tree, bits):
    """
    Creates a discrete char matrix from a tree
    @param num_cols: number of columns to create
    @param sample_tree: the tree
    @return: a r object of the matrix, and a list of the row names
    @rtype: tuple(robjects.Matrix, list)
    """
    r = robjects.r
    usable_cols = find_usable_length(num_cols, bits)
    a, n = __generate_candidate_discrete_matrix(num_cols, num_samples, sample_tree, bits, usable_cols)
    assert isinstance(a, robjects.Matrix)
    assert a.ncol == usable_cols

    paralin_matrix, valid = __create_paralin_matrix(a)
    if valid is False:
        sample_tree = create_tree(num_samples, type = "S")
        return create_discrete_matrix(num_cols, num_samples, sample_tree, bits)
    else:
        robjects.globalenv['paralin_matrix'] = paralin_matrix
        r('rownames(paralin_matrix) = rownames(m)')
        r('paralin_dist = as.dist(paralin_matrix, diag=T, upper=T)')
        r("paralinear_cluster = hclust(paralin_dist, method='average')")
    return sample_tree, a, n
EN

回答 1

Stack Overflow用户

发布于 2012-08-10 04:18:03

这似乎已通过服务器重新启动(FML)修复。然而,获得了有效的信息。

代码语言:javascript
复制
def __get_valid_triplets(num_samples, num_triplets, bits, q):
    try:
        r = robjects.r
        name = current_process().name.replace("-", "_")
        timer = stopwatch.Timer()
        log("\trunning %s (%d triplets), pid %d, ppid %d" % (name, num_triplets, current_process().pid, os.getppid()),
            log_file)
        r('%s = get_valid_triplets(%d, %d, %d)' % (name, num_samples, num_triplets, bits))
        q.put((name, r[name]))
        timer.stop()
        log("\t%s complete (%s)" % (name, str(timer)), log_file)
    except Exception, e:
        q.put("DEATH")
        traceback.print_exc()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/11884864

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档