Skip to content

Commit 0f45fe5

Browse files
committed
update
1 parent 4df785d commit 0f45fe5

File tree

7 files changed

+66
-70
lines changed

7 files changed

+66
-70
lines changed

models/actions/run.go

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type ActionRun struct {
4747
Status Status `xorm:"index"`
4848
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
4949
ConcurrencyGroup string
50+
ConcurrencyCancel bool
5051
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
5152
Started timeutil.TimeStamp
5253
Stopped timeutil.TimeStamp
@@ -270,7 +271,7 @@ func CancelPreviousJobsWithOpts(ctx context.Context, opts *FindRunOptions) error
270271

271272
// InsertRun inserts a run
272273
// The title will be cut off at 255 characters if it's longer than 255 characters.
273-
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow, blockedByConcurrency bool) error {
274+
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
274275
ctx, committer, err := db.TxContext(ctx)
275276
if err != nil {
276277
return err
@@ -284,6 +285,32 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork
284285
run.Index = index
285286
run.Title, _ = util.SplitStringAtByteN(run.Title, 255)
286287

288+
blockedByWorkflowConcurrency := false
289+
if len(run.ConcurrencyGroup) > 0 {
290+
if run.ConcurrencyCancel {
291+
if err := CancelPreviousJobsWithOpts(ctx, &FindRunOptions{
292+
RepoID: run.RepoID,
293+
ConcurrencyGroup: run.ConcurrencyGroup,
294+
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
295+
}); err != nil {
296+
return err
297+
}
298+
} else {
299+
waitingConcurrentRunsNum, err := db.Count[ActionRun](ctx, &FindRunOptions{
300+
RepoID: run.RepoID,
301+
ConcurrencyGroup: run.ConcurrencyGroup,
302+
Status: []Status{StatusWaiting},
303+
})
304+
if err != nil {
305+
return err
306+
}
307+
blockedByWorkflowConcurrency = waitingConcurrentRunsNum > 0
308+
}
309+
}
310+
if blockedByWorkflowConcurrency {
311+
run.Status = StatusBlocked
312+
}
313+
287314
if err := db.Insert(ctx, run); err != nil {
288315
return err
289316
}
@@ -310,13 +337,13 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork
310337
}
311338
payload, _ := v.Marshal()
312339
status := StatusWaiting
313-
if len(needs) > 0 || run.NeedApproval {
340+
if len(needs) > 0 || run.NeedApproval || blockedByWorkflowConcurrency {
314341
status = StatusBlocked
315342
} else {
316343
hasWaiting = true
317344
}
318345
job.Name, _ = util.SplitStringAtByteN(job.Name, 255)
319-
runJobs = append(runJobs, &ActionRunJob{
346+
runJob := &ActionRunJob{
320347
RunID: run.ID,
321348
RepoID: run.RepoID,
322349
OwnerID: run.OwnerID,
@@ -328,7 +355,12 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork
328355
Needs: needs,
329356
RunsOn: job.RunsOn(),
330357
Status: status,
331-
})
358+
}
359+
if job.RawConcurrency != nil {
360+
runJob.RawConcurrencyGroup = job.RawConcurrency.Group
361+
runJob.RawConcurrencyCancel = job.RawConcurrency.CancelInProgress
362+
}
363+
runJobs = append(runJobs, runJob)
332364
}
333365
if err := db.Insert(ctx, runJobs); err != nil {
334366
return err

models/actions/run_job.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,16 @@ type ActionRunJob struct {
3333
RunsOn []string `xorm:"JSON TEXT"`
3434
TaskID int64 // the latest task of the job
3535
Status Status `xorm:"index"`
36-
Started timeutil.TimeStamp
37-
Stopped timeutil.TimeStamp
38-
Created timeutil.TimeStamp `xorm:"created"`
39-
Updated timeutil.TimeStamp `xorm:"updated index"`
36+
37+
RawConcurrencyGroup string // raw concurrency.group
38+
RawConcurrencyCancel string // raw concurrency.cancel-in-progress
39+
ConcurrencyGroup string // interpolated concurrency.group
40+
ConcurrencyCancel bool // interpolated concurrency.cancel-in-progress
41+
42+
Started timeutil.TimeStamp
43+
Stopped timeutil.TimeStamp
44+
Created timeutil.TimeStamp `xorm:"created"`
45+
Updated timeutil.TimeStamp `xorm:"updated index"`
4046
}
4147

4248
func init() {

modules/actions/workflows.go

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ import (
2121
)
2222

2323
type DetectedWorkflow struct {
24-
EntryName string
25-
TriggerEvent *jobparser.Event
26-
Content []byte
27-
Concurrency *jobparser.UninterpolatedConcurrency
24+
EntryName string
25+
TriggerEvent *jobparser.Event
26+
Content []byte
27+
RawConcurrency *model.RawConcurrency
2828
}
2929

3030
func init() {
@@ -96,17 +96,12 @@ func GetEventsFromContent(content []byte) ([]*jobparser.Event, error) {
9696
return events, nil
9797
}
9898

99-
func GetConcurrencyFromContent(content []byte) (*jobparser.UninterpolatedConcurrency, error) {
99+
func GetConcurrencyFromContent(content []byte) (*model.RawConcurrency, error) {
100100
workflow, err := model.ReadWorkflow(bytes.NewReader(content))
101101
if err != nil {
102102
return nil, err
103103
}
104-
uc, err := jobparser.ParseRawConcurrency(&workflow.RawConcurrency)
105-
if err != nil {
106-
return nil, err
107-
}
108-
109-
return uc, nil
104+
return workflow.RawConcurrency, nil
110105
}
111106

112107
func DetectWorkflows(
@@ -153,10 +148,10 @@ func DetectWorkflows(
153148
}
154149
} else if detectMatched(gitRepo, commit, triggedEvent, payload, evt) {
155150
dwf := &DetectedWorkflow{
156-
EntryName: entry.Name(),
157-
TriggerEvent: evt,
158-
Content: content,
159-
Concurrency: concurrency,
151+
EntryName: entry.Name(),
152+
TriggerEvent: evt,
153+
Content: content,
154+
RawConcurrency: concurrency,
160155
}
161156
workflows = append(workflows, dwf)
162157
}

routers/web/repo/actions/view.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,7 @@ func Approve(ctx *context_module.Context) {
546546
}
547547
for _, job := range jobs {
548548
if len(job.Needs) == 0 && job.Status.IsBlocked() {
549+
// TODO: check concurrency
549550
job.Status = actions_model.StatusWaiting
550551
_, err := actions_model.UpdateRunJob(ctx, job, nil, "status")
551552
if err != nil {
@@ -899,7 +900,7 @@ func Run(ctx *context_module.Context) {
899900
}
900901

901902
// Insert the action run and its associated jobs into the database
902-
if err := actions_model.InsertRun(ctx, run, workflows, false); err != nil {
903+
if err := actions_model.InsertRun(ctx, run, workflows); err != nil {
903904
ctx.ServerError("workflow", err)
904905
return
905906
}

services/actions/concurrency.go

Lines changed: 0 additions & 22 deletions
This file was deleted.

services/actions/notifier_helper.go

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -333,14 +333,12 @@ func handleWorkflows(
333333
}
334334

335335
// check workflow concurrency
336-
wfGitCtx := jobparser.ToGitContext(GenerateGitContext(run, nil))
337-
wfConcurrencyGroup, wfConcurrencyCancel := jobparser.InterpolateWorkflowConcurrency(dwf.Concurrency, wfGitCtx, vars)
338-
if len(wfConcurrencyGroup) > 0 {
339-
run.ConcurrencyGroup = wfConcurrencyGroup
340-
if wfConcurrencyCancel {
341-
if err := CancelActionRunByConcurrency(ctx, run); err != nil {
342-
log.Error("CancelActionRunByConcurrency: %v", err)
343-
}
336+
if dwf.RawConcurrency != nil {
337+
wfGitCtx := jobparser.ToGitContext(GenerateGitContext(run, nil))
338+
wfConcurrencyGroup, wfConcurrencyCancel := jobparser.InterpolateWorkflowConcurrency(dwf.RawConcurrency, wfGitCtx, vars)
339+
if len(wfConcurrencyGroup) > 0 {
340+
run.ConcurrencyGroup = wfConcurrencyGroup
341+
run.ConcurrencyCancel = wfConcurrencyCancel
344342
}
345343
}
346344

@@ -350,21 +348,7 @@ func handleWorkflows(
350348
continue
351349
}
352350

353-
// cancel running jobs if the event is push or pull_request_sync
354-
if run.Event == webhook_module.HookEventPush ||
355-
run.Event == webhook_module.HookEventPullRequestSync {
356-
if err := actions_model.CancelPreviousJobs(
357-
ctx,
358-
run.RepoID,
359-
run.Ref,
360-
run.WorkflowID,
361-
run.Event,
362-
); err != nil {
363-
log.Error("CancelPreviousJobs: %v", err)
364-
}
365-
}
366-
367-
if err := actions_model.InsertRun(ctx, run, jobs, !wfConcurrencyCancel); err != nil {
351+
if err := actions_model.InsertRun(ctx, run, jobs); err != nil {
368352
log.Error("InsertRun: %v", err)
369353
continue
370354
}

services/actions/schedule_tasks.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule)
145145
}
146146

147147
// Insert the action run and its associated jobs into the database
148-
if err := actions_model.InsertRun(ctx, run, workflows, false); err != nil {
148+
if err := actions_model.InsertRun(ctx, run, workflows); err != nil {
149149
return err
150150
}
151151

0 commit comments

Comments
 (0)