Skip to content

Support Actions concurrency syntax #32751

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 101 additions & 100 deletions models/actions/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"code.gitea.io/gitea/modules/util"
webhook_module "code.gitea.io/gitea/modules/webhook"

"github.com/nektos/act/pkg/jobparser"
"xorm.io/builder"
)

Expand All @@ -49,6 +48,8 @@ type ActionRun struct {
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
Status Status `xorm:"index"`
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
ConcurrencyGroup string `xorm:"index"`
ConcurrencyCancel bool
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
Expand Down Expand Up @@ -170,7 +171,7 @@ func (run *ActionRun) IsSchedule() bool {
return run.ScheduleID > 0
}

func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
func UpdateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
_, err := db.GetEngine(ctx).ID(repo.ID).
NoAutoTime().
SetExpr("num_action_runs",
Expand Down Expand Up @@ -227,121 +228,69 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
return cancelledJobs, err
}

// Iterate over each job and attempt to cancel it.
for _, job := range jobs {
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
status := job.Status
if status.IsDone() {
continue
}
cjs, err := CancelJobs(ctx, jobs)
if err != nil {
return cancelledJobs, err
}
cancelledJobs = append(cancelledJobs, cjs...)
}

// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
if job.TaskID == 0 {
job.Status = StatusCancelled
job.Stopped = timeutil.TimeStampNow()
// Return nil to indicate successful cancellation of all running and waiting jobs.
return cancelledJobs, nil
}

// Update the job's status and stopped time in the database.
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
if err != nil {
return cancelledJobs, err
}
func CancelJobs(ctx context.Context, jobs []*ActionRunJob) ([]*ActionRunJob, error) {
cancelledJobs := make([]*ActionRunJob, 0, len(jobs))
// Iterate over each job and attempt to cancel it.
for _, job := range jobs {
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
status := job.Status
if status.IsDone() {
continue
}

// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
if n == 0 {
return cancelledJobs, errors.New("job has changed, try again")
}
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
if job.TaskID == 0 {
job.Status = StatusCancelled
job.Stopped = timeutil.TimeStampNow()

cancelledJobs = append(cancelledJobs, job)
// Continue with the next job.
continue
// Update the job's status and stopped time in the database.
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
if err != nil {
return cancelledJobs, err
}

// If the job has an associated task, try to stop the task, effectively cancelling the job.
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
return cancelledJobs, err
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
if n == 0 {
return cancelledJobs, errors.New("job has changed, try again")
}

cancelledJobs = append(cancelledJobs, job)
// Continue with the next job.
continue
}

// If the job has an associated task, try to stop the task, effectively cancelling the job.
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
return cancelledJobs, err
}
cancelledJobs = append(cancelledJobs, job)
}

// Return nil to indicate successful cancellation of all running and waiting jobs.
return cancelledJobs, nil
}

// InsertRun inserts a run
// The title will be cut off at 255 characters if it's longer than 255 characters.
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
ctx, committer, err := db.TxContext(ctx)
if err != nil {
return err
}
defer committer.Close()

index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
func GetRunByID(ctx context.Context, id int64) (*ActionRun, error) {
var run ActionRun
has, err := db.GetEngine(ctx).Where("id=?", id).Get(&run)
if err != nil {
return err
}
run.Index = index
run.Title = util.EllipsisDisplayString(run.Title, 255)

if err := db.Insert(ctx, run); err != nil {
return err
}

if run.Repo == nil {
repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
if err != nil {
return err
}
run.Repo = repo
}

if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
return err
}

