Skip to content

Commit 865d0c3

Browse files
committed
support concurrency
1 parent bde014e commit 865d0c3

17 files changed

+1577
-144
lines changed

models/actions/run.go

Lines changed: 48 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"code.gitea.io/gitea/modules/util"
2222
webhook_module "code.gitea.io/gitea/modules/webhook"
2323

24-
"github.com/nektos/act/pkg/jobparser"
2524
"xorm.io/builder"
2625
)
2726

@@ -48,6 +47,8 @@ type ActionRun struct {
4847
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
4948
Status Status `xorm:"index"`
5049
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
50+
ConcurrencyGroup string
51+
ConcurrencyCancel bool
5152
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
5253
Started timeutil.TimeStamp
5354
Stopped timeutil.TimeStamp
@@ -169,7 +170,7 @@ func (run *ActionRun) IsSchedule() bool {
169170
return run.ScheduleID > 0
170171
}
171172

172-
func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
173+
func UpdateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
173174
_, err := db.GetEngine(ctx).ID(repo.ID).
174175
SetExpr("num_action_runs",
175176
builder.Select("count(*)").From("action_run").
@@ -266,80 +267,41 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
266267
return cancelledJobs, nil
267268
}
268269

269-
// InsertRun inserts a run
270-
// The title will be cut off at 255 characters if it's longer than 255 characters.
271-
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
272-
ctx, committer, err := db.TxContext(ctx)
273-
if err != nil {
274-
return err
275-
}
276-
defer committer.Close()
270+
func CancelJobs(ctx context.Context, jobs []*ActionRunJob) error {
271+
// Iterate over each job and attempt to cancel it.
272+
for _, job := range jobs {
273+
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
274+
status := job.Status
275+
if status.IsDone() {
276+
continue
277+
}
277278

278-
index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
279-
if err != nil {
280-
return err
281-
}
282-
run.Index = index
283-
run.Title = util.EllipsisDisplayString(run.Title, 255)
279+
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
280+
if job.TaskID == 0 {
281+
job.Status = StatusCancelled
282+
job.Stopped = timeutil.TimeStampNow()
284283

285-
if err := db.Insert(ctx, run); err != nil {
286-
return err
287-
}
288-
289-
if run.Repo == nil {
290-
repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
291-
if err != nil {
292-
return err
293-
}
294-
run.Repo = repo
295-
}
284+
// Update the job's status and stopped time in the database.
285+
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
286+
if err != nil {
287+
return err
288+
}
296289

297-
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
298-
return err
299-
}
290+
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
291+
if n == 0 {
292+
return fmt.Errorf("job has changed, try again")
293+
}
300294

301-
runJobs := make([]*ActionRunJob, 0, len(jobs))
302-
var hasWaiting bool
303-
for _, v := range jobs {
304-
id, job := v.Job()
305-
needs := job.Needs()
306-
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
307-
return err
308-
}
309-
payload, _ := v.Marshal()
310-
status := StatusWaiting
311-
if len(needs) > 0 || run.NeedApproval {
312-
status = StatusBlocked
313-
} else {
314-
hasWaiting = true
295+
// Continue with the next job.
296+
continue
315297
}
316-
job.Name = util.EllipsisDisplayString(job.Name, 255)
317-
runJobs = append(runJobs, &ActionRunJob{
318-
RunID: run.ID,
319-
RepoID: run.RepoID,
320-
OwnerID: run.OwnerID,
321-
CommitSHA: run.CommitSHA,
322-
IsForkPullRequest: run.IsForkPullRequest,
323-
Name: job.Name,
324-
WorkflowPayload: payload,
325-
JobID: id,
326-
Needs: needs,
327-
RunsOn: job.RunsOn(),
328-
Status: status,
329-
})
330-
}
331-
if err := db.Insert(ctx, runJobs); err != nil {
332-
return err
333-
}
334298

335-
// if there is a job in the waiting status, increase tasks version.
336-
if hasWaiting {
337-
if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
299+
// If the job has an associated task, try to stop the task, effectively cancelling the job.
300+
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
338301
return err
339302
}
340303
}
341-
342-
return committer.Commit()
304+
return nil
343305
}
344306

345307
func GetRunByID(ctx context.Context, id int64) (*ActionRun, error) {
@@ -431,7 +393,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
431393
}
432394
run.Repo = repo
433395
}
434-
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
396+
if err := UpdateRepoRunsNumbers(ctx, run.Repo); err != nil {
435397
return err
436398
}
437399
}
@@ -440,3 +402,21 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
440402
}
441403

