Skip to content

Commit dbd996d

Browse files
committed
move InsertRun to services layer
1 parent 1aacdeb commit dbd996d

File tree

6 files changed

+150
-185
lines changed

6 files changed

+150
-185
lines changed

models/actions/run.go

Lines changed: 2 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ func (run *ActionRun) IsSchedule() bool {
169169
return run.ScheduleID > 0
170170
}
171171

172-
func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
172+
func UpdateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
173173
_, err := db.GetEngine(ctx).ID(repo.ID).
174174
SetExpr("num_action_runs",
175175
builder.Select("count(*)").From("action_run").
@@ -276,101 +276,6 @@ func CancelJobs(ctx context.Context, jobs []*ActionRunJob) error {
276276
return nil
277277
}
278278

279-
// InsertRun inserts a run
280-
// The title will be cut off at 255 characters if it's longer than 255 characters.
281-
func InsertRun(ctx context.Context, run *ActionRun, runJobs []*ActionRunJob) error {
282-
ctx, committer, err := db.TxContext(ctx)
283-
if err != nil {
284-
return err
285-
}
286-
defer committer.Close()
287-
288-
index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
289-
if err != nil {
290-
return err
291-
}
292-
run.Index = index
293-
run.Title, _ = util.SplitStringAtByteN(run.Title, 255)
294-
295-
blockedByWorkflowConcurrency := false
296-
if len(run.ConcurrencyGroup) > 0 {
297-
if run.ConcurrencyCancel {
298-
if err := CancelPreviousJobsWithOpts(ctx, &FindRunOptions{
299-
RepoID: run.RepoID,
300-
ConcurrencyGroup: run.ConcurrencyGroup,
301-
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
302-
}); err != nil {
303-
return err
304-
}
305-
} else {
306-
runningConcurrentRunsNum, err := db.Count[ActionRun](ctx, &FindRunOptions{
307-
RepoID: run.RepoID,
308-
ConcurrencyGroup: run.ConcurrencyGroup,
309-
Status: []Status{StatusRunning},
310-
})
311-
if err != nil {
312-
return err
313-
}
314-
blockedByWorkflowConcurrency = runningConcurrentRunsNum > 0
315-
}
316-
}
317-
if blockedByWorkflowConcurrency {
318-
run.Status = StatusBlocked
319-
}
320-
321-
if err := db.Insert(ctx, run); err != nil {
322-
return err
323-
}
324-
325-
if run.Repo == nil {
326-
repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
327-
if err != nil {
328-
return err
329-
}
330-
run.Repo = repo
331-
}
332-
333-
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
334-
return err
335-
}
336-
337-
var hasWaiting bool
338-
for _, job := range runJobs {
339-
job.RunID = run.ID
340-
if job.Status != StatusBlocked {
341-
if blockedByWorkflowConcurrency {
342-
// the job should also be blocked when the run is blocked
343-
job.Status = StatusBlocked
344-
} else if len(job.ConcurrencyGroup) > 0 {
345-
// check if the job should be blocked by job concurrency
346-
shouldBlock, err := ShouldJobBeBlockedByConcurrentJobs(ctx, job)
347-
if err != nil {
348-
return err
349-
}
350-
if shouldBlock {
351-
job.Status = StatusBlocked
352-
}
353-
}
354-
}
355-
356-
if job.Status == StatusWaiting {
357-
hasWaiting = true
358-
}
359-
}
360-
if err := db.Insert(ctx, runJobs); err != nil {
361-
return err
362-
}
363-
364-
// if there is a job in the waiting status, increase tasks version.
365-
if hasWaiting {
366-
if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
367-
return err
368-
}
369-
}
370-
371-
return committer.Commit()
372-
}
373-
374279
func GetRunByID(ctx context.Context, id int64) (*ActionRun, error) {
375280
var run ActionRun
376281
has, err := db.GetEngine(ctx).Where("id=?", id).Get(&run)
@@ -460,7 +365,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
460365
}
461366
run.Repo = repo
462367
}
463-
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
368+
if err := UpdateRepoRunsNumbers(ctx, run.Repo); err != nil {
464369
return err
465370
}
466371
}

routers/web/repo/actions/view.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -912,14 +912,8 @@ func Run(ctx *context_module.Context) {
912912
log.Error("CancelRunningJobs: %v", err)
913913
}
914914

