Skip to content

Commit 49c8f3e

Browse files
committed
refactor some funcs
1 parent b9a6246 commit 49c8f3e

File tree

7 files changed

+194
-93
lines changed

7 files changed

+194
-93
lines changed

models/actions/run.go

Lines changed: 49 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"code.gitea.io/gitea/modules/util"
2121
webhook_module "code.gitea.io/gitea/modules/webhook"
2222

23-
"github.com/nektos/act/pkg/jobparser"
2423
"xorm.io/builder"
2524
)
2625

@@ -278,7 +277,7 @@ func CancelJobs(ctx context.Context, jobs []*ActionRunJob) error {
278277

279278
// InsertRun inserts a run
280279
// The title will be cut off at 255 characters if it's longer than 255 characters.
281-
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
280+
func InsertRun(ctx context.Context, run *ActionRun, runJobs []*ActionRunJob) error {
282281
ctx, committer, err := db.TxContext(ctx)
283282
if err != nil {
284283
return err
@@ -334,40 +333,27 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork
334333
return err
335334
}
336335

337-
runJobs := make([]*ActionRunJob, 0, len(jobs))
338336
var hasWaiting bool
339-
for _, v := range jobs {
340-
id, job := v.Job()
341-
needs := job.Needs()
342-
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
343-
return err
337+
for _, job := range runJobs {
338+
if job.Status != StatusBlocked {
339+
if blockedByWorkflowConcurrency {
340+
// the job should also be blocked when the run is blocked
341+
job.Status = StatusBlocked
342+
} else if len(job.ConcurrencyGroup) > 0 {
343+
// check if the job should be blocked by job concurrency
344+
shouldBlock, err := ShouldJobBeBlockedByConcurrentJobs(ctx, job)
345+
if err != nil {
346+
return err
347+
}
348+
if shouldBlock {
349+
job.Status = StatusBlocked
350+
}
351+
}
344352
}
345-
payload, _ := v.Marshal()
346-
status := StatusWaiting
347-
if len(needs) > 0 || run.NeedApproval || blockedByWorkflowConcurrency {
348-
status = StatusBlocked
349-
} else {
353+
354+
if job.Status == StatusWaiting {
350355
hasWaiting = true
351356
}
352-
job.Name, _ = util.SplitStringAtByteN(job.Name, 255)
353-
runJob := &ActionRunJob{
354-
RunID: run.ID,
355-
RepoID: run.RepoID,
356-
OwnerID: run.OwnerID,
357-
CommitSHA: run.CommitSHA,
358-
IsForkPullRequest: run.IsForkPullRequest,
359-
Name: job.Name,
360-
WorkflowPayload: payload,
361-
JobID: id,
362-
Needs: needs,
363-
RunsOn: job.RunsOn(),
364-
Status: status,
365-
}
366-
if job.RawConcurrency != nil {
367-
runJob.RawConcurrencyGroup = job.RawConcurrency.Group
368-
runJob.RawConcurrencyCancel = job.RawConcurrency.CancelInProgress
369-
}
370-
runJobs = append(runJobs, runJob)
371357
}
372358
if err := db.Insert(ctx, runJobs); err != nil {
373359
return err
@@ -481,3 +467,34 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
481467
}
482468

483469
type ActionRunIndex db.ResourceIndex
470+
471+
func CancelConcurrentJobs(ctx context.Context, actionRunJob *ActionRunJob) error {
472+
// cancel previous jobs in the same concurrency group
473+
previousJobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
474+
RepoID: actionRunJob.RepoID,
475+
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
476+
Statuses: []Status{
477+
StatusRunning,
478+
StatusWaiting,
479+
StatusBlocked,
480+
},
481+
})
482+
if err != nil {
483+
return fmt.Errorf("find previous jobs: %w", err)
484+
}
485+
486+
return CancelJobs(ctx, previousJobs)
487+
}
488+
489+
func ShouldJobBeBlockedByConcurrentJobs(ctx context.Context, actionRunJob *ActionRunJob) (bool, error) {
490+
waitingConcurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{
491+
RepoID: actionRunJob.RepoID,
492+
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
493+
Statuses: []Status{StatusWaiting},
494+
})
495+
if err != nil {
496+
return false, fmt.Errorf("count waiting jobs: %w", err)
497+
}
498+
499+
return waitingConcurrentJobsNum > 0, nil
500+
}

models/actions/run_job.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ type ActionRunJob struct {
3636

3737
RawConcurrencyGroup string // raw concurrency.group
3838
RawConcurrencyCancel string // raw concurrency.cancel-in-progress
39-
ConcurrencyGroup string // interpolated concurrency.group
40-
ConcurrencyCancel bool // interpolated concurrency.cancel-in-progress
39+
ConcurrencyGroup string // evaluated concurrency.group
40+
ConcurrencyCancel bool // evaluated concurrency.cancel-in-progress
4141

4242
Started timeutil.TimeStamp
4343
Stopped timeutil.TimeStamp

routers/web/repo/actions/view.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -812,9 +812,10 @@ func Run(ctx *context_module.Context) {
812812

813813
// find workflow from commit
814814
var workflows []*jobparser.SingleWorkflow
815+
var content []byte
815816
for _, entry := range entries {
816817
if entry.Name() == workflowID {
817-
content, err := actions.GetContentFromEntry(entry)
818+
content, err = actions.GetContentFromEntry(entry)
818819
if err != nil {
819820
ctx.Error(http.StatusInternalServerError, err.Error())
820821
return
@@ -899,8 +900,14 @@ func Run(ctx *context_module.Context) {
899900
log.Error("CancelRunningJobs: %v", err)
900901
}
901902

903+
runJobs, err := actions_service.PrepareConcurrencyForRunAndJobs(ctx, content, run, workflows)
904+
if err != nil {
905+
ctx.ServerError("PrepareConcurrencyForRunAndJobs", err)
906+
return
907+
}
908+
902909
// Insert the action run and its associated jobs into the database
903-
if err := actions_model.InsertRun(ctx, run, workflows); err != nil {
910+
if err := actions_model.InsertRun(ctx, run, runJobs); err != nil {
904911
ctx.ServerError("workflow", err)
905912
return
906913
}

services/actions/concurrency.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright 2024 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package actions
5+
6+
import (
7+
"bytes"
8+
"context"
9+
"fmt"
10+
11+
actions_model "code.gitea.io/gitea/models/actions"
12+
"code.gitea.io/gitea/modules/util"
13+
14+
"github.com/nektos/act/pkg/jobparser"
15+
act_model "github.com/nektos/act/pkg/model"
16+
)
17+
18+
func PrepareConcurrencyForRunAndJobs(ctx context.Context, wfContent []byte, run *actions_model.ActionRun, jobs []*jobparser.SingleWorkflow) ([]*actions_model.ActionRunJob, error) {
19+
vars, err := actions_model.GetVariablesOfRun(ctx, run)
20+
if err != nil {
21+
return nil, fmt.Errorf("get run %d variables: %w", run.ID, err)
22+
}
23+
24+
// check workflow concurrency
25+
wfRawConcurrency, err := jobparser.ReadWorkflowRawConcurrency(wfContent)
26+
if err != nil {
27+
return nil, fmt.Errorf("read workflow raw concurrency: %w", err)
28+
}
29+
if wfRawConcurrency != nil {
30+
wfGitCtx := jobparser.ToGitContext(GenerateGitContext(run, nil))
31+
wfConcurrencyGroup, wfConcurrencyCancel := jobparser.EvaluateWorkflowConcurrency(wfRawConcurrency, wfGitCtx, vars)
32+
if len(wfConcurrencyGroup) > 0 {
33+
run.ConcurrencyGroup = wfConcurrencyGroup
34+
run.ConcurrencyCancel = wfConcurrencyCancel
35+
}
36+
}
37+
38+
runJobs := make([]*actions_model.ActionRunJob, 0, len(jobs))
39+
for _, v := range jobs {
40+
id, job := v.Job()
41+
needs := job.Needs()
42+
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
43+
return nil, err
44+
}
45+
payload, _ := v.Marshal()
46+
47+
status := actions_model.StatusWaiting
48+
if len(needs) > 0 || run.NeedApproval {
49+
status = actions_model.StatusBlocked
50+
}
51+
job.Name, _ = util.SplitStringAtByteN(job.Name, 255)
52+
runJob := &actions_model.ActionRunJob{
53+
RunID: run.ID,
54+
RepoID: run.RepoID,
55+
OwnerID: run.OwnerID,
56+
CommitSHA: run.CommitSHA,
57+
IsForkPullRequest: run.IsForkPullRequest,
58+
Name: job.Name,
59+
WorkflowPayload: payload,
60+
JobID: id,
61+
Needs: needs,
62+
RunsOn: job.RunsOn(),
63+
Status: status,
64+
}
65+
66+
// check job concurrency
67+
if job.RawConcurrency != nil && len(job.RawConcurrency.Group) > 0 {
68+
runJob.RawConcurrencyGroup = job.RawConcurrency.Group
69+
runJob.RawConcurrencyCancel = job.RawConcurrency.CancelInProgress
70+
// we do not need to evaluate job concurrency if the job is blocked
71+
// because it will be checked by job emitter
72+
if runJob.Status != actions_model.StatusBlocked && len(needs) == 0 {
73+
var err error
74+
runJob.ConcurrencyGroup, runJob.ConcurrencyCancel, err = evaluateJobConcurrency(ctx, runJob, vars, map[string]*jobparser.JobResult{})
75+
if err != nil {
76+
return nil, fmt.Errorf("evaluate job concurrency: %w", err)
77+
}
78+
}
79+
}
80+
81+
runJobs = append(runJobs, runJob)
82+
}
83+
84+
return runJobs, nil
85+
}
86+
87+
func evaluateJobConcurrency(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string, jobResults map[string]*jobparser.JobResult) (string, bool, error) {
88+
if err := actionRunJob.LoadRun(ctx); err != nil {
89+
return "", false, err
90+
}
91+
run := actionRunJob.Run
92+
93+
rawConcurrency := &act_model.RawConcurrency{
94+
Group: actionRunJob.RawConcurrencyGroup,
95+
CancelInProgress: actionRunJob.RawConcurrencyCancel,
96+
}
97+
98+
gitCtx := jobparser.ToGitContext(GenerateGitContext(run, actionRunJob))
99+
100+
actWorkflow, err := act_model.ReadWorkflow(bytes.NewReader(actionRunJob.WorkflowPayload))
101+
if err != nil {
102+
return "", false, fmt.Errorf("read workflow: %w", err)
103+
}
104+
actJob := actWorkflow.GetJob(actionRunJob.JobID)
105+
106+
concurrencyGroup, concurrencyCancel := jobparser.EvaluateJobConcurrency(rawConcurrency, actJob, gitCtx, vars, jobResults)
107+
108+
return concurrencyGroup, concurrencyCancel, nil
109+
}

services/actions/job_emitter.go

Lines changed: 14 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package actions
55

66
import (
7-
"bytes"
87
"context"
98
"errors"
109
"fmt"
@@ -16,7 +15,6 @@ import (
1615
"code.gitea.io/gitea/modules/queue"
1716

1817
"github.com/nektos/act/pkg/jobparser"
19-
act_model "github.com/nektos/act/pkg/model"
2018
"xorm.io/builder"
2119
)
2220

@@ -230,22 +228,8 @@ func checkJobConcurrency(ctx context.Context, actionRunJob *actions_model.Action
230228
return false, nil
231229
}
232230

233-
run := actionRunJob.Run
234-
235231
if len(actionRunJob.ConcurrencyGroup) == 0 {
236-
rawConcurrency := &act_model.RawConcurrency{
237-
Group: actionRunJob.RawConcurrencyGroup,
238-
CancelInProgress: actionRunJob.RawConcurrencyCancel,
239-
}
240-
241-
gitCtx := jobparser.ToGitContext(GenerateGitContext(run, actionRunJob))
242-
243-
actWorkflow, err := act_model.ReadWorkflow(bytes.NewReader(actionRunJob.WorkflowPayload))
244-
if err != nil {
245-
return false, fmt.Errorf("read workflow: %w", err)
246-
}
247-
actJob := actWorkflow.GetJob(actionRunJob.JobID)
248-
232+
// empty concurrency group means the raw concurrency has not been evaluated
249233
task, err := actions_model.GetTaskByID(ctx, actionRunJob.TaskID)
250234
if err != nil {
251235
return false, fmt.Errorf("get task by id: %w", err)
@@ -254,7 +238,6 @@ func checkJobConcurrency(ctx context.Context, actionRunJob *actions_model.Action
254238
if err != nil {
255239
return false, fmt.Errorf("find task needs: %w", err)
256240
}
257-
258241
jobResults := make(map[string]*jobparser.JobResult, len(taskNeeds))
259242
for jobID, taskNeed := range taskNeeds {
260243
jobResult := &jobparser.JobResult{
@@ -264,7 +247,11 @@ func checkJobConcurrency(ctx context.Context, actionRunJob *actions_model.Action
264247
jobResults[jobID] = jobResult
265248
}
266249

267-
actionRunJob.ConcurrencyGroup, actionRunJob.ConcurrencyCancel = jobparser.InterpolatJobConcurrency(rawConcurrency, actJob, gitCtx, vars, jobResults)
250+
actionRunJob.ConcurrencyGroup, actionRunJob.ConcurrencyCancel, err = evaluateJobConcurrency(ctx, actionRunJob, vars, jobResults)
251+
if err != nil {
252+
return false, fmt.Errorf("evaluate job concurrency: %w", err)
253+
}
254+
268255
if _, err := actions_model.UpdateRunJob(ctx, &actions_model.ActionRunJob{
269256
ID: actionRunJob.ID,
270257
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
@@ -274,35 +261,17 @@ func checkJobConcurrency(ctx context.Context, actionRunJob *actions_model.Action
274261
}
275262
}
276263

277-
if actionRunJob.ConcurrencyCancel {
278-
// cancel previous jobs in the same concurrency group
279-
previousJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
280-
RepoID: actionRunJob.RepoID,
281-
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
282-
Statuses: []actions_model.Status{
283-
actions_model.StatusRunning,
284-
actions_model.StatusWaiting,
285-
actions_model.StatusBlocked,
286-
},
287-
})
288-
if err != nil {
289-
return false, fmt.Errorf("find previous jobs: %w", err)
290-
}
291-
if err := actions_model.CancelJobs(ctx, previousJobs); err != nil {
292-
return false, fmt.Errorf("cancel previous jobs: %w", err)
293-
}
294-
// we have cancelled all previous jobs, so this job does not need to be blocked
264+
if len(actionRunJob.ConcurrencyGroup) == 0 {
265+
// the job should not be blocked by concurrency if its concurrency group is empty
295266
return false, nil
296267
}
297268

298-
waitingConcurrentJobsNum, err := db.Count[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
299-
RepoID: actionRunJob.RepoID,
300-
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
301-
Statuses: []actions_model.Status{actions_model.StatusWaiting},
302-
})
303-
if err != nil {
304-
return false, fmt.Errorf("count waiting jobs: %w", err)
269+
if actionRunJob.ConcurrencyCancel {
270+
if err := actions_model.CancelConcurrentJobs(ctx, actionRunJob); err != nil {
271+
return false, fmt.Errorf("cancel concurrent jobs: %w", err)
272+
}
273+
return false, nil
305274
}
306275

307-
return waitingConcurrentJobsNum > 0, nil
276+
return actions_model.ShouldJobBeBlockedByConcurrentJobs(ctx, actionRunJob)
308277
}

services/actions/notifier_helper.go

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -332,23 +332,18 @@ func handleWorkflows(
332332
continue
333333
}
334334

335-
// check workflow concurrency
336-
if dwf.RawConcurrency != nil {
337-
wfGitCtx := jobparser.ToGitContext(GenerateGitContext(run, nil))
338-
wfConcurrencyGroup, wfConcurrencyCancel := jobparser.InterpolateWorkflowConcurrency(dwf.RawConcurrency, wfGitCtx, vars)
339-
if len(wfConcurrencyGroup) > 0 {
340-
run.ConcurrencyGroup = wfConcurrencyGroup
341-
run.ConcurrencyCancel = wfConcurrencyCancel
342-
}
343-
}
344-
345335
jobs, err := jobparser.Parse(dwf.Content, jobparser.WithVars(vars))
346336
if err != nil {
347337
log.Error("jobparser.Parse: %v", err)
348338
continue
349339
}
340+
runJobs, err := PrepareConcurrencyForRunAndJobs(ctx, dwf.Content, run, jobs)
341+
if err != nil {
342+
log.Error("PrepareConcurrencyForRunAndJobs: %v", err)
343+
continue
344+
}
350345

351-
if err := actions_model.InsertRun(ctx, run, jobs); err != nil {
346+
if err := actions_model.InsertRun(ctx, run, runJobs); err != nil {
352347
log.Error("InsertRun: %v", err)
353348
continue
354349
}

0 commit comments

Comments
 (0)