Skip to content

Commit ed133ff

Browse files
committed
checkJobConcurrency
1 parent b3ddfce commit ed133ff

File tree

5 files changed

+209
-46
lines changed

5 files changed

+209
-46
lines changed

models/actions/run.go

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -230,42 +230,49 @@ func CancelPreviousJobsWithOpts(ctx context.Context, opts *FindRunOptions) error
230230
return err
231231
}
232232

233-
// Iterate over each job and attempt to cancel it.
234-
for _, job := range jobs {
235-
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
236-
status := job.Status
237-
if status.IsDone() {
238-
continue
239-
}
233+
if err := CancelJobs(ctx, jobs); err != nil {
234+
return err
235+
}
236+
}
240237

241-
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
242-
if job.TaskID == 0 {
243-
job.Status = StatusCancelled
244-
job.Stopped = timeutil.TimeStampNow()
238+
// Return nil to indicate successful cancellation of all running and waiting jobs.
239+
return nil
240+
}
245241

246-
// Update the job's status and stopped time in the database.
247-
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
248-
if err != nil {
249-
return err
250-
}
242+
func CancelJobs(ctx context.Context, jobs []*ActionRunJob) error {
243+
// Iterate over each job and attempt to cancel it.
244+
for _, job := range jobs {
245+
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
246+
status := job.Status
247+
if status.IsDone() {
248+
continue
249+
}
251250

252-
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
253-
if n == 0 {
254-
return fmt.Errorf("job has changed, try again")
255-
}
251+
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
252+
if job.TaskID == 0 {
253+
job.Status = StatusCancelled
254+
job.Stopped = timeutil.TimeStampNow()
256255

257-
// Continue with the next job.
258-
continue
256+
// Update the job's status and stopped time in the database.
257+
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
258+
if err != nil {
259+
return err
259260
}
260261

261-
// If the job has an associated task, try to stop the task, effectively cancelling the job.
262-
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
263-
return err
262+
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
263+
if n == 0 {
264+
return fmt.Errorf("job has changed, try again")
264265
}
266+
267+
// Continue with the next job.
268+
continue
265269
}
266-
}
267270

268-
// Return nil to indicate successful cancellation of all running and waiting jobs.
271+
// If the job has an associated task, try to stop the task, effectively cancelling the job.
272+
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
273+
return err
274+
}
275+
}
269276
return nil
270277
}
271278

models/actions/run_job_list.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,13 @@ func (jobs ActionJobList) LoadAttributes(ctx context.Context, withRepo bool) err
4848

4949
type FindRunJobOptions struct {
5050
db.ListOptions
51-
RunID int64
52-
RepoID int64
53-
OwnerID int64
54-
CommitSHA string
55-
Statuses []Status
56-
UpdatedBefore timeutil.TimeStamp
51+
RunID int64
52+
RepoID int64
53+
OwnerID int64
54+
CommitSHA string
55+
Statuses []Status
56+
UpdatedBefore timeutil.TimeStamp
57+
ConcurrencyGroup string
5758
}
5859

5960
func (opts FindRunJobOptions) ToConds() builder.Cond {
@@ -76,5 +77,8 @@ func (opts FindRunJobOptions) ToConds() builder.Cond {
7677
if opts.UpdatedBefore > 0 {
7778
cond = cond.And(builder.Lt{"updated": opts.UpdatedBefore})
7879
}
80+
if opts.ConcurrencyGroup != "" {
81+
cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup})
82+
}
7983
return cond
8084
}

services/actions/job_emitter.go

Lines changed: 114 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,19 @@
44
package actions
55

66
import (
7+
"bytes"
78
"context"
89
"errors"
910
"fmt"
1011

1112
actions_model "code.gitea.io/gitea/models/actions"
1213
"code.gitea.io/gitea/models/db"
1314
"code.gitea.io/gitea/modules/graceful"
15+
"code.gitea.io/gitea/modules/log"
1416
"code.gitea.io/gitea/modules/queue"
1517

1618
"github.com/nektos/act/pkg/jobparser"
19+
act_model "github.com/nektos/act/pkg/model"
1720
"xorm.io/builder"
1821
)
1922

