Skip to content

Commit 661dcf0

Browse files
committed
support concurrency
1 parent 780e92e commit 661dcf0

18 files changed

+1619
-188
lines changed

models/actions/run.go

Lines changed: 57 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"code.gitea.io/gitea/modules/util"
2222
webhook_module "code.gitea.io/gitea/modules/webhook"
2323

24-
"github.com/nektos/act/pkg/jobparser"
2524
"xorm.io/builder"
2625
)
2726

@@ -48,6 +47,8 @@ type ActionRun struct {
4847
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
4948
Status Status `xorm:"index"`
5049
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
50+
ConcurrencyGroup string `xorm:"index"`
51+
ConcurrencyCancel bool
5152
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
5253
Started timeutil.TimeStamp
5354
Stopped timeutil.TimeStamp
@@ -169,7 +170,7 @@ func (run *ActionRun) IsSchedule() bool {
169170
return run.ScheduleID > 0
170171
}
171172

172-
func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
173+
func UpdateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
173174
_, err := db.GetEngine(ctx).ID(repo.ID).
174175
NoAutoTime().
175176
SetExpr("num_action_runs",
@@ -226,121 +227,57 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
226227
return cancelledJobs, err
227228
}
228229

229-
// Iterate over each job and attempt to cancel it.
230-
for _, job := range jobs {
231-
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
232-
status := job.Status
233-
if status.IsDone() {
234-
continue
235-
}
236-
237-
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
238-
if job.TaskID == 0 {
239-
job.Status = StatusCancelled
240-
job.Stopped = timeutil.TimeStampNow()
241-
242-
// Update the job's status and stopped time in the database.
243-
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
244-
if err != nil {
245-
return cancelledJobs, err
246-
}
247-
248-
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
249-
if n == 0 {
250-
return cancelledJobs, errors.New("job has changed, try again")
251-
}
252-
253-
cancelledJobs = append(cancelledJobs, job)
254-
// Continue with the next job.
255-
continue
256-
}
257-
258-
// If the job has an associated task, try to stop the task, effectively cancelling the job.
259-
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
260-
return cancelledJobs, err
261-
}
262-
cancelledJobs = append(cancelledJobs, job)
230+
cjs, err := CancelJobs(ctx, jobs)
231+
if err != nil {
232+
return cancelledJobs, err
263233
}
234+
cancelledJobs = append(cancelledJobs, cjs...)
264235
}
265236

266237
// Return nil to indicate successful cancellation of all running and waiting jobs.
267238
return cancelledJobs, nil
268239
}
269240

270-
// InsertRun inserts a run
271-
// The title will be cut off at 255 characters if it's longer than 255 characters.
272-
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
273-
ctx, committer, err := db.TxContext(ctx)
274-
if err != nil {
275-
return err
276-
}
277-
defer committer.Close()
278-
279-
index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
280-
if err != nil {
281-
return err
282-
}
283-
run.Index = index
284-
run.Title = util.EllipsisDisplayString(run.Title, 255)
241+
func CancelJobs(ctx context.Context, jobs []*ActionRunJob) ([]*ActionRunJob, error) {
242+
cancelledJobs := make([]*ActionRunJob, 0, len(jobs))
243+
// Iterate over each job and attempt to cancel it.
244+
for _, job := range jobs {
245+
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
246+
status := job.Status
247+
if status.IsDone() {
248+
continue
249+
}
285250

286-
if err := db.Insert(ctx, run); err != nil {
287-
return err
288-
}
251+
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
252+
if job.TaskID == 0 {
253+
job.Status = StatusCancelled
254+
job.Stopped = timeutil.TimeStampNow()
289255

290-
if run.Repo == nil {
291-
repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
292-
if err != nil {
293-
return err
294-
}
295-
run.Repo = repo
296-
}
256+
// Update the job's status and stopped time in the database.
257+
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
258+
if err != nil {
259+
return cancelledJobs, err
260+
}
297261

298-
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
299-
return err
300-
}
262+
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
263+
if n == 0 {
264+
return cancelledJobs, errors.New("job has changed, try again")
265+
}
301266

302-
runJobs := make([]*ActionRunJob, 0, len(jobs))
303-
var hasWaiting bool
304-
for _, v := range jobs {
305-
id, job := v.Job()
306-
needs := job.Needs()
307-
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
308-
return err
309-
}
310-
payload, _ := v.Marshal()
311-
status := StatusWaiting
312-
if len(needs) > 0 || run.NeedApproval {
313-
status = StatusBlocked
314-
} else {
315-
hasWaiting = true
267+
cancelledJobs = append(cancelledJobs, job)
268+
// Continue with the next job.
269+
continue
316270
}
317-
job.Name = util.EllipsisDisplayString(job.Name, 255)
318-
runJobs = append(runJobs, &ActionRunJob{
319-
RunID: run.ID,
320-
RepoID: run.RepoID,
321-
OwnerID: run.OwnerID,
322-
CommitSHA: run.CommitSHA,
323-
IsForkPullRequest: run.IsForkPullRequest,
324-
Name: job.Name,
325-
WorkflowPayload: payload,
326-
JobID: id,
327-
Needs: needs,
328-
RunsOn: job.RunsOn(),
329-
Status: status,
330-
})
331-
}
332-
if err := db.Insert(ctx, runJobs); err != nil {
333-
return err
334-
}
335271

336-
// if there is a job in the waiting status, increase tasks version.
337-
if hasWaiting {
338-
if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
339-
return err
272+
// If the job has an associated task, try to stop the task, effectively cancelling the job.
273+
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
274+
return cancelledJobs, err
340275
}
276+
cancelledJobs = append(cancelledJobs, job)
341277
}
342278

