Skip to content

Commit 5d1c0ae

Browse files
leastrequest: fix data race in leastrequest picker (#6606)
Co-authored-by: Huang Chong <hchtgh315@gmail.com>
1 parent e26457d commit 5d1c0ae

File tree

1 file changed

+7
-7
lines changed

1 file changed

+7
-7
lines changed

balancer/leastrequest/leastrequest.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func (bb) Name() string {
8080
}
8181

8282
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
83-
b := &leastRequestBalancer{scRPCCounts: make(map[balancer.SubConn]*int32)}
83+
b := &leastRequestBalancer{scRPCCounts: make(map[balancer.SubConn]*atomic.Int32)}
8484
baseBuilder := base.NewBalancerBuilder(Name, b, base.Config{HealthCheck: true})
8585
b.Balancer = baseBuilder.Build(cc, bOpts)
8686
return b
@@ -92,7 +92,7 @@ type leastRequestBalancer struct {
9292
balancer.Balancer
9393

9494
choiceCount uint32
95-
scRPCCounts map[balancer.SubConn]*int32 // Hold onto RPC counts to keep track for subsequent picker updates.
95+
scRPCCounts map[balancer.SubConn]*atomic.Int32 // Hold onto RPC counts to keep track for subsequent picker updates.
9696
}
9797

9898
func (lrb *leastRequestBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
@@ -108,7 +108,7 @@ func (lrb *leastRequestBalancer) UpdateClientConnState(s balancer.ClientConnStat
108108

109109
type scWithRPCCount struct {
110110
sc balancer.SubConn
111-
numRPCs *int32
111+
numRPCs *atomic.Int32
112112
}
113113

114114
func (lrb *leastRequestBalancer) Build(info base.PickerBuildInfo) balancer.Picker {
@@ -126,7 +126,7 @@ func (lrb *leastRequestBalancer) Build(info base.PickerBuildInfo) balancer.Picke
126126
// Create new refs if needed.
127127
for sc := range info.ReadySCs {
128128
if _, ok := lrb.scRPCCounts[sc]; !ok {
129-
lrb.scRPCCounts[sc] = new(int32)
129+
lrb.scRPCCounts[sc] = new(atomic.Int32)
130130
}
131131
}
132132

@@ -162,18 +162,18 @@ func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
162162
pickedSC = &sc
163163
continue
164164
}
165-
if *sc.numRPCs < *pickedSC.numRPCs {
165+
if sc.numRPCs.Load() < pickedSC.numRPCs.Load() {
166166
pickedSC = &sc
167167
}
168168
}
169169
// "The counter for a subchannel should be atomically incremented by one
170170
// after it has been successfully picked by the picker." - A48
171-
atomic.AddInt32(pickedSC.numRPCs, 1)
171+
pickedSC.numRPCs.Add(1)
172172
// "the picker should add a callback for atomically decrementing the
173173
// subchannel counter once the RPC finishes (regardless of Status code)." -
174174
// A48.
175175
done := func(balancer.DoneInfo) {
176-
atomic.AddInt32(pickedSC.numRPCs, -1)
176+
pickedSC.numRPCs.Add(-1)
177177
}
178178
return balancer.PickResult{
179179
SubConn: pickedSC.sc,

0 commit comments

Comments
 (0)