Description
@jenssegers I previously implemented queues for a project using an older version of this library and looking at the MongoQueue.php in the master branch I noticed that it may suffer from the same issue as I did.
Running the queue as a daemon with multiple processes will cause a race condition where more then 1 process can select the same job before another has marked it as reserved. This in turn will produce randomly duplicated jobs.
The fix is to to get the next job and mark it as reserved using Mongo's findAndReplace to lock the document.
I created a "getNextAvailableJobAndReserve" method and updated the "pop" method to use it.
For reference here is what my MongobdQueue class method looks like, Take note of the modified pop and how it uses the getNextAvailableJobAndReserve method.
Hope this can be of some help.
<?php
namespace App\Mongodb\Queue;
use Carbon\Carbon;
use Illuminate\Queue\DatabaseQueue;
use DB;
class MongodbQueue extends DatabaseQueue
{
/**
* Pop the next job off of the queue.
*
* @param string $queue
*
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$queue = $this->getQueue($queue);
if (!is_null($this->expire))
{
$this->releaseJobsThatHaveBeenReservedTooLong($queue);
}
if ($job = $this->getNextAvailableJobAndReserve($queue))
{
return new MongodbJob(
$this->container, $this, $job, $queue
);
}
}
/**
* Release the jobs that have been reserved for too long.
*
* @param string $queue
*
* @return void
*/
protected function releaseJobsThatHaveBeenReservedTooLong($queue)
{
$expired = Carbon::now()->subSeconds($this->expire)->getTimestamp();
$reserved = $this->database->collection($this->table)
->where('queue', $this->getQueue($queue))
->where('reserved', 1)
->where('reserved_at', '<=', $expired)->get();
foreach ($reserved as $job)
{
$attempts = $job['attempts'] + 1;
$this->releaseJob($job['_id'], $attempts);
}
}
/**
* Get the next available job for the queue and mark it as reserved.
*
* @param string|null $queue
*
* @return \StdClass|null
*/
protected function getNextAvailableJobAndReserve($queue)
{
$job = DB::getCollection($this->table)->findAndModify(
[
'queue' => $this->getQueue($queue),
'reserved' => 0,
'available_at' => ['$lte' => $this->getTime()],
],
[
'$set' => [
'reserved' => 1,
'reserved_at' => $this->getTime(),
],
],
null,
[
'new' => true,
]
);
return $job ? (object)$job : null;
}
/**
* Release the given job ID from reservation.
*
* @param string $id
*
* @return void
*/
protected function releaseJob($id, $attempts)
{
$this->database->table($this->table)->where('_id', $id)->update([
'reserved' => 0,
'reserved_at' => null,
'attempts' => $attempts,
]);
}
/**
* Delete a reserved job from the queue.
*
* @param string $queue
* @param string $id
*
* @return void
*/
public function deleteReserved($queue, $id)
{
$this->database->table($this->table)->where('_id', $id)->delete();
}
}