@@ -3,6 +3,7 @@ package queue_test
3
3
import (
4
4
"fmt"
5
5
"sync"
6
+ "sync/atomic"
6
7
"time"
7
8
8
9
"github.com/google/uuid"
@@ -22,6 +23,7 @@ type QueueConnectionHandler struct {
22
23
err error
23
24
mutex sync.Mutex
24
25
masterUpdated chan struct {}
26
+ masterCnt int32
25
27
}
26
28
27
29
// QueueConnectionHandler implements the ConnectionHandler interface.
@@ -87,6 +89,7 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
87
89
}
88
90
}
89
91
92
+ atomic .AddInt32 (& h .masterCnt , 1 )
90
93
fmt .Printf ("Master %s is ready to work!\n " , conn .Addr ())
91
94
92
95
return nil
@@ -95,6 +98,9 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
95
98
// Deactivated doesn't do anything useful for the example.
96
99
func (h * QueueConnectionHandler ) Deactivated (conn * tarantool.Connection ,
97
100
role connection_pool.Role ) error {
101
+ if role == connection_pool .MasterRole {
102
+ atomic .AddInt32 (& h .masterCnt , - 1 )
103
+ }
98
104
return nil
99
105
}
100
106
@@ -184,8 +190,18 @@ func Example_connectionPool() {
184
190
return
185
191
}
186
192
193
+ for i := 0 ; i < 2 && atomic .LoadInt32 (& h .masterCnt ) != 1 ; i ++ {
194
+ // The pool does not immediately detect role switching. It may happen
195
+ // that requests will be sent to RO instances. In that case q.Take()
196
+ // method will return a nil value.
197
+ //
198
+ // We need to make the example test output deterministic so we need to
199
+ // avoid it here. But in real life, you need to take this into account.
200
+ time .Sleep (poolOpts .CheckTimeout )
201
+ }
202
+
187
203
// Take a data from the new master instance.
188
- task , err := q .TakeTimeout ( 1 * time . Second )
204
+ task , err := q .Take ( )
189
205
if err != nil {
190
206
fmt .Println ("Unable to got task:" , err )
191
207
} else if task == nil {
0 commit comments