442404
type ActionRunIndex db.ResourceIndex
405+
406+
func ShouldBlockRunByConcurrency(ctx context.Context, actionRun *ActionRun) (bool, error) {
407+
if actionRun.ConcurrencyGroup == "" || actionRun.ConcurrencyCancel {
408+
return false, nil
409+
}
410+
411+
concurrentRuns, err := db.Find[ActionRun](ctx, &FindRunOptions{
412+
RepoID: actionRun.RepoID,
413+
ConcurrencyGroup: actionRun.ConcurrencyGroup,
414+
Status: []Status{StatusWaiting, StatusRunning},
415+
})
416+
if err != nil {
417+
return false, fmt.Errorf("find running and waiting runs: %w", err)
418+
}
419+
previousRuns := slices.DeleteFunc(concurrentRuns, func(r *ActionRun) bool { return r.ID == actionRun.ID })
420+
421+
return len(previousRuns) > 0, nil
422+
}

models/actions/run_job.go

Lines changed: 115 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,17 @@ type ActionRunJob struct {
3535
RunsOn []string `xorm:"JSON TEXT"`
3636
TaskID int64 // the latest task of the job
3737
Status Status `xorm:"index"`
38-
Started timeutil.TimeStamp
39-
Stopped timeutil.TimeStamp
40-
Created timeutil.TimeStamp `xorm:"created"`
41-
Updated timeutil.TimeStamp `xorm:"updated index"`
38+
39+
RawConcurrencyGroup string // raw concurrency.group
40+
RawConcurrencyCancel string // raw concurrency.cancel-in-progress
41+
IsConcurrencyEvaluated bool // whether RawConcurrencyGroup have been evaluated, only valid when RawConcurrencyGroup is not empty
42+
ConcurrencyGroup string // evaluated concurrency.group
43+
ConcurrencyCancel bool // evaluated concurrency.cancel-in-progress
44+
45+
Started timeutil.TimeStamp
46+
Stopped timeutil.TimeStamp
47+
Created timeutil.TimeStamp `xorm:"created"`
48+
Updated timeutil.TimeStamp `xorm:"updated index"`
4249
}
4350

4451
func init() {
@@ -197,3 +204,107 @@ func AggregateJobStatus(jobs []*ActionRunJob) Status {
197204
return StatusUnknown // it shouldn't happen
198205
}
199206
}
207+
208+
func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool, error) {
209+
if job.RawConcurrencyGroup == "" {
210+
return false, nil
211+
}
212+
if !job.IsConcurrencyEvaluated {
213+
return false, ErrUnevaluatedConcurrency{
214+
Group: job.RawConcurrencyGroup,
215+
CancelInProgress: job.RawConcurrencyCancel,
216+
}
217+
}
218+
if job.ConcurrencyGroup == "" || job.ConcurrencyCancel {
219+
return false, nil
220+
}
221+
222+
concurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{
223+
RepoID: job.RepoID,
224+
ConcurrencyGroup: job.ConcurrencyGroup,
225+
Statuses: []Status{StatusRunning, StatusWaiting},
226+
})
227+
if err != nil {
228+
return false, fmt.Errorf("count running and waiting jobs: %w", err)
229+
}
230+
if concurrentJobsNum > 0 {
231+
return true, nil
232+
}
233+
234+
if err := job.LoadRun(ctx); err != nil {
235+
return false, fmt.Errorf("load run: %w", err)
236+
}
237+
238+
return ShouldBlockRunByConcurrency(ctx, job.Run)
239+
}
240+
241+
func CancelPreviousJobsByConcurrency(ctx context.Context, job *ActionRunJob) error {
242+
if job.RawConcurrencyGroup != "" {
243+
if !job.IsConcurrencyEvaluated {
244+
return ErrUnevaluatedConcurrency{
245+
Group: job.RawConcurrencyGroup,
246+
CancelInProgress: job.RawConcurrencyCancel,
247+
}
248+
}
249+
if job.ConcurrencyGroup != "" && job.ConcurrencyCancel {
250+
// cancel previous jobs in the same concurrency group
251+
previousJobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{
252+
RepoID: job.RepoID,
253+
ConcurrencyGroup: job.ConcurrencyGroup,
254+
Statuses: []Status{StatusRunning, StatusWaiting, StatusBlocked},
255+
})
256+
if err != nil {
257+
return fmt.Errorf("find previous jobs: %w", err)
258+
}
259+
previousJobs = slices.DeleteFunc(previousJobs, func(j *ActionRunJob) bool { return j.ID == job.ID })
260+
if err := CancelJobs(ctx, previousJobs); err != nil {
261+
return fmt.Errorf("cancel previous jobs: %w", err)
262+
}
263+
}
264+
}
265+
266+
if err := job.LoadRun(ctx); err != nil {
267+
return fmt.Errorf("load run: %w", err)
268+
}
269+
if job.Run.ConcurrencyGroup != "" && job.Run.ConcurrencyCancel {
270+
// cancel previous runs in the same concurrency group
271+
runs, err := db.Find[ActionRun](ctx, &FindRunOptions{
272+
RepoID: job.RepoID,
273+
ConcurrencyGroup: job.Run.ConcurrencyGroup,
274+
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
275+
})
276+
if err != nil {
277+
return fmt.Errorf("find runs: %w", err)
278+
}
279+
for _, run := range runs {
280+
if run.ID == job.Run.ID {
281+
continue
282+
}
283+
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
284+
RunID: run.ID,
285+
})
286+
if err != nil {
287+
return fmt.Errorf("find run %d jobs: %w", run.ID, err)
288+
}
289+
if err := CancelJobs(ctx, jobs); err != nil {
290+
return fmt.Errorf("cancel run %d jobs: %w", run.ID, err)
291+
}
292+
}
293+
}
294+
295+
return nil
296+
}
297+
298+
type ErrUnevaluatedConcurrency struct {
299+
Group string
300+
CancelInProgress string
301+
}
302+
303+
func IsErrUnevaluatedConcurrency(err error) bool {
304+
_, ok := err.(ErrUnevaluatedConcurrency)
305+
return ok
306+
}
307+
308+
func (err ErrUnevaluatedConcurrency) Error() string {
309+
return fmt.Sprintf("the raw concurrency [group=%s, cancel-in-progress=%s] is not evaluated", err.Group, err.CancelInProgress)
310+
}

