diff --git a/go.mod b/go.mod index 06c2fbd0c8534..cc0bc6d7ae6b7 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( gitea.com/lunny/dingtalk_webhook v0.0.0-20171025031554-e3534c89ef96 gitea.com/lunny/levelqueue v0.4.2-0.20230414023320-3c0159fe0fe4 github.com/42wim/sshsig v0.0.0-20211121163825-841cf5bbc121 + github.com/6543/cicd_feedback v0.0.0-20240625213231-0f894fa6f0f9 github.com/Azure/azure-sdk-for-go/sdk/azcore v1.12.0 github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 diff --git a/go.sum b/go.sum index dbbbf342d6c99..eed9474c3aa49 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,8 @@ gitea.com/xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a h1:lSA0F4e9A2NcQSqGq gitea.com/xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a/go.mod h1:EXuID2Zs0pAQhH8yz+DNjUbjppKQzKFAn28TMYPB6IU= github.com/42wim/sshsig v0.0.0-20211121163825-841cf5bbc121 h1:r3qt8PCHnfjOv9PN3H+XXKmDA1dfFMIN1AislhlA/ps= github.com/42wim/sshsig v0.0.0-20211121163825-841cf5bbc121/go.mod h1:Ock8XgA7pvULhIaHGAk/cDnRfNrF9Jey81nPcc403iU= +github.com/6543/cicd_feedback v0.0.0-20240625213231-0f894fa6f0f9 h1:/QqqEnwNhZk4GwAk0GE6F6lfdIUfjWTod+zCa/jBQSI= +github.com/6543/cicd_feedback v0.0.0-20240625213231-0f894fa6f0f9/go.mod h1:xJjRB6hyl1f8XMjBajWkP98wk4Jq+Kxm+eIGy+Piozo= github.com/6543/go-version v1.3.1 h1:HvOp+Telns7HWJ2Xo/05YXQSB2bE0WmVgbHqwMPZT4U= github.com/6543/go-version v1.3.1/go.mod h1:oqFAHCwtLVUTLdhQmVZWYvaHXTdsbB4SY85at64SQEo= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.12.0 h1:1nGuui+4POelzDwI7RG56yfQJHCnKvwfMoU7VsEp+Zg= diff --git a/models/actions/run.go b/models/actions/run.go index 4f886999e9cd2..71fcc3c1c019b 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -53,6 +53,8 @@ type ActionRun struct { PreviousDuration time.Duration Created timeutil.TimeStamp `xorm:"created"` Updated timeutil.TimeStamp `xorm:"updated"` + // External is true if it's an cicd_feedback pipeline + External bool `xorm:"NOT NULL DEFAULT false"` } func init() { @@ -286,6 +288,10 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork run.Repo = repo } + if run.External { + return committer.Commit() + } + if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil { return err } diff --git a/models/actions/run_job.go b/models/actions/run_job.go index 4b8664077dca9..551501b7af15b 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -37,6 +37,7 @@ type ActionRunJob struct { Stopped timeutil.TimeStamp Created timeutil.TimeStamp `xorm:"created"` Updated timeutil.TimeStamp `xorm:"updated index"` + External bool `xorm:"-"` } func init() { diff --git a/routers/web/repo/actions/view.go b/routers/web/repo/actions/view.go index 2c62c8d9ec0fa..41eace6b9af32 100644 --- a/routers/web/repo/actions/view.go +++ b/routers/web/repo/actions/view.go @@ -22,13 +22,16 @@ import ( "code.gitea.io/gitea/models/unit" "code.gitea.io/gitea/modules/actions" "code.gitea.io/gitea/modules/base" + "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/modules/web" actions_service "code.gitea.io/gitea/services/actions" + "code.gitea.io/gitea/services/cicdfeedback" context_module "code.gitea.io/gitea/services/context" + "github.com/6543/cicd_feedback" "xorm.io/builder" ) @@ -49,11 +52,13 @@ func View(ctx *context_module.Context) { } type ViewRequest struct { - LogCursors []struct { - Step int `json:"step"` - Cursor int64 `json:"cursor"` - Expanded bool `json:"expanded"` - } `json:"logCursors"` + LogCursors []LogCursor `json:"logCursors"` +} + +type LogCursor struct { + Step int `json:"step"` + Cursor int64 `json:"cursor"` + Expanded bool `json:"expanded"` } type ViewResponse struct { @@ -133,7 +138,7 @@ func ViewPost(ctx *context_module.Context) { runIndex := ctx.PathParamInt64("run") jobIndex := ctx.PathParamInt64("job") - current, jobs := getRunJobs(ctx, runIndex, jobIndex) + current, jobs, externalInfo := getRunJobs(ctx, runIndex, jobIndex) if ctx.Written() { return } @@ -162,7 +167,7 @@ func ViewPost(ctx *context_module.Context) { ID: v.ID, Name: v.Name, Status: v.Status.String(), - CanRerun: v.Status.IsDone() && ctx.Repo.CanWrite(unit.TypeActions), + CanRerun: v.Status.IsDone() && ctx.Repo.CanWrite(unit.TypeActions) && !v.External, Duration: v.Duration().String(), }) } @@ -205,64 +210,140 @@ func ViewPost(ctx *context_module.Context) { resp.State.CurrentJob.Steps = make([]*ViewJobStep, 0) // marshal to '[]' instead fo 'null' in json resp.Logs.StepsLog = make([]*ViewStepLog, 0) // marshal to '[]' instead fo 'null' in json if task != nil { - steps := actions.FullSteps(task) - - for _, v := range steps { - resp.State.CurrentJob.Steps = append(resp.State.CurrentJob.Steps, &ViewJobStep{ - Summary: v.Name, - Duration: v.Duration().String(), - Status: v.Status.String(), - }) + steps, logs, err := convertToViewModel(ctx, req.LogCursors, task) + if err != nil { + ctx.Error(http.StatusInternalServerError, err.Error()) + return } + resp.State.CurrentJob.Steps = append(resp.State.CurrentJob.Steps, steps...) + resp.Logs.StepsLog = append(resp.Logs.StepsLog, logs...) + } + if current.External { + loadExternalTask(ctx, req, resp, current, externalInfo) + if ctx.Written() { + return + } + } - for _, cursor := range req.LogCursors { - if !cursor.Expanded { - continue + ctx.JSON(http.StatusOK, resp) +} + +func loadExternalTask(ctx *context_module.Context, req *ViewRequest, resp *ViewResponse, job *actions_model.ActionRunJob, info *cicdfeedback.WorkflowInfo) { + externalSteps := make([]*cicd_feedback.Step, 0, 4) + if err := json.Unmarshal(job.WorkflowPayload, &externalSteps); err != nil { + ctx.Error(http.StatusInternalServerError, err.Error()) + return + } + + for _, v := range externalSteps { + status, _ := cicdfeedback.ConvertStatus(v.Status) + + resp.State.CurrentJob.Steps = append(resp.State.CurrentJob.Steps, &ViewJobStep{ + Summary: v.Name, + Status: status.String(), + }) + } + + for _, cursor := range req.LogCursors { + if !cursor.Expanded { + continue + } + + logLines := make([]*ViewStepLogLine, 0) // marshal to '[]' instead fo 'null' in json + + // we don't support live update atm + if job.Status != actions_model.StatusRunning { + if cursor.Step >= len(externalSteps) { + // out of bounds + ctx.Error(http.StatusForbidden, "out ouf bounds step cursor") + return + } + step := externalSteps[cursor.Step] + logs, err := cicdfeedback.LoadLogs(ctx, step, info) + if err != nil { + ctx.Error(http.StatusInternalServerError, "could not fetch external logs") + return + } + if logs[len(logs)-1] == '\n' { + logs = logs[:len(logs)-1] } - step := steps[cursor.Step] - - logLines := make([]*ViewStepLogLine, 0) // marshal to '[]' instead fo 'null' in json - - index := step.LogIndex + cursor.Cursor - validCursor := cursor.Cursor >= 0 && - // !(cursor.Cursor < step.LogLength) when the frontend tries to fetch next line before it's ready. - // So return the same cursor and empty lines to let the frontend retry. - cursor.Cursor < step.LogLength && - // !(index < task.LogIndexes[index]) when task data is older than step data. - // It can be fixed by making sure write/read tasks and steps in the same transaction, - // but it's easier to just treat it as fetching the next line before it's ready. - index < int64(len(task.LogIndexes)) - - if validCursor { - length := step.LogLength - cursor.Cursor - offset := task.LogIndexes[index] - var err error - logRows, err := actions.ReadLogs(ctx, task.LogInStorage, task.LogFilename, offset, length) - if err != nil { - ctx.Error(http.StatusInternalServerError, err.Error()) - return - } + for i, line := range strings.Split(logs, "\n") { + logLines = append(logLines, &ViewStepLogLine{ + Index: int64(i + 1), + Message: line, + }) + } + } - for i, row := range logRows { - logLines = append(logLines, &ViewStepLogLine{ - Index: cursor.Cursor + int64(i) + 1, // start at 1 - Message: row.Content, - Timestamp: float64(row.Time.AsTime().UnixNano()) / float64(time.Second), - }) - } + resp.Logs.StepsLog = []*ViewStepLog{{ + Step: cursor.Step, + Cursor: cursor.Cursor + int64(len(logLines)), + Lines: logLines, + }} + } +} + +func convertToViewModel(ctx *context_module.Context, cursors []LogCursor, task *actions_model.ActionTask) ([]*ViewJobStep, []*ViewStepLog, error) { + var viewJobs []*ViewJobStep + var logs []*ViewStepLog + + steps := actions.FullSteps(task) + + for _, v := range steps { + viewJobs = append(viewJobs, &ViewJobStep{ + Summary: v.Name, + Duration: v.Duration().String(), + Status: v.Status.String(), + }) + } + + for _, cursor := range cursors { + if !cursor.Expanded { + continue + } + + step := steps[cursor.Step] + + logLines := make([]*ViewStepLogLine, 0) // marshal to '[]' instead fo 'null' in json + + index := step.LogIndex + cursor.Cursor + validCursor := cursor.Cursor >= 0 && + // !(cursor.Cursor < step.LogLength) when the frontend tries to fetch next line before it's ready. + // So return the same cursor and empty lines to let the frontend retry. + cursor.Cursor < step.LogLength && + // !(index < task.LogIndexes[index]) when task data is older than step data. + // It can be fixed by making sure write/read tasks and steps in the same transaction, + // but it's easier to just treat it as fetching the next line before it's ready. + index < int64(len(task.LogIndexes)) + + if validCursor { + length := step.LogLength - cursor.Cursor + offset := task.LogIndexes[index] + var err error + logRows, err := actions.ReadLogs(ctx, task.LogInStorage, task.LogFilename, offset, length) + if err != nil { + return nil, nil, err } - resp.Logs.StepsLog = append(resp.Logs.StepsLog, &ViewStepLog{ - Step: cursor.Step, - Cursor: cursor.Cursor + int64(len(logLines)), - Lines: logLines, - Started: int64(step.Started), - }) + for i, row := range logRows { + logLines = append(logLines, &ViewStepLogLine{ + Index: cursor.Cursor + int64(i) + 1, // start at 1 + Message: row.Content, + Timestamp: float64(row.Time.AsTime().UnixNano()) / float64(time.Second), + }) + } } + + logs = append(logs, &ViewStepLog{ + Step: cursor.Step, + Cursor: cursor.Cursor + int64(len(logLines)), + Lines: logLines, + Started: int64(step.Started), + }) } - ctx.JSON(http.StatusOK, resp) + return viewJobs, logs, nil } // Rerun will rerun jobs in the given run @@ -281,6 +362,11 @@ func Rerun(ctx *context_module.Context) { return } + if run.External { + ctx.Error(http.StatusForbidden, "can not control external run") + return + } + // can not rerun job when workflow is disabled cfgUnit := ctx.Repo.Repository.MustGetUnit(ctx, unit.TypeActions) cfg := cfgUnit.ActionsConfig() @@ -300,11 +386,15 @@ func Rerun(ctx *context_module.Context) { } } - job, jobs := getRunJobs(ctx, runIndex, jobIndex) + job, jobs, _ := getRunJobs(ctx, runIndex, jobIndex) if ctx.Written() { return } + if job.External { + ctx.Error(http.StatusForbidden, "can not control external run") + } + if jobIndexStr == "" { // rerun all jobs for _, j := range jobs { // if the job has needs, it should be set to "blocked" status to wait for other jobs @@ -361,7 +451,7 @@ func Logs(ctx *context_module.Context) { runIndex := ctx.PathParamInt64("run") jobIndex := ctx.PathParamInt64("job") - job, _ := getRunJobs(ctx, runIndex, jobIndex) + job, _, externalInfo := getRunJobs(ctx, runIndex, jobIndex) if ctx.Written() { return } @@ -370,6 +460,11 @@ func Logs(ctx *context_module.Context) { return } + if job.External || externalInfo != nil { + ctx.Error(http.StatusForbidden, "streaming of external logs not implemented") + return + } + err := job.LoadRun(ctx) if err != nil { ctx.Error(http.StatusInternalServerError, err.Error()) @@ -409,10 +504,13 @@ func Logs(ctx *context_module.Context) { func Cancel(ctx *context_module.Context) { runIndex := ctx.PathParamInt64("run") - _, jobs := getRunJobs(ctx, runIndex, -1) + _, jobs, _ := getRunJobs(ctx, runIndex, -1) if ctx.Written() { return } + if jobs[0].External { + ctx.Error(http.StatusForbidden, "can not control external run") + } if err := db.WithTx(ctx, func(ctx context.Context) error { for _, job := range jobs { @@ -450,10 +548,15 @@ func Cancel(ctx *context_module.Context) { func Approve(ctx *context_module.Context) { runIndex := ctx.PathParamInt64("run") - current, jobs := getRunJobs(ctx, runIndex, -1) + current, jobs, _ := getRunJobs(ctx, runIndex, -1) if ctx.Written() { return } + + if current.External { + ctx.Error(http.StatusForbidden, "can not control external run") + } + run := current.Run doer := ctx.Doer @@ -486,26 +589,33 @@ func Approve(ctx *context_module.Context) { // getRunJobs gets the jobs of runIndex, and returns jobs[jobIndex], jobs. // Any error will be written to the ctx. // It never returns a nil job of an empty jobs, if the jobIndex is out of range, it will be treated as 0. -func getRunJobs(ctx *context_module.Context, runIndex, jobIndex int64) (*actions_model.ActionRunJob, []*actions_model.ActionRunJob) { +func getRunJobs(ctx *context_module.Context, runIndex, jobIndex int64) (*actions_model.ActionRunJob, []*actions_model.ActionRunJob, *cicdfeedback.WorkflowInfo) { run, err := actions_model.GetRunByIndex(ctx, ctx.Repo.Repository.ID, runIndex) if err != nil { if errors.Is(err, util.ErrNotExist) { ctx.Error(http.StatusNotFound, err.Error()) - return nil, nil + return nil, nil, nil } ctx.Error(http.StatusInternalServerError, err.Error()) - return nil, nil + return nil, nil, nil } run.Repo = ctx.Repo.Repository - jobs, err := actions_model.GetRunJobsByRunID(ctx, run.ID) + var jobs []*actions_model.ActionRunJob + var externalInfo *cicdfeedback.WorkflowInfo + if run.External { + jobs, externalInfo, err = cicdfeedback.GetExternalRunJobs(ctx, run) + } else { + jobs, err = actions_model.GetRunJobsByRunID(ctx, run.ID) + } if err != nil { ctx.Error(http.StatusInternalServerError, err.Error()) - return nil, nil + return nil, nil, nil } + if len(jobs) == 0 { ctx.Error(http.StatusNotFound) - return nil, nil + return nil, nil, nil } for _, v := range jobs { @@ -513,9 +623,9 @@ func getRunJobs(ctx *context_module.Context, runIndex, jobIndex int64) (*actions } if jobIndex >= 0 && jobIndex < int64(len(jobs)) { - return jobs[jobIndex], jobs + return jobs[jobIndex], jobs, externalInfo } - return jobs[0], jobs + return jobs[0], jobs, externalInfo } type ArtifactsViewResponse struct { diff --git a/services/cicdfeedback/client.go b/services/cicdfeedback/client.go new file mode 100644 index 0000000000000..efc37d1dc716c --- /dev/null +++ b/services/cicdfeedback/client.go @@ -0,0 +1,114 @@ +package cicdfeedback + +import ( + "context" + "fmt" + "io" + "net/http" + "strings" + "time" + + actions_model "code.gitea.io/gitea/models/actions" + "code.gitea.io/gitea/modules/json" + + "github.com/6543/cicd_feedback" +) + +const timeout = time.Second * 2 + +func DetectHeaders(h http.Header) (*WorkflowInfo, bool) { + detect := false + info := &WorkflowInfo{} + for k, v := range h { + if strings.EqualFold(k, cicd_feedback.HeaderFeedback) && len(v) > 0 && len(v[0]) > 0 { + info.PipelineURI = v[0] + detect = true + } + if strings.EqualFold(k, cicd_feedback.HeaderAuthorization) && len(v) > 0 && len(v[0]) > 0 { + info.PipelineURI = v[0] + } + if detect && info.Authorization != "" { + return info, detect + } + } + return info, detect +} + +func GetFeedbackToActionRun(ctx context.Context, draft *actions_model.ActionRun, draftInfo *WorkflowInfo) ([]*actions_model.ActionRun, error) { + feedbackResponse, err := doAPICall(ctx, draftInfo) + if err != nil { + return nil, err + } + return ConvertFeedbackToActionRun(ctx, feedbackResponse, draft, draftInfo) +} + +func GetExternalRunJobs(ctx context.Context, run *actions_model.ActionRun) ([]*actions_model.ActionRunJob, *WorkflowInfo, error) { + if !run.External { + return nil, nil, fmt.Errorf("ActionRun is not an external one") + } + + info := &WorkflowInfo{} + if err := json.Unmarshal([]byte(run.EventPayload), &info); err != nil { + return nil, nil, fmt.Errorf("could parse payload: %w", err) + } + + feedbackResponse, err := doAPICall(ctx, info) + if err != nil { + return nil, nil, err + } + + jobs, err := ExtractJobsFromFeedback(ctx, feedbackResponse, info, run) + return jobs, info, err +} + +func doAPICall(ctx context.Context, info *WorkflowInfo) (*cicd_feedback.PipelineResponse, error) { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, info.PipelineURI, nil) + if err != nil { + return nil, fmt.Errorf("error creating request: %w", err) + } + if info.Authorization != "" { + req.Header.Set(cicd_feedback.HeaderAuthorization, info.Authorization) + } + + payload, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("could not fetch external cicd_feedback pipeline response: %w", err) + } + + payloadBytes, err := io.ReadAll(payload.Body) + _ = payload.Body.Close() + if err != nil { + return nil, fmt.Errorf("could not read external cicd_feedback pipeline response: %w", err) + } + feedbackResponse := cicd_feedback.PipelineResponse{} + if err := json.Unmarshal(payloadBytes, &feedbackResponse); err != nil { + return nil, fmt.Errorf("could not deserialize external cicd_feedback pipeline response: %w", err) + } + return &feedbackResponse, nil +} + +func LoadLogs(ctx context.Context, step *cicd_feedback.Step, info *WorkflowInfo) (string, error) { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + builder := &strings.Builder{} + for i := range step.Outputs.Logs { + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, step.Outputs.Logs[i].URI, nil) + if err != nil { + return "", fmt.Errorf("error creating request: %w", err) + } + + payload, err := http.DefaultClient.Do(req) + if err != nil { + return "", fmt.Errorf("could not fetch external cicd_feedback log: %w", err) + } + + if _, err := io.Copy(builder, payload.Body); err != nil { + return "", fmt.Errorf("could not copy external cicd_feedback log: %w", err) + } + } + return builder.String(), nil +} diff --git a/services/cicdfeedback/convert.go b/services/cicdfeedback/convert.go new file mode 100644 index 0000000000000..ad1dca06bc11b --- /dev/null +++ b/services/cicdfeedback/convert.go @@ -0,0 +1,127 @@ +package cicdfeedback + +import ( + "context" + "fmt" + + actions_model "code.gitea.io/gitea/models/actions" + "code.gitea.io/gitea/modules/json" + + "github.com/6543/cicd_feedback" +) + +func ExtractJobsFromFeedback(ctx context.Context, in *cicd_feedback.PipelineResponse, info *WorkflowInfo, run *actions_model.ActionRun) ([]*actions_model.ActionRunJob, error) { + if info == nil { + if err := json.Unmarshal([]byte(run.EventPayload), &info); err != nil { + return nil, fmt.Errorf("could parse payload: %w", err) + } + } + + workflows := FlatWorkflows(in.Workflows) + var workflow *cicd_feedback.Workflow + for i := range workflows { + if workflows[i].ID == info.ID { + workflow = &workflows[i] + break + } + } + if workflow == nil { + return nil, fmt.Errorf("try to extract workflow '%s' but could not find one for %s", run.WorkflowID, run.EventPayload) + } + + result := make([]*actions_model.ActionRunJob, 0, len(workflow.SubWorkflows)) + for _, job := range workflow.SubWorkflows { + status, err := ConvertStatus(job.Status) + if err != nil { + return nil, err + } + + stepData, err := json.Marshal(job.Steps) + if err != nil { + return nil, err + } + + result = append(result, &actions_model.ActionRunJob{ + // workflow infos + RunID: run.ID, + Run: run, + RepoID: run.RepoID, + OwnerID: run.OwnerID, + CommitSHA: run.CommitSHA, + // job infos + Name: job.Name, + Status: status, + External: true, + WorkflowPayload: stepData, + }) + } + + return result, nil +} + +func ConvertFeedbackToActionRun(ctx context.Context, in *cicd_feedback.PipelineResponse, draft *actions_model.ActionRun, draftInfo *WorkflowInfo) ([]*actions_model.ActionRun, error) { + workflows := FlatWorkflows(in.Workflows) + result := make([]*actions_model.ActionRun, 0, len(workflows)) + + for _, workflow := range workflows { + status, err := ConvertStatus(workflow.Status) + if err != nil { + return nil, err + } + + info, err := json.Marshal(&WorkflowInfo{ + ID: workflow.ID, + PipelineURI: draftInfo.PipelineURI, + Authorization: draftInfo.Authorization, + }) + if err != nil { + return nil, err + } + + title := in.Title + if in.Title == "" { + title = workflow.Name + } + title = "[extern] " + title + + result = append(result, &actions_model.ActionRun{ + External: true, + Title: title, + WorkflowID: workflow.Name, + EventPayload: string(info), + Status: status, + NeedApproval: in.RequiresManualAction, + // we copy gitea integration stuff ... + Event: draft.Event, + RepoID: draft.RepoID, + Repo: draft.Repo, + OwnerID: draft.OwnerID, + TriggerUserID: draft.TriggerUserID, + TriggerEvent: draft.TriggerEvent, + CommitSHA: draft.CommitSHA, + Ref: draft.Ref, + }) + } + return result, nil +} + +func ConvertStatus(in cicd_feedback.Status) (actions_model.Status, error) { + switch in { + case cicd_feedback.StatusSuccess: + return actions_model.StatusSuccess, nil + case cicd_feedback.StatusFailed: + return actions_model.StatusFailure, nil + case cicd_feedback.StatusKilled, cicd_feedback.StatusDeclined: + return actions_model.StatusCancelled, nil + case cicd_feedback.StatusSkipped: + return actions_model.StatusSkipped, nil + case cicd_feedback.StatusPending: + return actions_model.StatusWaiting, nil + case cicd_feedback.StatusRunning: + return actions_model.StatusRunning, nil + case cicd_feedback.StatusManual: + return actions_model.StatusBlocked, nil + default: + return actions_model.StatusUnknown, fmt.Errorf("unknown status: %v", in) + } +} diff --git a/services/cicdfeedback/flat.go b/services/cicdfeedback/flat.go new file mode 100644 index 0000000000000..7e51a4ad4a30c --- /dev/null +++ b/services/cicdfeedback/flat.go @@ -0,0 +1,51 @@ +package cicdfeedback + +import "github.com/6543/cicd_feedback" + +const ( + simulatedWorkflow = "simulated_workflow/" + maxLevel = 5 +) + +// FlatWorkflows converts the cicd_feedback.Workflow witch can have any level of hierarchy into a three level one +func FlatWorkflows(workflows []cicd_feedback.Workflow) []cicd_feedback.Workflow { + for i := range workflows { + // gitea can not have steps in level 0 so we move them in a sub workflow + if workflows[i].Steps != nil { + workflows[i].SubWorkflows = []cicd_feedback.Workflow{{ + ID: workflows[i].ID, + Name: workflows[i].Name, + Status: workflows[i].Status, + Steps: workflows[i].Steps, + }} + workflows[i].ID = simulatedWorkflow + workflows[i].ID + workflows[i].Steps = nil + } + + // ok and all this sub workflows should have only steps + for j := range workflows[i].SubWorkflows { + workflows[i].SubWorkflows[j].Steps = flatSubWorkflowsToSteps(&workflows[i].SubWorkflows[j], 0, "") + workflows[i].SubWorkflows[j].SubWorkflows = nil // would be invalid but we do sanitize it non the less + } + } + return workflows +} + +func flatSubWorkflowsToSteps(workflows *cicd_feedback.Workflow, level int, prefix string) []cicd_feedback.Step { + if level > maxLevel { + return nil + } else if workflows.Steps != nil { + if prefix != "" { + for i := range workflows.Steps { + workflows.Steps[i].Name = prefix + workflows.Steps[i].Name + } + } + return workflows.Steps + } + + var result []cicd_feedback.Step + for _, subFlow := range workflows.SubWorkflows { + result = append(result, flatSubWorkflowsToSteps(&subFlow, level+1, workflows.Name+" > ")...) + } + return result +} diff --git a/services/cicdfeedback/flat_test.go b/services/cicdfeedback/flat_test.go new file mode 100644 index 0000000000000..e1f5ddc7cc628 --- /dev/null +++ b/services/cicdfeedback/flat_test.go @@ -0,0 +1,86 @@ +package cicdfeedback + +import ( + "testing" + + "github.com/6543/cicd_feedback" + "github.com/stretchr/testify/assert" +) + +func TestFlatWorkflows(t *testing.T) { + workflows := FlatWorkflows([]cicd_feedback.Workflow{{ + ID: "w1", + Name: "lint", + Status: cicd_feedback.StatusSuccess, + Steps: []cicd_feedback.Step{{ + ID: "s1", + Name: "clone", + }}, + }, { + ID: "w2", + Name: "test", + Status: cicd_feedback.StatusDeclined, + SubWorkflows: []cicd_feedback.Workflow{{ + ID: "sw1", + Name: "test-backend", + Steps: []cicd_feedback.Step{{ + ID: "s2", + Name: "clone2", + }}, + }}, + }, { + ID: "w3", + Name: "frontend", + Status: cicd_feedback.StatusDeclined, + SubWorkflows: []cicd_feedback.Workflow{{ + ID: "sw2", + Name: "test-frontend", + SubWorkflows: []cicd_feedback.Workflow{{ + ID: "ssw2", + Name: "e2e", + Steps: []cicd_feedback.Step{{ + ID: "s3", + Name: "clone3", + }}, + }}, + }}, + }}) + assert.EqualValues(t, []cicd_feedback.Workflow{{ + ID: "simulated_workflow/w1", + Name: "lint", + Status: cicd_feedback.StatusSuccess, + SubWorkflows: []cicd_feedback.Workflow{{ + ID: "w1", + Name: "lint", + Status: cicd_feedback.StatusSuccess, + Steps: []cicd_feedback.Step{{ + ID: "s1", + Name: "clone", + }}, + }}, + }, { + ID: "w2", + Name: "test", + Status: cicd_feedback.StatusDeclined, + SubWorkflows: []cicd_feedback.Workflow{{ + ID: "sw1", + Name: "test-backend", + Steps: []cicd_feedback.Step{{ + ID: "s2", + Name: "clone2", + }}, + }}, + }, { + ID: "w3", + Name: "frontend", + Status: cicd_feedback.StatusDeclined, + SubWorkflows: []cicd_feedback.Workflow{{ + ID: "sw2", + Name: "test-frontend", + Steps: []cicd_feedback.Step{{ + ID: "s3", + Name: "test-frontend > clone3", + }}, + }}, + }}, workflows) +} diff --git a/services/cicdfeedback/types.go b/services/cicdfeedback/types.go new file mode 100644 index 0000000000000..18db9995ea680 --- /dev/null +++ b/services/cicdfeedback/types.go @@ -0,0 +1,7 @@ +package cicdfeedback + +type WorkflowInfo struct { + ID string `json:"id"` + PipelineURI string `json:"pipeline_uri"` + Authorization string `json:"authorization,omitempty"` +} diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go index b2c0a73784d94..1f9dd72bdcac7 100644 --- a/services/webhook/deliver.go +++ b/services/webhook/deliver.go @@ -18,6 +18,8 @@ import ( "sync" "time" + actions_model "code.gitea.io/gitea/models/actions" + repo_model "code.gitea.io/gitea/models/repo" webhook_model "code.gitea.io/gitea/models/webhook" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/hostmatcher" @@ -28,6 +30,7 @@ import ( "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/timeutil" webhook_module "code.gitea.io/gitea/modules/webhook" + "code.gitea.io/gitea/services/cicdfeedback" "github.com/gobwas/glob" ) @@ -251,9 +254,46 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { return fmt.Errorf("unable to deliver webhook task[%d] in %s as unable to read response body: %w", t.ID, w.URL, err) } t.ResponseInfo.Body = string(p) + + if feedbackInfo, detect := cicdfeedback.DetectHeaders(resp.Header); detect { + go handleFeedbackAPI(feedbackInfo, w, t) + } + return nil } +func handleFeedbackAPI(info *cicdfeedback.WorkflowInfo, w *webhook_model.Webhook, t *webhook_model.HookTask) { + time.Sleep(2 * time.Second) // we wait some time to let the external cicd parse stuff + + ctx := context.Background() + draft := &actions_model.ActionRun{ + Event: t.EventType, + TriggerEvent: t.EventType.Event(), + RepoID: w.RepoID, + // TODO: extract from Payload?!? + // Ref: "branch name" + // CommitSHA: "e.g. 0b0780ed5b4175147f46475d3bd240278b1248ca" + // OwnerID: + // TriggerUserID: + } + + draft.Repo, _ = repo_model.GetRepositoryByID(ctx, w.RepoID) + draft.OwnerID = draft.Repo.OwnerID + draft.TriggerUserID = draft.Repo.OwnerID + + actionRuns, err := cicdfeedback.GetFeedbackToActionRun(ctx, draft, info) + if err != nil { + log.Error("could not get cicd feedback: %w", err) + return + } + + for _, actionRun := range actionRuns { + if err := actions_model.InsertRun(ctx, actionRun, nil); err != nil { + log.Error("InsertRun: %v", err) + } + } +} + var ( webhookHTTPClient *http.Client once sync.Once