Skip to content

Commit 28a5fe7

Browse files
committed
fix bugs
1 parent dbd996d commit 28a5fe7

File tree

6 files changed

+121
-53
lines changed

6 files changed

+121
-53
lines changed

models/actions/run.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -394,14 +394,18 @@ func CancelConcurrentJobs(ctx context.Context, actionRunJob *ActionRunJob) error
394394
}
395395

396396
func ShouldJobBeBlockedByConcurrentJobs(ctx context.Context, actionRunJob *ActionRunJob) (bool, error) {
397-
waitingConcurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{
397+
if actionRunJob.ConcurrencyCancel {
398+
return false, CancelConcurrentJobs(ctx, actionRunJob)
399+
}
400+
401+
concurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{
398402
RepoID: actionRunJob.RepoID,
399403
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
400-
Statuses: []Status{StatusWaiting},
404+
Statuses: []Status{StatusRunning, StatusWaiting},
401405
})
402406
if err != nil {
403407
return false, fmt.Errorf("count waiting jobs: %w", err)
404408
}
405409

406-
return waitingConcurrentJobsNum > 0, nil
410+
return concurrentJobsNum > 0, nil
407411
}

models/actions/run_job_list.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ func (jobs ActionJobList) LoadAttributes(ctx context.Context, withRepo bool) err
4646
return jobs.LoadRuns(ctx, withRepo)
4747
}
4848

