我有一个3个beanstalkd队列进程运行在相同的ip,但不同的端口。我有一个独立的服务器,运行的是并行产生的php工作进程(每个beanstalkd端口20个)来处理队列。我的问题是,似乎两个进程可以在同一时间在同一服务器上保留相同的作业id。
以下是我的日志的一些示例输出:
2017-02-23 09:59:56 --> START JOB (port: 11301 | u: 0.45138600 1487861996 | jid:1695074 | pid:30019 | j:leads_to_tags_add | tr:1)
2017-02-23 09:59:57 --> START JOB (port: 11301 | u: 0.55024800 1487861997 | jid:1695074 | pid:30157 | j:leads_to_tags_add | tr:2)
2017-02-23 09:59:58 --> DEL JOB (port: 11301 | u: 0.54731000 1487861998 | jid:1695074 | pid:30019 | j:leads_to_tags_add)
2017-02-23 09:59:58 --> DEL JOB (port: 11301 | u: 0.58927900 1487861998 | jid:1695074 | pid:30157 | j:leads_to_tags_add)似乎这两个保留发生在彼此之后,而第二个保留发生在第一个进程完成并删除作业之前。
我在每个jobid的redis中添加了一个计数器,很明显,当它保留第二次时,计数器上升了一次(tr)。TTRR设置为3600,因此它不可能在第一个进程完成之前过期。
这是在第二个进程保留之后的作业状态:
Pheanstalk\Response\ArrayResponse::__set_state(array(
'id' => '1695074',
'tube' => 'action-medium',
'state' => 'reserved',
'pri' => '0',
'age' => '1',
'delay' => '0',
'ttr' => '3600',
'time-left' => '3599',
'file' => '385',
'reserves' => '2',
'timeouts' => '0',
'releases' => '0',
'buries' => '0',
'kicks' => '0',
))这种行为是非常随机的,有时只有一个进程能够保留,直到作业锁定,有时2个,有时甚至4个或更多(很少)。当然,这会导致执行的重复作业数量不一致。
代码的简短版本:
$this->job = $this->pheanstalk->watch($tube)->reserve($timeout);
set_error_handler(function($errno, $errstr, $errfile, $errline, array $errcontext) {
// error was suppressed with the @-operator
if (0 === error_reporting()) {
return false;
}
throw new ErrorException($errstr, 0, $errno, $errfile, $errline);
});
$this->log_message('info', __METHOD__ . ": START JOB (" . $this->_logDetails() . " | tr:{$tries})");
if ($this->_process_job()) {
$this->log_message('info', __METHOD__ . ": FINISHED JOB (" . $this->_logDetails() . ")");
$this->_delete_job();
} else {
$this->log_message('error', __METHOD__ . ": FAILED JOB (" . $this->_logDetails() . ")");
}
restore_error_handler();和
protected function _delete_job()
{
$this->pheanstalk->delete($this->job);
$this->log_message('info', __METHOD__ . ": DELETED JOB (" . $this->_logDetails() . ")");
}发布于 2017-02-24 23:32:56
问题是在我的代码的process_job位中发生了TCP断开。pheanstalk对象会在执行链的某个位置被覆盖。
我有一个封装pheanstalk的库,它在发送put命令之前检查可用的最空的beanstalkd服务器。然后,它将使用最佳服务器的详细信息创建一个新的Pheanstalk实例,将其保存为当前连接,然后发送命令。
有时,worker可以向队列发送子作业。该库最初将加载到worker中以获取作业,然后再次加载到process_job中以发送到队列。由于Codeigniter中的依赖注入,该库将引用相同的对象,并且在process_job中它将覆盖worker的当前连接并导致TCP断开。
https://stackoverflow.com/questions/42420517
复制相似问题