@@ -55,7 +58,7 @@ func checkJobsByRunID(ctx context.Context, runID int64) error {
5558

5659
return db.WithTx(ctx, func(ctx context.Context) error {
5760
// check jobs of the current run
58-
if err := checkJobsOfRun(ctx, runID); err != nil {
61+
if err := checkJobsOfRun(ctx, run); err != nil {
5962
return err
6063
}
6164

@@ -78,7 +81,7 @@ func checkJobsByRunID(ctx context.Context, runID int64) error {
7881
if cRun.NeedApproval {
7982
continue
8083
}
81-
if err := checkJobsOfRun(ctx, cRun.ID); err != nil {
84+
if err := checkJobsOfRun(ctx, cRun); err != nil {
8285
return err
8386
}
8487
break // only run one blocked action run with the same concurrency group
@@ -87,18 +90,23 @@ func checkJobsByRunID(ctx context.Context, runID int64) error {
8790
})
8891
}
8992

90-
func checkJobsOfRun(ctx context.Context, runID int64) error {
91-
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: runID})
93+
func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) error {
94+
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID})
9295
if err != nil {
9396
return err
9497
}
98+
99+
vars, err := actions_model.GetVariablesOfRun(ctx, run)
100+
if err != nil {
101+
return fmt.Errorf("get run %d variables: %w", run.ID, err)
102+
}
103+
95104
if err := db.WithTx(ctx, func(ctx context.Context) error {
96-
idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs))
97105
for _, job := range jobs {
98-
idToJobs[job.JobID] = append(idToJobs[job.JobID], job)
106+
job.Run = run
99107
}
100108

101-
updates := newJobStatusResolver(jobs).Resolve()
109+
updates := newJobStatusResolver(jobs, vars).Resolve(ctx)
102110
for _, job := range jobs {
103111
if status, ok := updates[job.ID]; ok {
104112
job.Status = status
@@ -121,9 +129,10 @@ type jobStatusResolver struct {
121129
statuses map[int64]actions_model.Status
122130
needs map[int64][]int64
123131
jobMap map[int64]*actions_model.ActionRunJob
132+
vars map[string]string
124133
}
125134

126-
func newJobStatusResolver(jobs actions_model.ActionJobList) *jobStatusResolver {
135+
func newJobStatusResolver(jobs actions_model.ActionJobList, vars map[string]string) *jobStatusResolver {
127136
idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs))
128137
jobMap := make(map[int64]*actions_model.ActionRunJob)
129138
for _, job := range jobs {
@@ -145,13 +154,14 @@ func newJobStatusResolver(jobs actions_model.ActionJobList) *jobStatusResolver {
145154
statuses: statuses,
146155
needs: needs,
147156
jobMap: jobMap,
157+
vars: vars,
148158
}
149159
}
150160

151-
func (r *jobStatusResolver) Resolve() map[int64]actions_model.Status {
161+
func (r *jobStatusResolver) Resolve(ctx context.Context) map[int64]actions_model.Status {
152162
ret := map[int64]actions_model.Status{}
153163
for i := 0; i < len(r.statuses); i++ {
154-
updated := r.resolve()
164+
updated := r.resolve(ctx)
155165
if len(updated) == 0 {
156166
return ret
157167
}
@@ -163,7 +173,7 @@ func (r *jobStatusResolver) Resolve() map[int64]actions_model.Status {
163173
return ret
164174
}
165175

166-
func (r *jobStatusResolver) resolve() map[int64]actions_model.Status {
176+
func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model.Status {
167177
ret := map[int64]actions_model.Status{}
168178
for id, status := range r.statuses {
169179
if status != actions_model.StatusBlocked {
@@ -180,6 +190,17 @@ func (r *jobStatusResolver) resolve() map[int64]actions_model.Status {
180190
}
181191
}
182192
if allDone {
193+
// check concurrency
194+
blockedByJobConcurrency, err := checkJobConcurrency(ctx, r.jobMap[id], r.vars)
195+
if err != nil {
196+
log.Error("Check run %d job %d concurrency: %v. This job will stay blocked.")
197+
continue
198+
}
199+
200+
if blockedByJobConcurrency {
201+
continue
202+
}
203+
183204
if allSucceed {
184205
ret[id] = actions_model.StatusWaiting
185206
} else {
@@ -203,3 +224,85 @@ func (r *jobStatusResolver) resolve() map[int64]actions_model.Status {
203224
}
204225
return ret
205226
}
227+
228+
func checkJobConcurrency(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) (bool, error) {
229+
if len(actionRunJob.RawConcurrencyGroup) == 0 {
230+
return false, nil
231+
}
232+
233+
run := actionRunJob.Run
234+
235+
if len(actionRunJob.ConcurrencyGroup) == 0 {
236+
rawConcurrency := &act_model.RawConcurrency{
237+
Group: actionRunJob.RawConcurrencyGroup,
238+
CancelInProgress: actionRunJob.RawConcurrencyCancel,
239+
}
240+
241+
gitCtx := jobparser.ToGitContext(GenerateGitContext(run, actionRunJob))
242+
243+
actWorkflow, err := act_model.ReadWorkflow(bytes.NewReader(actionRunJob.WorkflowPayload))
244+
if err != nil {
245+
return false, fmt.Errorf("read workflow: %w", err)
246+
}
247+
actJob := actWorkflow.GetJob(actionRunJob.JobID)
248+
249+
task, err := actions_model.GetTaskByID(ctx, actionRunJob.TaskID)
250+
if err != nil {
251+
return false, fmt.Errorf("get task by id: %w", err)
252+
}
253+
taskNeeds, err := FindTaskNeeds(ctx, task)
254+
if err != nil {
255+
return false, fmt.Errorf("find task needs: %w", err)
256+
}
257+
258+
jobResults := make(map[string]*jobparser.JobResult, len(taskNeeds))
259+
for jobID, taskNeed := range taskNeeds {
260+
jobResult := &jobparser.JobResult{
261+
Result: taskNeed.Result.String(),
262+
Outputs: taskNeed.Outputs,
263+
}
264+
jobResults[jobID] = jobResult
265+
}
266+
267+
actionRunJob.ConcurrencyGroup, actionRunJob.ConcurrencyCancel = jobparser.InterpolatJobConcurrency(rawConcurrency, actJob, gitCtx, vars, jobResults)
268+
if _, err := actions_model.UpdateRunJob(ctx, &actions_model.ActionRunJob{
269+
ID: actionRunJob.ID,
270+
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
271+
ConcurrencyCancel: actionRunJob.ConcurrencyCancel,
272+
}, nil); err != nil {
273+
return false, fmt.Errorf("update run job: %w", err)
274+
}
275+
}
276+
277+
if actionRunJob.ConcurrencyCancel {
278+
// cancel previous jobs in the same concurrency group
279+
previousJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
280+
RepoID: actionRunJob.RepoID,
281+
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
282+
Statuses: []actions_model.Status{
283+
actions_model.StatusRunning,
284+
actions_model.StatusWaiting,
285+
actions_model.StatusBlocked,
286+
},
287+
})
288+
if err != nil {
289+
return false, fmt.Errorf("find previous jobs: %w", err)
290+
}
291+
if err := actions_model.CancelJobs(ctx, previousJobs); err != nil {
292+
return false, fmt.Errorf("cancel previous jobs: %w", err)
293+
}
294+
// we have cancelled all previous jobs, so this job does not need to be blocked
295+
return false, nil
296+
}
297+
298+
waitingConcurrentJobsNum, err := db.Count[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
299+
RepoID: actionRunJob.RepoID,
300+
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
301+
Statuses: []actions_model.Status{actions_model.StatusWaiting},
302+
})
303+
if err != nil {
304+
return false, fmt.Errorf("count waiting jobs: %w", err)
305+
}
306+
307+
return waitingConcurrentJobsNum > 0, nil
308+
}

services/actions/job_emitter_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package actions
55

66
import (
7+
"context"
78
"testing"
89

910
actions_model "code.gitea.io/gitea/models/actions"
@@ -129,8 +130,8 @@ jobs:
129130
}
130131
for _, tt := range tests {
131132
t.Run(tt.name, func(t *testing.T) {
132-
r := newJobStatusResolver(tt.jobs)
133-
assert.Equal(t, tt.want, r.Resolve())
133+
r := newJobStatusResolver(tt.jobs, nil)
134+
assert.Equal(t, tt.want, r.Resolve(context.Background()))
134135
})
135136
}
136137
}

services/actions/utils.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@
44
package actions
55

66
import (
7+
"context"
78
"fmt"
89

910
actions_model "code.gitea.io/gitea/models/actions"
11+
"code.gitea.io/gitea/models/db"
1012
actions_module "code.gitea.io/gitea/modules/actions"
13+
"code.gitea.io/gitea/modules/container"
1114
"code.gitea.io/gitea/modules/git"
1215
"code.gitea.io/gitea/modules/json"
1316
"code.gitea.io/gitea/modules/setting"
@@ -94,3 +97,48 @@ func GenerateGitContext(run *actions_model.ActionRun, job *actions_model.ActionR
9497

9598
return gitContext
9699
}
100+
101+
type TaskNeed struct {
102+
Result actions_model.Status
103+
Outputs map[string]string
104+
}
105+
106+
func FindTaskNeeds(ctx context.Context, task *actions_model.ActionTask) (map[string]*TaskNeed, error) {
107+
if err := task.LoadAttributes(ctx); err != nil {
108+
return nil, fmt.Errorf("LoadAttributes: %w", err)
109+
}
110+
if len(task.Job.Needs) == 0 {
111+
return nil, nil
112+
}
113+
needs := container.SetOf(task.Job.Needs...)
114+
115+
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: task.Job.RunID})
116+
if err != nil {
117+
return nil, fmt.Errorf("FindRunJobs: %w", err)
118+
}
119+
120+
ret := make(map[string]*TaskNeed, len(needs))
121+
for _, job := range jobs {
122+
if !needs.Contains(job.JobID) {
123+
continue
124+
}
125+
if job.TaskID == 0 || !job.Status.IsDone() {
126+
// it shouldn't happen, or the job has been rerun
127+
continue
128+
}
129+
outputs := make(map[string]string)
130+
got, err := actions_model.FindTaskOutputByTaskID(ctx, job.TaskID)
131+
if err != nil {
132+
return nil, fmt.Errorf("FindTaskOutputByTaskID: %w", err)
133+
}
134+
for _, v := range got {
135+
outputs[v.OutputKey] = v.OutputValue
136+
}
137+
ret[job.JobID] = &TaskNeed{
138+
Outputs: outputs,
139+
Result: job.Status,
140+
}
141+
}
142+
143+
return ret, nil
144+
}

0 commit comments

Comments
 (0)