diff --git a/enterprise/internal/insights/scheduler/backfill_state_inprogress_handler.go b/enterprise/internal/insights/scheduler/backfill_state_inprogress_handler.go index dca1bf62fb78..b4611c2ae778 100644 --- a/enterprise/internal/insights/scheduler/backfill_state_inprogress_handler.go +++ b/enterprise/internal/insights/scheduler/backfill_state_inprogress_handler.go @@ -42,7 +42,7 @@ func makeInProgressWorker(ctx context.Context, config JobMonitorConfig) (*worker backfillStore := NewBackfillStore(db) name := "backfill_in_progress_worker" - + maxRetries := 3 workerStore := dbworkerstore.New(config.ObservationCtx, db.Handle(), dbworkerstore.Options[*BaseJob]{ Name: fmt.Sprintf("%s_store", name), TableName: "insights_background_jobs", @@ -53,7 +53,7 @@ func makeInProgressWorker(ctx context.Context, config JobMonitorConfig) (*worker MaxNumResets: 100, StalledMaxAge: time.Second * 30, RetryAfter: time.Second * 30, - MaxNumRetries: 3, + MaxNumRetries: maxRetries, }) handlerConfig := newHandlerConfig() @@ -67,6 +67,7 @@ func makeInProgressWorker(ctx context.Context, config JobMonitorConfig) (*worker repoStore: config.RepoStore, clock: glock.NewRealClock(), config: handlerConfig, + maxRetries: maxRetries, } worker := dbworker.NewWorker(ctx, workerStore, workerutil.Handler[*BaseJob](task), workerutil.WorkerOptions{ @@ -117,6 +118,7 @@ type inProgressHandler struct { insightsStore store.Interface backfillRunner pipeline.Backfiller config handlerConfig + maxRetries int clock glock.Clock } @@ -156,14 +158,25 @@ func (h *inProgressHandler) Handle(ctx context.Context, logger log.Logger, job * interrupt, err := h.doExecution(ctx, execution) if err != nil { - return err + return h.fail(ctx, err, job, execution.backfill) } if interrupt { - return h.doInterrupt(ctx, job) + err = h.doInterrupt(ctx, job) + if err != nil { + return h.fail(ctx, err, job, execution.backfill) + } } return nil } +func (h *inProgressHandler) fail(ctx context.Context, err error, job *BaseJob, backfill *SeriesBackfill) error { + if job.NumFailures+1 >= h.maxRetries { + _ = backfill.SetFailed(ctx, h.backfillStore) + h.workerStore.MarkFailed(ctx, job.ID, err.Error(), dbworkerstore.MarkFinalOptions{}) + } + return err +} + type nextNFunc func(pageSize int, config iterator.IterationConfig) ([]api.RepoID, bool, iterator.FinishNFunc) func (h *inProgressHandler) doExecution(ctx context.Context, execution *backfillExecution) (interrupt bool, err error) { diff --git a/enterprise/internal/insights/scheduler/backfill_state_new_handler.go b/enterprise/internal/insights/scheduler/backfill_state_new_handler.go index 190cf1479627..9235df2772c2 100644 --- a/enterprise/internal/insights/scheduler/backfill_state_new_handler.go +++ b/enterprise/internal/insights/scheduler/backfill_state_new_handler.go @@ -34,6 +34,7 @@ type newBackfillHandler struct { repoIterator discovery.SeriesRepoIterator costAnalyzer priority.QueryAnalyzer timeseriesStore store.Interface + maxRetries int } // makeNewBackfillWorker makes a new Worker, Resetter and Store to handle the queue of Backfill jobs that are in the state of "New" @@ -42,7 +43,7 @@ func makeNewBackfillWorker(ctx context.Context, config JobMonitorConfig) (*worke backfillStore := NewBackfillStore(insightsDB) name := "backfill_new_backfill_worker" - + maxRetries := 3 workerStore := dbworkerstore.New(config.ObservationCtx, insightsDB.Handle(), dbworkerstore.Options[*BaseJob]{ Name: fmt.Sprintf("%s_store", name), TableName: "insights_background_jobs", @@ -53,7 +54,7 @@ func makeNewBackfillWorker(ctx context.Context, config JobMonitorConfig) (*worke MaxNumResets: 100, StalledMaxAge: time.Second * 30, RetryAfter: time.Second * 30, - MaxNumRetries: 3, + MaxNumRetries: maxRetries, }) task := newBackfillHandler{ @@ -63,6 +64,7 @@ func makeNewBackfillWorker(ctx context.Context, config JobMonitorConfig) (*worke repoIterator: discovery.NewSeriesRepoIterator(config.AllRepoIterator, config.RepoStore, config.RepoQueryExecutor), costAnalyzer: *config.CostAnalyzer, timeseriesStore: config.InsightStore, + maxRetries: maxRetries, } worker := dbworker.NewWorker(ctx, workerStore, workerutil.Handler[*BaseJob](&task), workerutil.WorkerOptions{ @@ -87,7 +89,6 @@ var _ workerutil.Handler[*BaseJob] = &newBackfillHandler{} func (h *newBackfillHandler) Handle(ctx context.Context, logger log.Logger, job *BaseJob) (err error) { logger.Info("newBackfillHandler called", log.Int("recordId", job.RecordID())) - // setup transactions tx, err := h.backfillStore.Transact(ctx) if err != nil { @@ -98,30 +99,31 @@ func (h *newBackfillHandler) Handle(ctx context.Context, logger log.Logger, job // load backfill and series backfill, err := tx.LoadBackfill(ctx, job.backfillId) if err != nil { - return errors.Wrap(err, "loadBackfill") + // if the backfill can't be loaded we can't fail it + return err } series, err := h.seriesReader.GetDataSeriesByID(ctx, backfill.SeriesId) if err != nil { - return errors.Wrap(err, "GetDataSeriesByID") + return h.fail(ctx, errors.Wrap(err, "GetDataSeriesByID"), job, backfill) } // set backfill repo scope repoIds := []int32{} reposIterator, err := h.repoIterator.ForSeries(ctx, series) if err != nil { - return errors.Wrap(err, "repoIterator.SeriesRepoIterator") + return h.fail(ctx, errors.Wrap(err, "repoIterator.SeriesRepoIterator"), job, backfill) } err = reposIterator.ForEach(ctx, func(repoName string, id api.RepoID) error { repoIds = append(repoIds, int32(id)) return nil }) if err != nil { - return errors.Wrap(err, "reposIterator.ForEach") + return h.fail(ctx, errors.Wrap(err, "reposIterator.ForEach"), job, backfill) } queryPlan, err := parseQuery(*series) if err != nil { - return errors.Wrap(err, "parseQuery") + return h.fail(ctx, errors.Wrap(err, "parseQuery"), job, backfill) } cost := h.costAnalyzer.Cost(&priority.QueryObject{ @@ -129,9 +131,10 @@ func (h *newBackfillHandler) Handle(ctx context.Context, logger log.Logger, job NumberOfRepositories: int64(len(repoIds)), }) + captureBackfill := *backfill backfill, err = backfill.SetScope(ctx, tx, repoIds, cost) if err != nil { - return errors.Wrap(err, "backfill.SetScope") + return h.fail(ctx, errors.Wrap(err, "backfill.SetScope"), job, &captureBackfill) } sampleTimes := timeseries.BuildSampleTimes(12, timeseries.TimeInterval{ @@ -145,26 +148,34 @@ func (h *newBackfillHandler) Handle(ctx context.Context, logger log.Logger, job RecordingTimes: timeseries.MakeRecordingsFromTimes(sampleTimes, false), }, }); err != nil { - return errors.Wrap(err, "NewBackfillHandler.SetInsightSeriesRecordingTimes") + return h.fail(ctx, errors.Wrap(err, "NewBackfillHandler.SetInsightSeriesRecordingTimes"), job, backfill) } // update series state err = backfill.setState(ctx, tx, BackfillStateProcessing) if err != nil { - return errors.Wrap(err, "backfill.setState") - } + return h.fail(ctx, errors.Wrap(err, "backfill.setState"), job, backfill) + } // enqueue backfill for next step in processing err = enqueueBackfill(ctx, tx.Handle(), backfill) if err != nil { - return errors.Wrap(err, "backfill.enqueueBackfill") + return h.fail(ctx, errors.Wrap(err, "backfill.enqueueBackfill"), job, backfill) } // We have to manually manipulate the queue record here to ensure that the new job is written in the same tx // that this job is marked complete. This is how we will ensure there is no desync if the mark complete operation // fails after we've already queued up a new job. _, err = h.workerStore.MarkComplete(ctx, job.RecordID(), dbworkerstore.MarkFinalOptions{}) if err != nil { - return errors.Wrap(err, "backfill.MarkComplete") + return h.fail(ctx, errors.Wrap(err, "backfill.MarkComplete"), job, backfill) + } + return err +} + +func (h *newBackfillHandler) fail(ctx context.Context, err error, job *BaseJob, backfill *SeriesBackfill) error { + if job.NumFailures+1 >= h.maxRetries { + _ = backfill.SetFailed(ctx, h.backfillStore) + h.workerStore.MarkFailed(ctx, job.ID, err.Error(), dbworkerstore.MarkFinalOptions{}) } return err } diff --git a/enterprise/internal/insights/scheduler/backfill_state_new_handler_test.go b/enterprise/internal/insights/scheduler/backfill_state_new_handler_test.go index 43db9afaf074..d067843e8898 100644 --- a/enterprise/internal/insights/scheduler/backfill_state_new_handler_test.go +++ b/enterprise/internal/insights/scheduler/backfill_state_new_handler_test.go @@ -73,6 +73,7 @@ func Test_MovesBackfillFromNewToProcessing(t *testing.T) { repoIterator: discovery.NewSeriesRepoIterator(nil, repos, repoQueryExecutor), costAnalyzer: *config.CostAnalyzer, timeseriesStore: seriesStore, + maxRetries: 3, } err = handler.Handle(ctx, logger, newDequeue) require.NoError(t, err) @@ -150,6 +151,7 @@ func Test_MovesBackfillFromNewToProcessing_ScopedInsight(t *testing.T) { repoIterator: discovery.NewSeriesRepoIterator(nil, repos, repoQueryExecutor), costAnalyzer: *config.CostAnalyzer, timeseriesStore: seriesStore, + maxRetries: 3, } err = handler.Handle(ctx, logger, newDequeue) require.NoError(t, err) @@ -174,3 +176,65 @@ func Test_MovesBackfillFromNewToProcessing_ScopedInsight(t *testing.T) { t.Fatal(errors.New("recording times should have been saved after success")) } } + +func Test_MovesBackfillToErrored_ScopedInsight(t *testing.T) { + logger := logtest.Scoped(t) + ctx := context.Background() + insightsDB := edb.NewInsightsDB(dbtest.NewInsightsDB(logger, t), logger) + repos := database.NewMockRepoStore() + repos.ListFunc.SetDefaultReturn([]*itypes.Repo{}, errors.New("the repo store should not be called")) + now := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC) + clock := glock.NewMockClockAt(now) + bfs := newBackfillStoreWithClock(insightsDB, clock) + insightsStore := store.NewInsightStore(insightsDB) + permStore := store.NewInsightPermissionStore(database.NewMockDB()) + seriesStore := store.New(insightsDB, permStore) + repoQueryExecutor := NewMockRepoQueryExecutor() + repoQueryExecutor.ExecuteRepoListFunc.SetDefaultReturn(nil, errors.New("test error")) + config := JobMonitorConfig{ + InsightsDB: insightsDB, + RepoStore: repos, + ObservationCtx: &observation.TestContext, + CostAnalyzer: priority.NewQueryAnalyzer(), + InsightStore: seriesStore, + RepoQueryExecutor: repoQueryExecutor, + } + var err error + monitor := NewBackgroundJobMonitor(ctx, config) + + repoCriteria := "repo:sourcegraph" + series, err := insightsStore.CreateSeries(ctx, types.InsightSeries{ + SeriesID: "series1", + Query: "asdf", + SampleIntervalUnit: string(types.Month), + RepositoryCriteria: &repoCriteria, + SampleIntervalValue: 1, + GenerationMethod: types.Search, + }) + require.NoError(t, err) + + backfill, err := bfs.NewBackfill(ctx, series) + require.NoError(t, err) + + err = enqueueBackfill(ctx, bfs.Handle(), backfill) + require.NoError(t, err) + + newDequeue, _, err := monitor.newBackfillStore.Dequeue(ctx, "test", nil) + require.NoError(t, err) + handler := newBackfillHandler{ + workerStore: monitor.newBackfillStore, + backfillStore: bfs, + seriesReader: store.NewInsightStore(insightsDB), + repoIterator: discovery.NewSeriesRepoIterator(nil, repos, repoQueryExecutor), + costAnalyzer: *config.CostAnalyzer, + timeseriesStore: seriesStore, + maxRetries: 1, + } + err = handler.Handle(ctx, logger, newDequeue) + require.Error(t, err) + + handledBackfill, err := bfs.LoadBackfill(ctx, backfill.Id) + require.NoError(t, err) + require.Equal(t, BackfillStateFailed, handledBackfill.State) + +}