49+
func GetRunsByIDs(ctx context.Context, runIDs []int64) (RunList, error) {
50+
runList := make(RunList, 0, len(runIDs))
51+
err := db.GetEngine(ctx).In("id", runIDs).Find(&runList)
52+
return runList, err
53+
}
54+
4955
type FindRunJobOptions struct {
5056
db.ListOptions
5157
RunID int64

modules/actions/workflows.go

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,9 @@ import (
2121
)
2222

2323
type DetectedWorkflow struct {
24-
EntryName string
25-
TriggerEvent *jobparser.Event
26-
Content []byte
27-
RawConcurrency *model.RawConcurrency
24+
EntryName string
25+
TriggerEvent *jobparser.Event
26+
Content []byte
2827
}
2928

3029
func init() {
@@ -96,14 +95,6 @@ func GetEventsFromContent(content []byte) ([]*jobparser.Event, error) {
9695
return events, nil
9796
}
9897

99-
func GetConcurrencyFromContent(content []byte) (*model.RawConcurrency, error) {
100-
workflow, err := model.ReadWorkflow(bytes.NewReader(content))
101-
if err != nil {
102-
return nil, err
103-
}
104-
return workflow.RawConcurrency, nil
105-
}
106-
10798
func DetectWorkflows(
10899
gitRepo *git.Repository,
109100
commit *git.Commit,
@@ -130,11 +121,6 @@ func DetectWorkflows(
130121
log.Warn("ignore invalid workflow %q: %v", entry.Name(), err)
131122
continue
132123
}
133-
concurrency, err := GetConcurrencyFromContent(content)
134-
if err != nil {
135-
log.Warn("ignore workflow with invalid concurrency %q: %v", entry.Name(), err)
136-
continue
137-
}
138124
for _, evt := range events {
139125
log.Trace("detect workflow %q for event %#v matching %q", entry.Name(), evt, triggedEvent)
140126
if evt.IsSchedule() {
@@ -148,10 +134,9 @@ func DetectWorkflows(
148134
}
149135
} else if detectMatched(gitRepo, commit, triggedEvent, payload, evt) {
150136
dwf := &DetectedWorkflow{
151-
EntryName: entry.Name(),
152-
TriggerEvent: evt,
153-
Content: content,
154-
RawConcurrency: concurrency,
137+
EntryName: entry.Name(),
138+
TriggerEvent: evt,
139+
Content: content,
155140
}
156141
workflows = append(workflows, dwf)
157142
}

services/actions/job_emitter.go

Lines changed: 69 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
actions_model "code.gitea.io/gitea/models/actions"
1212
"code.gitea.io/gitea/models/db"
13+
"code.gitea.io/gitea/modules/container"
1314
"code.gitea.io/gitea/modules/graceful"
1415
"code.gitea.io/gitea/modules/log"
1516
"code.gitea.io/gitea/modules/queue"
@@ -46,44 +47,87 @@ func jobEmitterQueueHandler(items ...*jobUpdate) []*jobUpdate {
4647
}
4748

4849
func checkJobsByRunID(ctx context.Context, runID int64) error {
49-
run, exist, err := db.GetByID[actions_model.ActionRun](ctx, runID)
50+
run, err := actions_model.GetRunByID(ctx, runID)
5051
if err != nil {
5152
return fmt.Errorf("get action run: %w", err)
5253
}
53-
if !exist {
54-
return fmt.Errorf("action run %d does not exist", runID)
55-
}
5654

5755
return db.WithTx(ctx, func(ctx context.Context) error {
5856
// check jobs of the current run
5957
if err := checkJobsOfRun(ctx, run); err != nil {
6058
return err
6159
}
6260

63-
// check jobs by the concurrency group of the run
64-
if len(run.ConcurrencyGroup) == 0 {
65-
return nil
61+
// check run (workflow-level) concurrency
62+
concurrentRunIDs := make(container.Set[int64])
63+
if len(run.ConcurrencyGroup) > 0 && !run.ConcurrencyCancel {
64+
concurrentRuns, err := db.Find[actions_model.ActionRun](ctx, actions_model.FindRunOptions{
65+
RepoID: run.RepoID,
66+
ConcurrencyGroup: run.ConcurrencyGroup,
67+
Status: []actions_model.Status{actions_model.StatusBlocked},
68+
})
69+
if err != nil {
70+
return err
71+
}
72+
for _, cRun := range concurrentRuns {
73+
concurrentRunIDs.Add(cRun.ID)
74+
if cRun.NeedApproval {
75+
continue
76+
}
77+
if err := checkJobsOfRun(ctx, cRun); err != nil {
78+
return err
79+
}
80+
updatedRun, err := actions_model.GetRunByID(ctx, cRun.ID)
81+
if err != nil {
82+
return err
83+
}
84+
if updatedRun.Status == actions_model.StatusWaiting {
85+
// only run one blocked action run in the same concurrency group
86+
break
87+
}
88+
}
6689
}
67-
concurrentActionRuns, err := db.Find[actions_model.ActionRun](ctx, &actions_model.FindRunOptions{
68-
RepoID: run.RepoID,
69-
ConcurrencyGroup: run.ConcurrencyGroup,
70-
Status: []actions_model.Status{
71-
actions_model.StatusBlocked,
72-
},
73-
SortType: "oldest",
74-
})
90+
91+
// check job concurrency
92+
concurrentJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID})
7593
if err != nil {
76-
return fmt.Errorf("find action run with concurrency group %s: %w", run.ConcurrencyGroup, err)
94+
return err
7795
}
78-
for _, cRun := range concurrentActionRuns {
79-
if cRun.NeedApproval {
80-
continue
81-
}
82-
if err := checkJobsOfRun(ctx, cRun); err != nil {
83-
return err
96+
for _, job := range concurrentJobs {
97+
if job.Status.IsDone() && len(job.ConcurrencyGroup) > 0 && !job.ConcurrencyCancel {
98+
concurrentJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
99+
RepoID: job.RepoID,
100+
ConcurrencyGroup: job.ConcurrencyGroup,
101+
Statuses: []actions_model.Status{actions_model.StatusBlocked},
102+
})
103+
if err != nil {
104+
return err
105+
}
106+
for _, cJob := range concurrentJobs {
107+
if concurrentRunIDs.Contains(cJob.RunID) {
108+
continue
109+
}
110+
cRun, err := actions_model.GetRunByID(ctx, cJob.RunID)
111+
if err != nil {
112+
return err
113+
}
114+
if cRun.NeedApproval {
115+
continue
116+
}
117+
if err := checkJobsOfRun(ctx, cRun); err != nil {
118+
return err
119+
}
120+
updatedJob, err := actions_model.GetRunJobByID(ctx, cJob.ID)
121+
if err != nil {
122+
return err
123+
}
124+
if updatedJob.Status == actions_model.StatusWaiting {
125+
break
126+
}
127+
}
84128
}
85-
break // only run one blocked action run with the same concurrency group
86129
}
130+
87131
return nil
88132
})
89133
}
@@ -189,7 +233,7 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
189233
}
190234
if allDone {
191235
// check concurrency
192-
blockedByJobConcurrency, err := checkJobConcurrency(ctx, r.jobMap[id], r.vars)
236+
blockedByJobConcurrency, err := checkConcurrencyForJobWithNeeds(ctx, r.jobMap[id], r.vars)
193237
if err != nil {
194238
log.Error("Check run %d job %d concurrency: %v. This job will stay blocked.")
195239
continue
@@ -223,7 +267,7 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
223267
return ret
224268
}
225269

226-
func checkJobConcurrency(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) (bool, error) {
270+
func checkConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) (bool, error) {
227271
if len(actionRunJob.RawConcurrencyGroup) == 0 {
228272
return false, nil
229273
}

services/actions/notifier_helper.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,20 @@ func handleWorkflows(
332332
continue
333333
}
334334

335+
wfRawConcurrency, err := jobparser.ReadWorkflowRawConcurrency(dwf.Content)
336+
if err != nil {
337+
log.Error("ReadWorkflowRawConcurrency: %v", err)
338+
continue
339+
}
340+
if wfRawConcurrency != nil {
341+
wfGitCtx := jobparser.ToGitContext(GenerateGitContext(run, nil))
342+
wfConcurrencyGroup, wfConcurrencyCancel := jobparser.EvaluateWorkflowConcurrency(wfRawConcurrency, wfGitCtx, vars)
343+
if len(wfConcurrencyGroup) > 0 {
344+
run.ConcurrencyGroup = wfConcurrencyGroup
345+
run.ConcurrencyCancel = wfConcurrencyCancel
346+
}
347+
}
348+
335349
jobs, err := jobparser.Parse(dwf.Content, jobparser.WithVars(vars))
336350
if err != nil {
337351
log.Error("jobparser.Parse: %v", err)

services/actions/run.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,15 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
4646
return err
4747
}
4848
} else {
49-
runningConcurrentRunsNum, err := db.Count[actions_model.ActionRun](ctx, &actions_model.FindRunOptions{
49+
concurrentRunsNum, err := db.Count[actions_model.ActionRun](ctx, &actions_model.FindRunOptions{
5050
RepoID: run.RepoID,
5151
ConcurrencyGroup: run.ConcurrencyGroup,
52-
Status: []actions_model.Status{actions_model.StatusRunning},
52+
Status: []actions_model.Status{actions_model.StatusWaiting, actions_model.StatusRunning},
5353
})
5454
if err != nil {
5555
return err
5656
}
57-
if runningConcurrentRunsNum > 0 {
57+
if concurrentRunsNum > 0 {
5858
run.Status = actions_model.StatusBlocked
5959
}
6060
}
@@ -118,12 +118,22 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
118118
runJob.RawConcurrencyCancel = job.RawConcurrency.CancelInProgress
119119
// we do not need to evaluate job concurrency if the job is blocked
120120
// because it will be checked by job emitter
121-
if runJob.Status != actions_model.StatusBlocked && len(needs) == 0 {
121+
if runJob.Status != actions_model.StatusBlocked {
122122
var err error
123123
runJob.ConcurrencyGroup, runJob.ConcurrencyCancel, err = evaluateJobConcurrency(run, runJob, vars, map[string]*jobparser.JobResult{})
124124
if err != nil {
125125
return fmt.Errorf("evaluate job concurrency: %w", err)
126126
}
127+
if len(runJob.ConcurrencyGroup) > 0 {
128+
// check if the job should be blocked by job concurrency
129+
shouldBlock, err := actions_model.ShouldJobBeBlockedByConcurrentJobs(ctx, runJob)
130+
if err != nil {
131+
return err
132+
}
133+
if shouldBlock {
134+
runJob.Status = actions_model.StatusBlocked
135+
}
136+
}
127137
}
128138
}
129139

@@ -134,6 +144,11 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
134144
return err
135145
}
136146

147+
run.Status = actions_model.AggregateJobStatus(runJobs)
148+
if err := actions_model.UpdateRun(ctx, run, "status"); err != nil {
149+
return err
150+
}
151+
137152
// if there is a job in the waiting status, increase tasks version.
138153
if hasWaiting {
139154
if err := actions_model.IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {

0 commit comments

Comments
 (0)