// Copyright 2022 The Gitea Authors. All rights reserved. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. package bots import ( "context" "fmt" bots_model "code.gitea.io/gitea/models/bots" "code.gitea.io/gitea/models/db" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/queue" "xorm.io/builder" ) var jobEmitterQueue queue.UniqueQueue type jobUpdate struct { RunID int64 } func InitJobEmitter() { jobEmitterQueue = queue.CreateUniqueQueue("bots_ready_job", jobEmitterQueueHandle, new(jobUpdate)) go graceful.GetManager().RunWithShutdownFns(jobEmitterQueue.Run) } func EmitJobsIfReady(runID int64) error { return jobEmitterQueue.Push(&jobUpdate{ RunID: runID, }) } func jobEmitterQueueHandle(data ...queue.Data) []queue.Data { ctx := graceful.GetManager().ShutdownContext() var ret []queue.Data for _, d := range data { update := d.(*jobUpdate) if err := checkJobsOfRun(ctx, update.RunID); err != nil { ret = append(ret, d) } } return ret } func checkJobsOfRun(ctx context.Context, runID int64) error { return db.WithTx(func(ctx context.Context) error { jobs, _, err := bots_model.FindRunJobs(ctx, bots_model.FindRunJobOptions{RunID: runID}) if err != nil { return err } idToJobs := make(map[string][]*bots_model.RunJob, len(jobs)) for _, job := range jobs { idToJobs[job.JobID] = append(idToJobs[job.JobID], job) } updates := newJobStatusResolver(jobs).Resolve() for _, job := range jobs { if status, ok := updates[job.ID]; ok { job.Status = status if n, err := bots_model.UpdateRunJob(ctx, job, builder.Eq{"status": bots_model.StatusBlocked}, "status"); err != nil { return err } else if n != 1 { return fmt.Errorf("no affected for updating blocked job %v", job.ID) } } } return nil }, ctx) } type jobStatusResolver struct { statuses map[int64]bots_model.Status needs map[int64][]int64 } func newJobStatusResolver(jobs bots_model.RunJobList) *jobStatusResolver { idToJobs := make(map[string][]*bots_model.RunJob, len(jobs)) for _, job := range jobs { idToJobs[job.JobID] = append(idToJobs[job.JobID], job) } statuses := make(map[int64]bots_model.Status, len(jobs)) needs := make(map[int64][]int64, len(jobs)) for _, job := range jobs { statuses[job.ID] = job.Status for _, need := range job.Needs { for _, v := range idToJobs[need] { needs[job.ID] = append(needs[job.ID], v.ID) } } } return &jobStatusResolver{ statuses: statuses, needs: needs, } } func (r *jobStatusResolver) Resolve() map[int64]bots_model.Status { ret := map[int64]bots_model.Status{} for i := 0; i < len(r.statuses); i++ { updated := r.resolve() if len(updated) == 0 { return ret } for k, v := range updated { ret[k] = v r.statuses[k] = v } } return ret } func (r *jobStatusResolver) resolve() map[int64]bots_model.Status { ret := map[int64]bots_model.Status{} for id, status := range r.statuses { if status != bots_model.StatusBlocked { continue } allDone, allSucceed := true, true for _, need := range r.needs[id] { needStatus := r.statuses[need] if !needStatus.IsDone() { allDone = false } if needStatus.In(bots_model.StatusFailure, bots_model.StatusCancelled, bots_model.StatusSkipped) { allSucceed = false } } if allDone { if allSucceed { ret[id] = bots_model.StatusWaiting } else { ret[id] = bots_model.StatusSkipped } } } return ret }