首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >php pthread中的动态任务调度?

php pthread中的动态任务调度?
EN

Stack Overflow用户
提问于 2018-05-16 11:37:23
回答 1查看 274关注 0票数 0

我是个多线程的php pthread新手。我已经成功地实现了我的目标,建立了一个池,设置了许多工作者,使用for循环提交任务,并使用池收集结果。

但我认为当提交到池中时,任务与工作人员绑定在一起。我认为,内部逻辑是让每个工作者获得相等或尽可能多的任务数量。

在实际实现中,这种逻辑可能导致效率问题。对于分配给每个任务的不同数据,可能会导致大量的时间差。或者,如果工作人员需要与服务器通信来发送/获取数据,那么在数据传输甚至服务器超时和重新连接方面也会发生很大变化,这导致在处理不同任务时存在进一步的差异。

举个简单的例子,4个任务被提交给一个有2个工作进程的池。让我们假设每个任务都需要一定的时间来处理,不管它是由哪个worker处理的。

任务1%2%3%4

加工时间1s 7s 2s 4s

我的理解是,在php pthread中,每个worker堆栈2个要收集的任务。当任务提交到池中时,池总是将其堆叠到任务较少的worker。假设没有其他开销,worker #1在3秒内处理任务#1和#3,而worker #2在11秒内处理任务#2和#4。因此,池的总执行时间将为11秒。效率非常低,因为顺序运行所有任务需要14秒。

另一个值得注意的副作用是,worker #1将空闲8秒,等待worker #2完成。在实际应用中可能会导致服务器超时。在我的应用程序中,我迭代地重用相同的池。每次迭代时,我必须关闭空闲工作线程的连接,并在连接移动到下一次迭代时重新建立连接。

如果可以进行动态任务调度,则工作进程#1将接管任务#1,而工作进程#2将接管任务#2。1秒后,工作进程#1将接管任务#3,而工作进程#2仍在处理任务#2。再过2秒,工作进程#1将接管任务#4,而工作进程#2仍在处理任务#2。再过4秒,它们都将完成,且池中没有其他任务,则它们将被关闭。所以池执行时间= worker #1 (1s + 2s + 4s) = worker #2 ( 7s ) =7s。这完全是两倍的速度和100%的线程效率。同时,两个工人都不会闲着造成潜在的问题。

上面的例子是为了演示而虚构的。现实世界会复杂得多。但是我碰巧有一个应用程序,最慢的工作者比最快的工作者运行的时间长5倍,这让我很头疼。

实际上,我更熟悉用于C++的OpenMP。它提供了multiple loop scheduling methods。我认为php线程中的调度方法是静态的。我希望有一种方法可以实现动态方法。

EN

回答 1

Stack Overflow用户

发布于 2019-01-04 08:10:25

我遇到了一个问题,当有其他空闲的工作线程可以使用时,某个工作线程上的单个长时间运行的任务会导致任务堆叠在该工作线程上。

根据Pool::submit()的文档,它的行为是:

将任务提交给池中的下一个Worker

对于我的用例,我扩展了Pool类:

代码语言:javascript
复制
namespace Task\Runner;

class Pool extends \Pool
{
    public function getIdleWorker() : ?int
    {
        if (empty($this->workers)) {
            return null;
        }

        $idleWorkerIndexes = [];

        foreach ($this->workers as $i => $worker) {
            if ($worker->getStacked() == 0)
                $idleWorkerIndexes[] = $i;
        }

        if (empty($idleWorkerIndexes))
            return null;

        return $idleWorkerIndexes[mt_rand(0, count($idleWorkerIndexes)-1)];
    }
}

my Task runner中的用法:

代码语言:javascript
复制
$workerIndex = $this->pool->getIdleWorker();
if (is_null($workerIndex))
    $this->pool->submit($taskWorkUnit);
else
    $this->pool->submitTo($workerIndex, $taskWorkUnit);

这种方法可以很容易地适应您自己的调度需求。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50362130

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档