Skip to content

Commit 2a91428

Browse files
committed
api: add ConnectionListener to the connection_pool
ConnectionListener provides an observer semantics for components interested in knowing the role changes of connections in a ConnectionPool. Closes #178
1 parent adb2bfd commit 2a91428

File tree

3 files changed

+200
-13
lines changed

3 files changed

+200
-13
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1111
### Added
1212

1313
- Support queue 1.2.0 (#177)
14+
- RoleListener interface for knowing the role changes of connections in
15+
ConnectionPool (#178)
1416

1517
### Changed
1618

connection_pool/connection_pool.go

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,35 @@ var (
3232
ErrNoHealthyInstance = errors.New("can't find healthy instance in pool")
3333
)
3434

35+
// ConnectionListener provides an observer semantics for components interested
36+
// in knowing the role changes of connections in a ConnectionPool.
37+
type ConnectionListener interface {
38+
// Added is called before a connection with a role added into a list of
39+
// active connections.
40+
Added(conn *tarantool.Connection, role Role)
41+
// Removed is called after a connection with a role removed from a list
42+
// of active connections.
43+
Removed(conn *tarantool.Connection, role Role)
44+
// Updated is called if a connection with a role updated in a list
45+
// of active connections.
46+
Updated(conn *tarantool.Connection, oldRole, newRole Role)
47+
}
48+
3549
/*
3650
Additional options (configurable via ConnectWithOpts):
3751
3852
- CheckTimeout - time interval to check for connection timeout and try to switch connection.
53+
54+
- ConnectionListener - a listener for connection updates.
3955
*/
4056
type OptsPool struct {
4157
// timeout for timer to reopen connections
4258
// that have been closed by some events and
4359
// to relocate connection between subpools
4460
// if ro/rw role has been updated
4561
CheckTimeout time.Duration
62+
// ConnectionListener provides an ability to observe connection updates.
63+
ConnectionListener ConnectionListener
4664
}
4765

4866
/*
@@ -174,9 +192,22 @@ func (connPool *ConnectionPool) closeImpl() []error {
174192
if err := conn.Close(); err != nil {
175193
errs = append(errs, err)
176194
}
195+
if conn := connPool.rwPool.DeleteConnByAddr(addr); conn != nil {
196+
if connPool.opts.ConnectionListener != nil {
197+
connPool.opts.ConnectionListener.Removed(conn, MasterRole)
198+
}
199+
continue
200+
}
201+
if conn := connPool.roPool.DeleteConnByAddr(addr); conn != nil {
202+
if connPool.opts.ConnectionListener != nil {
203+
connPool.opts.ConnectionListener.Removed(conn, ReplicaRole)
204+
}
205+
continue
206+
}
207+
if connPool.opts.ConnectionListener != nil {
208+
connPool.opts.ConnectionListener.Removed(conn, UnknownRole)
209+
}
177210
}
178-
connPool.rwPool.DeleteConnByAddr(addr)
179-
connPool.roPool.DeleteConnByAddr(addr)
180211
}
181212

182213
return errs
@@ -659,10 +690,22 @@ func (connPool *ConnectionPool) getConnectionFromPool(addr string) (*tarantool.C
659690
}
660691

661692
func (connPool *ConnectionPool) deleteConnectionFromPool(addr string) {
662-
_ = connPool.anyPool.DeleteConnByAddr(addr)
663-
conn := connPool.rwPool.DeleteConnByAddr(addr)
664-
if conn == nil {
665-
connPool.roPool.DeleteConnByAddr(addr)
693+
if conn := connPool.anyPool.DeleteConnByAddr(addr); conn != nil {
694+
if conn := connPool.rwPool.DeleteConnByAddr(addr); conn != nil {
695+
if connPool.opts.ConnectionListener != nil {
696+
connPool.opts.ConnectionListener.Removed(conn, MasterRole)
697+
}
698+
return
699+
}
700+
if conn := connPool.roPool.DeleteConnByAddr(addr); conn != nil {
701+
if connPool.opts.ConnectionListener != nil {
702+
connPool.opts.ConnectionListener.Removed(conn, ReplicaRole)
703+
}
704+
return
705+
}
706+
if connPool.opts.ConnectionListener != nil {
707+
connPool.opts.ConnectionListener.Removed(conn, UnknownRole)
708+
}
666709
}
667710
}
668711

@@ -672,6 +715,10 @@ func (connPool *ConnectionPool) setConnectionToPool(addr string, conn *tarantool
672715
return err
673716
}
674717

718+
if connPool.opts.ConnectionListener != nil {
719+
connPool.opts.ConnectionListener.Added(conn, role)
720+
}
721+
675722
connPool.anyPool.AddConn(addr, conn)
676723

677724
switch role {
@@ -684,16 +731,32 @@ func (connPool *ConnectionPool) setConnectionToPool(addr string, conn *tarantool
684731
return nil
685732
}
686733

734+
func (connPool *ConnectionPool) updateConnectionInPool(addr string, conn *tarantool.Connection, oldRole Role, newRole Role) {
735+
switch oldRole {
736+
case MasterRole:
737+
connPool.rwPool.DeleteConnByAddr(addr)
738+
case ReplicaRole:
739+
connPool.roPool.DeleteConnByAddr(addr)
740+
}
741+
742+
if connPool.opts.ConnectionListener != nil {
743+
connPool.opts.ConnectionListener.Updated(conn, oldRole, newRole)
744+
}
745+
746+
switch newRole {
747+
case MasterRole:
748+
connPool.rwPool.AddConn(addr, conn)
749+
case ReplicaRole:
750+
connPool.roPool.AddConn(addr, conn)
751+
}
752+
}
753+
687754
func (connPool *ConnectionPool) refreshConnection(addr string) {
688755
if conn, oldRole := connPool.getConnectionFromPool(addr); conn != nil {
689756
if !conn.ClosedNow() {
690-
curRole, _ := connPool.getConnectionRole(conn)
691-
if oldRole != curRole {
692-
connPool.deleteConnectionFromPool(addr)
693-
err := connPool.setConnectionToPool(addr, conn)
694-
if err != nil {
695-
conn.Close()
696-
log.Printf("tarantool: storing connection to %s failed: %s\n", addr, err.Error())
757+
if curRole, err := connPool.getConnectionRole(conn); err == nil {
758+
if oldRole != curRole {
759+
connPool.updateConnectionInPool(addr, conn, oldRole, curRole)
697760
}
698761
}
699762
}

connection_pool/connection_pool_test.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,128 @@ func TestClose(t *testing.T) {
208208
require.Nil(t, err)
209209
}
210210

211+
type testListener struct {
212+
added, closed, updated int
213+
errs []error
214+
}
215+
216+
func (tl *testListener) addErr(err error) {
217+
tl.errs = append(tl.errs, err)
218+
}
219+
220+
func (tl *testListener) Added(conn *tarantool.Connection, role connection_pool.Role) {
221+
tl.added++
222+
223+
if conn == nil {
224+
tl.addErr(fmt.Errorf("added conn == nil"))
225+
return
226+
}
227+
228+
addr := conn.Addr()
229+
if addr == servers[0] {
230+
if role != connection_pool.MasterRole {
231+
tl.addErr(fmt.Errorf("unexpected added role %d for addr %s", role, addr))
232+
}
233+
} else if addr == servers[1] {
234+
if role != connection_pool.ReplicaRole {
235+
tl.addErr(fmt.Errorf("unexpected added role %d for addr %s", role, addr))
236+
}
237+
} else {
238+
tl.addErr(fmt.Errorf("unexpected added addr %s", addr))
239+
}
240+
}
241+
242+
func (tl *testListener) Removed(conn *tarantool.Connection, role connection_pool.Role) {
243+
tl.closed++
244+
245+
if conn == nil {
246+
tl.addErr(fmt.Errorf("removed conn == nil"))
247+
return
248+
}
249+
250+
addr := conn.Addr()
251+
if addr == servers[0] || addr == servers[1] {
252+
if role != connection_pool.ReplicaRole {
253+
tl.addErr(fmt.Errorf("unexpected removed role %d for addr %s", role, addr))
254+
}
255+
} else {
256+
tl.addErr(fmt.Errorf("unexpected removed addr %s", addr))
257+
}
258+
}
259+
260+
func (tl *testListener) Updated(conn *tarantool.Connection, oldRole, newRole connection_pool.Role) {
261+
tl.updated++
262+
263+
if oldRole != connection_pool.MasterRole {
264+
tl.addErr(fmt.Errorf("unexpected oldRole = %d", oldRole))
265+
}
266+
267+
if newRole != connection_pool.ReplicaRole {
268+
tl.addErr(fmt.Errorf("unexpected newRole = %d", newRole))
269+
}
270+
271+
if conn == nil {
272+
tl.addErr(fmt.Errorf("updated conn == nil"))
273+
return
274+
}
275+
276+
addr := conn.Addr()
277+
if addr != servers[0] {
278+
tl.addErr(fmt.Errorf("unexpected updated addr: %s", addr))
279+
}
280+
}
281+
282+
func TestConnectionListener(t *testing.T) {
283+
poolServers := []string{servers[0], servers[1]}
284+
roles := []bool{false, true}
285+
286+
err := test_helpers.SetClusterRO(poolServers, connOpts, roles)
287+
require.Nilf(t, err, "fail to set roles for cluster")
288+
289+
tl := &testListener{}
290+
poolOpts := connection_pool.OptsPool{
291+
CheckTimeout: 100 * time.Microsecond,
292+
ConnectionListener: tl,
293+
}
294+
connPool, err := connection_pool.ConnectWithOpts(poolServers, connOpts, poolOpts)
295+
require.Nilf(t, err, "failed to connect")
296+
require.NotNilf(t, connPool, "conn is nil after Connect")
297+
298+
_, err = connPool.Call17("box.cfg", []interface{}{map[string]bool{
299+
"read_only": true,
300+
}}, connection_pool.RW)
301+
require.Nilf(t, err, "failed to make ro")
302+
for i := 0; i < 10; i++ {
303+
if tl.updated == 1 {
304+
break
305+
}
306+
time.Sleep(poolOpts.CheckTimeout)
307+
}
308+
309+
connPool.Close()
310+
311+
args := test_helpers.CheckStatusesArgs{
312+
ConnPool: connPool,
313+
Mode: connection_pool.ANY,
314+
Servers: poolServers,
315+
ExpectedPoolStatus: false,
316+
ExpectedStatuses: map[string]bool{
317+
poolServers[0]: false,
318+
poolServers[1]: false,
319+
},
320+
}
321+
322+
err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, defaultCountRetry, defaultTimeoutRetry)
323+
require.Nil(t, err)
324+
325+
for _, err := range tl.errs {
326+
t.Errorf("Unexpected error: %s", err)
327+
}
328+
require.Equalf(t, tl.added, len(poolServers), "unexpected added count")
329+
require.Equalf(t, tl.closed, len(poolServers), "unexpected closed count")
330+
require.Equalf(t, tl.updated, 1, "unexpected updated count")
331+
}
332+
211333
func TestRequestOnClosed(t *testing.T) {
212334
server1 := servers[0]
213335
server2 := servers[1]

0 commit comments

Comments
 (0)