343-
return committer.Commit()
279+
// Return nil to indicate successful cancellation of all running and waiting jobs.
280+
return cancelledJobs, nil
344281
}
345282

346283
func GetRunByID(ctx context.Context, id int64) (*ActionRun, error) {
@@ -432,7 +369,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
432369
}
433370
run.Repo = repo
434371
}
435-
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
372+
if err := UpdateRepoRunsNumbers(ctx, run.Repo); err != nil {
436373
return err
437374
}
438375
}
@@ -441,3 +378,21 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
441378
}
442379

443380
type ActionRunIndex db.ResourceIndex
381+
382+
func ShouldBlockRunByConcurrency(ctx context.Context, actionRun *ActionRun) (bool, error) {
383+
if actionRun.ConcurrencyGroup == "" || actionRun.ConcurrencyCancel {
384+
return false, nil
385+
}
386+
387+
concurrentRuns, err := db.Find[ActionRun](ctx, &FindRunOptions{
388+
RepoID: actionRun.RepoID,
389+
ConcurrencyGroup: actionRun.ConcurrencyGroup,
390+
Status: []Status{StatusWaiting, StatusRunning},
391+
})
392+
if err != nil {
393+
return false, fmt.Errorf("find running and waiting runs: %w", err)
394+
}
395+
previousRuns := slices.DeleteFunc(concurrentRuns, func(r *ActionRun) bool { return r.ID == actionRun.ID })
396+
397+
return len(previousRuns) > 0, nil
398+
}

models/actions/run_job.go

Lines changed: 121 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,17 @@ type ActionRunJob struct {
3535
RunsOn []string `xorm:"JSON TEXT"`
3636
TaskID int64 // the latest task of the job
3737
Status Status `xorm:"index"`
38-
Started timeutil.TimeStamp
39-
Stopped timeutil.TimeStamp
40-
Created timeutil.TimeStamp `xorm:"created"`
41-
Updated timeutil.TimeStamp `xorm:"updated index"`
38+
39+
RawConcurrencyGroup string // raw concurrency.group
40+
RawConcurrencyCancel string // raw concurrency.cancel-in-progress
41+
IsConcurrencyEvaluated bool // whether RawConcurrencyGroup have been evaluated, only valid when RawConcurrencyGroup is not empty
42+
ConcurrencyGroup string `xorm:"index"` // evaluated concurrency.group
43+
ConcurrencyCancel bool // evaluated concurrency.cancel-in-progress
44+
45+
Started timeutil.TimeStamp
46+
Stopped timeutil.TimeStamp
47+
Created timeutil.TimeStamp `xorm:"created"`
48+
Updated timeutil.TimeStamp `xorm:"updated index"`
4249
}
4350

