我正在使用pthreads3.1.6-dev和PHP7.1。我的目标是创建一个小的网络爬虫。
计划的工作流程是:你把一个url放入池中(可能是主页),然后一个爬虫(扩展线程)从这个URL中搜索所有链接。在小范围过滤之后,爬虫应该将所有新链接添加到池中(不应将外部链接添加到池中)。或者,爬虫将新的urls提供给“其他人”,后者将其添加到池中。
该过程应该继续,直到找不到新的URL。
我的问题是我找不到有效的解决方案。我的当前绘图如下:爬虫提取urls并将其放入池中。因此,每个Worker都有一个对池的引用,因此爬虫可以通过worker访问池对象。
这个解决方案的问题是:如果一个“延迟”线程向池中添加了一个新线程,那么这个新任务将不会执行。
一些演示代码:
class WebWorker extends Worker
{
public function __construct(WebPool $pool)
{
$this->pool = $pool;
print_r("Create a new worker\n");
}
public function run()
{
print_r("Worker {$this->getThreadId()}: " . __METHOD__ . "\n");
}
public function getPool()
{
return $this->pool;
}
private $pool;
}
class WebWork extends Threaded
{
public function run()
{
print_r("Webwork from Worker {$this->worker->getThreadId()}\n");
if (rand(0, 10) > 5) {
print_r("Webwork {$this->worker->getThreadId()} add new Webwork\n");
$this->worker->getPool()->submit(new WebWork());
}
}
}
class WebPool extends Pool
{
public function __construct($size, $class)
{
parent::__construct($size, $class, [$this]);
}
}
$pool = new WebPool(2, 'WebWorker');
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
while ($pool->collect(function ($work) {
return $work->isGarbage();
})) continue;
$pool->shutdown();一个示例结果:
Create a new worker
Worker 139878744053504: WebWorker::run
Webwork from Worker 139878744053504
Create a new worker
Worker 139878731872000: WebWorker::run
Webwork from Worker 139878731872000
Webwork from Worker 139878731872000
Webwork 139878731872000 add new Webwork
Webwork from Worker 139878744053504
Create a new worker
Worker 139878719289088: WebWorker::run
Webwork from Worker 139878719289088
Webwork 139878719289088 add new Webwork有人能告诉我解决这个问题的最佳实践吗?
发布于 2017-02-13 20:35:58
问题是你依赖垃圾回收器来阻塞主线程,而实际上你应该使用你自己的条件来阻塞。在pthread v3中覆盖默认的垃圾收集器并不是真正必要的,除非在特殊情况下,您不希望任务在完成执行后立即被收集。
您的问题的一个可能的解决方案是有一个链接计数器变量,它对每个新发现的链接(需要爬行)递增,对每个爬行的链接递减。当此变量达到0时,您可以假定网站已完全爬行,因此您可以安全地关闭您的线程池。
以下是代码中的解决方案:
<?php
class WebsiteCrawler extends Worker
{
public $pool; // expose the pool to our LinkCrawler tasks
public function __construct(Pool $pool)
{
$this->pool = $pool;
}
}
class LinkCrawler extends Threaded
{
private $link;
public static $domain;
public function __construct(string $link)
{
$this->link = $link;
WebCrawlerPool::$links[] = $link;
++WebCrawlerPool::$linksRemaining->count;
var_dump($link); // for debugging, just to show that it is collecting links
}
public function run()
{
$content = file_get_contents($this->link);
$domain = preg_quote(self::$domain);
preg_match_all("~href=\"(.+?{$domain}.+?)\"~", $content, $matches); // naive regex to fetch links
$links = $matches[1];
foreach ($links as $link) {
if (count(WebCrawlerPool::$links) > 9) { // stop at 10 links (for example purposes...)
break;
}
if (!in_array($link, get_object_vars(WebCrawlerPool::$links), true)) {
$this->worker->pool->submit(new LinkCrawler($link));
}
}
--WebCrawlerPool::$linksRemaining->count;
}
}
class WebCrawlerPool extends Pool
{
public static $linksRemaining;
public static $links;
public function __construct(int $size, string $class, array $ctor = [])
{
parent::__construct($size, $class, [$this]);
self::$links = new Threaded();
self::$linksRemaining = new Threaded();
self::$linksRemaining->count = 0;
}
}
LinkCrawler::$domain = 'php.net';
$pool = new WebCrawlerPool(8, 'WebsiteCrawler');
$pool->submit(new LinkCrawler('http://php.net/', $pool)); // kick things off
while (WebCrawlerPool::$linksRemaining->count !== 0);
$pool->shutdown();
print_r(WebCrawlerPool::$links);当然,以上只是示例代码-您可能希望以不同的方式进行操作。但这里有几个值得注意的地方:
$linksRemaining是一个Threaded对象,而不是一个简单的整数。这是因为Threaded对象具有内置的同步。这意味着许多上下文可以安全地操作相同的Threaded对象。通过利用Threaded对象的这个属性,您可以简化在pthreads.WebCrawlerPool::$links)中编写的代码,这些链接保存在一个在主上下文中创建的Threaded对象中。这样做有两个原因。首先,PHP数组不能安全地在多个上下文中传递和操作(与Threaded不同)。其次,Threaded对象与创建它们的上下文绑定在一起。这意味着我们应该在我们希望使用它们的最外层上下文中创建它们-否则,当创建它们的线程被联接时,它们将变得不可访问。https://stackoverflow.com/questions/42034531
复制相似问题