Skip to content
This repository was archived by the owner on Sep 30, 2024. It is now read-only.

insights: set backfill to failed if the job running it will be failed #50569

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func makeInProgressWorker(ctx context.Context, config JobMonitorConfig) (*worker
backfillStore := NewBackfillStore(db)

name := "backfill_in_progress_worker"

maxRetries := 3
workerStore := dbworkerstore.New(config.ObservationCtx, db.Handle(), dbworkerstore.Options[*BaseJob]{
Name: fmt.Sprintf("%s_store", name),
TableName: "insights_background_jobs",
Expand All @@ -53,7 +53,7 @@ func makeInProgressWorker(ctx context.Context, config JobMonitorConfig) (*worker
MaxNumResets: 100,
StalledMaxAge: time.Second * 30,
RetryAfter: time.Second * 30,
MaxNumRetries: 3,
MaxNumRetries: maxRetries,
})

handlerConfig := newHandlerConfig()
Expand All @@ -67,6 +67,7 @@ func makeInProgressWorker(ctx context.Context, config JobMonitorConfig) (*worker
repoStore: config.RepoStore,
clock: glock.NewRealClock(),
config: handlerConfig,
maxRetries: maxRetries,
}

worker := dbworker.NewWorker(ctx, workerStore, workerutil.Handler[*BaseJob](task), workerutil.WorkerOptions{
Expand Down Expand Up @@ -117,6 +118,7 @@ type inProgressHandler struct {
insightsStore store.Interface
backfillRunner pipeline.Backfiller
config handlerConfig
maxRetries int

clock glock.Clock
}
Expand Down Expand Up @@ -156,14 +158,25 @@ func (h *inProgressHandler) Handle(ctx context.Context, logger log.Logger, job *

interrupt, err := h.doExecution(ctx, execution)
if err != nil {
return err
return h.fail(ctx, err, job, execution.backfill)
}
if interrupt {
return h.doInterrupt(ctx, job)
err = h.doInterrupt(ctx, job)
if err != nil {
return h.fail(ctx, err, job, execution.backfill)
}
}
return nil
}

func (h *inProgressHandler) fail(ctx context.Context, err error, job *BaseJob, backfill *SeriesBackfill) error {
if job.NumFailures+1 >= h.maxRetries {
_ = backfill.SetFailed(ctx, h.backfillStore)
h.workerStore.MarkFailed(ctx, job.ID, err.Error(), dbworkerstore.MarkFinalOptions{})
}
return err
}

type nextNFunc func(pageSize int, config iterator.IterationConfig) ([]api.RepoID, bool, iterator.FinishNFunc)

func (h *inProgressHandler) doExecution(ctx context.Context, execution *backfillExecution) (interrupt bool, err error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type newBackfillHandler struct {
repoIterator discovery.SeriesRepoIterator
costAnalyzer priority.QueryAnalyzer
timeseriesStore store.Interface
maxRetries int
}

// makeNewBackfillWorker makes a new Worker, Resetter and Store to handle the queue of Backfill jobs that are in the state of "New"
Expand All @@ -42,7 +43,7 @@ func makeNewBackfillWorker(ctx context.Context, config JobMonitorConfig) (*worke
backfillStore := NewBackfillStore(insightsDB)

name := "backfill_new_backfill_worker"

maxRetries := 3
workerStore := dbworkerstore.New(config.ObservationCtx, insightsDB.Handle(), dbworkerstore.Options[*BaseJob]{
Name: fmt.Sprintf("%s_store", name),
TableName: "insights_background_jobs",
Expand All @@ -53,7 +54,7 @@ func makeNewBackfillWorker(ctx context.Context, config JobMonitorConfig) (*worke
MaxNumResets: 100,
StalledMaxAge: time.Second * 30,
RetryAfter: time.Second * 30,
MaxNumRetries: 3,
MaxNumRetries: maxRetries,
})

task := newBackfillHandler{
Expand All @@ -63,6 +64,7 @@ func makeNewBackfillWorker(ctx context.Context, config JobMonitorConfig) (*worke
repoIterator: discovery.NewSeriesRepoIterator(config.AllRepoIterator, config.RepoStore, config.RepoQueryExecutor),
costAnalyzer: *config.CostAnalyzer,
timeseriesStore: config.InsightStore,
maxRetries: maxRetries,
}

worker := dbworker.NewWorker(ctx, workerStore, workerutil.Handler[*BaseJob](&task), workerutil.WorkerOptions{
Expand All @@ -87,7 +89,6 @@ var _ workerutil.Handler[*BaseJob] = &newBackfillHandler{}

func (h *newBackfillHandler) Handle(ctx context.Context, logger log.Logger, job *BaseJob) (err error) {
logger.Info("newBackfillHandler called", log.Int("recordId", job.RecordID()))

// setup transactions
tx, err := h.backfillStore.Transact(ctx)
if err != nil {
Expand All @@ -98,40 +99,42 @@ func (h *newBackfillHandler) Handle(ctx context.Context, logger log.Logger, job
// load backfill and series
backfill, err := tx.LoadBackfill(ctx, job.backfillId)
if err != nil {
return errors.Wrap(err, "loadBackfill")
// if the backfill can't be loaded we can't fail it
return err
}
series, err := h.seriesReader.GetDataSeriesByID(ctx, backfill.SeriesId)
if err != nil {
return errors.Wrap(err, "GetDataSeriesByID")
return h.fail(ctx, errors.Wrap(err, "GetDataSeriesByID"), job, backfill)
}

// set backfill repo scope
repoIds := []int32{}
reposIterator, err := h.repoIterator.ForSeries(ctx, series)
if err != nil {
return errors.Wrap(err, "repoIterator.SeriesRepoIterator")
return h.fail(ctx, errors.Wrap(err, "repoIterator.SeriesRepoIterator"), job, backfill)
}
err = reposIterator.ForEach(ctx, func(repoName string, id api.RepoID) error {
repoIds = append(repoIds, int32(id))
return nil
})
if err != nil {
return errors.Wrap(err, "reposIterator.ForEach")
return h.fail(ctx, errors.Wrap(err, "reposIterator.ForEach"), job, backfill)
}

queryPlan, err := parseQuery(*series)
if err != nil {
return errors.Wrap(err, "parseQuery")
return h.fail(ctx, errors.Wrap(err, "parseQuery"), job, backfill)
}

cost := h.costAnalyzer.Cost(&priority.QueryObject{
Query: queryPlan,
NumberOfRepositories: int64(len(repoIds)),
})

captureBackfill := *backfill
backfill, err = backfill.SetScope(ctx, tx, repoIds, cost)
if err != nil {
return errors.Wrap(err, "backfill.SetScope")
return h.fail(ctx, errors.Wrap(err, "backfill.SetScope"), job, &captureBackfill)
}

sampleTimes := timeseries.BuildSampleTimes(12, timeseries.TimeInterval{
Expand All @@ -145,26 +148,34 @@ func (h *newBackfillHandler) Handle(ctx context.Context, logger log.Logger, job
RecordingTimes: timeseries.MakeRecordingsFromTimes(sampleTimes, false),
},
}); err != nil {
return errors.Wrap(err, "NewBackfillHandler.SetInsightSeriesRecordingTimes")
return h.fail(ctx, errors.Wrap(err, "NewBackfillHandler.SetInsightSeriesRecordingTimes"), job, backfill)
}

// update series state
err = backfill.setState(ctx, tx, BackfillStateProcessing)
if err != nil {
return errors.Wrap(err, "backfill.setState")
}
return h.fail(ctx, errors.Wrap(err, "backfill.setState"), job, backfill)

}
// enqueue backfill for next step in processing
err = enqueueBackfill(ctx, tx.Handle(), backfill)
if err != nil {
return errors.Wrap(err, "backfill.enqueueBackfill")
return h.fail(ctx, errors.Wrap(err, "backfill.enqueueBackfill"), job, backfill)
}
// We have to manually manipulate the queue record here to ensure that the new job is written in the same tx
// that this job is marked complete. This is how we will ensure there is no desync if the mark complete operation
// fails after we've already queued up a new job.
_, err = h.workerStore.MarkComplete(ctx, job.RecordID(), dbworkerstore.MarkFinalOptions{})
if err != nil {
return errors.Wrap(err, "backfill.MarkComplete")
return h.fail(ctx, errors.Wrap(err, "backfill.MarkComplete"), job, backfill)
}
return err
}

func (h *newBackfillHandler) fail(ctx context.Context, err error, job *BaseJob, backfill *SeriesBackfill) error {
if job.NumFailures+1 >= h.maxRetries {
_ = backfill.SetFailed(ctx, h.backfillStore)
h.workerStore.MarkFailed(ctx, job.ID, err.Error(), dbworkerstore.MarkFinalOptions{})
}
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func Test_MovesBackfillFromNewToProcessing(t *testing.T) {
repoIterator: discovery.NewSeriesRepoIterator(nil, repos, repoQueryExecutor),
costAnalyzer: *config.CostAnalyzer,
timeseriesStore: seriesStore,
maxRetries: 3,
}
err = handler.Handle(ctx, logger, newDequeue)
require.NoError(t, err)
Expand Down Expand Up @@ -150,6 +151,7 @@ func Test_MovesBackfillFromNewToProcessing_ScopedInsight(t *testing.T) {
repoIterator: discovery.NewSeriesRepoIterator(nil, repos, repoQueryExecutor),
costAnalyzer: *config.CostAnalyzer,
timeseriesStore: seriesStore,
maxRetries: 3,
}
err = handler.Handle(ctx, logger, newDequeue)
require.NoError(t, err)
Expand All @@ -174,3 +176,65 @@ func Test_MovesBackfillFromNewToProcessing_ScopedInsight(t *testing.T) {
t.Fatal(errors.New("recording times should have been saved after success"))
}
}

func Test_MovesBackfillToErrored_ScopedInsight(t *testing.T) {
logger := logtest.Scoped(t)
ctx := context.Background()
insightsDB := edb.NewInsightsDB(dbtest.NewInsightsDB(logger, t), logger)
repos := database.NewMockRepoStore()
repos.ListFunc.SetDefaultReturn([]*itypes.Repo{}, errors.New("the repo store should not be called"))
now := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)
clock := glock.NewMockClockAt(now)
bfs := newBackfillStoreWithClock(insightsDB, clock)
insightsStore := store.NewInsightStore(insightsDB)
permStore := store.NewInsightPermissionStore(database.NewMockDB())
seriesStore := store.New(insightsDB, permStore)
repoQueryExecutor := NewMockRepoQueryExecutor()
repoQueryExecutor.ExecuteRepoListFunc.SetDefaultReturn(nil, errors.New("test error"))
config := JobMonitorConfig{
InsightsDB: insightsDB,
RepoStore: repos,
ObservationCtx: &observation.TestContext,
CostAnalyzer: priority.NewQueryAnalyzer(),
InsightStore: seriesStore,
RepoQueryExecutor: repoQueryExecutor,
}
var err error
monitor := NewBackgroundJobMonitor(ctx, config)

repoCriteria := "repo:sourcegraph"
series, err := insightsStore.CreateSeries(ctx, types.InsightSeries{
SeriesID: "series1",
Query: "asdf",
SampleIntervalUnit: string(types.Month),
RepositoryCriteria: &repoCriteria,
SampleIntervalValue: 1,
GenerationMethod: types.Search,
})
require.NoError(t, err)

backfill, err := bfs.NewBackfill(ctx, series)
require.NoError(t, err)

err = enqueueBackfill(ctx, bfs.Handle(), backfill)
require.NoError(t, err)

newDequeue, _, err := monitor.newBackfillStore.Dequeue(ctx, "test", nil)
require.NoError(t, err)
handler := newBackfillHandler{
workerStore: monitor.newBackfillStore,
backfillStore: bfs,
seriesReader: store.NewInsightStore(insightsDB),
repoIterator: discovery.NewSeriesRepoIterator(nil, repos, repoQueryExecutor),
costAnalyzer: *config.CostAnalyzer,
timeseriesStore: seriesStore,
maxRetries: 1,
}
err = handler.Handle(ctx, logger, newDequeue)
require.Error(t, err)

handledBackfill, err := bfs.LoadBackfill(ctx, backfill.Id)
require.NoError(t, err)
require.Equal(t, BackfillStateFailed, handledBackfill.State)

}