runJobs := make([]*ActionRunJob, 0, len(jobs))
var hasWaiting bool
for _, v := range jobs {
id, job := v.Job()
needs := job.Needs()
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
return err
}
payload, _ := v.Marshal()
status := StatusWaiting
if len(needs) > 0 || run.NeedApproval {
status = StatusBlocked
} else {
hasWaiting = true
}
job.Name = util.EllipsisDisplayString(job.Name, 255)
runJobs = append(runJobs, &ActionRunJob{
RunID: run.ID,
RepoID: run.RepoID,
OwnerID: run.OwnerID,
CommitSHA: run.CommitSHA,
IsForkPullRequest: run.IsForkPullRequest,
Name: job.Name,
WorkflowPayload: payload,
JobID: id,
Needs: needs,
RunsOn: job.RunsOn(),
Status: status,
})
}
if err := db.Insert(ctx, runJobs); err != nil {
return err
}

// if there is a job in the waiting status, increase tasks version.
if hasWaiting {
if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
return err
}
return nil, err
} else if !has {
return nil, fmt.Errorf("run with id %d: %w", id, util.ErrNotExist)
}

return committer.Commit()
return &run, nil
}

func GetRunByRepoAndID(ctx context.Context, repoID, runID int64) (*ActionRun, error) {
Expand Down Expand Up @@ -426,7 +375,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
if err = run.LoadRepo(ctx); err != nil {
return err
}
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
if err := UpdateRepoRunsNumbers(ctx, run.Repo); err != nil {
return err
}
}
Expand All @@ -435,3 +384,55 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
}

type ActionRunIndex db.ResourceIndex

func ShouldBlockRunByConcurrency(ctx context.Context, actionRun *ActionRun) (bool, error) {
if actionRun.ConcurrencyGroup == "" || actionRun.ConcurrencyCancel {
return false, nil
}

concurrentRuns, err := db.Find[ActionRun](ctx, &FindRunOptions{
RepoID: actionRun.RepoID,
ConcurrencyGroup: actionRun.ConcurrencyGroup,
Status: []Status{StatusWaiting, StatusRunning},
})
if err != nil {
return false, fmt.Errorf("find running and waiting runs: %w", err)
}
previousRuns := slices.DeleteFunc(concurrentRuns, func(r *ActionRun) bool { return r.ID == actionRun.ID })

return len(previousRuns) > 0, nil
}

func CancelPreviousJobsByRunConcurrency(ctx context.Context, actionRun *ActionRun) ([]*ActionRunJob, error) {
var cancelledJobs []*ActionRunJob

if actionRun.ConcurrencyGroup != "" && actionRun.ConcurrencyCancel {
// cancel previous runs in the same concurrency group
runs, err := db.Find[ActionRun](ctx, &FindRunOptions{
RepoID: actionRun.RepoID,
ConcurrencyGroup: actionRun.ConcurrencyGroup,
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
})
if err != nil {
return cancelledJobs, fmt.Errorf("find runs: %w", err)
}
for _, run := range runs {
if run.ID == actionRun.ID {
continue
}
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
RunID: run.ID,
})
if err != nil {
return cancelledJobs, fmt.Errorf("find run %d jobs: %w", run.ID, err)
}
cjs, err := CancelJobs(ctx, jobs)
if err != nil {
return cancelledJobs, fmt.Errorf("cancel run %d jobs: %w", run.ID, err)
}
cancelledJobs = append(cancelledJobs, cjs...)
}
}

return cancelledJobs, nil
}
104 changes: 100 additions & 4 deletions models/actions/run_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,17 @@ type ActionRunJob struct {
RunsOn []string `xorm:"JSON TEXT"`
TaskID int64 // the latest task of the job
Status Status `xorm:"index"`
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated index"`

RawConcurrencyGroup string // raw concurrency.group
RawConcurrencyCancel string // raw concurrency.cancel-in-progress
IsConcurrencyEvaluated bool // whether RawConcurrencyGroup have been evaluated, only valid when RawConcurrencyGroup is not empty
ConcurrencyGroup string `xorm:"index"` // evaluated concurrency.group
ConcurrencyCancel bool // evaluated concurrency.cancel-in-progress

Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated index"`
}

