Skip to content

Commit 19e2875

Browse files
committed
fix cancelling jobs
1 parent 3ddc132 commit 19e2875

File tree

7 files changed

+63
-30
lines changed

7 files changed

+63
-30
lines changed

models/actions/run.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,3 +402,37 @@ func ShouldBlockRunByConcurrency(ctx context.Context, actionRun *ActionRun) (boo
402402

403403
return len(previousRuns) > 0, nil
404404
}
405+
406+
func CancelPreviousJobsByRunConcurrency(ctx context.Context, actionRun *ActionRun) ([]*ActionRunJob, error) {
407+
var cancelledJobs []*ActionRunJob
408+
409+
if actionRun.ConcurrencyGroup != "" && actionRun.ConcurrencyCancel {
410+
// cancel previous runs in the same concurrency group
411+
runs, err := db.Find[ActionRun](ctx, &FindRunOptions{
412+
RepoID: actionRun.RepoID,
413+
ConcurrencyGroup: actionRun.ConcurrencyGroup,
414+
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
415+
})
416+
if err != nil {
417+
return cancelledJobs, fmt.Errorf("find runs: %w", err)
418+
}
419+
for _, run := range runs {
420+
if run.ID == actionRun.ID {
421+
continue
422+
}
423+
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
424+
RunID: run.ID,
425+
})
426+
if err != nil {
427+
return cancelledJobs, fmt.Errorf("find run %d jobs: %w", run.ID, err)
428+
}
429+
cjs, err := CancelJobs(ctx, jobs)
430+
if err != nil {
431+
return cancelledJobs, fmt.Errorf("cancel run %d jobs: %w", run.ID, err)
432+
}
433+
cancelledJobs = append(cancelledJobs, cjs...)
434+
}
435+
}
436+
437+
return cancelledJobs, nil
438+
}

models/actions/run_job.go

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool,
238238
return ShouldBlockRunByConcurrency(ctx, job.Run)
239239
}
240240

241-
func CancelPreviousJobsByConcurrency(ctx context.Context, job *ActionRunJob) ([]*ActionRunJob, error) {
241+
func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob) ([]*ActionRunJob, error) {
242242
var cancelledJobs []*ActionRunJob
243243

244244
if job.RawConcurrencyGroup != "" {
@@ -270,33 +270,12 @@ func CancelPreviousJobsByConcurrency(ctx context.Context, job *ActionRunJob) ([]
270270
if err := job.LoadRun(ctx); err != nil {
271271
return cancelledJobs, fmt.Errorf("load run: %w", err)
272272
}
273-
if job.Run.ConcurrencyGroup != "" && job.Run.ConcurrencyCancel {
274-
// cancel previous runs in the same concurrency group
275-
runs, err := db.Find[ActionRun](ctx, &FindRunOptions{
276-
RepoID: job.RepoID,
277-
ConcurrencyGroup: job.Run.ConcurrencyGroup,
278-
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
279-
})
280-
if err != nil {
281-
return cancelledJobs, fmt.Errorf("find runs: %w", err)
282-
}
283-
for _, run := range runs {
284-
if run.ID == job.Run.ID {
285-
continue
286-
}
287-
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
288-
RunID: run.ID,
289-
})
290-
if err != nil {
291-
return cancelledJobs, fmt.Errorf("find run %d jobs: %w", run.ID, err)
292-
}
293-
cjs, err := CancelJobs(ctx, jobs)
294-
if err != nil {
295-
return cancelledJobs, fmt.Errorf("cancel run %d jobs: %w", run.ID, err)
296-
}
297-
cancelledJobs = append(cancelledJobs, cjs...)
298-
}
273+
274+
cancelledJobsByRun, err := CancelPreviousJobsByRunConcurrency(ctx, job.Run)
275+
if err != nil {
276+
return cancelledJobs, fmt.Errorf("cancel runs: %w", err)
299277
}
278+
cancelledJobs = append(cancelledJobs, cancelledJobsByRun...)
300279

301280
return cancelledJobs, nil
302281
}

routers/web/repo/actions/view.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,8 @@ func rerunJob(ctx *context_module.Context, job *actions_model.ActionRunJob, shou
470470
}
471471
if blockByConcurrency {
472472
job.Status = actions_model.StatusBlocked
473+
} else if err := actions_service.CancelJobsByJobConcurrency(ctx, job); err != nil {
474+
return fmt.Errorf("cancel jobs: %w", err)
473475
}
474476
}
475477

@@ -585,6 +587,9 @@ func Approve(ctx *context_module.Context) {
585587
return err
586588
}
587589
if len(job.Needs) == 0 && job.Status.IsBlocked() && !blockJobByConcurrency {
590+
if err := actions_service.CancelJobsByJobConcurrency(ctx, job); err != nil {
591+
return fmt.Errorf("cancel jobs: %w", err)
592+
}
588593
job.Status = actions_model.StatusWaiting
589594
n, err := actions_model.UpdateRunJob(ctx, job, nil, "status")
590595
if err != nil {

services/actions/clear_tasks.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,14 @@ func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository) er
5757
return err
5858
}
5959

60-
func CancelJobs(ctx context.Context, job *actions_model.ActionRunJob) error {
61-
jobs, err := actions_model.CancelPreviousJobsByConcurrency(ctx, job)
60+
func CancelJobsByJobConcurrency(ctx context.Context, job *actions_model.ActionRunJob) error {
61+
jobs, err := actions_model.CancelPreviousJobsByJobConcurrency(ctx, job)
62+
notifyWorkflowJobStatusUpdate(ctx, jobs)
63+
return err
64+
}
65+
66+
func CancelJobsByRunConcurrency(ctx context.Context, run *actions_model.ActionRun) error {
67+
jobs, err := actions_model.CancelPreviousJobsByRunConcurrency(ctx, run)
6268
notifyWorkflowJobStatusUpdate(ctx, jobs)
6369
return err
6470
}

services/actions/job_emitter.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,11 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
275275
continue
276276
}
277277

278+
if err := CancelJobsByJobConcurrency(ctx, r.jobMap[id]); err != nil {
279+
log.Error("Cancel previous jobs for job %d: %v. This job will stay blocked.", id, err)
280+
continue
281+
}
282+
278283
if allSucceed {
279284
ret[id] = actions_model.StatusWaiting
280285
} else {

services/actions/run.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
3838
}
3939
if blockRunByConcurrency {
4040
run.Status = actions_model.StatusBlocked
41+
} else if err := CancelJobsByRunConcurrency(ctx, run); err != nil {
42+
return fmt.Errorf("cancel jobs: %w", err)
4143
}
4244

4345
if err := db.Insert(ctx, run); err != nil {
@@ -111,6 +113,8 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
111113
}
112114
if blockByConcurrency {
113115
runJob.Status = actions_model.StatusBlocked
116+
} else if err := CancelJobsByJobConcurrency(ctx, runJob); err != nil {
117+
return fmt.Errorf("cancel jobs: %w", err)
114118
}
115119
}
116120
}

services/actions/task.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv
5353
return nil
5454
}
5555

56-
if err := CancelJobs(ctx, t.Job); err != nil {
56+
if err := CancelJobsByJobConcurrency(ctx, t.Job); err != nil {
5757
return fmt.Errorf("CancelJobs: %w", err)
5858
}
5959

0 commit comments

Comments
 (0)