Skip to content

Commit 3b77f00

Browse files
committed
Adds to small extension points to JobProcessor
1 parent 27afca9 commit 3b77f00

File tree

1 file changed

+23
-21
lines changed

1 file changed

+23
-21
lines changed

pkg/job-queue/JobProcessor.php

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public function findOrCreateRootJob($ownerId, $jobName, $unique = false)
6363
$job->setUnique((bool) $unique);
6464

6565
try {
66-
$this->jobStorage->saveJob($job);
66+
$this->saveJob($job);
6767

6868
return $job;
6969
} catch (DuplicateJobException $e) {
@@ -97,11 +97,9 @@ public function findOrCreateChildJob($jobName, Job $rootJob)
9797
$job->setCreatedAt(new \DateTime());
9898
$job->setRootJob($rootJob);
9999

100-
$this->jobStorage->saveJob($job);
100+
$this->saveJob($job);
101101

102-
$this->producer->sendEvent(Topics::CALCULATE_ROOT_JOB_STATUS, [
103-
'jobId' => $job->getId(),
104-
]);
102+
$this->sendCalculateRootJobStatusEvent($job);
105103

106104
return $job;
107105
}
@@ -128,11 +126,9 @@ public function startChildJob(Job $job)
128126
$job->setStatus(Job::STATUS_RUNNING);
129127
$job->setStartedAt(new \DateTime());
130128

131-
$this->jobStorage->saveJob($job);
129+
$this->saveJob($job);
132130

133-
$this->producer->sendEvent(Topics::CALCULATE_ROOT_JOB_STATUS, [
134-
'jobId' => $job->getId(),
135-
]);
131+
$this->sendCalculateRootJobStatusEvent($job);
136132
}
137133

138134
/**
@@ -157,11 +153,9 @@ public function successChildJob(Job $job)
157153
$job->setStatus(Job::STATUS_SUCCESS);
158154
$job->setStoppedAt(new \DateTime());
159155

160-
$this->jobStorage->saveJob($job);
156+
$this->saveJob($job);
161157

162-
$this->producer->sendEvent(Topics::CALCULATE_ROOT_JOB_STATUS, [
163-
'jobId' => $job->getId(),
164-
]);
158+
$this->sendCalculateRootJobStatusEvent($job);
165159
}
166160

167161
/**
@@ -186,11 +180,9 @@ public function failChildJob(Job $job)
186180
$job->setStatus(Job::STATUS_FAILED);
187181
$job->setStoppedAt(new \DateTime());
188182

189-
$this->jobStorage->saveJob($job);
183+
$this->saveJob($job);
190184

191-
$this->producer->sendEvent(Topics::CALCULATE_ROOT_JOB_STATUS, [
192-
'jobId' => $job->getId(),
193-
]);
185+
$this->sendCalculateRootJobStatusEvent($job);
194186
}
195187

196188
/**
@@ -219,11 +211,9 @@ public function cancelChildJob(Job $job)
219211
$job->setStartedAt($stoppedAt);
220212
}
221213

222-
$this->jobStorage->saveJob($job);
214+
$this->saveJob($job);
223215

224-
$this->producer->sendEvent(Topics::CALCULATE_ROOT_JOB_STATUS, [
225-
'jobId' => $job->getId(),
226-
]);
216+
$this->sendCalculateRootJobStatusEvent($job);
227217
}
228218

229219
/**
@@ -252,4 +242,16 @@ public function interruptRootJob(Job $job, $force = false)
252242
}
253243
});
254244
}
245+
246+
protected function saveJob(Job $job)
247+
{
248+
$this->jobStorage->saveJob($job);
249+
}
250+
251+
protected function sendCalculateRootJobStatusEvent(Job $job)
252+
{
253+
$this->producer->sendEvent(Topics::CALCULATE_ROOT_JOB_STATUS, [
254+
'jobId' => $job->getId(),
255+
]);
256+
}
255257
}

0 commit comments

Comments
 (0)