@@ -32,17 +32,35 @@ var (
32
32
ErrNoHealthyInstance = errors .New ("can't find healthy instance in pool" )
33
33
)
34
34
35
+ // ConnectionListener provides an observer semantics for components interested
36
+ // in knowing the role changes of connections in a ConnectionPool.
37
+ type ConnectionListener interface {
38
+ // Added is called before a connection with a role added into a list of
39
+ // active connections.
40
+ Added (conn * tarantool.Connection , role Role )
41
+ // Removed is called after a connection with a role removed from a list
42
+ // of active connections.
43
+ Removed (conn * tarantool.Connection , role Role )
44
+ // Updated is called if a connection with a role updated in a list
45
+ // of active connections.
46
+ Updated (conn * tarantool.Connection , oldRole , newRole Role )
47
+ }
48
+
35
49
/*
36
50
Additional options (configurable via ConnectWithOpts):
37
51
38
52
- CheckTimeout - time interval to check for connection timeout and try to switch connection.
53
+
54
+ - ConnectionListener - a listener for connection updates.
39
55
*/
40
56
type OptsPool struct {
41
57
// timeout for timer to reopen connections
42
58
// that have been closed by some events and
43
59
// to relocate connection between subpools
44
60
// if ro/rw role has been updated
45
61
CheckTimeout time.Duration
62
+ // ConnectionListener provides an ability to observe connection updates.
63
+ ConnectionListener ConnectionListener
46
64
}
47
65
48
66
/*
@@ -174,9 +192,22 @@ func (connPool *ConnectionPool) closeImpl() []error {
174
192
if err := conn .Close (); err != nil {
175
193
errs = append (errs , err )
176
194
}
195
+ if conn := connPool .rwPool .DeleteConnByAddr (addr ); conn != nil {
196
+ if connPool .opts .ConnectionListener != nil {
197
+ connPool .opts .ConnectionListener .Removed (conn , MasterRole )
198
+ }
199
+ continue
200
+ }
201
+ if conn := connPool .roPool .DeleteConnByAddr (addr ); conn != nil {
202
+ if connPool .opts .ConnectionListener != nil {
203
+ connPool .opts .ConnectionListener .Removed (conn , ReplicaRole )
204
+ }
205
+ continue
206
+ }
207
+ if connPool .opts .ConnectionListener != nil {
208
+ connPool .opts .ConnectionListener .Removed (conn , UnknownRole )
209
+ }
177
210
}
178
- connPool .rwPool .DeleteConnByAddr (addr )
179
- connPool .roPool .DeleteConnByAddr (addr )
180
211
}
181
212
182
213
return errs
@@ -660,10 +691,22 @@ func (connPool *ConnectionPool) getConnectionFromPool(addr string) (*tarantool.C
660
691
}
661
692
662
693
func (connPool * ConnectionPool ) deleteConnectionFromPool (addr string ) {
663
- _ = connPool .anyPool .DeleteConnByAddr (addr )
664
- conn := connPool .rwPool .DeleteConnByAddr (addr )
665
- if conn == nil {
666
- connPool .roPool .DeleteConnByAddr (addr )
694
+ if conn := connPool .anyPool .DeleteConnByAddr (addr ); conn != nil {
695
+ if conn := connPool .rwPool .DeleteConnByAddr (addr ); conn != nil {
696
+ if connPool .opts .ConnectionListener != nil {
697
+ connPool .opts .ConnectionListener .Removed (conn , MasterRole )
698
+ }
699
+ return
700
+ }
701
+ if conn := connPool .roPool .DeleteConnByAddr (addr ); conn != nil {
702
+ if connPool .opts .ConnectionListener != nil {
703
+ connPool .opts .ConnectionListener .Removed (conn , ReplicaRole )
704
+ }
705
+ return
706
+ }
707
+ if connPool .opts .ConnectionListener != nil {
708
+ connPool .opts .ConnectionListener .Removed (conn , UnknownRole )
709
+ }
667
710
}
668
711
}
669
712
@@ -673,6 +716,10 @@ func (connPool *ConnectionPool) setConnectionToPool(addr string, conn *tarantool
673
716
return err
674
717
}
675
718
719
+ if connPool .opts .ConnectionListener != nil {
720
+ connPool .opts .ConnectionListener .Added (conn , role )
721
+ }
722
+
676
723
connPool .anyPool .AddConn (addr , conn )
677
724
678
725
switch role {
@@ -685,16 +732,32 @@ func (connPool *ConnectionPool) setConnectionToPool(addr string, conn *tarantool
685
732
return nil
686
733
}
687
734
735
+ func (connPool * ConnectionPool ) updateConnectionInPool (addr string , conn * tarantool.Connection , oldRole Role , newRole Role ) {
736
+ switch oldRole {
737
+ case MasterRole :
738
+ connPool .rwPool .DeleteConnByAddr (addr )
739
+ case ReplicaRole :
740
+ connPool .roPool .DeleteConnByAddr (addr )
741
+ }
742
+
743
+ if connPool .opts .ConnectionListener != nil {
744
+ connPool .opts .ConnectionListener .Updated (conn , oldRole , newRole )
745
+ }
746
+
747
+ switch newRole {
748
+ case MasterRole :
749
+ connPool .rwPool .AddConn (addr , conn )
750
+ case ReplicaRole :
751
+ connPool .roPool .AddConn (addr , conn )
752
+ }
753
+ }
754
+
688
755
func (connPool * ConnectionPool ) refreshConnection (addr string ) {
689
756
if conn , oldRole := connPool .getConnectionFromPool (addr ); conn != nil {
690
757
if ! conn .ClosedNow () {
691
- curRole , _ := connPool .getConnectionRole (conn )
692
- if oldRole != curRole {
693
- connPool .deleteConnectionFromPool (addr )
694
- err := connPool .setConnectionToPool (addr , conn )
695
- if err != nil {
696
- conn .Close ()
697
- log .Printf ("tarantool: storing connection to %s failed: %s\n " , addr , err .Error ())
758
+ if curRole , err := connPool .getConnectionRole (conn ); err == nil {
759
+ if oldRole != curRole {
760
+ connPool .updateConnectionInPool (addr , conn , oldRole , curRole )
698
761
}
699
762
}
700
763
}
0 commit comments