Skip to content

Commit 4e7b1c3

Browse files
committed
bugfix: prevent recreate connection after Close()
The ConnectionPool.checker() goroutine may still work some time after ConnectionPool.Close() call. It may lead to re-open connection in a concurrent closing pool. The connection still opened after the pool is closed. The patch adds RWLock to protect blocks which work with anyPool, roPool and rwPool. We don't need to protect regular requests because in the worst case, we will send a request into a closed connection. It can happen for other reasons and it seems like we can't avoid it. So it is an expected behavior. Closes #208
1 parent 605abb8 commit 4e7b1c3

File tree

4 files changed

+78
-42
lines changed

4 files changed

+78
-42
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
2222
- Segmentation faults in ConnectionPool requests after disconnect (#208)
2323
- Addresses in ConnectionPool may be changed from an external code (#208)
2424
- ConnectionPool recreates connections too often (#208)
25+
- A connection is still opened after ConnectionPool.Close() (#208)
2526

2627
## [1.8.0] - 2022-08-17
2728

connection_pool/connection_pool.go

Lines changed: 51 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"errors"
1515
"fmt"
1616
"log"
17-
"sync/atomic"
17+
"sync"
1818
"time"
1919

2020
"github.com/tarantool/go-tarantool"
@@ -69,12 +69,13 @@ type ConnectionPool struct {
6969
connOpts tarantool.Opts
7070
opts OptsPool
7171

72-
notify chan tarantool.ConnEvent
73-
state State
74-
control chan struct{}
75-
roPool *RoundRobinStrategy
76-
rwPool *RoundRobinStrategy
77-
anyPool *RoundRobinStrategy
72+
notify chan tarantool.ConnEvent
73+
state state
74+
control chan struct{}
75+
roPool *RoundRobinStrategy
76+
rwPool *RoundRobinStrategy
77+
anyPool *RoundRobinStrategy
78+
poolsMutex sync.RWMutex
7879
}
7980

8081
// ConnectWithOpts creates pool for instances with addresses addrs
@@ -100,6 +101,7 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (co
100101
connOpts: connOpts,
101102
opts: opts,
102103
notify: notify,
104+
state: unknownState,
103105
control: make(chan struct{}),
104106
rwPool: rwPool,
105107
roPool: roPool,
@@ -109,10 +111,12 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (co
109111

110112
somebodyAlive := connPool.fillPools()
111113
if !somebodyAlive {
112-
connPool.Close()
114+
connPool.state.set(closedState)
115+
connPool.closeImpl()
113116
return nil, ErrNoConnection
114117
}
115118

119+
connPool.state.set(connectedState)
116120
go connPool.checker()
117121

118122
return connPool, nil
@@ -128,7 +132,10 @@ func Connect(addrs []string, connOpts tarantool.Opts) (connPool *ConnectionPool,
128132

129133
// ConnectedNow gets connected status of pool.
130134
func (connPool *ConnectionPool) ConnectedNow(mode Mode) (bool, error) {
131-
if connPool.getState() != connConnected {
135+
connPool.poolsMutex.RLock()
136+
defer connPool.poolsMutex.RUnlock()
137+
138+
if connPool.state.get() != connectedState {
132139
return false, nil
133140
}
134141
switch mode {
@@ -157,10 +164,8 @@ func (connPool *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, err
157164
return conn.ConfiguredTimeout(), nil
158165
}
159166

160-
// Close closes connections in pool.
161-
func (connPool *ConnectionPool) Close() []error {
167+
func (connPool *ConnectionPool) closeImpl() []error {
162168
close(connPool.control)
163-
connPool.state = connClosed
164169

165170
errs := make([]error, 0, len(connPool.addrs))
166171

@@ -177,6 +182,17 @@ func (connPool *ConnectionPool) Close() []error {
177182
return errs
178183
}
179184

185+
// Close closes connections in pool.
186+
func (connPool *ConnectionPool) Close() []error {
187+
if connPool.state.cas(connectedState, closedState) {
188+
connPool.poolsMutex.Lock()
189+
defer connPool.poolsMutex.Unlock()
190+
191+
return connPool.closeImpl()
192+
}
193+
return nil
194+
}
195+
180196
// GetAddrs gets addresses of connections in pool.
181197
func (connPool *ConnectionPool) GetAddrs() []string {
182198
cpy := make([]string, len(connPool.addrs))
@@ -188,6 +204,13 @@ func (connPool *ConnectionPool) GetAddrs() []string {
188204
func (connPool *ConnectionPool) GetPoolInfo() map[string]*ConnectionInfo {
189205
info := make(map[string]*ConnectionInfo)
190206

207+
connPool.poolsMutex.RLock()
208+
defer connPool.poolsMutex.RUnlock()
209+
210+
if connPool.state.get() != connectedState {
211+
return info
212+
}
213+
191214
for _, addr := range connPool.addrs {
192215
conn, role := connPool.getConnectionFromPool(addr)
193216
if conn != nil {
@@ -638,11 +661,9 @@ func (connPool *ConnectionPool) getConnectionFromPool(addr string) (*tarantool.C
638661
func (connPool *ConnectionPool) deleteConnectionFromPool(addr string) {
639662
_ = connPool.anyPool.DeleteConnByAddr(addr)
640663
conn := connPool.rwPool.DeleteConnByAddr(addr)
641-
if conn != nil {
642-
return
664+
if conn == nil {
665+
connPool.roPool.DeleteConnByAddr(addr)
643666
}
644-
645-
connPool.roPool.DeleteConnByAddr(addr)
646667
}
647668

648669
func (connPool *ConnectionPool) setConnectionToPool(addr string, conn *tarantool.Connection) error {
@@ -689,39 +710,39 @@ func (connPool *ConnectionPool) refreshConnection(addr string) {
689710
}
690711

691712
func (connPool *ConnectionPool) checker() {
692-
693713
timer := time.NewTicker(connPool.opts.CheckTimeout)
694714
defer timer.Stop()
695715

696-
for connPool.getState() != connClosed {
716+
for connPool.state.get() != closedState {
697717
select {
698718
case <-connPool.control:
699719
return
700720
case e := <-connPool.notify:
701-
if connPool.getState() == connClosed {
702-
return
703-
}
704-
if e.Conn.ClosedNow() {
721+
connPool.poolsMutex.Lock()
722+
if connPool.state.get() == connectedState && e.Conn.ClosedNow() {
705723
connPool.deleteConnectionFromPool(e.Conn.Addr())
706724
}
725+
connPool.poolsMutex.Unlock()
707726
case <-timer.C:
708-
for _, addr := range connPool.addrs {
709-
if connPool.getState() == connClosed {
710-
return
727+
connPool.poolsMutex.Lock()
728+
if connPool.state.get() == connectedState {
729+
for _, addr := range connPool.addrs {
730+
// Reopen connection
731+
// Relocate connection between subpools
732+
// if ro/rw was updated
733+
connPool.refreshConnection(addr)
711734
}
712-
713-
// Reopen connection
714-
// Relocate connection between subpools
715-
// if ro/rw was updated
716-
connPool.refreshConnection(addr)
717735
}
736+
connPool.poolsMutex.Unlock()
718737
}
719738
}
720739
}
721740

722741
func (connPool *ConnectionPool) fillPools() bool {
723742
somebodyAlive := false
724743

744+
// It is called before checker() goroutine and before closeImpl() may be
745+
// called so we don't expect concurrency issues here.
725746
for _, addr := range connPool.addrs {
726747
conn, err := tarantool.Connect(addr, connPool.connOpts)
727748
if err != nil {
@@ -740,10 +761,6 @@ func (connPool *ConnectionPool) fillPools() bool {
740761
return somebodyAlive
741762
}
742763

743-
func (connPool *ConnectionPool) getState() uint32 {
744-
return atomic.LoadUint32((*uint32)(&connPool.state))
745-
}
746-
747764
func (connPool *ConnectionPool) getNextConnection(mode Mode) (*tarantool.Connection, error) {
748765

749766
switch mode {

connection_pool/const.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,3 @@ const (
3434
MasterRole // The instance is read-write mode.
3535
ReplicaRole // The instance is in read-only mode.
3636
)
37-
38-
type State uint32
39-
40-
// pool state
41-
const (
42-
connConnected = iota
43-
connClosed
44-
)

connection_pool/state.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package connection_pool
2+
3+
import (
4+
"sync/atomic"
5+
)
6+
7+
// pool state
8+
type state uint32
9+
10+
const (
11+
unknownState state = iota
12+
connectedState
13+
closedState
14+
)
15+
16+
func (s *state) set(news state) {
17+
atomic.StoreUint32((*uint32)(s), uint32(news))
18+
}
19+
20+
func (s *state) cas(olds, news state) bool {
21+
return atomic.CompareAndSwapUint32((*uint32)(s), uint32(olds), uint32(news))
22+
}
23+
24+
func (s *state) get() state {
25+
return state(atomic.LoadUint32((*uint32)(s)))
26+
}

0 commit comments

Comments
 (0)