func init() {
Expand Down Expand Up @@ -197,3 +204,92 @@ func AggregateJobStatus(jobs []*ActionRunJob) Status {
return StatusUnknown // it shouldn't happen
}
}

func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool, error) {
if job.RawConcurrencyGroup == "" {
return false, nil
}
if !job.IsConcurrencyEvaluated {
return false, ErrUnevaluatedConcurrency{
Group: job.RawConcurrencyGroup,
CancelInProgress: job.RawConcurrencyCancel,
}
}
if job.ConcurrencyGroup == "" || job.ConcurrencyCancel {
return false, nil
}

concurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{
RepoID: job.RepoID,
ConcurrencyGroup: job.ConcurrencyGroup,
Statuses: []Status{StatusRunning, StatusWaiting},
})
if err != nil {
return false, fmt.Errorf("count running and waiting jobs: %w", err)
}
if concurrentJobsNum > 0 {
return true, nil
}

if err := job.LoadRun(ctx); err != nil {
return false, fmt.Errorf("load run: %w", err)
}

return ShouldBlockRunByConcurrency(ctx, job.Run)
}

func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob) ([]*ActionRunJob, error) {
var cancelledJobs []*ActionRunJob

if job.RawConcurrencyGroup != "" {
if !job.IsConcurrencyEvaluated {
return cancelledJobs, ErrUnevaluatedConcurrency{
Group: job.RawConcurrencyGroup,
CancelInProgress: job.RawConcurrencyCancel,
}
}
if job.ConcurrencyGroup != "" && job.ConcurrencyCancel {
// cancel previous jobs in the same concurrency group
previousJobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{
RepoID: job.RepoID,
ConcurrencyGroup: job.ConcurrencyGroup,
Statuses: []Status{StatusRunning, StatusWaiting, StatusBlocked},
})
if err != nil {
return cancelledJobs, fmt.Errorf("find previous jobs: %w", err)
}
previousJobs = slices.DeleteFunc(previousJobs, func(j *ActionRunJob) bool { return j.ID == job.ID })
cjs, err := CancelJobs(ctx, previousJobs)
if err != nil {
return cancelledJobs, fmt.Errorf("cancel previous jobs: %w", err)
}
cancelledJobs = append(cancelledJobs, cjs...)
Comment on lines +252 to +266
Copy link
Contributor

@ChristopherHX ChristopherHX May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect that job concurrency key cancels equal workflow concurrency if defined on job level.

Based on my experience workflow and job concurrency use the same pool of concurrency keys. (deadlock handling required)

and CancelPreviousJobsByRunConcurrency to not be called here (EDIT 2 idk how this PR currently works, might break everything if not replaced)

e.g.

jobs:
  _:
    concurrency: test

cancels workflow

concurrency: test

In response of the unexpected behavior I have seen

EDIT
If we add an optional job / string parameter to CancelPreviousJobsByRunConcurrency , so the concurrency group of the job can be provided to replace the run concurrency key would fulfill my expectation, I guess.

}
}

if err := job.LoadRun(ctx); err != nil {
return cancelledJobs, fmt.Errorf("load run: %w", err)
}

cancelledJobsByRun, err := CancelPreviousJobsByRunConcurrency(ctx, job.Run)
if err != nil {
return cancelledJobs, fmt.Errorf("cancel runs: %w", err)
}
cancelledJobs = append(cancelledJobs, cancelledJobsByRun...)

return cancelledJobs, nil
}

type ErrUnevaluatedConcurrency struct {
Group string
CancelInProgress string
}

func IsErrUnevaluatedConcurrency(err error) bool {
_, ok := err.(ErrUnevaluatedConcurrency)
return ok
}

func (err ErrUnevaluatedConcurrency) Error() string {
return fmt.Sprintf("the raw concurrency [group=%s, cancel-in-progress=%s] is not evaluated", err.Group, err.CancelInProgress)
}
Loading