From 5b3fa6decd2ebe8c35bcf1a3784ef5c8cb7e850d Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 7 Dec 2019 16:04:49 +0000 Subject: [PATCH 01/32] Graceful: use GetManager instead of global --- cmd/web.go | 8 +- cmd/web_graceful.go | 4 +- integrations/integration_test.go | 6 ++ modules/graceful/context.go | 6 +- modules/graceful/manager.go | 126 +++++++++++++++++++++------- modules/graceful/manager_unix.go | 30 +++++-- modules/graceful/manager_windows.go | 22 +++-- modules/graceful/net_unix.go | 2 +- modules/graceful/restart_unix.go | 2 +- modules/graceful/server.go | 6 +- modules/graceful/server_hooks.go | 6 +- modules/indexer/code/bleve.go | 48 ++++++++--- modules/indexer/issues/indexer.go | 2 +- modules/ssh/ssh_graceful.go | 2 +- 14 files changed, 195 insertions(+), 75 deletions(-) diff --git a/cmd/web.go b/cmd/web.go index e0e47a181f5ab..9c9d4b3547e3a 100644 --- a/cmd/web.go +++ b/cmd/web.go @@ -5,6 +5,7 @@ package cmd import ( + "context" "fmt" "net/http" _ "net/http/pprof" // Used for debugging if enabled and a web server is running @@ -96,6 +97,10 @@ func runLetsEncryptFallbackHandler(w http.ResponseWriter, r *http.Request) { } func runWeb(ctx *cli.Context) error { + managerCtx, cancel := context.WithCancel(context.Background()) + graceful.InitManager(managerCtx) + defer cancel() + if os.Getppid() > 1 && len(os.Getenv("LISTEN_FDS")) > 0 { log.Info("Restarting Gitea on PID: %d from parent PID: %d", os.Getpid(), os.Getppid()) } else { @@ -195,8 +200,7 @@ func runWeb(ctx *cli.Context) error { log.Critical("Failed to start server: %v", err) } log.Info("HTTP Listener: %s Closed", listenAddr) - graceful.Manager.WaitForServers() - graceful.Manager.WaitForTerminate() + <-graceful.GetManager().Done() log.Info("PID: %d Gitea Web Finished", os.Getpid()) log.Close() return nil diff --git a/cmd/web_graceful.go b/cmd/web_graceful.go index e303f71510ee6..10d3c32185c49 100644 --- a/cmd/web_graceful.go +++ b/cmd/web_graceful.go @@ -28,13 +28,13 @@ func runHTTPSWithTLSConfig(network, listenAddr string, tlsConfig *tls.Config, m // NoHTTPRedirector tells our cleanup routine that we will not be using a fallback http redirector func NoHTTPRedirector() { - graceful.Manager.InformCleanup() + graceful.GetManager().InformCleanup() } // NoMainListener tells our cleanup routine that we will not be using a possibly provided listener // for our main HTTP/HTTPS service func NoMainListener() { - graceful.Manager.InformCleanup() + graceful.GetManager().InformCleanup() } func runFCGI(listenAddr string, m http.Handler) error { diff --git a/integrations/integration_test.go b/integrations/integration_test.go index 4177493930b7a..55d318fd800fe 100644 --- a/integrations/integration_test.go +++ b/integrations/integration_test.go @@ -6,6 +6,7 @@ package integrations import ( "bytes" + "context" "database/sql" "encoding/json" "fmt" @@ -24,6 +25,7 @@ import ( "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/base" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/routers" "code.gitea.io/gitea/routers/routes" @@ -55,6 +57,10 @@ func NewNilResponseRecorder() *NilResponseRecorder { } func TestMain(m *testing.M) { + managerCtx, cancel := context.WithCancel(context.Background()) + graceful.InitManager(managerCtx) + defer cancel() + initIntegrationTest() mac = routes.NewMacaron() routes.RegisterRoutes(mac) diff --git a/modules/graceful/context.go b/modules/graceful/context.go index a4a4df7dea9c3..1ad1109b4e5bd 100644 --- a/modules/graceful/context.go +++ b/modules/graceful/context.go @@ -62,7 +62,7 @@ func (ctx *ChannelContext) Value(key interface{}) interface{} { // ShutdownContext returns a context.Context that is Done at shutdown // Callers using this context should ensure that they are registered as a running server // in order that they are waited for. -func (g *gracefulManager) ShutdownContext() context.Context { +func (g *Manager) ShutdownContext() context.Context { return &ChannelContext{ done: g.IsShutdown(), err: ErrShutdown, @@ -72,7 +72,7 @@ func (g *gracefulManager) ShutdownContext() context.Context { // HammerContext returns a context.Context that is Done at hammer // Callers using this context should ensure that they are registered as a running server // in order that they are waited for. -func (g *gracefulManager) HammerContext() context.Context { +func (g *Manager) HammerContext() context.Context { return &ChannelContext{ done: g.IsHammer(), err: ErrHammer, @@ -82,7 +82,7 @@ func (g *gracefulManager) HammerContext() context.Context { // TerminateContext returns a context.Context that is Done at terminate // Callers using this context should ensure that they are registered as a terminating server // in order that they are waited for. -func (g *gracefulManager) TerminateContext() context.Context { +func (g *Manager) TerminateContext() context.Context { return &ChannelContext{ done: g.IsTerminate(), err: ErrTerminate, diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go index b9a56ca9c6be1..19b8e4d266085 100644 --- a/modules/graceful/manager.go +++ b/modules/graceful/manager.go @@ -6,6 +6,7 @@ package graceful import ( "context" + "sync" "time" "code.gitea.io/gitea/modules/git" @@ -34,14 +35,27 @@ const ( const numberOfServersToCreate = 3 // Manager represents the graceful server manager interface -var Manager *gracefulManager - -func init() { - Manager = newGracefulManager(context.Background()) - // Set the git default context to the HammerContext - git.DefaultContext = Manager.HammerContext() - // Set the process default context to the HammerContext - process.DefaultContext = Manager.HammerContext() +var manager *Manager + +var initOnce = sync.Once{} + +// GetManager returns the Manager +func GetManager() *Manager { + InitManager(context.Background()) + return manager +} + +// InitManager creates the graceful manager in the provided context +func InitManager(ctx context.Context) { + initOnce.Do(func() { + manager = newGracefulManager(ctx) + + // Set the git default context to the HammerContext + git.DefaultContext = manager.HammerContext() + + // Set the process default context to the HammerContext + process.DefaultContext = manager.HammerContext() + }) } // CallbackWithContext is combined runnable and context to watch to see if the caller has finished @@ -61,7 +75,7 @@ type RunnableWithShutdownFns func(atShutdown, atTerminate func(context.Context, // Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals // - users must therefore be careful to only call these as necessary. // If run is not expected to run indefinitely RunWithShutdownChan is likely to be more appropriate. -func (g *gracefulManager) RunWithShutdownFns(run RunnableWithShutdownFns) { +func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) { g.runningServerWaitGroup.Add(1) defer g.runningServerWaitGroup.Done() run(func(ctx context.Context, atShutdown func()) { @@ -90,7 +104,7 @@ type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate Callb // (Optionally IsHammer may be waited for instead however, this should be avoided if possible.) // The callback function provided to atTerminate must return once termination is complete. // Please note that use of the atTerminate function will create a go-routine that will wait till terminate - users must therefore be careful to only call this as necessary. -func (g *gracefulManager) RunWithShutdownChan(run RunnableWithShutdownChan) { +func (g *Manager) RunWithShutdownChan(run RunnableWithShutdownChan) { g.runningServerWaitGroup.Add(1) defer g.runningServerWaitGroup.Done() run(g.IsShutdown(), func(ctx context.Context, atTerminate func()) { @@ -101,14 +115,14 @@ func (g *gracefulManager) RunWithShutdownChan(run RunnableWithShutdownChan) { // RunWithShutdownContext takes a function that has a context to watch for shutdown. // After the provided context is Done(), the main function must return once shutdown is complete. // (Optionally the HammerContext may be obtained and waited for however, this should be avoided if possible.) -func (g *gracefulManager) RunWithShutdownContext(run func(context.Context)) { +func (g *Manager) RunWithShutdownContext(run func(context.Context)) { g.runningServerWaitGroup.Add(1) defer g.runningServerWaitGroup.Done() run(g.ShutdownContext()) } // RunAtTerminate adds to the terminate wait group and creates a go-routine to run the provided function at termination -func (g *gracefulManager) RunAtTerminate(ctx context.Context, terminate func()) { +func (g *Manager) RunAtTerminate(ctx context.Context, terminate func()) { g.terminateWaitGroup.Add(1) go func() { select { @@ -121,7 +135,7 @@ func (g *gracefulManager) RunAtTerminate(ctx context.Context, terminate func()) } // RunAtShutdown creates a go-routine to run the provided function at shutdown -func (g *gracefulManager) RunAtShutdown(ctx context.Context, shutdown func()) { +func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) { go func() { select { case <-g.IsShutdown(): @@ -132,7 +146,7 @@ func (g *gracefulManager) RunAtShutdown(ctx context.Context, shutdown func()) { } // RunAtHammer creates a go-routine to run the provided function at shutdown -func (g *gracefulManager) RunAtHammer(ctx context.Context, hammer func()) { +func (g *Manager) RunAtHammer(ctx context.Context, hammer func()) { go func() { select { case <-g.IsHammer(): @@ -141,11 +155,14 @@ func (g *gracefulManager) RunAtHammer(ctx context.Context, hammer func()) { } }() } -func (g *gracefulManager) doShutdown() { +func (g *Manager) doShutdown() { if !g.setStateTransition(stateRunning, stateShuttingDown) { return } g.lock.Lock() + if g.shutdown == nil { + g.shutdown = make(chan struct{}) + } close(g.shutdown) g.lock.Unlock() @@ -158,37 +175,54 @@ func (g *gracefulManager) doShutdown() { g.doHammerTime(0) <-time.After(1 * time.Second) g.doTerminate() + g.WaitForTerminate() + g.lock.Lock() + if g.done == nil { + g.done = make(chan struct{}) + } + close(g.done) + g.lock.Unlock() }() } -func (g *gracefulManager) doHammerTime(d time.Duration) { +func (g *Manager) doHammerTime(d time.Duration) { time.Sleep(d) + g.lock.Lock() + if g.hammer == nil { + g.hammer = make(chan struct{}) + } + g.lock.Unlock() select { case <-g.hammer: default: log.Warn("Setting Hammer condition") + g.lock.Lock() close(g.hammer) + g.lock.Unlock() } } -func (g *gracefulManager) doTerminate() { +func (g *Manager) doTerminate() { if !g.setStateTransition(stateShuttingDown, stateTerminate) { return } g.lock.Lock() + if g.terminate == nil { + g.terminate = make(chan struct{}) + } close(g.terminate) g.lock.Unlock() } // IsChild returns if the current process is a child of previous Gitea process -func (g *gracefulManager) IsChild() bool { +func (g *Manager) IsChild() bool { return g.isChild } // IsShutdown returns a channel which will be closed at shutdown. // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate -func (g *gracefulManager) IsShutdown() <-chan struct{} { +func (g *Manager) IsShutdown() <-chan struct{} { g.lock.RLock() if g.shutdown == nil { g.lock.RUnlock() @@ -207,7 +241,7 @@ func (g *gracefulManager) IsShutdown() <-chan struct{} { // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate // Servers running within the running server wait group should respond to IsHammer // if not shutdown already -func (g *gracefulManager) IsHammer() <-chan struct{} { +func (g *Manager) IsHammer() <-chan struct{} { g.lock.RLock() if g.hammer == nil { g.lock.RUnlock() @@ -225,7 +259,7 @@ func (g *gracefulManager) IsHammer() <-chan struct{} { // IsTerminate returns a channel which will be closed at terminate // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate // IsTerminate will only close once all running servers have stopped -func (g *gracefulManager) IsTerminate() <-chan struct{} { +func (g *Manager) IsTerminate() <-chan struct{} { g.lock.RLock() if g.terminate == nil { g.lock.RUnlock() @@ -243,29 +277,29 @@ func (g *gracefulManager) IsTerminate() <-chan struct{} { // ServerDone declares a running server done and subtracts one from the // running server wait group. Users probably do not want to call this // and should use one of the RunWithShutdown* functions -func (g *gracefulManager) ServerDone() { +func (g *Manager) ServerDone() { g.runningServerWaitGroup.Done() } // WaitForServers waits for all running servers to finish. Users should probably // instead use AtTerminate or IsTerminate -func (g *gracefulManager) WaitForServers() { +func (g *Manager) WaitForServers() { g.runningServerWaitGroup.Wait() } // WaitForTerminate waits for all terminating actions to finish. // Only the main go-routine should use this -func (g *gracefulManager) WaitForTerminate() { +func (g *Manager) WaitForTerminate() { g.terminateWaitGroup.Wait() } -func (g *gracefulManager) getState() state { +func (g *Manager) getState() state { g.lock.RLock() defer g.lock.RUnlock() return g.state } -func (g *gracefulManager) setStateTransition(old, new state) bool { +func (g *Manager) setStateTransition(old, new state) bool { if old != g.getState() { return false } @@ -279,7 +313,7 @@ func (g *gracefulManager) setStateTransition(old, new state) bool { return true } -func (g *gracefulManager) setState(st state) { +func (g *Manager) setState(st state) { g.lock.Lock() defer g.lock.Unlock() @@ -288,6 +322,42 @@ func (g *gracefulManager) setState(st state) { // InformCleanup tells the cleanup wait group that we have either taken a listener // or will not be taking a listener -func (g *gracefulManager) InformCleanup() { +func (g *Manager) InformCleanup() { g.createServerWaitGroup.Done() } + +// Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating +func (g *Manager) Done() <-chan struct{} { + g.lock.RLock() + if g.done == nil { + g.lock.RUnlock() + g.lock.Lock() + if g.done == nil { + g.done = make(chan struct{}) + } + defer g.lock.Unlock() + return g.done + } + defer g.lock.RUnlock() + return g.done +} + +// Err allows the manager to be viewed as a context.Context done at Terminate, it returns ErrTerminate +func (g *Manager) Err() error { + select { + case <-g.Done(): + return ErrTerminate + default: + return nil + } +} + +// Value allows the manager to be viewed as a context.Context done at Terminate, it has no values +func (g *Manager) Value(key interface{}) interface{} { + return nil +} + +// Deadline returns nil as there is no fixed Deadline for the manager, it allows the manager to be viewed as a context.Context +func (g *Manager) Deadline() (deadline time.Time, ok bool) { + return +} diff --git a/modules/graceful/manager_unix.go b/modules/graceful/manager_unix.go index 1ffc59f0df707..bb33775ca8693 100644 --- a/modules/graceful/manager_unix.go +++ b/modules/graceful/manager_unix.go @@ -19,7 +19,8 @@ import ( "code.gitea.io/gitea/modules/setting" ) -type gracefulManager struct { +// Manager manages the graceful shutdown process +type Manager struct { isChild bool forked bool lock *sync.RWMutex @@ -27,22 +28,23 @@ type gracefulManager struct { shutdown chan struct{} hammer chan struct{} terminate chan struct{} + done chan struct{} runningServerWaitGroup sync.WaitGroup createServerWaitGroup sync.WaitGroup terminateWaitGroup sync.WaitGroup } -func newGracefulManager(ctx context.Context) *gracefulManager { - manager := &gracefulManager{ +func newGracefulManager(ctx context.Context) *Manager { + manager := &Manager{ isChild: len(os.Getenv(listenFDs)) > 0 && os.Getppid() > 1, lock: &sync.RWMutex{}, } manager.createServerWaitGroup.Add(numberOfServersToCreate) - manager.Run(ctx) + manager.run(ctx) return manager } -func (g *gracefulManager) Run(ctx context.Context) { +func (g *Manager) run(ctx context.Context) { g.setState(stateRunning) go g.handleSignals(ctx) c := make(chan struct{}) @@ -61,6 +63,16 @@ func (g *gracefulManager) Run(ctx context.Context) { case <-c: return case <-g.IsShutdown(): + func() { + // When waitgroup counter goes negative it will panic - we don't care about this so we can just ignore it. + defer func() { + _ = recover() + }() + // Ensure that the createServerWaitGroup stops waiting + for { + g.createServerWaitGroup.Done() + } + }() return case <-time.After(setting.StartupTimeout): log.Error("Startup took too long! Shutting down") @@ -70,7 +82,7 @@ func (g *gracefulManager) Run(ctx context.Context) { } } -func (g *gracefulManager) handleSignals(ctx context.Context) { +func (g *Manager) handleSignals(ctx context.Context) { signalChannel := make(chan os.Signal, 1) signal.Notify( @@ -123,7 +135,7 @@ func (g *gracefulManager) handleSignals(ctx context.Context) { } } -func (g *gracefulManager) doFork() error { +func (g *Manager) doFork() error { g.lock.Lock() if g.forked { g.lock.Unlock() @@ -139,7 +151,9 @@ func (g *gracefulManager) doFork() error { return err } -func (g *gracefulManager) RegisterServer() { +// RegisterServer registers the running of a listening server, in the case of unix this means that the parent process can now die. +// Any call to RegisterServer must be matched by a call to ServerDone +func (g *Manager) RegisterServer() { KillParent() g.runningServerWaitGroup.Add(1) } diff --git a/modules/graceful/manager_windows.go b/modules/graceful/manager_windows.go index 26c791e6ed0db..f4f7a64f38a26 100644 --- a/modules/graceful/manager_windows.go +++ b/modules/graceful/manager_windows.go @@ -30,7 +30,8 @@ const ( acceptHammerCode = svc.Accepted(hammerCode) ) -type gracefulManager struct { +// Manager manages the graceful shutdown process +type Manager struct { ctx context.Context isChild bool lock *sync.RWMutex @@ -38,23 +39,24 @@ type gracefulManager struct { shutdown chan struct{} hammer chan struct{} terminate chan struct{} + done chan struct{} runningServerWaitGroup sync.WaitGroup createServerWaitGroup sync.WaitGroup terminateWaitGroup sync.WaitGroup } -func newGracefulManager(ctx context.Context) *gracefulManager { - manager := &gracefulManager{ +func newGracefulManager(ctx context.Context) *Manager { + manager := &Manager{ isChild: false, lock: &sync.RWMutex{}, ctx: ctx, } manager.createServerWaitGroup.Add(numberOfServersToCreate) - manager.Run() + manager.run() return manager } -func (g *gracefulManager) Run() { +func (g *Manager) run() { g.setState(stateRunning) if skip, _ := strconv.ParseBool(os.Getenv("SKIP_MINWINSVC")); skip { return @@ -71,8 +73,8 @@ func (g *gracefulManager) Run() { go run(WindowsServiceName, g) } -// Execute makes gracefulManager implement svc.Handler -func (g *gracefulManager) Execute(args []string, changes <-chan svc.ChangeRequest, status chan<- svc.Status) (svcSpecificEC bool, exitCode uint32) { +// Execute makes Manager implement svc.Handler +func (g *Manager) Execute(args []string, changes <-chan svc.ChangeRequest, status chan<- svc.Status) (svcSpecificEC bool, exitCode uint32) { if setting.StartupTimeout > 0 { status <- svc.Status{State: svc.StartPending} } else { @@ -141,11 +143,13 @@ hammerLoop: return false, 0 } -func (g *gracefulManager) RegisterServer() { +// RegisterServer registers the running of a listening server. +// Any call to RegisterServer must be matched by a call to ServerDone +func (g *Manager) RegisterServer() { g.runningServerWaitGroup.Add(1) } -func (g *gracefulManager) awaitServer(limit time.Duration) bool { +func (g *Manager) awaitServer(limit time.Duration) bool { c := make(chan struct{}) go func() { defer close(c) diff --git a/modules/graceful/net_unix.go b/modules/graceful/net_unix.go index 5550c09f427ce..1e496e9d916ba 100644 --- a/modules/graceful/net_unix.go +++ b/modules/graceful/net_unix.go @@ -101,7 +101,7 @@ func CloseProvidedListeners() error { // creates a new one using net.Listen. func GetListener(network, address string) (net.Listener, error) { // Add a deferral to say that we've tried to grab a listener - defer Manager.InformCleanup() + defer GetManager().InformCleanup() switch network { case "tcp", "tcp4", "tcp6": tcpAddr, err := net.ResolveTCPAddr(network, address) diff --git a/modules/graceful/restart_unix.go b/modules/graceful/restart_unix.go index 3fc4f0511d1df..9a94e5fa677bd 100644 --- a/modules/graceful/restart_unix.go +++ b/modules/graceful/restart_unix.go @@ -22,7 +22,7 @@ var killParent sync.Once // KillParent sends the kill signal to the parent process if we are a child func KillParent() { killParent.Do(func() { - if Manager.IsChild() { + if GetManager().IsChild() { ppid := syscall.Getppid() if ppid > 1 { _ = syscall.Kill(ppid, syscall.SIGTERM) diff --git a/modules/graceful/server.go b/modules/graceful/server.go index c6692cbb7511b..30fb8cdffa3af 100644 --- a/modules/graceful/server.go +++ b/modules/graceful/server.go @@ -47,7 +47,7 @@ type Server struct { // NewServer creates a server on network at provided address func NewServer(network, address string) *Server { - if Manager.IsChild() { + if GetManager().IsChild() { log.Info("Restarting new server: %s:%s on PID: %d", network, address, os.Getpid()) } else { log.Info("Starting new server: %s:%s on PID: %d", network, address, os.Getpid()) @@ -138,12 +138,12 @@ func (srv *Server) ListenAndServeTLSConfig(tlsConfig *tls.Config, serve ServeFun func (srv *Server) Serve(serve ServeFunction) error { defer log.Debug("Serve() returning... (PID: %d)", syscall.Getpid()) srv.setState(stateRunning) - Manager.RegisterServer() + GetManager().RegisterServer() err := serve(srv.listener) log.Debug("Waiting for connections to finish... (PID: %d)", syscall.Getpid()) srv.wg.Wait() srv.setState(stateTerminate) - Manager.ServerDone() + GetManager().ServerDone() // use of closed means that the listeners are closed - i.e. we should be shutting down - return nil if err != nil && strings.Contains(err.Error(), "use of closed") { return nil diff --git a/modules/graceful/server_hooks.go b/modules/graceful/server_hooks.go index 74b0fcb885814..c634905711f34 100644 --- a/modules/graceful/server_hooks.go +++ b/modules/graceful/server_hooks.go @@ -14,15 +14,15 @@ import ( // awaitShutdown waits for the shutdown signal from the Manager func (srv *Server) awaitShutdown() { select { - case <-Manager.IsShutdown(): + case <-GetManager().IsShutdown(): // Shutdown srv.doShutdown() - case <-Manager.IsHammer(): + case <-GetManager().IsHammer(): // Hammer srv.doShutdown() srv.doHammer() } - <-Manager.IsHammer() + <-GetManager().IsHammer() srv.doHammer() } diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index 4e7eaa21b73cb..7ee392036a432 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -6,6 +6,7 @@ package code import ( "fmt" + "os" "strconv" "strings" "time" @@ -38,7 +39,7 @@ func InitRepoIndexer() { repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength) go func() { start := time.Now() - log.Info("Initializing Repository Indexer") + log.Info("PID: %d: Initializing Repository Indexer", os.Getpid()) indexer.InitRepoIndexer(populateRepoIndexerAsynchronously) go processRepoIndexerOperationQueue() waitChannel <- time.Since(start) @@ -46,7 +47,7 @@ func InitRepoIndexer() { if setting.Indexer.StartupTimeout > 0 { go func() { timeout := setting.Indexer.StartupTimeout - if graceful.Manager.IsChild() && setting.GracefulHammerTime > 0 { + if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 { timeout += setting.GracefulHammerTime } select { @@ -90,10 +91,19 @@ func populateRepoIndexerAsynchronously() error { // should only be run when the indexer is created for the first time. func populateRepoIndexer(maxRepoID int64) { log.Info("Populating the repo indexer with existing repositories") + + isShutdown := graceful.GetManager().IsShutdown() + // start with the maximum existing repo ID and work backwards, so that we // don't include repos that are created after gitea starts; such repos will // already be added to the indexer, and we don't need to add them again. for maxRepoID > 0 { + select { + case <-isShutdown: + log.Info("Repository Indexer population shutdown before completion") + return + default: + } repos := make([]*models.Repository, 0, models.RepositoryListDefaultPageSize) err := models.FindByMaxID(maxRepoID, models.RepositoryListDefaultPageSize, &repos) if err != nil { @@ -103,6 +113,12 @@ func populateRepoIndexer(maxRepoID int64) { break } for _, repo := range repos { + select { + case <-isShutdown: + log.Info("Repository Indexer population shutdown before completion") + return + default: + } repoIndexerOperationQueue <- repoIndexerOperation{ repoID: repo.ID, deleted: false, @@ -323,20 +339,26 @@ func nonGenesisChanges(repo *models.Repository, revision string) (*repoChanges, func processRepoIndexerOperationQueue() { for { - op := <-repoIndexerOperationQueue - var err error - if op.deleted { - if err = indexer.DeleteRepoFromIndexer(op.repoID); err != nil { - log.Error("DeleteRepoFromIndexer: %v", err) + select { + case op := <-repoIndexerOperationQueue: + var err error + if op.deleted { + if err = indexer.DeleteRepoFromIndexer(op.repoID); err != nil { + log.Error("DeleteRepoFromIndexer: %v", err) + } + } else { + if err = updateRepoIndexer(op.repoID); err != nil { + log.Error("updateRepoIndexer: %v", err) + } } - } else { - if err = updateRepoIndexer(op.repoID); err != nil { - log.Error("updateRepoIndexer: %v", err) + for _, watcher := range op.watchers { + watcher <- err } + case <-graceful.GetManager().IsShutdown(): + log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid()) + return } - for _, watcher := range op.watchers { - watcher <- err - } + } } diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index f4771136be5f8..edc46d4f016d4 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -172,7 +172,7 @@ func InitIssueIndexer(syncReindex bool) { } else if setting.Indexer.StartupTimeout > 0 { go func() { timeout := setting.Indexer.StartupTimeout - if graceful.Manager.IsChild() && setting.GracefulHammerTime > 0 { + if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 { timeout += setting.GracefulHammerTime } select { diff --git a/modules/ssh/ssh_graceful.go b/modules/ssh/ssh_graceful.go index 4d7557e2ee4d0..f8370ab4db2e3 100644 --- a/modules/ssh/ssh_graceful.go +++ b/modules/ssh/ssh_graceful.go @@ -24,5 +24,5 @@ func listen(server *ssh.Server) { // Unused informs our cleanup routine that we will not be using a ssh port func Unused() { - graceful.Manager.InformCleanup() + graceful.GetManager().InformCleanup() } From 00ddf85c167c337a9845eebe61a4e3281c38df4a Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 7 Dec 2019 17:05:17 +0000 Subject: [PATCH 02/32] Graceful: Make repo indexer shutdown gracefully --- modules/indexer/repo.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/modules/indexer/repo.go b/modules/indexer/repo.go index 841f29acd7474..9a2541939eab8 100644 --- a/modules/indexer/repo.go +++ b/modules/indexer/repo.go @@ -5,9 +5,12 @@ package indexer import ( + "context" + "os" "strings" "sync" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" @@ -104,10 +107,11 @@ func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) func InitRepoIndexer(populateIndexer func() error) { indexer, err := openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) if err != nil { - log.Fatal("InitRepoIndexer: %v", err) + log.Fatal("InitRepoIndexer %s: %v", setting.Indexer.RepoPath, err) } if indexer != nil { indexerHolder.set(indexer) + closeAtTerminate() return } @@ -117,6 +121,21 @@ func InitRepoIndexer(populateIndexer func() error) { if err = populateIndexer(); err != nil { log.Fatal("PopulateRepoIndex: %v", err) } + closeAtTerminate() +} + +func closeAtTerminate() { + graceful.GetManager().RunAtTerminate(context.Background(), func() { + log.Debug("Closing repo indexer") + indexer := indexerHolder.get() + if indexer != nil { + err := indexer.Close() + if err != nil { + log.Error("Error whilst closing the repository indexer: %v", err) + } + } + log.Info("PID: %d Repository Indexer closed", os.Getpid()) + }) } // createRepoIndexer create a repo indexer if one does not already exist From b2dea35351b1513416de151a4d4fa5b33e4aa27f Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Fri, 15 Nov 2019 21:52:16 +0000 Subject: [PATCH 03/32] Graceful: Make the cron tasks graceful --- integrations/auth_ldap_test.go | 6 ++- models/branches.go | 4 +- models/repo.go | 69 ++++++++++++++++++++++++++++++---- models/user.go | 35 ++++++++++++++++- modules/cron/cron.go | 9 ++++- modules/migrations/update.go | 26 +++++++++++-- routers/admin/admin.go | 5 ++- services/mirror/mirror.go | 14 ++++--- 8 files changed, 144 insertions(+), 24 deletions(-) diff --git a/integrations/auth_ldap_test.go b/integrations/auth_ldap_test.go index 5cb2bad57d286..80286c09e6f46 100644 --- a/integrations/auth_ldap_test.go +++ b/integrations/auth_ldap_test.go @@ -5,6 +5,7 @@ package integrations import ( + "context" "net/http" "os" "strings" @@ -147,7 +148,7 @@ func TestLDAPUserSync(t *testing.T) { } defer prepareTestEnv(t)() addAuthSourceLDAP(t, "") - models.SyncExternalUsers() + models.SyncExternalUsers(context.Background()) session := loginUser(t, "user1") // Check if users exists @@ -206,7 +207,8 @@ func TestLDAPUserSSHKeySync(t *testing.T) { } defer prepareTestEnv(t)() addAuthSourceLDAP(t, "sshPublicKey") - models.SyncExternalUsers() + + models.SyncExternalUsers(context.Background()) // Check if users has SSH keys synced for _, u := range gitLDAPUsers { diff --git a/models/branches.go b/models/branches.go index cf4b4fe393205..c3999d9672bf0 100644 --- a/models/branches.go +++ b/models/branches.go @@ -5,6 +5,7 @@ package models import ( + "context" "fmt" "time" @@ -525,7 +526,8 @@ func (deletedBranch *DeletedBranch) LoadUser() { } // RemoveOldDeletedBranches removes old deleted branches -func RemoveOldDeletedBranches() { +func RemoveOldDeletedBranches(ctx context.Context) { + // Nothing to do for shutdown or terminate log.Trace("Doing: DeletedBranchesCleanup") deleteBefore := time.Now().Add(-setting.Cron.DeletedBranchesCleanup.OlderThan) diff --git a/models/repo.go b/models/repo.go index e809bafa309f1..ccc4703f5de19 100644 --- a/models/repo.go +++ b/models/repo.go @@ -7,6 +7,7 @@ package models import ( "bytes" + "context" "crypto/md5" "errors" "fmt" @@ -2268,19 +2269,32 @@ func DeleteRepositoryArchives() error { } // DeleteOldRepositoryArchives deletes old repository archives. -func DeleteOldRepositoryArchives() { +func DeleteOldRepositoryArchives(ctx context.Context) { log.Trace("Doing: ArchiveCleanup") - if err := x.Where("id > 0").Iterate(new(Repository), deleteOldRepositoryArchives); err != nil { + if err := x.Where("id > 0").Iterate(new(Repository), func(idx int, bean interface{}) error { + return deleteOldRepositoryArchives(ctx, idx, bean) + }); err != nil { log.Error("ArchiveClean: %v", err) } } -func deleteOldRepositoryArchives(idx int, bean interface{}) error { +func deleteOldRepositoryArchives(ctx context.Context, idx int, bean interface{}) error { repo := bean.(*Repository) basePath := filepath.Join(repo.RepoPath(), "archives") + select { + case <-ctx.Done(): + return fmt.Errorf("Aborted due to shutdown:\nin delete of old repository archives in %v ", repo) + default: + } for _, ty := range []string{"zip", "targz"} { + select { + case <-ctx.Done(): + return fmt.Errorf("Aborted due to shutdown:\nin delete of old repository archives %v\nat delete file %s", repo, ty) + default: + } + path := filepath.Join(basePath, ty) file, err := os.Open(path) if err != nil { @@ -2303,6 +2317,11 @@ func deleteOldRepositoryArchives(idx int, bean interface{}) error { minimumOldestTime := time.Now().Add(-setting.Cron.ArchiveCleanup.OlderThan) for _, info := range files { if info.ModTime().Before(minimumOldestTime) && !info.IsDir() { + select { + case <-ctx.Done(): + return fmt.Errorf("Aborted due to shutdown:\nin delete of old repository archives %v\nat delete file %s - %s", repo, ty, info.Name()) + default: + } toDelete := filepath.Join(path, info.Name()) // This is a best-effort purge, so we do not check error codes to confirm removal. if err = os.Remove(toDelete); err != nil { @@ -2396,13 +2415,17 @@ func SyncRepositoryHooks() error { } // GitFsck calls 'git fsck' to check repository health. -func GitFsck() { +func GitFsck(ctx context.Context) { log.Trace("Doing: GitFsck") - if err := x. Where("id>0 AND is_fsck_enabled=?", true).BufferSize(setting.Database.IterateBufferSize). Iterate(new(Repository), func(idx int, bean interface{}) error { + select { + case <-ctx.Done(): + return fmt.Errorf("Aborted due to shutdown") + default: + } repo := bean.(*Repository) repoPath := repo.RepoPath() log.Trace("Running health check on repository %s", repoPath) @@ -2448,13 +2471,19 @@ type repoChecker struct { desc string } -func repoStatsCheck(checker *repoChecker) { +func repoStatsCheck(ctx context.Context, checker *repoChecker) { results, err := x.Query(checker.querySQL) if err != nil { log.Error("Select %s: %v", checker.desc, err) return } for _, result := range results { + select { + case <-ctx.Done(): + log.Warn("Aborting due to shutdown") + return + default: + } id := com.StrTo(result["id"]).MustInt64() log.Trace("Updating %s: %d", checker.desc, id) _, err = x.Exec(checker.correctSQL, id, id) @@ -2465,7 +2494,7 @@ func repoStatsCheck(checker *repoChecker) { } // CheckRepoStats checks the repository stats -func CheckRepoStats() { +func CheckRepoStats(ctx context.Context) { log.Trace("Doing: CheckRepoStats") checkers := []*repoChecker{ @@ -2501,7 +2530,13 @@ func CheckRepoStats() { }, } for i := range checkers { - repoStatsCheck(checkers[i]) + select { + case <-ctx.Done(): + log.Warn("Aborting due to shutdown") + return + default: + repoStatsCheck(ctx, checkers[i]) + } } // ***** START: Repository.NumClosedIssues ***** @@ -2511,6 +2546,12 @@ func CheckRepoStats() { log.Error("Select %s: %v", desc, err) } else { for _, result := range results { + select { + case <-ctx.Done(): + log.Warn("Aborting due to shutdown") + return + default: + } id := com.StrTo(result["id"]).MustInt64() log.Trace("Updating %s: %d", desc, id) _, err = x.Exec("UPDATE `repository` SET num_closed_issues=(SELECT COUNT(*) FROM `issue` WHERE repo_id=? AND is_closed=? AND is_pull=?) WHERE id=?", id, true, false, id) @@ -2528,6 +2569,12 @@ func CheckRepoStats() { log.Error("Select %s: %v", desc, err) } else { for _, result := range results { + select { + case <-ctx.Done(): + log.Warn("Aborting due to shutdown") + return + default: + } id := com.StrTo(result["id"]).MustInt64() log.Trace("Updating %s: %d", desc, id) _, err = x.Exec("UPDATE `repository` SET num_closed_pulls=(SELECT COUNT(*) FROM `issue` WHERE repo_id=? AND is_closed=? AND is_pull=?) WHERE id=?", id, true, true, id) @@ -2545,6 +2592,12 @@ func CheckRepoStats() { log.Error("Select repository count 'num_forks': %v", err) } else { for _, result := range results { + select { + case <-ctx.Done(): + log.Warn("Aborting due to shutdown") + return + default: + } id := com.StrTo(result["id"]).MustInt64() log.Trace("Updating repository count 'num_forks': %d", id) diff --git a/models/user.go b/models/user.go index 2cef2e5deccce..0454158de6863 100644 --- a/models/user.go +++ b/models/user.go @@ -7,6 +7,7 @@ package models import ( "container/list" + "context" "crypto/md5" "crypto/sha256" "crypto/subtle" @@ -1695,7 +1696,7 @@ func synchronizeLdapSSHPublicKeys(usr *User, s *LoginSource, sshPublicKeys []str } // SyncExternalUsers is used to synchronize users with external authorization source -func SyncExternalUsers() { +func SyncExternalUsers(ctx context.Context) { log.Trace("Doing: SyncExternalUsers") ls, err := LoginSources() @@ -1710,6 +1711,12 @@ func SyncExternalUsers() { if !s.IsActived || !s.IsSyncEnabled { continue } + select { + case <-ctx.Done(): + log.Warn("SyncExternalUsers: Aborted due to shutdown before update of %s", s.Name) + return + default: + } if s.IsLDAP() { log.Trace("Doing: SyncExternalUsers[%s]", s.Name) @@ -1727,6 +1734,12 @@ func SyncExternalUsers() { log.Error("SyncExternalUsers: %v", err) return } + select { + case <-ctx.Done(): + log.Warn("SyncExternalUsers: Aborted due to shutdown before update of %s", s.Name) + return + default: + } sr, err := s.LDAP().SearchEntries() if err != nil { @@ -1735,6 +1748,19 @@ func SyncExternalUsers() { } for _, su := range sr { + select { + case <-ctx.Done(): + log.Warn("SyncExternalUsers: Aborted due to shutdown at update of %s before completed update of users", s.Name) + // Rewrite authorized_keys file if LDAP Public SSH Key attribute is set and any key was added or removed + if sshKeysNeedUpdate { + err = RewriteAllPublicKeys() + if err != nil { + log.Error("RewriteAllPublicKeys: %v", err) + } + } + return + default: + } if len(su.Username) == 0 { continue } @@ -1819,6 +1845,13 @@ func SyncExternalUsers() { } } + select { + case <-ctx.Done(): + log.Warn("SyncExternalUsers: Aborted due to shutdown at update of %s before delete users", s.Name) + return + default: + } + // Deactivate users not present in LDAP if updateExisting { for _, usr := range users { diff --git a/modules/cron/cron.go b/modules/cron/cron.go index 795fafb51fdaa..f4511a8e79561 100644 --- a/modules/cron/cron.go +++ b/modules/cron/cron.go @@ -6,9 +6,11 @@ package cron import ( + "context" "time" "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/migrations" "code.gitea.io/gitea/modules/setting" @@ -37,17 +39,19 @@ var taskStatusTable = sync.NewStatusTable() type Func func() // WithUnique wrap a cron func with an unique running check -func WithUnique(name string, body Func) Func { +func WithUnique(name string, body func(context.Context)) Func { return func() { if !taskStatusTable.StartIfNotRunning(name) { return } defer taskStatusTable.Stop(name) - body() + graceful.GetManager().RunWithShutdownContext(body) } } // NewContext begins cron tasks +// Each cron task is run within the shutdown context as a running server +// AtShutdown the cron server is stopped func NewContext() { var ( entry *cron.Entry @@ -129,6 +133,7 @@ func NewContext() { go WithUnique(updateMigrationPosterID, migrations.UpdateMigrationPosterID)() c.Start() + graceful.GetManager().RunAtShutdown(context.Background(), c.Stop) } // ListTasks returns all running cron tasks. diff --git a/modules/migrations/update.go b/modules/migrations/update.go index d1465b2baf36c..3d0962657c299 100644 --- a/modules/migrations/update.go +++ b/modules/migrations/update.go @@ -5,21 +5,28 @@ package migrations import ( + "context" + "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/structs" ) // UpdateMigrationPosterID updates all migrated repositories' issues and comments posterID -func UpdateMigrationPosterID() { +func UpdateMigrationPosterID(ctx context.Context) { for _, gitService := range structs.SupportedFullGitService { - if err := updateMigrationPosterIDByGitService(gitService); err != nil { + select { + case <-ctx.Done(): + log.Warn("UpdateMigrationPosterID aborted due to shutdown before %s", gitService.Name()) + default: + } + if err := updateMigrationPosterIDByGitService(ctx, gitService); err != nil { log.Error("updateMigrationPosterIDByGitService failed: %v", err) } } } -func updateMigrationPosterIDByGitService(tp structs.GitServiceType) error { +func updateMigrationPosterIDByGitService(ctx context.Context, tp structs.GitServiceType) error { provider := tp.Name() if len(provider) == 0 { return nil @@ -28,6 +35,13 @@ func updateMigrationPosterIDByGitService(tp structs.GitServiceType) error { const batchSize = 100 var start int for { + select { + case <-ctx.Done(): + log.Warn("UpdateMigrationPosterIDByGitService(%s) aborted due to shutdown", tp.Name()) + return nil + default: + } + users, err := models.FindExternalUsersByProvider(models.FindExternalUserOptions{ Provider: provider, Start: start, @@ -38,6 +52,12 @@ func updateMigrationPosterIDByGitService(tp structs.GitServiceType) error { } for _, user := range users { + select { + case <-ctx.Done(): + log.Warn("UpdateMigrationPosterIDByGitService(%s) aborted due to shutdown", tp.Name()) + return nil + default: + } externalUserID := user.ExternalID if err := models.UpdateMigrationsByType(tp, externalUserID, user.UserID); err != nil { log.Error("UpdateMigrationsByType type %s external user id %v to local user id %v failed: %v", tp.Name(), user.ExternalID, user.UserID, err) diff --git a/routers/admin/admin.go b/routers/admin/admin.go index 9f155ff008bea..ccedcaf8a62e9 100644 --- a/routers/admin/admin.go +++ b/routers/admin/admin.go @@ -19,6 +19,7 @@ import ( "code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/cron" "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/setting" @@ -171,10 +172,10 @@ func Dashboard(ctx *context.Context) { err = models.ReinitMissingRepositories() case syncExternalUsers: success = ctx.Tr("admin.dashboard.sync_external_users_started") - go models.SyncExternalUsers() + go graceful.GetManager().RunWithShutdownContext(models.SyncExternalUsers) case gitFsck: success = ctx.Tr("admin.dashboard.git_fsck_started") - go models.GitFsck() + go graceful.GetManager().RunWithShutdownContext(models.GitFsck) case deleteGeneratedRepositoryAvatars: success = ctx.Tr("admin.dashboard.delete_generated_repository_avatars_success") err = models.RemoveRandomAvatars() diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go index 9c52f1723ba3f..0c461b0f47cfa 100644 --- a/services/mirror/mirror.go +++ b/services/mirror/mirror.go @@ -5,6 +5,7 @@ package mirror import ( + "context" "fmt" "net/url" "strings" @@ -293,18 +294,21 @@ func Password(m *models.Mirror) string { } // Update checks and updates mirror repositories. -func Update() { +func Update(ctx context.Context) { log.Trace("Doing: Update") - if err := models.MirrorsIterate(func(idx int, bean interface{}) error { m := bean.(*models.Mirror) if m.Repo == nil { log.Error("Disconnected mirror repository found: %d", m.ID) return nil } - - mirrorQueue.Add(m.RepoID) - return nil + select { + case <-ctx.Done(): + return fmt.Errorf("Aborted due to shutdown") + default: + mirrorQueue.Add(m.RepoID) + return nil + } }); err != nil { log.Error("Update: %v", err) } From c0ea8ef27198cc05dbd6a21767ae5d4fe6b5df4c Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Wed, 13 Nov 2019 18:27:38 +0000 Subject: [PATCH 04/32] Graceful: Make TestPullRequests shutdownable services/pull/check.go:TestPullRequests run runs within the shutdown context and will respond the manager requesting it be shutdown. In future this should probably become a persitable queue with pooled workers but that restructure can wait. --- services/pull/check.go | 54 ++++++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/services/pull/check.go b/services/pull/check.go index 0fd3e2a76f47b..1e10abdd82e16 100644 --- a/services/pull/check.go +++ b/services/pull/check.go @@ -6,6 +6,7 @@ package pull import ( + "context" "fmt" "io/ioutil" "os" @@ -16,6 +17,7 @@ import ( "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/sync" @@ -151,7 +153,7 @@ func manuallyMerged(pr *models.PullRequest) bool { // TestPullRequests checks and tests untested patches of pull requests. // TODO: test more pull requests at same time. -func TestPullRequests() { +func TestPullRequests(ctx context.Context) { prs, err := models.GetPullRequestsByCheckStatus(models.PullRequestStatusChecking) if err != nil { log.Error("Find Checking PRs: %v", err) @@ -176,34 +178,46 @@ func TestPullRequests() { } checkAndUpdateStatus(pr) + select { + case <-ctx.Done(): + log.Info("PID: %d Pull Request testing shutdown", os.Getpid()) + return + default: + } } // Start listening on new test requests. - for prID := range pullRequestQueue.Queue() { - log.Trace("TestPullRequests[%v]: processing test task", prID) - pullRequestQueue.Remove(prID) + for { + select { + case prID := <-pullRequestQueue.Queue(): + log.Trace("TestPullRequests[%v]: processing test task", prID) + pullRequestQueue.Remove(prID) + + id := com.StrTo(prID).MustInt64() + if _, ok := checkedPRs[id]; ok { + continue + } - id := com.StrTo(prID).MustInt64() - if _, ok := checkedPRs[id]; ok { - continue - } + pr, err := models.GetPullRequestByID(id) + if err != nil { + log.Error("GetPullRequestByID[%s]: %v", prID, err) + continue + } else if manuallyMerged(pr) { + continue + } else if err = pr.TestPatch(); err != nil { + log.Error("testPatch[%d]: %v", pr.ID, err) + continue + } - pr, err := models.GetPullRequestByID(id) - if err != nil { - log.Error("GetPullRequestByID[%s]: %v", prID, err) - continue - } else if manuallyMerged(pr) { - continue - } else if err = pr.TestPatch(); err != nil { - log.Error("testPatch[%d]: %v", pr.ID, err) - continue + checkAndUpdateStatus(pr) + case <-ctx.Done(): + log.Info("PID: %d Pull Request testing shutdown", os.Getpid()) + return } - - checkAndUpdateStatus(pr) } } // Init runs the task queue to test all the checking status pull requests func Init() { - go TestPullRequests() + go graceful.GetManager().RunWithShutdownContext(TestPullRequests) } From 7beda2dbb767d185660339a420639d458faf9978 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 7 Dec 2019 16:35:51 +0000 Subject: [PATCH 05/32] Graceful: AddTestPullRequest run in graceful ctx --- services/pull/pull.go | 63 ++++++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/services/pull/pull.go b/services/pull/pull.go index 2650dacc116da..76cd88638eeaf 100644 --- a/services/pull/pull.go +++ b/services/pull/pull.go @@ -5,10 +5,12 @@ package pull import ( + "context" "fmt" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/notification" issue_service "code.gitea.io/gitea/services/issue" @@ -44,6 +46,7 @@ func checkForInvalidation(requests models.PullRequestList, repoID int64, doer *m return fmt.Errorf("git.OpenRepository: %v", err) } go func() { + // FIXME: graceful: We need to tell the manager we're doing something... err := requests.InvalidateCodeComments(doer, gitRepo, branch) if err != nil { log.Error("PullRequestList.InvalidateCodeComments: %v", err) @@ -72,37 +75,43 @@ func addHeadRepoTasks(prs []*models.PullRequest) { // and generate new patch for testing as needed. func AddTestPullRequestTask(doer *models.User, repoID int64, branch string, isSync bool) { log.Trace("AddTestPullRequestTask [head_repo_id: %d, head_branch: %s]: finding pull requests", repoID, branch) - prs, err := models.GetUnmergedPullRequestsByHeadInfo(repoID, branch) - if err != nil { - log.Error("Find pull requests [head_repo_id: %d, head_branch: %s]: %v", repoID, branch, err) - return - } + graceful.GetManager().RunWithShutdownContext(func(ctx context.Context) { + // There is no sensible way to shut this down ":-(" + // If you don't let it run all the way then you will lose data + // FIXME: graceful: AddTestPullRequestTask needs to become a queue! - if isSync { - requests := models.PullRequestList(prs) - if err = requests.LoadAttributes(); err != nil { - log.Error("PullRequestList.LoadAttributes: %v", err) - } - if invalidationErr := checkForInvalidation(requests, repoID, doer, branch); invalidationErr != nil { - log.Error("checkForInvalidation: %v", invalidationErr) + prs, err := models.GetUnmergedPullRequestsByHeadInfo(repoID, branch) + if err != nil { + log.Error("Find pull requests [head_repo_id: %d, head_branch: %s]: %v", repoID, branch, err) + return } - if err == nil { - for _, pr := range prs { - pr.Issue.PullRequest = pr - notification.NotifyPullRequestSynchronized(doer, pr) + + if isSync { + requests := models.PullRequestList(prs) + if err = requests.LoadAttributes(); err != nil { + log.Error("PullRequestList.LoadAttributes: %v", err) + } + if invalidationErr := checkForInvalidation(requests, repoID, doer, branch); invalidationErr != nil { + log.Error("checkForInvalidation: %v", invalidationErr) + } + if err == nil { + for _, pr := range prs { + pr.Issue.PullRequest = pr + notification.NotifyPullRequestSynchronized(doer, pr) + } } } - } - addHeadRepoTasks(prs) + addHeadRepoTasks(prs) - log.Trace("AddTestPullRequestTask [base_repo_id: %d, base_branch: %s]: finding pull requests", repoID, branch) - prs, err = models.GetUnmergedPullRequestsByBaseInfo(repoID, branch) - if err != nil { - log.Error("Find pull requests [base_repo_id: %d, base_branch: %s]: %v", repoID, branch, err) - return - } - for _, pr := range prs { - AddToTaskQueue(pr) - } + log.Trace("AddTestPullRequestTask [base_repo_id: %d, base_branch: %s]: finding pull requests", repoID, branch) + prs, err = models.GetUnmergedPullRequestsByBaseInfo(repoID, branch) + if err != nil { + log.Error("Find pull requests [base_repo_id: %d, base_branch: %s]: %v", repoID, branch, err) + return + } + for _, pr := range prs { + AddToTaskQueue(pr) + } + }) } From db022dd351e2f6f055e2f2e880d5e0c748e62c95 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sun, 8 Dec 2019 03:46:41 +0000 Subject: [PATCH 06/32] Graceful: SetDefaultContext for Xorm to be HammerContext --- models/models.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/models/models.go b/models/models.go index 8c10e7abfc1e6..bcacd94de5528 100644 --- a/models/models.go +++ b/models/models.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/setting" // Needed for the MySQL driver @@ -169,6 +170,8 @@ func NewEngine(migrateFunc func(*xorm.Engine) error) (err error) { return err } + x.SetDefaultContext(graceful.GetManager().HammerContext()) + if err = x.Ping(); err != nil { return err } From 1aafb562f397a116579313f3148566b6e0134750 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sun, 8 Dec 2019 04:37:19 +0000 Subject: [PATCH 07/32] Avoid starting graceful for migrate commands and checkout --- cmd/migrate.go | 4 +++- cmd/web.go | 2 +- contrib/pr/checkout.go | 3 ++- integrations/integration_test.go | 2 +- integrations/migration-test/migration_test.go | 3 ++- models/models.go | 6 +++--- modules/git/git.go | 3 ++- modules/git/git_test.go | 3 ++- modules/graceful/manager.go | 4 ---- routers/init.go | 11 ++++++----- routers/install.go | 3 ++- 11 files changed, 24 insertions(+), 20 deletions(-) diff --git a/cmd/migrate.go b/cmd/migrate.go index 1fa1d09e25378..2428925887c9a 100644 --- a/cmd/migrate.go +++ b/cmd/migrate.go @@ -5,6 +5,8 @@ package cmd import ( + "context" + "code.gitea.io/gitea/models" "code.gitea.io/gitea/models/migrations" "code.gitea.io/gitea/modules/log" @@ -32,7 +34,7 @@ func runMigrate(ctx *cli.Context) error { log.Trace("Log path: %s", setting.LogRootPath) setting.InitDBConfig() - if err := models.NewEngine(migrations.Migrate); err != nil { + if err := models.NewEngine(context.Background(), migrations.Migrate); err != nil { log.Fatal("Failed to initialize ORM engine: %v", err) return err } diff --git a/cmd/web.go b/cmd/web.go index 9c9d4b3547e3a..98d5c59105776 100644 --- a/cmd/web.go +++ b/cmd/web.go @@ -113,7 +113,7 @@ func runWeb(ctx *cli.Context) error { } // Perform global initialization - routers.GlobalInit() + routers.GlobalInit(graceful.GetManager().HammerContext()) // Set up Macaron m := routes.NewMacaron() diff --git a/contrib/pr/checkout.go b/contrib/pr/checkout.go index 9c063572950e8..34cd82ff0a38f 100644 --- a/contrib/pr/checkout.go +++ b/contrib/pr/checkout.go @@ -5,6 +5,7 @@ Checkout a PR and load the tests data into sqlite database */ import ( + "context" "flag" "fmt" "io/ioutil" @@ -92,7 +93,7 @@ func runPR() { //x, err = xorm.NewEngine("sqlite3", "file::memory:?cache=shared") var helper testfixtures.Helper = &testfixtures.SQLite{} - models.NewEngine(func(_ *xorm.Engine) error { + models.NewEngine(context.Background(), func(_ *xorm.Engine) error { return nil }) models.HasEngine = true diff --git a/integrations/integration_test.go b/integrations/integration_test.go index 55d318fd800fe..bb43baa9d7ab6 100644 --- a/integrations/integration_test.go +++ b/integrations/integration_test.go @@ -176,7 +176,7 @@ func initIntegrationTest() { } defer db.Close() } - routers.GlobalInit() + routers.GlobalInit(graceful.GetManager().HammerContext()) } func prepareTestEnv(t testing.TB, skip ...int) func() { diff --git a/integrations/migration-test/migration_test.go b/integrations/migration-test/migration_test.go index 4fe36dc021155..c274d482da147 100644 --- a/integrations/migration-test/migration_test.go +++ b/integrations/migration-test/migration_test.go @@ -6,6 +6,7 @@ package migrations import ( "compress/gzip" + "context" "database/sql" "fmt" "io/ioutil" @@ -220,7 +221,7 @@ func doMigrationTest(t *testing.T, version string) { err := models.SetEngine() assert.NoError(t, err) - err = models.NewEngine(wrappedMigrate) + err = models.NewEngine(context.Background(), wrappedMigrate) assert.NoError(t, err) currentEngine.Close() } diff --git a/models/models.go b/models/models.go index bcacd94de5528..9eb174e200d4a 100644 --- a/models/models.go +++ b/models/models.go @@ -6,11 +6,11 @@ package models import ( + "context" "database/sql" "errors" "fmt" - "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/setting" // Needed for the MySQL driver @@ -165,12 +165,12 @@ func SetEngine() (err error) { } // NewEngine initializes a new xorm.Engine -func NewEngine(migrateFunc func(*xorm.Engine) error) (err error) { +func NewEngine(ctx context.Context, migrateFunc func(*xorm.Engine) error) (err error) { if err = SetEngine(); err != nil { return err } - x.SetDefaultContext(graceful.GetManager().HammerContext()) + x.SetDefaultContext(ctx) if err = x.Ping(); err != nil { return err diff --git a/modules/git/git.go b/modules/git/git.go index 286e1ad8b4c45..d5caaa0912163 100644 --- a/modules/git/git.go +++ b/modules/git/git.go @@ -106,7 +106,8 @@ func SetExecutablePath(path string) error { } // Init initializes git module -func Init() error { +func Init(ctx context.Context) error { + DefaultContext = ctx // Git requires setting user.name and user.email in order to commit changes. for configKey, defaultValue := range map[string]string{"user.name": "Gitea", "user.email": "gitea@fake.local"} { if stdout, stderr, err := process.GetManager().Exec("git.Init(get setting)", GitExecutable, "config", "--get", configKey); err != nil || strings.TrimSpace(stdout) == "" { diff --git a/modules/git/git_test.go b/modules/git/git_test.go index 0c6259a9c559f..27951d639bb7d 100644 --- a/modules/git/git_test.go +++ b/modules/git/git_test.go @@ -5,6 +5,7 @@ package git import ( + "context" "fmt" "os" "testing" @@ -16,7 +17,7 @@ func fatalTestError(fmtStr string, args ...interface{}) { } func TestMain(m *testing.M) { - if err := Init(); err != nil { + if err := Init(context.Background()); err != nil { fatalTestError("Init failed: %v", err) } diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go index 19b8e4d266085..5b8fc95205eee 100644 --- a/modules/graceful/manager.go +++ b/modules/graceful/manager.go @@ -9,7 +9,6 @@ import ( "sync" "time" - "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/setting" @@ -50,9 +49,6 @@ func InitManager(ctx context.Context) { initOnce.Do(func() { manager = newGracefulManager(ctx) - // Set the git default context to the HammerContext - git.DefaultContext = manager.HammerContext() - // Set the process default context to the HammerContext process.DefaultContext = manager.HammerContext() }) diff --git a/routers/init.go b/routers/init.go index 81418a4ad5a5d..01df15d6c5934 100644 --- a/routers/init.go +++ b/routers/init.go @@ -5,6 +5,7 @@ package routers import ( + "context" "strings" "time" @@ -53,11 +54,11 @@ func NewServices() { } // In case of problems connecting to DB, retry connection. Eg, PGSQL in Docker Container on Synology -func initDBEngine() (err error) { +func initDBEngine(ctx context.Context) (err error) { log.Info("Beginning ORM engine initialization.") for i := 0; i < setting.Database.DBConnectRetries; i++ { log.Info("ORM engine initialization attempt #%d/%d...", i+1, setting.Database.DBConnectRetries) - if err = models.NewEngine(migrations.Migrate); err == nil { + if err = models.NewEngine(ctx, migrations.Migrate); err == nil { break } else if i == setting.Database.DBConnectRetries-1 { return err @@ -71,9 +72,9 @@ func initDBEngine() (err error) { } // GlobalInit is for global configuration reload-able. -func GlobalInit() { +func GlobalInit(ctx context.Context) { setting.NewContext() - if err := git.Init(); err != nil { + if err := git.Init(ctx); err != nil { log.Fatal("Git module init failed: %v", err) } setting.CheckLFSVersion() @@ -88,7 +89,7 @@ func GlobalInit() { highlight.NewContext() external.RegisterParsers() markup.Init() - if err := initDBEngine(); err == nil { + if err := initDBEngine(ctx); err == nil { log.Info("ORM engine initialization successful!") } else { log.Fatal("ORM engine initialization failed: %v", err) diff --git a/routers/install.go b/routers/install.go index 53880d2c463b1..7395aeee84b6c 100644 --- a/routers/install.go +++ b/routers/install.go @@ -16,6 +16,7 @@ import ( "code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/generate" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/user" @@ -351,7 +352,7 @@ func InstallPost(ctx *context.Context, form auth.InstallForm) { return } - GlobalInit() + GlobalInit(graceful.GetManager().HammerContext()) // Create admin account if len(form.AdminName) > 0 { From 7589a49dbc1e41d776e1c094d9ad3d246548fa87 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Mon, 9 Dec 2019 21:38:22 +0000 Subject: [PATCH 08/32] Graceful: DeliverHooks now can be shutdown --- modules/webhook/deliver.go | 67 +++++++++++++++++++++++++++----------- 1 file changed, 48 insertions(+), 19 deletions(-) diff --git a/modules/webhook/deliver.go b/modules/webhook/deliver.go index b262505cead15..e9a4f92f0736b 100644 --- a/modules/webhook/deliver.go +++ b/modules/webhook/deliver.go @@ -5,6 +5,7 @@ package webhook import ( + "context" "crypto/tls" "fmt" "io/ioutil" @@ -16,6 +17,7 @@ import ( "time" "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "github.com/gobwas/glob" @@ -145,43 +147,70 @@ func Deliver(t *models.HookTask) error { } // DeliverHooks checks and delivers undelivered hooks. -// TODO: shoot more hooks at same time. -func DeliverHooks() { +// FIXME: graceful: This would likely benefit from either a worker pool with dummy queue +// or a full queue. Then more hooks could be sent at same time. +func DeliverHooks(ctx context.Context) { + select { + case <-ctx.Done(): + return + default: + } tasks, err := models.FindUndeliveredHookTasks() if err != nil { log.Error("DeliverHooks: %v", err) return } + select { + case <-ctx.Done(): + return + default: + } // Update hook task status. for _, t := range tasks { if err = Deliver(t); err != nil { log.Error("deliver: %v", err) } + select { + case <-ctx.Done(): + return + default: + } } // Start listening on new hook requests. - for repoIDStr := range hookQueue.Queue() { - log.Trace("DeliverHooks [repo_id: %v]", repoIDStr) - hookQueue.Remove(repoIDStr) + for { + select { + case <-ctx.Done(): + return + case repoIDStr := <-hookQueue.Queue(): + log.Trace("DeliverHooks [repo_id: %v]", repoIDStr) + hookQueue.Remove(repoIDStr) - repoID, err := com.StrTo(repoIDStr).Int64() - if err != nil { - log.Error("Invalid repo ID: %s", repoIDStr) - continue - } + repoID, err := com.StrTo(repoIDStr).Int64() + if err != nil { + log.Error("Invalid repo ID: %s", repoIDStr) + continue + } - tasks, err := models.FindRepoUndeliveredHookTasks(repoID) - if err != nil { - log.Error("Get repository [%d] hook tasks: %v", repoID, err) - continue - } - for _, t := range tasks { - if err = Deliver(t); err != nil { - log.Error("deliver: %v", err) + tasks, err := models.FindRepoUndeliveredHookTasks(repoID) + if err != nil { + log.Error("Get repository [%d] hook tasks: %v", repoID, err) + continue + } + for _, t := range tasks { + select { + case <-ctx.Done(): + return + default: + } + if err = Deliver(t); err != nil { + log.Error("deliver: %v", err) + } } } } + } var ( @@ -234,5 +263,5 @@ func InitDeliverHooks() { }, } - go DeliverHooks() + go graceful.GetManager().RunWithShutdownContext(DeliverHooks) } From 89154f5688f8945ac680e0fc6ba8d59ef23d6b3e Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Mon, 9 Dec 2019 21:52:00 +0000 Subject: [PATCH 09/32] Graceful: SyncMirrors shutdown Termination of syncs should take place with closure at hammer time in xorm and git --- services/mirror/mirror.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go index 0c461b0f47cfa..487def01635bf 100644 --- a/services/mirror/mirror.go +++ b/services/mirror/mirror.go @@ -11,6 +11,8 @@ import ( "strings" "time" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/cache" "code.gitea.io/gitea/modules/git" @@ -315,11 +317,16 @@ func Update(ctx context.Context) { } // SyncMirrors checks and syncs mirrors. -// TODO: sync more mirrors at same time. -func SyncMirrors() { +// FIXME: graceful: this should be a persistable queue +func SyncMirrors(ctx context.Context) { // Start listening on new sync requests. - for repoID := range mirrorQueue.Queue() { - syncMirror(repoID) + for { + select { + case <-ctx.Done(): + return + case repoID := <-mirrorQueue.Queue(): + syncMirror(repoID) + } } } @@ -419,7 +426,7 @@ func syncMirror(repoID string) { // InitSyncMirrors initializes a go routine to sync the mirrors func InitSyncMirrors() { - go SyncMirrors() + go graceful.GetManager().RunWithShutdownContext(SyncMirrors) } // StartToMirror adds repoID to mirror queue From d7d6c86b03d6c470e677ad11d5e68bb2e844e642 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Thu, 12 Dec 2019 09:47:29 +0000 Subject: [PATCH 10/32] Remove unnecessary ctx check --- models/repo.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/models/repo.go b/models/repo.go index ccc4703f5de19..b58381e8d7178 100644 --- a/models/repo.go +++ b/models/repo.go @@ -2282,11 +2282,6 @@ func DeleteOldRepositoryArchives(ctx context.Context) { func deleteOldRepositoryArchives(ctx context.Context, idx int, bean interface{}) error { repo := bean.(*Repository) basePath := filepath.Join(repo.RepoPath(), "archives") - select { - case <-ctx.Done(): - return fmt.Errorf("Aborted due to shutdown:\nin delete of old repository archives in %v ", repo) - default: - } for _, ty := range []string{"zip", "targz"} { select { From 1a79b2f25e33601dcde72ce1c7f51ae77ac44ca5 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Thu, 12 Dec 2019 10:04:25 +0000 Subject: [PATCH 11/32] Fix hammer syncing --- modules/graceful/manager.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go index 5b8fc95205eee..6d133cacdfbc1 100644 --- a/modules/graceful/manager.go +++ b/modules/graceful/manager.go @@ -187,16 +187,13 @@ func (g *Manager) doHammerTime(d time.Duration) { if g.hammer == nil { g.hammer = make(chan struct{}) } - g.lock.Unlock() select { case <-g.hammer: default: log.Warn("Setting Hammer condition") - g.lock.Lock() close(g.hammer) - g.lock.Unlock() } - + g.lock.Unlock() } func (g *Manager) doTerminate() { From 7ce742c33780028106220ff77f092c03326cfef5 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Thu, 12 Dec 2019 10:09:48 +0000 Subject: [PATCH 12/32] rename manager.run to manager.start --- modules/graceful/manager_unix.go | 4 ++-- modules/graceful/manager_windows.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/graceful/manager_unix.go b/modules/graceful/manager_unix.go index bb33775ca8693..280e5ebae5153 100644 --- a/modules/graceful/manager_unix.go +++ b/modules/graceful/manager_unix.go @@ -40,11 +40,11 @@ func newGracefulManager(ctx context.Context) *Manager { lock: &sync.RWMutex{}, } manager.createServerWaitGroup.Add(numberOfServersToCreate) - manager.run(ctx) + manager.start(ctx) return manager } -func (g *Manager) run(ctx context.Context) { +func (g *Manager) start(ctx context.Context) { g.setState(stateRunning) go g.handleSignals(ctx) c := make(chan struct{}) diff --git a/modules/graceful/manager_windows.go b/modules/graceful/manager_windows.go index f4f7a64f38a26..2826d2536cb18 100644 --- a/modules/graceful/manager_windows.go +++ b/modules/graceful/manager_windows.go @@ -52,11 +52,11 @@ func newGracefulManager(ctx context.Context) *Manager { ctx: ctx, } manager.createServerWaitGroup.Add(numberOfServersToCreate) - manager.run() + manager.start() return manager } -func (g *Manager) run() { +func (g *Manager) start() { g.setState(stateRunning) if skip, _ := strconv.ParseBool(os.Getenv("SKIP_MINWINSVC")); skip { return From fa751ea69a6f97744ee4af515f0b9da4038ab3ba Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Thu, 12 Dec 2019 15:58:02 +0000 Subject: [PATCH 13/32] adjust ctx.Done check --- modules/webhook/deliver.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/modules/webhook/deliver.go b/modules/webhook/deliver.go index e9a4f92f0736b..f982689a48574 100644 --- a/modules/webhook/deliver.go +++ b/modules/webhook/deliver.go @@ -160,22 +160,17 @@ func DeliverHooks(ctx context.Context) { log.Error("DeliverHooks: %v", err) return } - select { - case <-ctx.Done(): - return - default: - } // Update hook task status. for _, t := range tasks { - if err = Deliver(t); err != nil { - log.Error("deliver: %v", err) - } select { case <-ctx.Done(): return default: } + if err = Deliver(t); err != nil { + log.Error("deliver: %v", err) + } } // Start listening on new hook requests. From 19f2ca74068c3f5a091b41c1b0a1f7c9c1cce8ca Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Thu, 12 Dec 2019 16:37:25 +0000 Subject: [PATCH 14/32] Prevent deadlock in mirror.Update on shutdown --- modules/sync/unique_queue.go | 29 +++++++++++++++++++++++++++-- services/mirror/mirror.go | 2 +- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/modules/sync/unique_queue.go b/modules/sync/unique_queue.go index de694d8560cda..6b007594d2679 100644 --- a/modules/sync/unique_queue.go +++ b/modules/sync/unique_queue.go @@ -1,10 +1,13 @@ // Copyright 2016 The Gogs Authors. All rights reserved. +// Copyright 2019 The Gogs Authors. All rights reserved. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. package sync import ( + "context" + "github.com/unknwon/com" ) @@ -45,8 +48,15 @@ func (q *UniqueQueue) Exist(id interface{}) bool { // AddFunc adds new instance to the queue with a custom runnable function, // the queue is blocked until the function exits. func (q *UniqueQueue) AddFunc(id interface{}, fn func()) { + q.AddCtxFunc(nil, id, fn) +} + +// AddCtxFunc adds new instance to the queue with a custom runnable function, +// the queue is blocked until the function exits. If the context is done before +// the id is added to the queue it will not be added and false will be returned. +func (q *UniqueQueue) AddCtxFunc(ctx context.Context, id interface{}, fn func()) bool { if q.Exist(id) { - return + return true } idStr := com.ToStr(id) @@ -56,7 +66,17 @@ func (q *UniqueQueue) AddFunc(id interface{}, fn func()) { fn() } q.table.lock.Unlock() - q.queue <- idStr + if ctx == nil { + q.queue <- idStr + return true + + } + select { + case <-ctx.Done(): + return false + case q.queue <- idStr: + return true + } } // Add adds new instance to the queue. @@ -64,6 +84,11 @@ func (q *UniqueQueue) Add(id interface{}) { q.AddFunc(id, nil) } +// AddCtx adds new instance to the queue with a context - if the context is done before the id is added to the queue it is cancelled +func (q *UniqueQueue) AddCtx(ctx context.Context, id interface{}) bool { + return q.AddCtxFunc(ctx, id, nil) +} + // Remove removes instance from the queue. func (q *UniqueQueue) Remove(id interface{}) { q.table.Stop(com.ToStr(id)) diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go index 487def01635bf..b1678da62c531 100644 --- a/services/mirror/mirror.go +++ b/services/mirror/mirror.go @@ -308,7 +308,7 @@ func Update(ctx context.Context) { case <-ctx.Done(): return fmt.Errorf("Aborted due to shutdown") default: - mirrorQueue.Add(m.RepoID) + _ = mirrorQueue.AddCtx(ctx, m.RepoID) return nil } }); err != nil { From ef9db2d16d89c261d423ee8ad7fd098b60ba9ab5 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Thu, 12 Dec 2019 16:45:29 +0000 Subject: [PATCH 15/32] Lint doesn't permit passing in ctx nil --- modules/sync/unique_queue.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/modules/sync/unique_queue.go b/modules/sync/unique_queue.go index 6b007594d2679..f6e648ab78cd3 100644 --- a/modules/sync/unique_queue.go +++ b/modules/sync/unique_queue.go @@ -48,7 +48,7 @@ func (q *UniqueQueue) Exist(id interface{}) bool { // AddFunc adds new instance to the queue with a custom runnable function, // the queue is blocked until the function exits. func (q *UniqueQueue) AddFunc(id interface{}, fn func()) { - q.AddCtxFunc(nil, id, fn) + q.AddCtxFunc(context.Background(), id, fn) } // AddCtxFunc adds new instance to the queue with a custom runnable function, @@ -66,11 +66,6 @@ func (q *UniqueQueue) AddCtxFunc(ctx context.Context, id interface{}, fn func()) fn() } q.table.lock.Unlock() - if ctx == nil { - q.queue <- idStr - return true - - } select { case <-ctx.Done(): return false From 90bde75b13e6593a249fa7a2de6855f4eaa96fee Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Thu, 12 Dec 2019 16:52:40 +0000 Subject: [PATCH 16/32] Add FIXME note to indexer/code/bleve.go --- modules/indexer/code/bleve.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index f67bbe823db0e..ab544e3a2eea0 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -35,6 +35,7 @@ func InitRepoIndexer() { return } waitChannel := make(chan time.Duration) + // FIXME: graceful: This should use a persistable queue repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength) go func() { start := time.Now() @@ -88,6 +89,7 @@ func populateRepoIndexerAsynchronously() error { // populateRepoIndexer populate the repo indexer with pre-existing data. This // should only be run when the indexer is created for the first time. +// FIXME: graceful: This should use a persistable queue func populateRepoIndexer(maxRepoID int64) { log.Info("Populating the repo indexer with existing repositories") From 13a808df77852a7787b6c1617ee7801da6e7e48a Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Thu, 12 Dec 2019 16:55:55 +0000 Subject: [PATCH 17/32] Say CheckRepoStats: Aborting due to Shutdown instead --- models/repo.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/models/repo.go b/models/repo.go index b58381e8d7178..98f94c19a764e 100644 --- a/models/repo.go +++ b/models/repo.go @@ -2475,7 +2475,7 @@ func repoStatsCheck(ctx context.Context, checker *repoChecker) { for _, result := range results { select { case <-ctx.Done(): - log.Warn("Aborting due to shutdown") + log.Warn("CheckRepoStats: Aborting due to shutdown") return default: } @@ -2527,7 +2527,7 @@ func CheckRepoStats(ctx context.Context) { for i := range checkers { select { case <-ctx.Done(): - log.Warn("Aborting due to shutdown") + log.Warn("CheckRepoStats: Aborting due to shutdown") return default: repoStatsCheck(ctx, checkers[i]) @@ -2543,7 +2543,7 @@ func CheckRepoStats(ctx context.Context) { for _, result := range results { select { case <-ctx.Done(): - log.Warn("Aborting due to shutdown") + log.Warn("CheckRepoStats: Aborting due to shutdown") return default: } @@ -2566,7 +2566,7 @@ func CheckRepoStats(ctx context.Context) { for _, result := range results { select { case <-ctx.Done(): - log.Warn("Aborting due to shutdown") + log.Warn("CheckRepoStats: Aborting due to shutdown") return default: } @@ -2589,7 +2589,7 @@ func CheckRepoStats(ctx context.Context) { for _, result := range results { select { case <-ctx.Done(): - log.Warn("Aborting due to shutdown") + log.Warn("CheckRepoStats: Aborting due to shutdown") return default: } From f1c85e05bf2897629cf5e2c5d08fa05a417bf072 Mon Sep 17 00:00:00 2001 From: zeripath Date: Thu, 12 Dec 2019 17:56:57 +0000 Subject: [PATCH 18/32] Update modules/sync/unique_queue.go --- modules/sync/unique_queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/sync/unique_queue.go b/modules/sync/unique_queue.go index f6e648ab78cd3..ce9bbaa57429a 100644 --- a/modules/sync/unique_queue.go +++ b/modules/sync/unique_queue.go @@ -1,5 +1,5 @@ // Copyright 2016 The Gogs Authors. All rights reserved. -// Copyright 2019 The Gogs Authors. All rights reserved. +// Copyright 2019 The Gitea Authors. All rights reserved. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. From d55db9edde222cf5a39382809ac02b6755e5f6ea Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Fri, 13 Dec 2019 09:59:13 +0000 Subject: [PATCH 19/32] Make channels at start up rather than delayed --- modules/graceful/manager.go | 50 +++-------------------------- modules/graceful/manager_unix.go | 15 +++++++-- modules/graceful/manager_windows.go | 9 ++++++ 3 files changed, 26 insertions(+), 48 deletions(-) diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go index 6d133cacdfbc1..2aa11a391b3b4 100644 --- a/modules/graceful/manager.go +++ b/modules/graceful/manager.go @@ -156,9 +156,6 @@ func (g *Manager) doShutdown() { return } g.lock.Lock() - if g.shutdown == nil { - g.shutdown = make(chan struct{}) - } close(g.shutdown) g.lock.Unlock() @@ -184,9 +181,6 @@ func (g *Manager) doShutdown() { func (g *Manager) doHammerTime(d time.Duration) { time.Sleep(d) g.lock.Lock() - if g.hammer == nil { - g.hammer = make(chan struct{}) - } select { case <-g.hammer: default: @@ -201,10 +195,12 @@ func (g *Manager) doTerminate() { return } g.lock.Lock() - if g.terminate == nil { - g.terminate = make(chan struct{}) + select { + case <-g.terminate: + default: + log.Warn("Terminating") + close(g.terminate) } - close(g.terminate) g.lock.Unlock() } @@ -217,15 +213,6 @@ func (g *Manager) IsChild() bool { // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate func (g *Manager) IsShutdown() <-chan struct{} { g.lock.RLock() - if g.shutdown == nil { - g.lock.RUnlock() - g.lock.Lock() - if g.shutdown == nil { - g.shutdown = make(chan struct{}) - } - defer g.lock.Unlock() - return g.shutdown - } defer g.lock.RUnlock() return g.shutdown } @@ -236,15 +223,6 @@ func (g *Manager) IsShutdown() <-chan struct{} { // if not shutdown already func (g *Manager) IsHammer() <-chan struct{} { g.lock.RLock() - if g.hammer == nil { - g.lock.RUnlock() - g.lock.Lock() - if g.hammer == nil { - g.hammer = make(chan struct{}) - } - defer g.lock.Unlock() - return g.hammer - } defer g.lock.RUnlock() return g.hammer } @@ -254,15 +232,6 @@ func (g *Manager) IsHammer() <-chan struct{} { // IsTerminate will only close once all running servers have stopped func (g *Manager) IsTerminate() <-chan struct{} { g.lock.RLock() - if g.terminate == nil { - g.lock.RUnlock() - g.lock.Lock() - if g.terminate == nil { - g.terminate = make(chan struct{}) - } - defer g.lock.Unlock() - return g.terminate - } defer g.lock.RUnlock() return g.terminate } @@ -322,15 +291,6 @@ func (g *Manager) InformCleanup() { // Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating func (g *Manager) Done() <-chan struct{} { g.lock.RLock() - if g.done == nil { - g.lock.RUnlock() - g.lock.Lock() - if g.done == nil { - g.done = make(chan struct{}) - } - defer g.lock.Unlock() - return g.done - } defer g.lock.RUnlock() return g.done } diff --git a/modules/graceful/manager_unix.go b/modules/graceful/manager_unix.go index 280e5ebae5153..323c6a4111da2 100644 --- a/modules/graceful/manager_unix.go +++ b/modules/graceful/manager_unix.go @@ -45,11 +45,20 @@ func newGracefulManager(ctx context.Context) *Manager { } func (g *Manager) start(ctx context.Context) { + // Make channels + g.terminate = make(chan struct{}) + g.shutdown = make(chan struct{}) + g.hammer = make(chan struct{}) + g.done = make(chan struct{}) + + // Set the running state & handle signals g.setState(stateRunning) go g.handleSignals(ctx) - c := make(chan struct{}) + + // Handle clean up of unused provided listeners and delayed start-up + startupDone := make(chan struct{}) go func() { - defer close(c) + defer close(startupDone) // Wait till we're done getting all of the listeners and then close // the unused ones g.createServerWaitGroup.Wait() @@ -60,7 +69,7 @@ func (g *Manager) start(ctx context.Context) { if setting.StartupTimeout > 0 { go func() { select { - case <-c: + case <-startupDone: return case <-g.IsShutdown(): func() { diff --git a/modules/graceful/manager_windows.go b/modules/graceful/manager_windows.go index 2826d2536cb18..526fc0bd24c75 100644 --- a/modules/graceful/manager_windows.go +++ b/modules/graceful/manager_windows.go @@ -57,10 +57,19 @@ func newGracefulManager(ctx context.Context) *Manager { } func (g *Manager) start() { + // Make channels + g.terminate = make(chan struct{}) + g.shutdown = make(chan struct{}) + g.hammer = make(chan struct{}) + g.done = make(chan struct{}) + + // Set the running state g.setState(stateRunning) if skip, _ := strconv.ParseBool(os.Getenv("SKIP_MINWINSVC")); skip { return } + + // Make SVC process run := svc.Run isInteractive, err := svc.IsAnInteractiveSession() if err != nil { From c93f1a74b78569602068211a391da9a8b9cc22da Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Fri, 13 Dec 2019 16:19:15 +0000 Subject: [PATCH 20/32] Make repo indexer shutdown safely --- models/repo_indexer.go | 29 +++++++++++++++++++++++++++++ modules/indexer/code/bleve.go | 19 +++++-------------- modules/indexer/code/repo.go | 16 +++++++++++++++- 3 files changed, 49 insertions(+), 15 deletions(-) diff --git a/models/repo_indexer.go b/models/repo_indexer.go index 138ef54d33d4d..66a6792a03684 100644 --- a/models/repo_indexer.go +++ b/models/repo_indexer.go @@ -4,6 +4,10 @@ package models +import ( + "xorm.io/builder" +) + // RepoIndexerStatus status of a repo's entry in the repo indexer // For now, implicitly refers to default branch type RepoIndexerStatus struct { @@ -12,6 +16,31 @@ type RepoIndexerStatus struct { CommitSha string `xorm:"VARCHAR(40)"` } +// GetUnindexedRepos returns repos which do not have an indexer status +func GetUnindexedRepos(maxRepoID int64, page, pageSize int) ([]int64, error) { + ids := make([]int64, 0, 50) + cond := builder.Cond(builder.IsNull{ + "repo_indexer_status.id", + }) + sess := x.Table("repo").Join("LEFT OUTER", "repo_indexer_status", "repo.id = repo_indexer_status.repoID") + if maxRepoID > 0 { + cond = builder.And(cond, builder.Lte{ + "repo.id": maxRepoID, + }) + } + if page >= 0 && pageSize > 0 { + start := 0 + if page > 0 { + start = (page - 1) * pageSize + } + sess.Limit(pageSize, start) + } + + sess.Where(cond).Cols("repo.id") + err := sess.Find(&ids) + return ids, err +} + // GetIndexerStatus loads repo codes indxer status func (repo *Repository) GetIndexerStatus() error { if repo.IndexerStatus != nil { diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index ab544e3a2eea0..7500fe876e7b6 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -72,13 +72,6 @@ func populateRepoIndexerAsynchronously() error { return nil } - // if there is any existing repo indexer metadata in the DB, delete it - // since we are starting afresh. Also, xorm requires deletes to have a - // condition, and we want to delete everything, thus 1=1. - if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { - return err - } - var maxRepoID int64 if maxRepoID, err = models.GetMaxID("repository"); err != nil { return err @@ -105,15 +98,14 @@ func populateRepoIndexer(maxRepoID int64) { return default: } - repos := make([]*models.Repository, 0, models.RepositoryListDefaultPageSize) - err := models.FindByMaxID(maxRepoID, models.RepositoryListDefaultPageSize, &repos) + ids, err := models.GetUnindexedRepos(maxRepoID, 0, 50) if err != nil { log.Error("populateRepoIndexer: %v", err) return - } else if len(repos) == 0 { + } else if len(ids) == 0 { break } - for _, repo := range repos { + for _, id := range ids { select { case <-isShutdown: log.Info("Repository Indexer population shutdown before completion") @@ -121,13 +113,12 @@ func populateRepoIndexer(maxRepoID int64) { default: } repoIndexerOperationQueue <- repoIndexerOperation{ - repoID: repo.ID, + repoID: id, deleted: false, } - maxRepoID = repo.ID - 1 } } - log.Info("Done populating the repo indexer with existing repositories") + log.Info("Done (re)populating the repo indexer with existing repositories") } func updateRepoIndexer(repoID int64) error { diff --git a/modules/indexer/code/repo.go b/modules/indexer/code/repo.go index 12304eaf693f8..bc5f317b7d4e4 100644 --- a/modules/indexer/code/repo.go +++ b/modules/indexer/code/repo.go @@ -10,6 +10,7 @@ import ( "strings" "sync" + "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" @@ -112,16 +113,29 @@ func initRepoIndexer(populateIndexer func() error) { if indexer != nil { indexerHolder.set(indexer) closeAtTerminate() + + // Continue population from where left off + if err = populateIndexer(); err != nil { + log.Fatal("PopulateRepoIndex: %v", err) + } return } if err = createRepoIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion); err != nil { log.Fatal("CreateRepoIndexer: %v", err) } + closeAtTerminate() + + // if there is any existing repo indexer metadata in the DB, delete it + // since we are starting afresh. Also, xorm requires deletes to have a + // condition, and we want to delete everything, thus 1=1. + if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { + log.Fatal("DeleteAllRepoIndexerStatus: %v", err) + } + if err = populateIndexer(); err != nil { log.Fatal("PopulateRepoIndex: %v", err) } - closeAtTerminate() } func closeAtTerminate() { From 4411cae30310d25f01ae64fb0365e9c8428e4b1a Mon Sep 17 00:00:00 2001 From: zeripath Date: Fri, 13 Dec 2019 20:41:39 +0000 Subject: [PATCH 21/32] Update manager.go Remove unnecessary locking --- modules/graceful/manager.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go index 2aa11a391b3b4..23fe576e10eca 100644 --- a/modules/graceful/manager.go +++ b/modules/graceful/manager.go @@ -212,8 +212,6 @@ func (g *Manager) IsChild() bool { // IsShutdown returns a channel which will be closed at shutdown. // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate func (g *Manager) IsShutdown() <-chan struct{} { - g.lock.RLock() - defer g.lock.RUnlock() return g.shutdown } @@ -222,8 +220,6 @@ func (g *Manager) IsShutdown() <-chan struct{} { // Servers running within the running server wait group should respond to IsHammer // if not shutdown already func (g *Manager) IsHammer() <-chan struct{} { - g.lock.RLock() - defer g.lock.RUnlock() return g.hammer } @@ -231,8 +227,6 @@ func (g *Manager) IsHammer() <-chan struct{} { // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate // IsTerminate will only close once all running servers have stopped func (g *Manager) IsTerminate() <-chan struct{} { - g.lock.RLock() - defer g.lock.RUnlock() return g.terminate } @@ -290,8 +284,6 @@ func (g *Manager) InformCleanup() { // Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating func (g *Manager) Done() <-chan struct{} { - g.lock.RLock() - defer g.lock.RUnlock() return g.done } From cb2ac9b1fe9c595ee18ca3bad3f690c6a73ff39c Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 14 Dec 2019 10:01:02 +0000 Subject: [PATCH 22/32] oops --- models/repo_indexer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/models/repo_indexer.go b/models/repo_indexer.go index 66a6792a03684..1a886a78c3e37 100644 --- a/models/repo_indexer.go +++ b/models/repo_indexer.go @@ -22,10 +22,10 @@ func GetUnindexedRepos(maxRepoID int64, page, pageSize int) ([]int64, error) { cond := builder.Cond(builder.IsNull{ "repo_indexer_status.id", }) - sess := x.Table("repo").Join("LEFT OUTER", "repo_indexer_status", "repo.id = repo_indexer_status.repoID") + sess := x.Table("repository").Join("LEFT OUTER", "repo_indexer_status", "repository.id = repo_indexer_status.repo_id") if maxRepoID > 0 { cond = builder.And(cond, builder.Lte{ - "repo.id": maxRepoID, + "repository.id": maxRepoID, }) } if page >= 0 && pageSize > 0 { @@ -36,7 +36,7 @@ func GetUnindexedRepos(maxRepoID int64, page, pageSize int) ([]int64, error) { sess.Limit(pageSize, start) } - sess.Where(cond).Cols("repo.id") + sess.Where(cond).Cols("repository.id") err := sess.Find(&ids) return ids, err } From 607f2c2ac0b0f27f8e9d4c346eb29885ee185470 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 14 Dec 2019 10:22:09 +0000 Subject: [PATCH 23/32] Better error reporting --- models/repo_indexer.go | 11 ++++++++--- modules/indexer/code/bleve.go | 12 ++++++------ 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/models/repo_indexer.go b/models/repo_indexer.go index 1a886a78c3e37..08dd2d31bedf7 100644 --- a/models/repo_indexer.go +++ b/models/repo_indexer.go @@ -5,6 +5,8 @@ package models import ( + "fmt" + "xorm.io/builder" ) @@ -60,15 +62,18 @@ func (repo *Repository) GetIndexerStatus() error { // UpdateIndexerStatus updates indexer status func (repo *Repository) UpdateIndexerStatus(sha string) error { if err := repo.GetIndexerStatus(); err != nil { - return err + return fmt.Errorf("UpdateIndexerStatus: Unable to getIndexerStatus for repo: %s/%s Error: %v", repo.MustOwnerName(), repo.Name, err) } if len(repo.IndexerStatus.CommitSha) == 0 { repo.IndexerStatus.CommitSha = sha _, err := x.Insert(repo.IndexerStatus) - return err + return fmt.Errorf("UpdateIndexerStatus: Unable to insert repoIndexerStatus for repo: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err) } repo.IndexerStatus.CommitSha = sha _, err := x.ID(repo.IndexerStatus.ID).Cols("commit_sha"). Update(repo.IndexerStatus) - return err + if err != nil { + return fmt.Errorf("UpdateIndexerStatus: Unable to update repoIndexerStatus for repo: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err) + } + return nil } diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index 7500fe876e7b6..57dadf5dad7bc 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -124,16 +124,16 @@ func populateRepoIndexer(maxRepoID int64) { func updateRepoIndexer(repoID int64) error { repo, err := models.GetRepositoryByID(repoID) if err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepositoryByID: %d, Error: %v", repoID, err)) } sha, err := getDefaultBranchSha(repo) if err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to GetDefaultBranchSha for: %s/%s, Error: %v", repo.MustOwnerName(), repo.Name, err)) } changes, err := getRepoChanges(repo, sha) if err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepoChanges for: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err) } else if changes == nil { return nil } @@ -141,16 +141,16 @@ func updateRepoIndexer(repoID int64) error { batch := RepoIndexerBatch() for _, update := range changes.Updates { if err := addUpdate(update, repo, batch); err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to addUpdate to: %s/%s Sha: %s, update: %s(%s) Error: %v", repo.MustOwnerName(), repo.Name, sha, update.Filename, update.BlobSha, err)) } } for _, filename := range changes.RemovedFilenames { if err := addDelete(filename, repo, batch); err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to addDelete to: %s/%s Sha: %s, filename: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, filename, err) } } if err = batch.Flush(); err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to flush batch to indexer for repo: %s/%s Error: %v", repo.MustOwnerName(), repo.Name, err) } return repo.UpdateIndexerStatus(sha) } From a4a722fc529d0fcc3f917bf96498070fba16ab20 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 14 Dec 2019 10:27:30 +0000 Subject: [PATCH 24/32] fixup --- modules/indexer/code/bleve.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index 57dadf5dad7bc..cefd07cf7f05d 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -124,12 +124,12 @@ func populateRepoIndexer(maxRepoID int64) { func updateRepoIndexer(repoID int64) error { repo, err := models.GetRepositoryByID(repoID) if err != nil { - return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepositoryByID: %d, Error: %v", repoID, err)) + return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepositoryByID: %d, Error: %v", repoID, err) } sha, err := getDefaultBranchSha(repo) if err != nil { - return fmt.Errorf("UpdateRepoIndexer: Unable to GetDefaultBranchSha for: %s/%s, Error: %v", repo.MustOwnerName(), repo.Name, err)) + return fmt.Errorf("UpdateRepoIndexer: Unable to GetDefaultBranchSha for: %s/%s, Error: %v", repo.MustOwnerName(), repo.Name, err) } changes, err := getRepoChanges(repo, sha) if err != nil { @@ -141,7 +141,7 @@ func updateRepoIndexer(repoID int64) error { batch := RepoIndexerBatch() for _, update := range changes.Updates { if err := addUpdate(update, repo, batch); err != nil { - return fmt.Errorf("UpdateRepoIndexer: Unable to addUpdate to: %s/%s Sha: %s, update: %s(%s) Error: %v", repo.MustOwnerName(), repo.Name, sha, update.Filename, update.BlobSha, err)) + return fmt.Errorf("UpdateRepoIndexer: Unable to addUpdate to: %s/%s Sha: %s, update: %s(%s) Error: %v", repo.MustOwnerName(), repo.Name, sha, update.Filename, update.BlobSha, err) } } for _, filename := range changes.RemovedFilenames { From a9269a5060f7d17661ff64b357ecefda7c80a817 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 14 Dec 2019 11:08:37 +0000 Subject: [PATCH 25/32] Push TestPullRequests to the UniqueQueue and make UniqueQueue closable --- models/pull_list.go | 9 +++--- modules/sync/unique_queue.go | 63 +++++++++++++++++++++++------------- services/mirror/mirror.go | 3 +- services/pull/check.go | 45 ++++++++------------------ 4 files changed, 62 insertions(+), 58 deletions(-) diff --git a/models/pull_list.go b/models/pull_list.go index 49d04ba0b8238..1376978353f63 100644 --- a/models/pull_list.go +++ b/models/pull_list.go @@ -68,11 +68,12 @@ func GetUnmergedPullRequestsByBaseInfo(repoID int64, branch string) ([]*PullRequ Find(&prs) } -// GetPullRequestsByCheckStatus returns all pull requests according the special checking status. -func GetPullRequestsByCheckStatus(status PullRequestStatus) ([]*PullRequest, error) { - prs := make([]*PullRequest, 0, 10) - return prs, x. +// GetPullRequestIDsByCheckStatus returns all pull requests according the special checking status. +func GetPullRequestIDsByCheckStatus(status PullRequestStatus) ([]int64, error) { + prs := make([]int64, 0, 10) + return prs, x.Table("pull_request"). Where("status=?", status). + Cols("pull_request.id"). Find(&prs) } diff --git a/modules/sync/unique_queue.go b/modules/sync/unique_queue.go index ce9bbaa57429a..43aa5b6dee77d 100644 --- a/modules/sync/unique_queue.go +++ b/modules/sync/unique_queue.go @@ -6,8 +6,6 @@ package sync import ( - "context" - "github.com/unknwon/com" ) @@ -18,8 +16,9 @@ import ( // This queue is particularly useful for preventing duplicated task // of same purpose. type UniqueQueue struct { - table *StatusTable - queue chan string + table *StatusTable + queue chan string + closed chan struct{} } // NewUniqueQueue initializes and returns a new UniqueQueue object. @@ -29,11 +28,43 @@ func NewUniqueQueue(queueLength int) *UniqueQueue { } return &UniqueQueue{ - table: NewStatusTable(), - queue: make(chan string, queueLength), + table: NewStatusTable(), + queue: make(chan string, queueLength), + closed: make(chan struct{}), + } +} + +// Close closes this queue +func (q *UniqueQueue) Close() { + select { + case <-q.closed: + default: + q.table.lock.Lock() + select { + case <-q.closed: + default: + close(q.closed) + } + q.table.lock.Unlock() } } +// IsClosed returns a channel that is closed when this Queue is closed +func (q *UniqueQueue) IsClosed() <-chan struct{} { + return q.closed +} + +// IDs returns the current ids in the pool +func (q *UniqueQueue) IDs() []interface{} { + q.table.lock.Lock() + defer q.table.lock.Unlock() + ids := make([]interface{}, 0, len(q.table.pool)) + for id := range q.table.pool { + ids = append(ids, id) + } + return ids +} + // Queue returns channel of queue for retrieving instances. func (q *UniqueQueue) Queue() <-chan string { return q.queue @@ -48,15 +79,8 @@ func (q *UniqueQueue) Exist(id interface{}) bool { // AddFunc adds new instance to the queue with a custom runnable function, // the queue is blocked until the function exits. func (q *UniqueQueue) AddFunc(id interface{}, fn func()) { - q.AddCtxFunc(context.Background(), id, fn) -} - -// AddCtxFunc adds new instance to the queue with a custom runnable function, -// the queue is blocked until the function exits. If the context is done before -// the id is added to the queue it will not be added and false will be returned. -func (q *UniqueQueue) AddCtxFunc(ctx context.Context, id interface{}, fn func()) bool { if q.Exist(id) { - return true + return } idStr := com.ToStr(id) @@ -67,10 +91,10 @@ func (q *UniqueQueue) AddCtxFunc(ctx context.Context, id interface{}, fn func()) } q.table.lock.Unlock() select { - case <-ctx.Done(): - return false + case <-q.closed: + return case q.queue <- idStr: - return true + return } } @@ -79,11 +103,6 @@ func (q *UniqueQueue) Add(id interface{}) { q.AddFunc(id, nil) } -// AddCtx adds new instance to the queue with a context - if the context is done before the id is added to the queue it is cancelled -func (q *UniqueQueue) AddCtx(ctx context.Context, id interface{}) bool { - return q.AddCtxFunc(ctx, id, nil) -} - // Remove removes instance from the queue. func (q *UniqueQueue) Remove(id interface{}) { q.table.Stop(com.ToStr(id)) diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go index b1678da62c531..0f7cefe565b9b 100644 --- a/services/mirror/mirror.go +++ b/services/mirror/mirror.go @@ -308,7 +308,7 @@ func Update(ctx context.Context) { case <-ctx.Done(): return fmt.Errorf("Aborted due to shutdown") default: - _ = mirrorQueue.AddCtx(ctx, m.RepoID) + mirrorQueue.Add(m.RepoID) return nil } }); err != nil { @@ -323,6 +323,7 @@ func SyncMirrors(ctx context.Context) { for { select { case <-ctx.Done(): + mirrorQueue.Close() return case repoID := <-mirrorQueue.Queue(): syncMirror(repoID) diff --git a/services/pull/check.go b/services/pull/check.go index dcaa394dc11aa..7344f071ac23e 100644 --- a/services/pull/check.go +++ b/services/pull/check.go @@ -154,37 +154,22 @@ func manuallyMerged(pr *models.PullRequest) bool { // TestPullRequests checks and tests untested patches of pull requests. // TODO: test more pull requests at same time. func TestPullRequests(ctx context.Context) { - prs, err := models.GetPullRequestsByCheckStatus(models.PullRequestStatusChecking) - if err != nil { - log.Error("Find Checking PRs: %v", err) - return - } - - var checkedPRs = make(map[int64]struct{}) - - // Update pull request status. - for _, pr := range prs { - checkedPRs[pr.ID] = struct{}{} - if err := pr.GetBaseRepo(); err != nil { - log.Error("GetBaseRepo: %v", err) - continue - } - if manuallyMerged(pr) { - continue - } - if err := TestPatch(pr); err != nil { - log.Error("testPatch: %v", err) - continue - } - checkAndUpdateStatus(pr) - select { - case <-ctx.Done(): - log.Info("PID: %d Pull Request testing shutdown", os.Getpid()) + go func() { + prs, err := models.GetPullRequestIDsByCheckStatus(models.PullRequestStatusChecking) + if err != nil { + log.Error("Find Checking PRs: %v", err) return - default: } - } + for _, prID := range prs { + select { + case <-ctx.Done(): + return + default: + pullRequestQueue.Add(prID) + } + } + }() // Start listening on new test requests. for { @@ -194,9 +179,6 @@ func TestPullRequests(ctx context.Context) { pullRequestQueue.Remove(prID) id := com.StrTo(prID).MustInt64() - if _, ok := checkedPRs[id]; ok { - continue - } pr, err := models.GetPullRequestByID(id) if err != nil { @@ -210,6 +192,7 @@ func TestPullRequests(ctx context.Context) { } checkAndUpdateStatus(pr) case <-ctx.Done(): + pullRequestQueue.Close() log.Info("PID: %d Pull Request testing shutdown", os.Getpid()) return } From 3b97005f512493a54ad9223873181f0283e1a9a3 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 14 Dec 2019 11:14:25 +0000 Subject: [PATCH 26/32] Ensure webhook queue is also closed to prevent blockage on add here --- modules/webhook/deliver.go | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/webhook/deliver.go b/modules/webhook/deliver.go index f982689a48574..9f5c938f8302c 100644 --- a/modules/webhook/deliver.go +++ b/modules/webhook/deliver.go @@ -177,6 +177,7 @@ func DeliverHooks(ctx context.Context) { for { select { case <-ctx.Done(): + hookQueue.Close() return case repoIDStr := <-hookQueue.Queue(): log.Trace("DeliverHooks [repo_id: %v]", repoIDStr) From c227a34ddffcab05f234385ece77a1b9508e8a62 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 14 Dec 2019 12:19:53 +0000 Subject: [PATCH 27/32] Remove unnecessary channel check --- modules/graceful/manager.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go index 23fe576e10eca..eec675e297d5f 100644 --- a/modules/graceful/manager.go +++ b/modules/graceful/manager.go @@ -170,9 +170,6 @@ func (g *Manager) doShutdown() { g.doTerminate() g.WaitForTerminate() g.lock.Lock() - if g.done == nil { - g.done = make(chan struct{}) - } close(g.done) g.lock.Unlock() }() From 0e38ea8583392b6135413cd36131b27ad2560bd4 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 14 Dec 2019 12:36:01 +0000 Subject: [PATCH 28/32] Double check that we are not trying to add id to the table again --- modules/sync/unique_queue.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/modules/sync/unique_queue.go b/modules/sync/unique_queue.go index 43aa5b6dee77d..86078ee126696 100644 --- a/modules/sync/unique_queue.go +++ b/modules/sync/unique_queue.go @@ -85,6 +85,9 @@ func (q *UniqueQueue) AddFunc(id interface{}, fn func()) { idStr := com.ToStr(id) q.table.lock.Lock() + if _, ok := q.table.pool[idStr]; ok { + return + } q.table.pool[idStr] = struct{}{} if fn != nil { fn() From 244b54a5b842b3e5c8883871bbf8348f9bff255e Mon Sep 17 00:00:00 2001 From: zeripath Date: Sat, 14 Dec 2019 23:52:29 +0000 Subject: [PATCH 29/32] Sort results of getunindexrepos by descending id --- models/repo_indexer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/repo_indexer.go b/models/repo_indexer.go index 08dd2d31bedf7..279de4a129467 100644 --- a/models/repo_indexer.go +++ b/models/repo_indexer.go @@ -38,7 +38,7 @@ func GetUnindexedRepos(maxRepoID int64, page, pageSize int) ([]int64, error) { sess.Limit(pageSize, start) } - sess.Where(cond).Cols("repository.id") + sess.Where(cond).Cols("repository.id").Desc("repository.id") err := sess.Find(&ids) return ids, err } From 2098dd26a9dfd1ba7c2808e9163a45661a23a675 Mon Sep 17 00:00:00 2001 From: zeripath Date: Sat, 14 Dec 2019 23:56:12 +0000 Subject: [PATCH 30/32] Ensure repos only added once to queue --- modules/indexer/code/bleve.go | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index cefd07cf7f05d..80285f105d42b 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -116,6 +116,7 @@ func populateRepoIndexer(maxRepoID int64) { repoID: id, deleted: false, } + maxRepoID = id - 1 } } log.Info("Done (re)populating the repo indexer with existing repositories") From d42cf0da342f8b74b113eeea6601d95e5a00d311 Mon Sep 17 00:00:00 2001 From: zeripath Date: Sun, 15 Dec 2019 00:49:29 +0000 Subject: [PATCH 31/32] D'oh --- models/repo_indexer.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/models/repo_indexer.go b/models/repo_indexer.go index 279de4a129467..aee3c74b35d40 100644 --- a/models/repo_indexer.go +++ b/models/repo_indexer.go @@ -67,7 +67,10 @@ func (repo *Repository) UpdateIndexerStatus(sha string) error { if len(repo.IndexerStatus.CommitSha) == 0 { repo.IndexerStatus.CommitSha = sha _, err := x.Insert(repo.IndexerStatus) - return fmt.Errorf("UpdateIndexerStatus: Unable to insert repoIndexerStatus for repo: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err) + if err != nil { + return fmt.Errorf("UpdateIndexerStatus: Unable to insert repoIndexerStatus for repo: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err) + } + return nil } repo.IndexerStatus.CommitSha = sha _, err := x.ID(repo.IndexerStatus.ID).Cols("commit_sha"). From be4388c3fd7d307c2467abea99ed69a84ff7e26c Mon Sep 17 00:00:00 2001 From: zeripath Date: Sun, 15 Dec 2019 01:17:58 +0000 Subject: [PATCH 32/32] Skip duplicate exist check in addfunc As per @guillep2k. We check if the id is in the pool under the write lock, there's no need to pre check under the read lock --- modules/sync/unique_queue.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/modules/sync/unique_queue.go b/modules/sync/unique_queue.go index 86078ee126696..14644c7d4e0c6 100644 --- a/modules/sync/unique_queue.go +++ b/modules/sync/unique_queue.go @@ -79,10 +79,6 @@ func (q *UniqueQueue) Exist(id interface{}) bool { // AddFunc adds new instance to the queue with a custom runnable function, // the queue is blocked until the function exits. func (q *UniqueQueue) AddFunc(id interface{}, fn func()) { - if q.Exist(id) { - return - } - idStr := com.ToStr(id) q.table.lock.Lock() if _, ok := q.table.pool[idStr]; ok {