Skip to content

Warning: Possible queue duplication with multiple processes #743

Closed
@Bodom78

Description

@Bodom78

@jenssegers I previously implemented queues for a project using an older version of this library and looking at the MongoQueue.php in the master branch I noticed that it may suffer from the same issue as I did.

Running the queue as a daemon with multiple processes will cause a race condition where more then 1 process can select the same job before another has marked it as reserved. This in turn will produce randomly duplicated jobs.

The fix is to to get the next job and mark it as reserved using Mongo's findAndReplace to lock the document.

I created a "getNextAvailableJobAndReserve" method and updated the "pop" method to use it.

For reference here is what my MongobdQueue class method looks like, Take note of the modified pop and how it uses the getNextAvailableJobAndReserve method.

Hope this can be of some help.

<?php

namespace App\Mongodb\Queue;

use Carbon\Carbon;
use Illuminate\Queue\DatabaseQueue;
use DB;

class MongodbQueue extends DatabaseQueue
{
    /**
     * Pop the next job off of the queue.
     *
     * @param  string $queue
     *
     * @return \Illuminate\Contracts\Queue\Job|null
     */
    public function pop($queue = null)
    {
        $queue = $this->getQueue($queue);

        if (!is_null($this->expire))
        {
            $this->releaseJobsThatHaveBeenReservedTooLong($queue);
        }

        if ($job = $this->getNextAvailableJobAndReserve($queue))
        {
            return new MongodbJob(
                $this->container, $this, $job, $queue
            );
        }
    }

    /**
     * Release the jobs that have been reserved for too long.
     *
     * @param  string $queue
     *
     * @return void
     */
    protected function releaseJobsThatHaveBeenReservedTooLong($queue)
    {
        $expired  = Carbon::now()->subSeconds($this->expire)->getTimestamp();
        $reserved = $this->database->collection($this->table)
                                   ->where('queue', $this->getQueue($queue))
                                   ->where('reserved', 1)
                                   ->where('reserved_at', '<=', $expired)->get();

        foreach ($reserved as $job)
        {
            $attempts = $job['attempts'] + 1;
            $this->releaseJob($job['_id'], $attempts);
        }
    }

    /**
     * Get the next available job for the queue and mark it as reserved.
     *
     * @param  string|null $queue
     *
     * @return \StdClass|null
     */
    protected function getNextAvailableJobAndReserve($queue)
    {
        $job = DB::getCollection($this->table)->findAndModify(
            [
                'queue'        => $this->getQueue($queue),
                'reserved'     => 0,
                'available_at' => ['$lte' => $this->getTime()],

            ],
            [
                '$set' => [
                    'reserved'    => 1,
                    'reserved_at' => $this->getTime(),
                ],
            ],
            null,
            [
                'new' => true,
            ]
        );

        return $job ? (object)$job : null;
    }

    /**
     * Release the given job ID from reservation.
     *
     * @param  string $id
     *
     * @return void
     */
    protected function releaseJob($id, $attempts)
    {
        $this->database->table($this->table)->where('_id', $id)->update([
            'reserved'    => 0,
            'reserved_at' => null,
            'attempts'    => $attempts,
        ]);
    }

    /**
     * Delete a reserved job from the queue.
     *
     * @param  string $queue
     * @param  string $id
     *
     * @return void
     */
    public function deleteReserved($queue, $id)
    {
        $this->database->table($this->table)->where('_id', $id)->delete();
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions