首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >AMPHP -将比池中可用工作线程更多的任务排队

AMPHP -将比池中可用工作线程更多的任务排队
EN

Stack Overflow用户
提问于 2020-09-23 15:57:57
回答 1查看 349关注 0票数 1

我有一个项目,在这个项目中,我正在将大量的.tif图像转换为PDF文档。文件数量高达数百万。

为了加快这个过程,我使用了Amphp。由于使用Imagemagick转换图像的过程占用了一些cpu资源,因此我想限制并行运行的转换器进程的最大数量。

我的第一种方法是有效的,但是如果我对文件进行排队,而不是给一定数量的工作人员一个x个文件数组,则可以得到改进。

这是我当前的代码,我试图在其中复制the example

代码语言:javascript
复制
<?php
require dirname(__DIR__) . '/vendor/autoload.php';

$constants = get_defined_constants(true);
$constants = $constants['user'];
$maxFileCount = THREAD_CHUNKSIZE * THREAD_COUNT;
$i = 0;
$folder = opendir(LOOKUP_PATH);
$tasks = [];

while ($i < $maxFileCount && (false !== ($import_file = readdir($folder)))) {
    $fileParts = explode('.', $import_file);
    $ext = strtolower(end($fileParts));
    if($ext === 'xml') {
        $filePath = LOOKUP_PATH. 'xml'.DIRECTORY_SEPARATOR.$import_file;
        $tasks[] = new ConvertPdfTask([$filePath], $constants);
    }
    $i++;
}
if(!empty($tasks)) {
    Amp\Loop::run(function () use ($tasks) {
        $coroutines = [];
        $pool = new Amp\Parallel\Worker\DefaultPool(THREAD_COUNT);
        foreach ($tasks as $index => $task) {
            $coroutines[] = Amp\call(function() use ($pool, $task) {
                return yield $pool->enqueue($task);
            });
        }
        $results = yield Amp\Promise\all($coroutines);

        return yield $pool->shutdown();
    });
}

我的问题是,一旦我入队的任务超过了THREAD_COUNT的数量,我就会收到以下PHP警告:Warning: Worker in pool exited unexpectedly with code -1,并且没有创建PDF。

只要我保持在最大池大小以下,一切都很好。

我在Windows10上使用的是PHP 7.4.9,而amphp/parallel是1.4.0。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-09-23 20:09:01

经过更多的实验,我找到了一个解决方案,似乎是可行的。这感觉有点“老生常谈”,所以如果有人有更好的想法,请分享。我认为池会自动建立一个队列,然后由最大数量的工作者处理,但似乎不是这样。

我现在将从Amp\call获得的协程保存在两个独立的数组中。一个包含所有协程,另一个包含当前循环的所有协程。

代码语言:javascript
复制
$coroutine = Amp\call(function () use ($pool, $task) {
    return yield $pool->enqueue($task);
});
$loopRoutines[] = $coroutine;
$allCoroutines[] = $coroutine;

将一个项目排入队列后,我会检查是否已经达到了配置的最大线程数。如果池中有最大数量的工作进程,但没有空闲的工作进程,我会在当前循环协程中调用Amp\Promise\first函数来等待新的空闲工作进程。

由于函数会在我下次到达时立即返回(因为完成的协程仍然是我的当前循环数组),所以我清除了该数组。

代码语言:javascript
复制
if ($pool->getWorkerCount() >= (THREAD_COUNT) && $pool->getIdleWorkerCount() === 0) {
    yield Amp\Promise\first($loopRoutines);
    $loopRoutines = [];
}

在foreach之后,我在所有协程数组上调用Amp\Promise\all,所以脚本会一直等到所有工作进程完成。

下面是我修改后的代码:

代码语言:javascript
复制
<?php
require dirname(__DIR__) . '/vendor/autoload.php';

$constants = get_defined_constants(true);
$constants = $constants['user'];
$maxFileCount = THREAD_CHUNKSIZE * THREAD_COUNT;
$i = 0;
$folder = opendir(LOOKUP_PATH);
$tasks = [];

while ($i < $maxFileCount && (false !== ($import_file = readdir($folder)))) {
    $fileParts = explode('.', $import_file);
    $ext = strtolower(end($fileParts));
    if($ext === 'xml') {
        $filePath = LOOKUP_PATH. 'xml'.DIRECTORY_SEPARATOR.$import_file;
        $tasks[] = new ConvertPdfTask([$filePath], $constants);
    }
    $i++;
}
if(!empty($tasks)) {
    Amp\Loop::run(function () use ($tasks) {
        $allCoroutines = [];
        $loopRoutines = [];
        $pool = new Amp\Parallel\Worker\DefaultPool(THREAD_COUNT);
        foreach ($tasks as $index => $task) {
            $coroutine = Amp\call(function () use ($pool, $task) {
                return yield $pool->enqueue($task);
            });
            $loopRoutines[] = $coroutine;
            $allCoroutines[] = $coroutine;
            if ($pool->getWorkerCount() >= THREAD_COUNT && $pool->getIdleWorkerCount() === 0) {
                yield Amp\Promise\first($loopRoutines);
                $loopRoutines = [];
            }
        }
        yield Amp\Promise\all($allCoroutines);

        return yield $pool->shutdown();
    });
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64023451

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档