4451
func init() {
@@ -197,3 +204,113 @@ func AggregateJobStatus(jobs []*ActionRunJob) Status {
197204
return StatusUnknown // it shouldn't happen
198205
}
199206
}
207+
208+
func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool, error) {
209+
if job.RawConcurrencyGroup == "" {
210+
return false, nil
211+
}
212+
if !job.IsConcurrencyEvaluated {
213+
return false, ErrUnevaluatedConcurrency{
214+
Group: job.RawConcurrencyGroup,
215+
CancelInProgress: job.RawConcurrencyCancel,
216+
}
217+
}
218+
if job.ConcurrencyGroup == "" || job.ConcurrencyCancel {
219+
return false, nil
220+
}
221+
222+
concurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{
223+
RepoID: job.RepoID,
224+
ConcurrencyGroup: job.ConcurrencyGroup,
225+
Statuses: []Status{StatusRunning, StatusWaiting},
226+
})
227+
if err != nil {
228+
return false, fmt.Errorf("count running and waiting jobs: %w", err)
229+
}
230+
if concurrentJobsNum > 0 {
231+
return true, nil
232+
}
233+
234+
if err := job.LoadRun(ctx); err != nil {
235+
return false, fmt.Errorf("load run: %w", err)
236+
}
237+
238+
return ShouldBlockRunByConcurrency(ctx, job.Run)
239+
}
240+
241+
func CancelPreviousJobsByConcurrency(ctx context.Context, job *ActionRunJob) ([]*ActionRunJob, error) {
242+
var cancelledJobs []*ActionRunJob
243+
244+
if job.RawConcurrencyGroup != "" {
245+
if !job.IsConcurrencyEvaluated {
246+
return cancelledJobs, ErrUnevaluatedConcurrency{
247+
Group: job.RawConcurrencyGroup,
248+
CancelInProgress: job.RawConcurrencyCancel,
249+
}
250+
}
251+
if job.ConcurrencyGroup != "" && job.ConcurrencyCancel {
252+
// cancel previous jobs in the same concurrency group
253+
previousJobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{
254+
RepoID: job.RepoID,
255+
ConcurrencyGroup: job.ConcurrencyGroup,
256+
Statuses: []Status{StatusRunning, StatusWaiting, StatusBlocked},
257+
})
258+
if err != nil {
259+
return cancelledJobs, fmt.Errorf("find previous jobs: %w", err)
260+
}
261+
previousJobs = slices.DeleteFunc(previousJobs, func(j *ActionRunJob) bool { return j.ID == job.ID })
262+
cjs, err := CancelJobs(ctx, previousJobs)
263+
if err != nil {
264+
return cancelledJobs, fmt.Errorf("cancel previous jobs: %w", err)
265+
}
266+
cancelledJobs = append(cancelledJobs, cjs...)
267+
}
268+
}
269+
270+
if err := job.LoadRun(ctx); err != nil {
271+
return cancelledJobs, fmt.Errorf("load run: %w", err)
272+
}
273+
if job.Run.ConcurrencyGroup != "" && job.Run.ConcurrencyCancel {
274+
// cancel previous runs in the same concurrency group
275+
runs, err := db.Find[ActionRun](ctx, &FindRunOptions{
276+
RepoID: job.RepoID,
277+
ConcurrencyGroup: job.Run.ConcurrencyGroup,
278+
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
279+
})
280+
if err != nil {
281+
return cancelledJobs, fmt.Errorf("find runs: %w", err)
282+
}
283+
for _, run := range runs {
284+
if run.ID == job.Run.ID {
285+
continue
286+
}
287+
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
288+
RunID: run.ID,
289+
})
290+
if err != nil {
291+
return cancelledJobs, fmt.Errorf("find run %d jobs: %w", run.ID, err)
292+
}
293+
cjs, err := CancelJobs(ctx, jobs)
294+
if err != nil {
295+
return cancelledJobs, fmt.Errorf("cancel run %d jobs: %w", run.ID, err)
296+
}
297+
cancelledJobs = append(cancelledJobs, cjs...)
298+
}
299+
}
300+
301+
return cancelledJobs, nil
302+
}
303+
304+
type ErrUnevaluatedConcurrency struct {
305+
Group string
306+
CancelInProgress string
307+
}
308+
309+
func IsErrUnevaluatedConcurrency(err error) bool {
310+
_, ok := err.(ErrUnevaluatedConcurrency)
311+
return ok
312+
}
313+
314+
func (err ErrUnevaluatedConcurrency) Error() string {
315+
return fmt.Sprintf("the raw concurrency [group=%s, cancel-in-progress=%s] is not evaluated", err.Group, err.CancelInProgress)
316+
}

models/actions/run_job_list.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,13 @@ func (jobs ActionJobList) LoadAttributes(ctx context.Context, withRepo bool) err
6969

7070
type FindRunJobOptions struct {
7171
db.ListOptions
72-
RunID int64
73-
RepoID int64
74-
OwnerID int64
75-
CommitSHA string
76-
Statuses []Status
77-
UpdatedBefore timeutil.TimeStamp
72+
RunID int64
73+
RepoID int64
74+
OwnerID int64
75+
CommitSHA string
76+
Statuses []Status
77+
UpdatedBefore timeutil.TimeStamp
78+
ConcurrencyGroup string
7879
}
7980

8081
func (opts FindRunJobOptions) ToConds() builder.Cond {
@@ -97,5 +98,8 @@ func (opts FindRunJobOptions) ToConds() builder.Cond {
9798
if opts.UpdatedBefore > 0 {
9899
cond = cond.And(builder.Lt{"updated": opts.UpdatedBefore})
99100
}
101+
if opts.ConcurrencyGroup != "" {
102+
cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup})
103+
}
100104
return cond
101105
}

0 commit comments

Comments
 (0)