@@ -14,7 +14,7 @@ import (
14
14
"errors"
15
15
"fmt"
16
16
"log"
17
- "sync/atomic "
17
+ "sync"
18
18
"time"
19
19
20
20
"github.com/tarantool/go-tarantool"
@@ -69,12 +69,13 @@ type ConnectionPool struct {
69
69
connOpts tarantool.Opts
70
70
opts OptsPool
71
71
72
- notify chan tarantool.ConnEvent
73
- state State
74
- control chan struct {}
75
- roPool * RoundRobinStrategy
76
- rwPool * RoundRobinStrategy
77
- anyPool * RoundRobinStrategy
72
+ notify chan tarantool.ConnEvent
73
+ state state
74
+ control chan struct {}
75
+ roPool * RoundRobinStrategy
76
+ rwPool * RoundRobinStrategy
77
+ anyPool * RoundRobinStrategy
78
+ poolsMutex sync.RWMutex
78
79
}
79
80
80
81
// ConnectWithOpts creates pool for instances with addresses addrs
@@ -100,6 +101,7 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (co
100
101
connOpts : connOpts ,
101
102
opts : opts ,
102
103
notify : notify ,
104
+ state : unknownState ,
103
105
control : make (chan struct {}),
104
106
rwPool : rwPool ,
105
107
roPool : roPool ,
@@ -109,10 +111,12 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (co
109
111
110
112
somebodyAlive := connPool .fillPools ()
111
113
if ! somebodyAlive {
112
- connPool .Close ()
114
+ connPool .state .set (closedState )
115
+ connPool .closeImpl ()
113
116
return nil , ErrNoConnection
114
117
}
115
118
119
+ connPool .state .set (connectedState )
116
120
go connPool .checker ()
117
121
118
122
return connPool , nil
@@ -128,7 +132,10 @@ func Connect(addrs []string, connOpts tarantool.Opts) (connPool *ConnectionPool,
128
132
129
133
// ConnectedNow gets connected status of pool.
130
134
func (connPool * ConnectionPool ) ConnectedNow (mode Mode ) (bool , error ) {
131
- if connPool .getState () != connConnected {
135
+ connPool .poolsMutex .RLock ()
136
+ defer connPool .poolsMutex .RUnlock ()
137
+
138
+ if connPool .state .get () != connectedState {
132
139
return false , nil
133
140
}
134
141
switch mode {
@@ -157,10 +164,8 @@ func (connPool *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, err
157
164
return conn .ConfiguredTimeout (), nil
158
165
}
159
166
160
- // Close closes connections in pool.
161
- func (connPool * ConnectionPool ) Close () []error {
167
+ func (connPool * ConnectionPool ) closeImpl () []error {
162
168
close (connPool .control )
163
- connPool .state = connClosed
164
169
165
170
errs := make ([]error , 0 , len (connPool .addrs ))
166
171
@@ -177,6 +182,17 @@ func (connPool *ConnectionPool) Close() []error {
177
182
return errs
178
183
}
179
184
185
+ // Close closes connections in pool.
186
+ func (connPool * ConnectionPool ) Close () []error {
187
+ if connPool .state .cas (connectedState , closedState ) {
188
+ connPool .poolsMutex .Lock ()
189
+ defer connPool .poolsMutex .Unlock ()
190
+
191
+ return connPool .closeImpl ()
192
+ }
193
+ return nil
194
+ }
195
+
180
196
// GetAddrs gets addresses of connections in pool.
181
197
func (connPool * ConnectionPool ) GetAddrs () []string {
182
198
cpy := make ([]string , len (connPool .addrs ))
@@ -188,6 +204,13 @@ func (connPool *ConnectionPool) GetAddrs() []string {
188
204
func (connPool * ConnectionPool ) GetPoolInfo () map [string ]* ConnectionInfo {
189
205
info := make (map [string ]* ConnectionInfo )
190
206
207
+ connPool .poolsMutex .RLock ()
208
+ defer connPool .poolsMutex .RUnlock ()
209
+
210
+ if connPool .state .get () != connectedState {
211
+ return info
212
+ }
213
+
191
214
for _ , addr := range connPool .addrs {
192
215
conn , role := connPool .getConnectionFromPool (addr )
193
216
if conn != nil {
@@ -638,11 +661,9 @@ func (connPool *ConnectionPool) getConnectionFromPool(addr string) (*tarantool.C
638
661
func (connPool * ConnectionPool ) deleteConnectionFromPool (addr string ) {
639
662
_ = connPool .anyPool .DeleteConnByAddr (addr )
640
663
conn := connPool .rwPool .DeleteConnByAddr (addr )
641
- if conn ! = nil {
642
- return
664
+ if conn = = nil {
665
+ connPool . roPool . DeleteConnByAddr ( addr )
643
666
}
644
-
645
- connPool .roPool .DeleteConnByAddr (addr )
646
667
}
647
668
648
669
func (connPool * ConnectionPool ) setConnectionToPool (addr string , conn * tarantool.Connection ) error {
@@ -689,39 +710,39 @@ func (connPool *ConnectionPool) refreshConnection(addr string) {
689
710
}
690
711
691
712
func (connPool * ConnectionPool ) checker () {
692
-
693
713
timer := time .NewTicker (connPool .opts .CheckTimeout )
694
714
defer timer .Stop ()
695
715
696
- for connPool .getState () != connClosed {
716
+ for connPool .state . get () != closedState {
697
717
select {
698
718
case <- connPool .control :
699
719
return
700
720
case e := <- connPool .notify :
701
- if connPool .getState () == connClosed {
702
- return
703
- }
704
- if e .Conn .ClosedNow () {
721
+ connPool .poolsMutex .Lock ()
722
+ if connPool .state .get () == connectedState && e .Conn .ClosedNow () {
705
723
connPool .deleteConnectionFromPool (e .Conn .Addr ())
706
724
}
725
+ connPool .poolsMutex .Unlock ()
707
726
case <- timer .C :
708
- for _ , addr := range connPool .addrs {
709
- if connPool .getState () == connClosed {
710
- return
727
+ connPool .poolsMutex .Lock ()
728
+ if connPool .state .get () == connectedState {
729
+ for _ , addr := range connPool .addrs {
730
+ // Reopen connection
731
+ // Relocate connection between subpools
732
+ // if ro/rw was updated
733
+ connPool .refreshConnection (addr )
711
734
}
712
-
713
- // Reopen connection
714
- // Relocate connection between subpools
715
- // if ro/rw was updated
716
- connPool .refreshConnection (addr )
717
735
}
736
+ connPool .poolsMutex .Unlock ()
718
737
}
719
738
}
720
739
}
721
740
722
741
func (connPool * ConnectionPool ) fillPools () bool {
723
742
somebodyAlive := false
724
743
744
+ // It is called before checker() goroutine and before closeImpl() may be
745
+ // called so we don't expect concurrency issues here.
725
746
for _ , addr := range connPool .addrs {
726
747
conn , err := tarantool .Connect (addr , connPool .connOpts )
727
748
if err != nil {
@@ -740,10 +761,6 @@ func (connPool *ConnectionPool) fillPools() bool {
740
761
return somebodyAlive
741
762
}
742
763
743
- func (connPool * ConnectionPool ) getState () uint32 {
744
- return atomic .LoadUint32 ((* uint32 )(& connPool .state ))
745
- }
746
-
747
764
func (connPool * ConnectionPool ) getNextConnection (mode Mode ) (* tarantool.Connection , error ) {
748
765
749
766
switch mode {
0 commit comments