915-
runJobs, err := actions_service.PrepareConcurrencyForRunAndJobs(ctx, content, run, workflows)
916-
if err != nil {
917-
ctx.ServerError("PrepareConcurrencyForRunAndJobs", err)
918-
return
919-
}
920-
921915
// Insert the action run and its associated jobs into the database
922-
if err := actions_model.InsertRun(ctx, run, runJobs); err != nil {
916+
if err := actions_service.InsertRun(ctx, run, workflows); err != nil {
923917
ctx.ServerError("workflow", err)
924918
return
925919
}

services/actions/concurrency.go

Lines changed: 0 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -5,84 +5,14 @@ package actions
55

66
import (
77
"bytes"
8-
"context"
98
"fmt"
109

1110
actions_model "code.gitea.io/gitea/models/actions"
12-
"code.gitea.io/gitea/modules/util"
1311

1412
"github.com/nektos/act/pkg/jobparser"
1513
act_model "github.com/nektos/act/pkg/model"
1614
)
1715

18-
func PrepareConcurrencyForRunAndJobs(ctx context.Context, wfContent []byte, run *actions_model.ActionRun, jobs []*jobparser.SingleWorkflow) ([]*actions_model.ActionRunJob, error) {
19-
vars, err := actions_model.GetVariablesOfRun(ctx, run)
20-
if err != nil {
21-
return nil, fmt.Errorf("get run %d variables: %w", run.ID, err)
22-
}
23-
24-
// check workflow concurrency
25-
wfRawConcurrency, err := jobparser.ReadWorkflowRawConcurrency(wfContent)
26-
if err != nil {
27-
return nil, fmt.Errorf("read workflow raw concurrency: %w", err)
28-
}
29-
if wfRawConcurrency != nil {
30-
wfGitCtx := jobparser.ToGitContext(GenerateGitContext(run, nil))
31-
wfConcurrencyGroup, wfConcurrencyCancel := jobparser.EvaluateWorkflowConcurrency(wfRawConcurrency, wfGitCtx, vars)
32-
if len(wfConcurrencyGroup) > 0 {
33-
run.ConcurrencyGroup = wfConcurrencyGroup
34-
run.ConcurrencyCancel = wfConcurrencyCancel
35-
}
36-
}
37-
38-
runJobs := make([]*actions_model.ActionRunJob, 0, len(jobs))
39-
for _, v := range jobs {
40-
id, job := v.Job()
41-
needs := job.Needs()
42-
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
43-
return nil, err
44-
}
45-
payload, _ := v.Marshal()
46-
47-
status := actions_model.StatusWaiting
48-
if len(needs) > 0 || run.NeedApproval {
49-
status = actions_model.StatusBlocked
50-
}
51-
job.Name, _ = util.SplitStringAtByteN(job.Name, 255)
52-
runJob := &actions_model.ActionRunJob{
53-
RepoID: run.RepoID,
54-
OwnerID: run.OwnerID,
55-
CommitSHA: run.CommitSHA,
56-
IsForkPullRequest: run.IsForkPullRequest,
57-
Name: job.Name,
58-
WorkflowPayload: payload,
59-
JobID: id,
60-
Needs: needs,
61-
RunsOn: job.RunsOn(),
62-
Status: status,
63-
}
64-
65-
// check job concurrency
66-
if job.RawConcurrency != nil && len(job.RawConcurrency.Group) > 0 {
67-
runJob.RawConcurrencyGroup = job.RawConcurrency.Group
68-
runJob.RawConcurrencyCancel = job.RawConcurrency.CancelInProgress
69-
// we do not need to evaluate job concurrency if the job is blocked
70-
// because it will be checked by job emitter
71-
if runJob.Status != actions_model.StatusBlocked && len(needs) == 0 {
72-
var err error
73-
runJob.ConcurrencyGroup, runJob.ConcurrencyCancel, err = evaluateJobConcurrency(run, runJob, vars, map[string]*jobparser.JobResult{})
74-
if err != nil {
75-
return nil, fmt.Errorf("evaluate job concurrency: %w", err)
76-
}
77-
}
78-
}
79-
80-
runJobs = append(runJobs, runJob)
81-
}
82-
83-
return runJobs, nil
84-
}
85-
8616
func evaluateJobConcurrency(run *actions_model.ActionRun, actionRunJob *actions_model.ActionRunJob, vars map[string]string, jobResults map[string]*jobparser.JobResult) (string, bool, error) {
8717
rawConcurrency := &act_model.RawConcurrency{
8818
Group: actionRunJob.RawConcurrencyGroup,

services/actions/notifier_helper.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -337,13 +337,8 @@ func handleWorkflows(
337337
log.Error("jobparser.Parse: %v", err)
338338
continue
339339
}
340-
runJobs, err := PrepareConcurrencyForRunAndJobs(ctx, dwf.Content, run, jobs)
341-
if err != nil {
342-
log.Error("PrepareConcurrencyForRunAndJobs: %v", err)
343-
continue
344-
}
345340

346-
if err := actions_model.InsertRun(ctx, run, runJobs); err != nil {
341+
if err := InsertRun(ctx, run, jobs); err != nil {
347342
log.Error("InsertRun: %v", err)
348343
continue
349344
}

services/actions/run.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// Copyright 2024 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package actions
5+
6+
import (
7+
"context"
8+
"fmt"
9+
10+
actions_model "code.gitea.io/gitea/models/actions"
11+
"code.gitea.io/gitea/models/db"
12+
repo_model "code.gitea.io/gitea/models/repo"
13+
"code.gitea.io/gitea/modules/util"
14+
15+
"github.com/nektos/act/pkg/jobparser"
16+
)
17+
18+
// InsertRun inserts a run
19+
// The title will be cut off at 255 characters if it's longer than 255 characters.
20+
func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobparser.SingleWorkflow) error {
21+
ctx, committer, err := db.TxContext(ctx)
22+
if err != nil {
23+
return err
24+
}
25+
defer committer.Close()
26+
27+
index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
28+
if err != nil {
29+
return err
30+
}
31+
run.Index = index
32+
run.Title, _ = util.SplitStringAtByteN(run.Title, 255)
33+
34+
// check workflow concurrency
35+
if len(run.ConcurrencyGroup) > 0 {
36+
if run.ConcurrencyCancel {
37+
if err := actions_model.CancelPreviousJobsWithOpts(ctx, &actions_model.FindRunOptions{
38+
RepoID: run.RepoID,
39+
ConcurrencyGroup: run.ConcurrencyGroup,
40+
Status: []actions_model.Status{
41+
actions_model.StatusRunning,
42+
actions_model.StatusWaiting,
43+
actions_model.StatusBlocked,
44+
},
45+
}); err != nil {
46+
return err
47+
}
48+
} else {
49+
runningConcurrentRunsNum, err := db.Count[actions_model.ActionRun](ctx, &actions_model.FindRunOptions{
50+
RepoID: run.RepoID,
51+
ConcurrencyGroup: run.ConcurrencyGroup,
52+
Status: []actions_model.Status{actions_model.StatusRunning},
53+
})
54+
if err != nil {
55+
return err
56+
}
57+
if runningConcurrentRunsNum > 0 {
58+
run.Status = actions_model.StatusBlocked
59+
}
60+
}
61+
}
62+
63+
if err := db.Insert(ctx, run); err != nil {
64+
return err
65+
}
66+
67+
if run.Repo == nil {
68+
repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
69+
if err != nil {
70+
return err
71+
}
72+
run.Repo = repo
73+
}
74+
75+
if err := actions_model.UpdateRepoRunsNumbers(ctx, run.Repo); err != nil {
76+
return err
77+
}
78+
79+
// query vars for evaluating job concurrency groups
80+
vars, err := actions_model.GetVariablesOfRun(ctx, run)
81+
if err != nil {
82+
return fmt.Errorf("get run %d variables: %w", run.ID, err)
83+
}
84+
85+
runJobs := make([]*actions_model.ActionRunJob, 0, len(jobs))
86+
var hasWaiting bool
87+
for _, v := range jobs {
88+
id, job := v.Job()
89+
needs := job.Needs()
90+
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
91+
return err
92+
}
93+
payload, _ := v.Marshal()
94+
status := actions_model.StatusWaiting
95+
if len(needs) > 0 || run.NeedApproval || run.Status == actions_model.StatusBlocked {
96+
status = actions_model.StatusBlocked
97+
} else {
98+
hasWaiting = true
99+
}
100+
job.Name, _ = util.SplitStringAtByteN(job.Name, 255)
101+
runJob := &actions_model.ActionRunJob{
102+
RunID: run.ID,
103+
RepoID: run.RepoID,
104+
OwnerID: run.OwnerID,
105+
CommitSHA: run.CommitSHA,
106+
IsForkPullRequest: run.IsForkPullRequest,
107+
Name: job.Name,
108+
WorkflowPayload: payload,
109+
JobID: id,
110+
Needs: needs,
111+
RunsOn: job.RunsOn(),
112+
Status: status,
113+
}
114+
115+
// check job concurrency
116+
if job.RawConcurrency != nil && len(job.RawConcurrency.Group) > 0 {
117+
runJob.RawConcurrencyGroup = job.RawConcurrency.Group
118+
runJob.RawConcurrencyCancel = job.RawConcurrency.CancelInProgress
119+
// we do not need to evaluate job concurrency if the job is blocked
120+
// because it will be checked by job emitter
121+
if runJob.Status != actions_model.StatusBlocked && len(needs) == 0 {
122+
var err error
123+
runJob.ConcurrencyGroup, runJob.ConcurrencyCancel, err = evaluateJobConcurrency(run, runJob, vars, map[string]*jobparser.JobResult{})
124+
if err != nil {
125+
return fmt.Errorf("evaluate job concurrency: %w", err)
126+
}
127+
}
128+
}
129+
130+
runJobs = append(runJobs, runJob)
131+
}
132+
133+
if err := db.Insert(ctx, runJobs); err != nil {
134+
return err
135+
}
136+
137+
// if there is a job in the waiting status, increase tasks version.
138+
if hasWaiting {
139+
if err := actions_model.IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
140+
return err
141+
}
142+
}
143+
144+
return committer.Commit()
145+
}

services/actions/schedule_tasks.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,13 +143,9 @@ func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule)
143143
if err != nil {
144144
return err
145145
}
146-
runJobs, err := PrepareConcurrencyForRunAndJobs(ctx, cron.Content, run, workflows)
147-
if err != nil {
148-
return err
149-
}
150146

151147
// Insert the action run and its associated jobs into the database
152-
if err := actions_model.InsertRun(ctx, run, runJobs); err != nil {
148+
if err := InsertRun(ctx, run, workflows); err != nil {
153149
return err
154150
}
155151

0 commit comments

Comments
 (0)