diff --git a/pkg/job-queue/JobProcessor.php b/pkg/job-queue/JobProcessor.php index 2ab77a72d..b779d911b 100644 --- a/pkg/job-queue/JobProcessor.php +++ b/pkg/job-queue/JobProcessor.php @@ -63,7 +63,7 @@ public function findOrCreateRootJob($ownerId, $jobName, $unique = false) $job->setUnique((bool) $unique); try { - $this->jobStorage->saveJob($job); + $this->saveJob($job); return $job; } catch (DuplicateJobException $e) { @@ -97,11 +97,9 @@ public function findOrCreateChildJob($jobName, Job $rootJob) $job->setCreatedAt(new \DateTime()); $job->setRootJob($rootJob); - $this->jobStorage->saveJob($job); + $this->saveJob($job); - $this->producer->sendEvent(Topics::CALCULATE_ROOT_JOB_STATUS, [ - 'jobId' => $job->getId(), - ]); + $this->sendCalculateRootJobStatusEvent($job); return $job; } @@ -128,11 +126,9 @@ public function startChildJob(Job $job) $job->setStatus(Job::STATUS_RUNNING); $job->setStartedAt(new \DateTime()); - $this->jobStorage->saveJob($job); + $this->saveJob($job); - $this->producer->sendEvent(Topics::CALCULATE_ROOT_JOB_STATUS, [ - 'jobId' => $job->getId(), - ]); + $this->sendCalculateRootJobStatusEvent($job); } /** @@ -157,11 +153,9 @@ public function successChildJob(Job $job) $job->setStatus(Job::STATUS_SUCCESS); $job->setStoppedAt(new \DateTime()); - $this->jobStorage->saveJob($job); + $this->saveJob($job); - $this->producer->sendEvent(Topics::CALCULATE_ROOT_JOB_STATUS, [ - 'jobId' => $job->getId(), - ]); + $this->sendCalculateRootJobStatusEvent($job); } /** @@ -186,11 +180,9 @@ public function failChildJob(Job $job) $job->setStatus(Job::STATUS_FAILED); $job->setStoppedAt(new \DateTime()); - $this->jobStorage->saveJob($job); + $this->saveJob($job); - $this->producer->sendEvent(Topics::CALCULATE_ROOT_JOB_STATUS, [ - 'jobId' => $job->getId(), - ]); + $this->sendCalculateRootJobStatusEvent($job); } /** @@ -219,11 +211,9 @@ public function cancelChildJob(Job $job) $job->setStartedAt($stoppedAt); } - $this->jobStorage->saveJob($job); + $this->saveJob($job); - $this->producer->sendEvent(Topics::CALCULATE_ROOT_JOB_STATUS, [ - 'jobId' => $job->getId(), - ]); + $this->sendCalculateRootJobStatusEvent($job); } /** @@ -252,4 +242,26 @@ public function interruptRootJob(Job $job, $force = false) } }); } + + /** + * @link https://github.com/php-enqueue/enqueue-dev/pull/222#issuecomment-336102749 See for rationale + * + * @param Job $job + */ + protected function saveJob(Job $job) + { + $this->jobStorage->saveJob($job); + } + + /** + * @link https://github.com/php-enqueue/enqueue-dev/pull/222#issuecomment-336102749 See for rationale + * + * @param Job $job + */ + protected function sendCalculateRootJobStatusEvent(Job $job) + { + $this->producer->sendEvent(Topics::CALCULATE_ROOT_JOB_STATUS, [ + 'jobId' => $job->getId(), + ]); + } }