There is a potential memory leak in `Workerpool` due to the intricacies of `time.Timer` stopping. Whenever a `time.Timer` is `Stop`ped its channel must be cleared using a `select` if the result of the `Stop()` is `false`. Unfortunately in `Workerpool` these were checked the wrong way round. However, there were a few other places that were not being checked. Signed-off-by: Andrew Thornton <art27@cantab.net> Co-authored-by: techknowlogick <techknowlogick@gitea.io> Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>tags/v1.21.12.1
| @@ -19,6 +19,7 @@ import ( | |||||
| "code.gitea.io/gitea/modules/git" | "code.gitea.io/gitea/modules/git" | ||||
| "code.gitea.io/gitea/modules/private" | "code.gitea.io/gitea/modules/private" | ||||
| "code.gitea.io/gitea/modules/setting" | "code.gitea.io/gitea/modules/setting" | ||||
| "code.gitea.io/gitea/modules/util" | |||||
| "github.com/urfave/cli" | "github.com/urfave/cli" | ||||
| ) | ) | ||||
| @@ -113,15 +114,8 @@ func (d *delayWriter) Close() error { | |||||
| if d == nil { | if d == nil { | ||||
| return nil | return nil | ||||
| } | } | ||||
| stopped := d.timer.Stop() | |||||
| if stopped { | |||||
| return nil | |||||
| } | |||||
| select { | |||||
| case <-d.timer.C: | |||||
| default: | |||||
| } | |||||
| if d.buf == nil { | |||||
| stopped := util.StopTimer(d.timer) | |||||
| if stopped || d.buf == nil { | |||||
| return nil | return nil | ||||
| } | } | ||||
| _, err := d.internal.Write(d.buf.Bytes()) | _, err := d.internal.Write(d.buf.Bytes()) | ||||
| @@ -16,6 +16,7 @@ import ( | |||||
| "code.gitea.io/gitea/modules/log" | "code.gitea.io/gitea/modules/log" | ||||
| "code.gitea.io/gitea/modules/migrations/base" | "code.gitea.io/gitea/modules/migrations/base" | ||||
| "code.gitea.io/gitea/modules/structs" | "code.gitea.io/gitea/modules/structs" | ||||
| "code.gitea.io/gitea/modules/util" | |||||
| "github.com/google/go-github/v24/github" | "github.com/google/go-github/v24/github" | ||||
| "golang.org/x/oauth2" | "golang.org/x/oauth2" | ||||
| @@ -121,7 +122,7 @@ func (g *GithubDownloaderV3) sleep() { | |||||
| timer := time.NewTimer(time.Until(g.rate.Reset.Time)) | timer := time.NewTimer(time.Until(g.rate.Reset.Time)) | ||||
| select { | select { | ||||
| case <-g.ctx.Done(): | case <-g.ctx.Done(): | ||||
| timer.Stop() | |||||
| util.StopTimer(timer) | |||||
| return | return | ||||
| case <-timer.C: | case <-timer.C: | ||||
| } | } | ||||
| @@ -12,6 +12,7 @@ import ( | |||||
| "time" | "time" | ||||
| "code.gitea.io/gitea/modules/log" | "code.gitea.io/gitea/modules/log" | ||||
| "code.gitea.io/gitea/modules/util" | |||||
| ) | ) | ||||
| // WrappedQueueType is the type for a wrapped delayed starting queue | // WrappedQueueType is the type for a wrapped delayed starting queue | ||||
| @@ -88,7 +89,7 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h | |||||
| t := time.NewTimer(sleepTime) | t := time.NewTimer(sleepTime) | ||||
| select { | select { | ||||
| case <-ctx.Done(): | case <-ctx.Done(): | ||||
| t.Stop() | |||||
| util.StopTimer(t) | |||||
| case <-t.C: | case <-t.C: | ||||
| } | } | ||||
| } | } | ||||
| @@ -11,6 +11,7 @@ import ( | |||||
| "time" | "time" | ||||
| "code.gitea.io/gitea/modules/log" | "code.gitea.io/gitea/modules/log" | ||||
| "code.gitea.io/gitea/modules/util" | |||||
| ) | ) | ||||
| // WorkerPool represent a dynamically growable worker pool for a | // WorkerPool represent a dynamically growable worker pool for a | ||||
| @@ -92,12 +93,7 @@ func (p *WorkerPool) pushBoost(data Data) { | |||||
| p.lock.Unlock() | p.lock.Unlock() | ||||
| select { | select { | ||||
| case p.dataChan <- data: | case p.dataChan <- data: | ||||
| if timer.Stop() { | |||||
| select { | |||||
| case <-timer.C: | |||||
| default: | |||||
| } | |||||
| } | |||||
| util.StopTimer(timer) | |||||
| case <-timer.C: | case <-timer.C: | ||||
| p.lock.Lock() | p.lock.Lock() | ||||
| if p.blockTimeout > ourTimeout || (p.numberOfWorkers > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0) { | if p.blockTimeout > ourTimeout || (p.numberOfWorkers > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0) { | ||||
| @@ -353,12 +349,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { | |||||
| timer := time.NewTimer(delay) | timer := time.NewTimer(delay) | ||||
| select { | select { | ||||
| case <-ctx.Done(): | case <-ctx.Done(): | ||||
| if timer.Stop() { | |||||
| select { | |||||
| case <-timer.C: | |||||
| default: | |||||
| } | |||||
| } | |||||
| util.StopTimer(timer) | |||||
| if len(data) > 0 { | if len(data) > 0 { | ||||
| log.Trace("Handling: %d data, %v", len(data), data) | log.Trace("Handling: %d data, %v", len(data), data) | ||||
| p.handle(data...) | p.handle(data...) | ||||
| @@ -367,12 +358,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { | |||||
| log.Trace("Worker shutting down") | log.Trace("Worker shutting down") | ||||
| return | return | ||||
| case datum, ok := <-p.dataChan: | case datum, ok := <-p.dataChan: | ||||
| if timer.Stop() { | |||||
| select { | |||||
| case <-timer.C: | |||||
| default: | |||||
| } | |||||
| } | |||||
| util.StopTimer(timer) | |||||
| if !ok { | if !ok { | ||||
| // the dataChan has been closed - we should finish up: | // the dataChan has been closed - we should finish up: | ||||
| if len(data) > 0 { | if len(data) > 0 { | ||||
| @@ -0,0 +1,21 @@ | |||||
| // Copyright 2020 The Gitea Authors. All rights reserved. | |||||
| // Use of this source code is governed by a MIT-style | |||||
| // license that can be found in the LICENSE file. | |||||
| package util | |||||
| import ( | |||||
| "time" | |||||
| ) | |||||
| // StopTimer is a utility function to safely stop a time.Timer and clean its channel | |||||
| func StopTimer(t *time.Timer) bool { | |||||
| stopped := t.Stop() | |||||
| if !stopped { | |||||
| select { | |||||
| case <-t.C: | |||||
| default: | |||||
| } | |||||
| } | |||||
| return stopped | |||||
| } | |||||