Skip to content

Commit b9dcbe5

Browse files
committed
x/sync/singleflight/v2: add package
This adds a new package, x/sync/singleflight/v2, which is a version of x/sync/singleflight that uses generics instead of `string`, and `interface{}` in its API. The biggest changes are to the `Do` methods, and the `Result` type. The `Do` methods now take a `comparable` generic type instead of `string` for its inputs (matching that of `map` and similar). The output type of the `Do` method and the `Val` field in `Result` are now types that user can specify instead `interface{}`. Along the way, some tests received modifications to remove some now-unneeded `fmt.Sprintf` calls or add an `empty` struct for nil return tests. Also, `ExampleGroup` also received some additions to make it clear that non-`string` input types are acceptable. This is following a similar pattern as discussed with the `math/rand/v2` project. There is, however, one difference in affordances between packages in the stdlib and outside of the stdlib that we try to accomadate here. Stdlib packages like `math/rand/v2` can rely on the Go compiler version being the one our package is released with and, for packages outside of the stdlib, the errors can sometimes be cryptic when the package needs a more modern Go compile than a user attempted to use. For instance, `singleflight/v2` has a build tag specifying the need for Go 1.18 or later to be compiled in its necessary code files. When an older compiler is used to build it, the user will get an error starting with "build constraints exclude all Go files in". This makes sense when you read all of the files, but a user may be using it as a transitive dependency and won't know whether the build tags aren't matching because their OS is unsupported or their CPU architecture or what. To ameliorate user confusion, an extra `notgo18.go` file has been added with build tags saying that it's only built if the Go compiler isn't version 1.18 or later. And that file has a compile error in it that will error like so: $ go1.17.2 build . # golang.org/x/sync/singleflight/v2 ./notgo18.go:16:25: undefined: REQUIRES_GO_1_18_OR_LATER That error message is an attempt to be more clear to users about what needs to change in their build. Another alternative to that would be to change x/sync's `go.mod` to require Go 1.18 or greater. However, the rest of the x/sync packages don't require Go 1.18, and that seems like too large of a breaking change for singleflight/v2 alone.
1 parent 93782cc commit b9dcbe5

File tree

3 files changed

+627
-0
lines changed

3 files changed

+627
-0
lines changed

singleflight/v2/notgo18.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright 2023 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
//go:build !go1.18
6+
// +build !go1.18
7+
8+
package singleflight // import "golang.org/x/sync/singleflight/v2"
9+
10+
// singleflight/v2 requires Go 1.18 or later for generics support. To avoid a
11+
// confusing "build constraints exclude all Go files in" compile error on Go
12+
// 1.17 and earlier, we add this file and the below code. The code will fail to
13+
// compile on Go 1.17 or earlier and should help folks understand what they need
14+
// to do.
15+
16+
const versionRequired = REQUIRES_GO_1_18_OR_LATER

