@@ -22,7 +22,6 @@ import (
22
22
"code.gitea.io/gitea/modules/util"
23
23
webhook_module "code.gitea.io/gitea/modules/webhook"
24
24
25
- "github.com/nektos/act/pkg/jobparser"
26
25
"xorm.io/builder"
27
26
)
28
27
@@ -49,6 +48,8 @@ type ActionRun struct {
49
48
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
50
49
Status Status `xorm:"index"`
51
50
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
51
+ ConcurrencyGroup string `xorm:"index"`
52
+ ConcurrencyCancel bool
52
53
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
53
54
Started timeutil.TimeStamp
54
55
Stopped timeutil.TimeStamp
@@ -170,7 +171,7 @@ func (run *ActionRun) IsSchedule() bool {
170
171
return run .ScheduleID > 0
171
172
}
172
173
173
- func updateRepoRunsNumbers (ctx context.Context , repo * repo_model.Repository ) error {
174
+ func UpdateRepoRunsNumbers (ctx context.Context , repo * repo_model.Repository ) error {
174
175
_ , err := db .GetEngine (ctx ).ID (repo .ID ).
175
176
NoAutoTime ().
176
177
SetExpr ("num_action_runs" ,
@@ -227,121 +228,69 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
227
228
return cancelledJobs , err
228
229
}
229
230
230
- // Iterate over each job and attempt to cancel it.
231
- for _ , job := range jobs {
232
- // Skip jobs that are already in a terminal state (completed, cancelled, etc.).
233
- status := job .Status
234
- if status .IsDone () {
235
- continue
236
- }
231
+ cjs , err := CancelJobs (ctx , jobs )
232
+ if err != nil {
233
+ return cancelledJobs , err
234
+ }
235
+ cancelledJobs = append (cancelledJobs , cjs ... )
236
+ }
237
237
238
- // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
239
- if job .TaskID == 0 {
240
- job .Status = StatusCancelled
241
- job .Stopped = timeutil .TimeStampNow ()
238
+ // Return nil to indicate successful cancellation of all running and waiting jobs.
239
+ return cancelledJobs , nil
240
+ }
242
241
243
- // Update the job's status and stopped time in the database.
244
- n , err := UpdateRunJob (ctx , job , builder.Eq {"task_id" : 0 }, "status" , "stopped" )
245
- if err != nil {
246
- return cancelledJobs , err
247
- }
242
+ func CancelJobs (ctx context.Context , jobs []* ActionRunJob ) ([]* ActionRunJob , error ) {
243
+ cancelledJobs := make ([]* ActionRunJob , 0 , len (jobs ))
244
+ // Iterate over each job and attempt to cancel it.
245
+ for _ , job := range jobs {
246
+ // Skip jobs that are already in a terminal state (completed, cancelled, etc.).
247
+ status := job .Status
248
+ if status .IsDone () {
249
+ continue
250
+ }
248
251
249
- // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again .
250
- if n == 0 {
251
- return cancelledJobs , errors . New ( " job has changed, try again" )
252
- }
252
+ // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it .
253
+ if job . TaskID == 0 {
254
+ job . Status = StatusCancelled
255
+ job . Stopped = timeutil . TimeStampNow ()
253
256
254
- cancelledJobs = append (cancelledJobs , job )
255
- // Continue with the next job.
256
- continue
257
+ // Update the job's status and stopped time in the database.
258
+ n , err := UpdateRunJob (ctx , job , builder.Eq {"task_id" : 0 }, "status" , "stopped" )
259
+ if err != nil {
260
+ return cancelledJobs , err
257
261
}
258
262
259
- // If the job has an associated task, try to stop the task, effectively cancelling the job .
260
- if err := StopTask ( ctx , job . TaskID , StatusCancelled ); err != nil {
261
- return cancelledJobs , err
263
+ // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again .
264
+ if n == 0 {
265
+ return cancelledJobs , errors . New ( "job has changed, try again" )
262
266
}
267
+
263
268
cancelledJobs = append (cancelledJobs , job )
269
+ // Continue with the next job.
270
+ continue
264
271
}
272
+
273
+ // If the job has an associated task, try to stop the task, effectively cancelling the job.
274
+ if err := StopTask (ctx , job .TaskID , StatusCancelled ); err != nil {
275
+ return cancelledJobs , err
276
+ }
277
+ cancelledJobs = append (cancelledJobs , job )
265
278
}
266
279
267
280
// Return nil to indicate successful cancellation of all running and waiting jobs.
268
281
return cancelledJobs , nil
269
282
}
270
283
271
- // InsertRun inserts a run
272
- // The title will be cut off at 255 characters if it's longer than 255 characters.
273
- func InsertRun (ctx context.Context , run * ActionRun , jobs []* jobparser.SingleWorkflow ) error {
274
- ctx , committer , err := db .TxContext (ctx )
275
- if err != nil {
276
- return err
277
- }
278
- defer committer .Close ()
279
-
280
- index , err := db .GetNextResourceIndex (ctx , "action_run_index" , run .RepoID )
284
+ func GetRunByID (ctx context.Context , id int64 ) (* ActionRun , error ) {
285
+ var run ActionRun
286
+ has , err := db .GetEngine (ctx ).Where ("id=?" , id ).Get (& run )
281
287
if err != nil {
282
- return err
283
- }
284
- run .Index = index
285
- run .Title = util .EllipsisDisplayString (run .Title , 255 )
286
-
287
- if err := db .Insert (ctx , run ); err != nil {
288
- return err
289
- }
290
-
291
- if run .Repo == nil {
292
- repo , err := repo_model .GetRepositoryByID (ctx , run .RepoID )
293
- if err != nil {
294
- return err
295
- }
296
- run .Repo = repo
297
- }
298
-
299
- if err := updateRepoRunsNumbers (ctx , run .Repo ); err != nil {
300
- return err
301
- }
302
-
303
- runJobs := make ([]* ActionRunJob , 0 , len (jobs ))
304
- var hasWaiting bool
305
- for _ , v := range jobs {
306
- id , job := v .Job ()
307
- needs := job .Needs ()
308
- if err := v .SetJob (id , job .EraseNeeds ()); err != nil {
309
- return err
310
- }
311
- payload , _ := v .Marshal ()
312
- status := StatusWaiting
313
- if len (needs ) > 0 || run .NeedApproval {
314
- status = StatusBlocked
315
- } else {
316
- hasWaiting = true
317
- }
318
- job .Name = util .EllipsisDisplayString (job .Name , 255 )
319
- runJobs = append (runJobs , & ActionRunJob {
320
- RunID : run .ID ,
321
- RepoID : run .RepoID ,
322
- OwnerID : run .OwnerID ,
323
- CommitSHA : run .CommitSHA ,
324
- IsForkPullRequest : run .IsForkPullRequest ,
325
- Name : job .Name ,
326
- WorkflowPayload : payload ,
327
- JobID : id ,
328
- Needs : needs ,
329
- RunsOn : job .RunsOn (),
330
- Status : status ,
331
- })
332
- }
333
- if err := db .Insert (ctx , runJobs ); err != nil {
334
- return err
335
- }
336
-
337
- // if there is a job in the waiting status, increase tasks version.
338
- if hasWaiting {
339
- if err := IncreaseTaskVersion (ctx , run .OwnerID , run .RepoID ); err != nil {
340
- return err
341
- }
288
+ return nil , err
289
+ } else if ! has {
290
+ return nil , fmt .Errorf ("run with id %d: %w" , id , util .ErrNotExist )
342
291
}
343
292
344
- return committer . Commit ()
293
+ return & run , nil
345
294
}
346
295
347
296
func GetRunByRepoAndID (ctx context.Context , repoID , runID int64 ) (* ActionRun , error ) {
@@ -426,7 +375,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
426
375
if err = run .LoadRepo (ctx ); err != nil {
427
376
return err
428
377
}
429
- if err := updateRepoRunsNumbers (ctx , run .Repo ); err != nil {
378
+ if err := UpdateRepoRunsNumbers (ctx , run .Repo ); err != nil {
430
379
return err
431
380
}
432
381
}
@@ -435,3 +384,55 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
435
384
}
436
385
437
386
type ActionRunIndex db.ResourceIndex
387
+
388
+ func ShouldBlockRunByConcurrency (ctx context.Context , actionRun * ActionRun ) (bool , error ) {
389
+ if actionRun .ConcurrencyGroup == "" || actionRun .ConcurrencyCancel {
390
+ return false , nil
391
+ }
392
+
393
+ concurrentRuns , err := db .Find [ActionRun ](ctx , & FindRunOptions {
394
+ RepoID : actionRun .RepoID ,
395
+ ConcurrencyGroup : actionRun .ConcurrencyGroup ,
396
+ Status : []Status {StatusWaiting , StatusRunning },
397
+ })
398
+ if err != nil {
399
+ return false , fmt .Errorf ("find running and waiting runs: %w" , err )
400
+ }
401
+ previousRuns := slices .DeleteFunc (concurrentRuns , func (r * ActionRun ) bool { return r .ID == actionRun .ID })
402
+
403
+ return len (previousRuns ) > 0 , nil
404
+ }
405
+
406
+ func CancelPreviousJobsByRunConcurrency (ctx context.Context , actionRun * ActionRun ) ([]* ActionRunJob , error ) {
407
+ var cancelledJobs []* ActionRunJob
408
+
409
+ if actionRun .ConcurrencyGroup != "" && actionRun .ConcurrencyCancel {
410
+ // cancel previous runs in the same concurrency group
411
+ runs , err := db .Find [ActionRun ](ctx , & FindRunOptions {
412
+ RepoID : actionRun .RepoID ,
413
+ ConcurrencyGroup : actionRun .ConcurrencyGroup ,
414
+ Status : []Status {StatusRunning , StatusWaiting , StatusBlocked },
415
+ })
416
+ if err != nil {
417
+ return cancelledJobs , fmt .Errorf ("find runs: %w" , err )
418
+ }
419
+ for _ , run := range runs {
420
+ if run .ID == actionRun .ID {
421
+ continue
422
+ }
423
+ jobs , err := db .Find [ActionRunJob ](ctx , FindRunJobOptions {
424
+ RunID : run .ID ,
425
+ })
426
+ if err != nil {
427
+ return cancelledJobs , fmt .Errorf ("find run %d jobs: %w" , run .ID , err )
428
+ }
429
+ cjs , err := CancelJobs (ctx , jobs )
430
+ if err != nil {
431
+ return cancelledJobs , fmt .Errorf ("cancel run %d jobs: %w" , run .ID , err )
432
+ }
433
+ cancelledJobs = append (cancelledJobs , cjs ... )
434
+ }
435
+ }
436
+
437
+ return cancelledJobs , nil
438
+ }
0 commit comments