Skip to content

Commit 3ddf87d

Browse files
committed
queue: add an example with connection_pool
The example demonstrates how to use the queue package with the connection_pool package. Closes #176
1 parent 12bc5e2 commit 3ddf87d

File tree

8 files changed

+291
-16
lines changed

8 files changed

+291
-16
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1515
ConnectionPool (#178)
1616
- Execute, ExecuteTyped and ExecuteAsync methods to ConnectionPool (#176)
1717
- ConnectorAdapter type to use ConnectionPool as Connector interface (#176)
18+
- An example how to use queue and connection_pool subpackages together (#176)
1819

1920
### Changed
2021

connection_pool/connection_pool_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2056,7 +2056,7 @@ func TestStream_TxnIsolationLevel(t *testing.T) {
20562056
func runTestMain(m *testing.M) int {
20572057
initScript := "config.lua"
20582058
waitStart := 100 * time.Millisecond
2059-
var connectRetry uint = 3
2059+
connectRetry := 3
20602060
retryTimeout := 500 * time.Millisecond
20612061
workDirs := []string{
20622062
"work_dir1", "work_dir2",

multi/multi_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ func TestStream_Rollback(t *testing.T) {
556556
func runTestMain(m *testing.M) int {
557557
initScript := "config.lua"
558558
waitStart := 100 * time.Millisecond
559-
var connectRetry uint = 3
559+
connectRetry := 3
560560
retryTimeout := 500 * time.Millisecond
561561

562562
// Tarantool supports streams and interactive transactions since version 2.10.0

queue/example_connection_pool_test.go

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package queue_test
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
"github.com/tarantool/go-tarantool"
10+
"github.com/tarantool/go-tarantool/connection_pool"
11+
"github.com/tarantool/go-tarantool/queue"
12+
"github.com/tarantool/go-tarantool/test_helpers"
13+
)
14+
15+
type QueueConnectionHandler struct {
16+
name string
17+
cfg queue.Cfg
18+
19+
uuid uuid.UUID
20+
registered bool
21+
err error
22+
mutex sync.Mutex
23+
masterUpdated chan struct{}
24+
}
25+
26+
var _ connection_pool.ConnectionHandler = &QueueConnectionHandler{}
27+
28+
func NewQueueConnectionHandler(name string, cfg queue.Cfg) *QueueConnectionHandler {
29+
return &QueueConnectionHandler{
30+
name: name,
31+
cfg: cfg,
32+
masterUpdated: make(chan struct{}, 10),
33+
}
34+
}
35+
36+
func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
37+
role connection_pool.Role) error {
38+
h.mutex.Lock()
39+
defer h.mutex.Unlock()
40+
41+
if h.err != nil {
42+
return h.err
43+
}
44+
45+
master := role == connection_pool.MasterRole
46+
if master {
47+
defer func() {
48+
h.masterUpdated <- struct{}{}
49+
}()
50+
}
51+
52+
// Set up a queue module configuration for an instance.
53+
q := queue.New(conn, h.name)
54+
opts := queue.CfgOpts{InReplicaset: true, Ttr: 60 * time.Second}
55+
56+
if h.err = q.Cfg(opts); h.err != nil {
57+
return fmt.Errorf("unable to configure queue: %w", h.err)
58+
}
59+
60+
// The queue only works with a master instance.
61+
if !master {
62+
return nil
63+
}
64+
65+
if h.err = q.Create(h.cfg); h.err != nil {
66+
return h.err
67+
}
68+
69+
if !h.registered {
70+
// We register a shared session at the first time.
71+
if h.uuid, h.err = q.Identify(nil); h.err != nil {
72+
return h.err
73+
}
74+
h.registered = h.err == nil
75+
} else {
76+
// We re-identify as the shared session.
77+
if _, h.err = q.Identify(&h.uuid); h.err != nil {
78+
return h.err
79+
}
80+
}
81+
82+
fmt.Printf("Master %s is ready to work!\n", conn.Addr())
83+
84+
return nil
85+
}
86+
87+
func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection,
88+
role connection_pool.Role) error {
89+
return nil
90+
}
91+
92+
// Example demonstrates how to use the queue package with the connection_pool
93+
// package. First of all, you need to create a connection handler for the
94+
// a ConnectionPool object to process new connections from RW-instances.
95+
//
96+
// You need to register a shared session UUID at a first master connection.
97+
// It needs to be used to re-identify as the shared session on new
98+
// RW-instances. See QueueConnectionHandler.Discovered() implementation.
99+
//
100+
// After that, you need to create a ConnectorAdapter object with RW mode for
101+
// the ConnectionPool to send requests into RW-instances. This adapter can
102+
// be used to create a ready-to-work queue object.
103+
func Example_connectionPool() {
104+
// Create a ConnectionHandler object.
105+
cfg := queue.Cfg{
106+
Temporary: false,
107+
IfNotExists: true,
108+
Kind: queue.FIFO,
109+
Opts: queue.Opts{
110+
Ttl: 10 * time.Second,
111+
},
112+
}
113+
h := NewQueueConnectionHandler("test_queue", cfg)
114+
defer close(h.masterUpdated)
115+
116+
// Create a ConnectionPool object.
117+
servers := []string{
118+
"127.0.0.1:3014",
119+
"127.0.0.1:3015",
120+
}
121+
connOpts := tarantool.Opts{
122+
Timeout: 500 * time.Millisecond,
123+
User: "test",
124+
Pass: "test",
125+
}
126+
poolOpts := connection_pool.OptsPool{
127+
CheckTimeout: 1 * time.Second,
128+
ConnectionHandler: h,
129+
}
130+
connPool, err := connection_pool.ConnectWithOpts(servers, connOpts, poolOpts)
131+
if err != nil {
132+
fmt.Printf("Unable to connect to the pool: %s", err)
133+
return
134+
}
135+
defer connPool.Close()
136+
137+
// Wait for a master instance identify in the queue.
138+
<-h.masterUpdated
139+
if h.err != nil {
140+
fmt.Printf("Unable to identify in the pool: %s", h.err)
141+
return
142+
}
143+
144+
// Create a Queue object from the ConnectionPool object via
145+
// a ConnectorAdapter.
146+
rw := connection_pool.NewConnectorAdapter(connPool, connection_pool.RW)
147+
q := queue.New(rw, "test_queue")
148+
fmt.Println("A Queue object is ready to work.")
149+
150+
testData := "test_data"
151+
fmt.Println("Send data:", testData)
152+
if _, err = q.Put(testData); err != nil {
153+
fmt.Printf("Unable to put data into the queue: %s", err)
154+
return
155+
}
156+
157+
// Switch a master instance in the pool.
158+
roles := []bool{true, false}
159+
err = test_helpers.SetClusterRO(servers, connOpts, roles)
160+
if err != nil {
161+
fmt.Printf("Unable to set cluster roles: %s", err)
162+
return
163+
}
164+
165+
// Wait for a master instance re-identify.
166+
<-h.masterUpdated
167+
if h.err != nil {
168+
fmt.Printf("Unable to re-identify in the pool: %s", h.err)
169+
return
170+
}
171+
172+
// Take a data from the new master instance.
173+
if task, err := q.Take(); err == nil {
174+
task.Ack()
175+
fmt.Println("Got data:", task.Data())
176+
} else {
177+
fmt.Println("Unable to got data:", err)
178+
}
179+
180+
// Output:
181+
// Master 127.0.0.1:3014 is ready to work!
182+
// A Queue object is ready to work.
183+
// Send data: test_data
184+
// Master 127.0.0.1:3015 is ready to work!
185+
// Got data: test_data
186+
}

queue/queue_test.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@ import (
1414
)
1515

1616
var server = "127.0.0.1:3013"
17+
var serversPool = []string{
18+
"127.0.0.1:3014",
19+
"127.0.0.1:3015",
20+
}
21+
22+
var instances []test_helpers.TarantoolInstance
23+
1724
var opts = Opts{
1825
Timeout: 2500 * time.Millisecond,
1926
User: "test",
@@ -890,7 +897,7 @@ func TestTask_Touch(t *testing.T) {
890897
// https://stackoverflow.com/questions/27629380/how-to-exit-a-go-program-honoring-deferred-calls
891898
func runTestMain(m *testing.M) int {
892899
inst, err := test_helpers.StartTarantool(test_helpers.StartOpts{
893-
InitScript: "config.lua",
900+
InitScript: "testdata/config.lua",
894901
Listen: server,
895902
WorkDir: "work_dir",
896903
User: opts.User,
@@ -899,12 +906,40 @@ func runTestMain(m *testing.M) int {
899906
ConnectRetry: 3,
900907
RetryTimeout: 500 * time.Millisecond,
901908
})
902-
defer test_helpers.StopTarantoolWithCleanup(inst)
903909

904910
if err != nil {
905911
log.Fatalf("Failed to prepare test tarantool: %s", err)
906912
}
907913

914+
defer test_helpers.StopTarantoolWithCleanup(inst)
915+
916+
workDirs := []string{"work_dir1", "work_dir2"}
917+
poolOpts := test_helpers.StartOpts{
918+
InitScript: "testdata/pool.lua",
919+
User: opts.User,
920+
Pass: opts.Pass,
921+
WaitStart: 3 * time.Second, // replication_timeout * 3
922+
ConnectRetry: -1,
923+
}
924+
instances, err = test_helpers.StartTarantoolInstances(serversPool, workDirs, poolOpts)
925+
926+
if err != nil {
927+
log.Fatalf("Failed to prepare test tarantool pool: %s", err)
928+
}
929+
930+
defer test_helpers.StopTarantoolInstances(instances)
931+
932+
roles := []bool{false, true}
933+
connOpts := Opts{
934+
Timeout: 500 * time.Millisecond,
935+
User: "test",
936+
Pass: "test",
937+
}
938+
err = test_helpers.SetClusterRO(serversPool, connOpts, roles)
939+
940+
if err != nil {
941+
log.Fatalf("Failed to set roles in tarantool pool: %s", err)
942+
}
908943
return m.Run()
909944
}
910945

queue/config.lua renamed to queue/testdata/config.lua

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ box.cfg{
77
work_dir = os.getenv("TEST_TNT_WORK_DIR"),
88
}
99

10-
box.once("init", function()
10+
box.once("init", function()
1111
box.schema.user.create('test', {password = 'test'})
1212
box.schema.func.create('queue.tube.test_queue:touch')
1313
box.schema.func.create('queue.tube.test_queue:ack')
@@ -23,21 +23,15 @@ box.cfg{
2323
box.schema.func.create('queue.identify')
2424
box.schema.func.create('queue.state')
2525
box.schema.func.create('queue.statistics')
26-
box.schema.user.grant('test', 'create', 'space')
27-
box.schema.user.grant('test', 'write', 'space', '_schema')
28-
box.schema.user.grant('test', 'write', 'space', '_space')
29-
box.schema.user.grant('test', 'read,write', 'space', '_space_sequence')
30-
box.schema.user.grant('test', 'write', 'space', '_index')
26+
box.schema.user.grant('test', 'create,read,write,drop', 'space')
3127
box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids')
3228
box.schema.user.grant('test', 'execute', 'universe')
3329
box.schema.user.grant('test', 'read,write', 'space', '_queue')
3430
box.schema.user.grant('test', 'read,write', 'space', '_schema')
31+
box.schema.user.grant('test', 'read,write', 'space', '_space_sequence')
3532
box.schema.user.grant('test', 'read,write', 'space', '_space')
3633
box.schema.user.grant('test', 'read,write', 'space', '_index')
37-
box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers')
3834
box.schema.user.grant('test', 'read,write', 'space', '_priv')
39-
box.schema.user.grant('test', 'read,write', 'space', '_queue_taken_2')
40-
box.schema.user.grant('test', 'read,write', 'space', '_queue_shared_sessions')
4135
if box.space._trigger ~= nil then
4236
box.schema.user.grant('test', 'read', 'space', '_trigger')
4337
end
@@ -56,3 +50,5 @@ end)
5650
box.cfg{
5751
listen = os.getenv("TEST_TNT_LISTEN"),
5852
}
53+
54+
require('console').start()

queue/testdata/pool.lua

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
local queue = require('queue')
2+
rawset(_G, 'queue', queue)
3+
4+
local listen = os.getenv("TEST_TNT_LISTEN")
5+
box.cfg{
6+
work_dir = os.getenv("TEST_TNT_WORK_DIR"),
7+
listen = listen,
8+
replication = {
9+
"test:test@127.0.0.1:3014",
10+
"test:test@127.0.0.1:3015",
11+
},
12+
read_only = listen == "127.0.0.1:3015"
13+
}
14+
15+
box.once("schema", function()
16+
box.schema.user.create('test', {password = 'test'})
17+
box.schema.user.grant('test', 'replication')
18+
19+
box.schema.func.create('queue.tube.test_queue:touch')
20+
box.schema.func.create('queue.tube.test_queue:ack')
21+
box.schema.func.create('queue.tube.test_queue:put')
22+
box.schema.func.create('queue.tube.test_queue:drop')
23+
box.schema.func.create('queue.tube.test_queue:peek')
24+
box.schema.func.create('queue.tube.test_queue:kick')
25+
box.schema.func.create('queue.tube.test_queue:take')
26+
box.schema.func.create('queue.tube.test_queue:delete')
27+
box.schema.func.create('queue.tube.test_queue:release')
28+
box.schema.func.create('queue.tube.test_queue:release_all')
29+
box.schema.func.create('queue.tube.test_queue:bury')
30+
box.schema.func.create('queue.identify')
31+
box.schema.func.create('queue.state')
32+
box.schema.func.create('queue.statistics')
33+
box.schema.user.grant('test', 'create,read,write,drop', 'space')
34+
box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids')
35+
box.schema.user.grant('test', 'execute', 'universe')
36+
box.schema.user.grant('test', 'read,write', 'space', '_queue')
37+
box.schema.user.grant('test', 'read,write', 'space', '_schema')
38+
box.schema.user.grant('test', 'read,write', 'space', '_space_sequence')
39+
box.schema.user.grant('test', 'read,write', 'space', '_space')
40+
box.schema.user.grant('test', 'read,write', 'space', '_index')
41+
box.schema.user.grant('test', 'read,write', 'space', '_priv')
42+
if box.space._trigger ~= nil then
43+
box.schema.user.grant('test', 'read', 'space', '_trigger')
44+
end
45+
if box.space._fk_constraint ~= nil then
46+
box.schema.user.grant('test', 'read', 'space', '_fk_constraint')
47+
end
48+
if box.space._ck_constraint ~= nil then
49+
box.schema.user.grant('test', 'read', 'space', '_ck_constraint')
50+
end
51+
if box.space._func_index ~= nil then
52+
box.schema.user.grant('test', 'read', 'space', '_func_index')
53+
end
54+
end)
55+
56+
require('console').start()

test_helpers/main.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,9 @@ type StartOpts struct {
6767
// WaitStart is a time to wait before starting to ping tarantool.
6868
WaitStart time.Duration
6969

70-
// ConnectRetry is a count of attempts to ping tarantool.
71-
ConnectRetry uint
70+
// ConnectRetry is a count of retry attempts to ping tarantool. If the
71+
// value < 0 then there will be no ping tarantool at all.
72+
ConnectRetry int
7273

7374
// RetryTimeout is a time between tarantool ping retries.
7475
RetryTimeout time.Duration
@@ -240,7 +241,7 @@ func StartTarantool(startOpts StartOpts) (TarantoolInstance, error) {
240241
Ssl: startOpts.ClientSsl,
241242
}
242243

243-
var i uint
244+
var i int
244245
var server string
245246
if startOpts.ClientServer != "" {
246247
server = startOpts.ClientServer

0 commit comments

Comments
 (0)