singleflight/v2/singleflight.go

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
// Copyright 2023 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
//go:build go1.18
6+
// +build go1.18
7+
8+
// Package singleflight provides a duplicate function call suppression
9+
// mechanism.
10+
package singleflight // import "golang.org/x/sync/singleflight/v2"
11+
12+
import (
13+
"bytes"
14+
"errors"
15+
"fmt"
16+
"runtime"
17+
"runtime/debug"
18+
"sync"
19+
)
20+
21+
// errGoexit indicates the runtime.Goexit was called in
22+
// the user given function.
23+
var errGoexit = errors.New("runtime.Goexit was called")
24+
25+
// A panicError is an arbitrary value recovered from a panic
26+
// with the stack trace during the execution of given function.
27+
type panicError struct {
28+
value any
29+
stack []byte
30+
}
31+
32+
// Error implements error interface.
33+
func (p *panicError) Error() string {
34+
return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
35+
}
36+
37+
func newPanicError(v any) error {
38+
stack := debug.Stack()
39+
40+
// The first line of the stack trace is of the form "goroutine N [status]:"
41+
// but by the time the panic reaches Do the goroutine may no longer exist
42+
// and its status will have changed. Trim out the misleading line.
43+
if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
44+
stack = stack[line+1:]
45+
}
46+
return &panicError{value: v, stack: stack}
47+
}
48+
49+
// call is an in-flight or completed singleflight.Do call
50+
type call[V any] struct {
51+
wg sync.WaitGroup
52+
53+
// These fields are written once before the WaitGroup is done
54+
// and are only read after the WaitGroup is done.
55+
val V
56+
err error
57+
58+
// These fields are read and written with the singleflight
59+
// mutex held before the WaitGroup is done, and are read but
60+
// not written after the WaitGroup is done.
61+
dups int
62+
chans []chan<- Result[V]
63+
}
64+
65+
// Group represents a class of work and forms a namespace in
66+
// which units of work can be executed with duplicate suppression.
67+
type Group[K comparable, V any] struct {
68+
mu sync.Mutex // protects m
69+
m map[K]*call[V] // lazily initialized
70+
}
71+
72+
// Result holds the results of Do, so they can be passed
73+
// on a channel.
74+
type Result[V any] struct {
75+
Val V
76+
Err error
77+
Shared bool
78+
}
79+
80+
// Do executes and returns the results of the given function, making
81+
// sure that only one execution is in-flight for a given key at a
82+
// time. If a duplicate comes in, the duplicate caller waits for the
83+
// original to complete and receives the same results.
84+
// The return value shared indicates whether v was given to multiple callers.
85+
func (g *Group[K, V]) Do(key K, fn func() (V, error)) (V, error, bool) {
86+
g.mu.Lock()
87+
if g.m == nil {
88+
g.m = make(map[K]*call[V])
89+
}
90+
if c, ok := g.m[key]; ok {
91+
c.dups++
92+
g.mu.Unlock()
93+
c.wg.Wait()
94+
95+
if e, ok := c.err.(*panicError); ok {
96+
panic(e)
97+
} else if c.err == errGoexit {
98+
runtime.Goexit()
99+
}
100+
return c.val, c.err, true
101+
}
102+
c := new(call[V])
103+
c.wg.Add(1)
104+
g.m[key] = c
105+
g.mu.Unlock()
106+
107+
g.doCall(c, key, fn)
108+
return c.val, c.err, c.dups > 0
109+
}
110+
111+
// DoChan is like Do but returns a channel that will receive the
112+
// results when they are ready.
113+
//
114+
// The returned channel will not be closed.
115+
func (g *Group[K, V]) DoChan(key K, fn func() (V, error)) <-chan Result[V] {
116+
ch := make(chan Result[V], 1)
117+
g.mu.Lock()
118+
if g.m == nil {
119+
g.m = make(map[K]*call[V])
120+
}
121+
if c, ok := g.m[key]; ok {
122+
c.dups++
123+
c.chans = append(c.chans, ch)
124+
g.mu.Unlock()
125+
return ch
126+
}
127+
c := &call[V]{chans: []chan<- Result[V]{ch}}
128+
c.wg.Add(1)
129+
g.m[key] = c
130+
g.mu.Unlock()
131+
132+
go g.doCall(c, key, fn)
133+
134+
return ch
135+
}
136+
137+
// doCall handles the single call for a key.
138+
func (g *Group[K, V]) doCall(c *call[V], key K, fn func() (V, error)) {
139+
normalReturn := false
140+
recovered := false
141+
142+
// use double-defer to distinguish panic from runtime.Goexit,
143+
// more details see https://golang.org/cl/134395
144+
defer func() {
145+
// the given function invoked runtime.Goexit
146+
if !normalReturn && !recovered {
147+
c.err = errGoexit
148+
}
149+
150+
g.mu.Lock()
151+
defer g.mu.Unlock()
152+
c.wg.Done()
153+
if g.m[key] == c {
154+
delete(g.m, key)
155+
}
156+
157+
if e, ok := c.err.(*panicError); ok {
158+
// In order to prevent the waiting channels from being blocked forever,
159+
// needs to ensure that this panic cannot be recovered.
160+
if len(c.chans) > 0 {
161+
go panic(e)
162+
select {} // Keep this goroutine around so that it will appear in the crash dump.
163+
} else {
164+
panic(e)
165+
}
166+
} else if c.err == errGoexit {
167+
// Already in the process of goexit, no need to call again
168+
} else {
169+
// Normal return
170+
for _, ch := range c.chans {
171+
ch <- Result[V]{c.val, c.err, c.dups > 0}
172+
}
173+
}
174+
}()
175+
176+
func() {
177+
defer func() {
178+
if !normalReturn {
179+
// Ideally, we would wait to take a stack trace until we've determined
180+
// whether this is a panic or a runtime.Goexit.
181+
//
182+
// Unfortunately, the only way we can distinguish the two is to see
183+
// whether the recover stopped the goroutine from terminating, and by
184+
// the time we know that, the part of the stack trace relevant to the
185+
// panic has been discarded.
186+
if r := recover(); r != nil {
187+
c.err = newPanicError(r)
188+
}
189+
}
190+
}()
191+
192+
c.val, c.err = fn()
193+
normalReturn = true
194+
}()
195+
196+
if !normalReturn {
197+
recovered = true
198+
}
199+
}
200+
201+
// Forget tells the singleflight to forget about a key. Future calls
202+
// to Do for this key will call the function rather than waiting for
203+
// an earlier call to complete.
204+
func (g *Group[K, V]) Forget(key K) {
205+
g.mu.Lock()
206+
delete(g.m, key)
207+
g.mu.Unlock()
208+
}

0 commit comments

Comments
 (0)