models/actions/run_job_list.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,13 @@ func (jobs ActionJobList) LoadAttributes(ctx context.Context, withRepo bool) err
6969

7070
type FindRunJobOptions struct {
7171
db.ListOptions
72-
RunID int64
73-
RepoID int64
74-
OwnerID int64
75-
CommitSHA string
76-
Statuses []Status
77-
UpdatedBefore timeutil.TimeStamp
72+
RunID int64
73+
RepoID int64
74+
OwnerID int64
75+
CommitSHA string
76+
Statuses []Status
77+
UpdatedBefore timeutil.TimeStamp
78+
ConcurrencyGroup string
7879
}
7980

8081
func (opts FindRunJobOptions) ToConds() builder.Cond {
@@ -97,5 +98,8 @@ func (opts FindRunJobOptions) ToConds() builder.Cond {
9798
if opts.UpdatedBefore > 0 {
9899
cond = cond.And(builder.Lt{"updated": opts.UpdatedBefore})
99100
}
101+
if opts.ConcurrencyGroup != "" {
102+
cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup})
103+
}
100104
return cond
101105
}

models/actions/run_list.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,15 @@ func (runs RunList) LoadRepos(ctx context.Context) error {
6464

6565
type FindRunOptions struct {
6666
db.ListOptions
67-
RepoID int64
68-
OwnerID int64
69-
WorkflowID string
70-
Ref string // the commit/tag/… that caused this workflow
71-
TriggerUserID int64
72-
TriggerEvent webhook_module.HookEventType
73-
Approved bool // not util.OptionalBool, it works only when it's true
74-
Status []Status
67+
RepoID int64
68+
OwnerID int64
69+
WorkflowID string
70+
Ref string // the commit/tag/… that caused this workflow
71+
TriggerUserID int64
72+
TriggerEvent webhook_module.HookEventType
73+
Approved bool // not util.OptionalBool, it works only when it's true
74+
Status []Status
75+
ConcurrencyGroup string
7576
}
7677

7778
func (opts FindRunOptions) ToConds() builder.Cond {
@@ -100,6 +101,9 @@ func (opts FindRunOptions) ToConds() builder.Cond {
100101
if opts.TriggerEvent != "" {
101102
cond = cond.And(builder.Eq{"trigger_event": opts.TriggerEvent})
102103
}
104+
if len(opts.ConcurrencyGroup) > 0 {
105+
cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup})
106+
}
103107
return cond
104108
}
105109

models/actions/task.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,10 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
321321
return nil, false, nil
322322
}
323323

324+
if err := CancelPreviousJobsByConcurrency(ctx, job); err != nil {
325+
return nil, false, err
326+
}
327+
324328
task.Job = job
325329

326330
if err := committer.Commit(); err != nil {

models/migrations/migrations.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,7 @@ func prepareMigrationTasks() []*migration {
381381
newMigration(317, "Add new index for action for heatmap", v1_24.AddNewIndexForUserDashboard),
382382
newMigration(318, "Add anonymous_access_mode for repo_unit", v1_24.AddRepoUnitAnonymousAccessMode),
383383
newMigration(319, "Add ExclusiveOrder to Label table", v1_24.AddExclusiveOrderColumnToLabelTable),
384+
newMigration(320, "Add support for actions concurrency", v1_24.AddActionsConcurrency),
384385
}
385386
return preparedMigrations
386387
}

0 commit comments

Comments
 (0)