Skip to content

Commit ac49d45

Browse files
committed
queue: add an example with connection_pool
The example demonstrates how to use the queue package with the connection_pool package. Closes #176
1 parent 840e3ed commit ac49d45

File tree

5 files changed

+282
-10
lines changed

5 files changed

+282
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1515
ConnectionPool (#178)
1616
- Execute, ExecuteTyped and ExecuteAsync methods to ConnectionPool (#176)
1717
- ConnectorAdapter type to use ConnectionPool as Connector interface (#176)
18+
- An example how to use queue and connection_pool subpackages together (#176)
1819

1920
### Changed
2021

queue/config.lua

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ box.cfg{
77
work_dir = os.getenv("TEST_TNT_WORK_DIR"),
88
}
99

10-
box.once("init", function()
10+
box.once("init", function()
1111
box.schema.user.create('test', {password = 'test'})
1212
box.schema.func.create('queue.tube.test_queue:touch')
1313
box.schema.func.create('queue.tube.test_queue:ack')
@@ -23,21 +23,15 @@ box.cfg{
2323
box.schema.func.create('queue.identify')
2424
box.schema.func.create('queue.state')
2525
box.schema.func.create('queue.statistics')
26-
box.schema.user.grant('test', 'create', 'space')
27-
box.schema.user.grant('test', 'write', 'space', '_schema')
28-
box.schema.user.grant('test', 'write', 'space', '_space')
29-
box.schema.user.grant('test', 'read,write', 'space', '_space_sequence')
30-
box.schema.user.grant('test', 'write', 'space', '_index')
26+
box.schema.user.grant('test', 'create,read,write,drop', 'space')
3127
box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids')
3228
box.schema.user.grant('test', 'execute', 'universe')
3329
box.schema.user.grant('test', 'read,write', 'space', '_queue')
3430
box.schema.user.grant('test', 'read,write', 'space', '_schema')
31+
box.schema.user.grant('test', 'read,write', 'space', '_space_sequence')
3532
box.schema.user.grant('test', 'read,write', 'space', '_space')
3633
box.schema.user.grant('test', 'read,write', 'space', '_index')
37-
box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers')
3834
box.schema.user.grant('test', 'read,write', 'space', '_priv')
39-
box.schema.user.grant('test', 'read,write', 'space', '_queue_taken_2')
40-
box.schema.user.grant('test', 'read,write', 'space', '_queue_shared_sessions')
4135
if box.space._trigger ~= nil then
4236
box.schema.user.grant('test', 'read', 'space', '_trigger')
4337
end
@@ -56,3 +50,5 @@ end)
5650
box.cfg{
5751
listen = os.getenv("TEST_TNT_LISTEN"),
5852
}
53+
54+
require('console').start()

queue/example_connection_pool.lua

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
local queue = require('queue')
2+
rawset(_G, 'queue', queue)
3+
4+
allowed = {"localhost:3016", "localhost:3017"}
5+
listen = os.getenv("TEST_TNT_LISTEN")
6+
7+
founded = false
8+
replication = {}
9+
for i, l in ipairs(allowed) do
10+
if l == listen then
11+
founded = true
12+
end
13+
replication[i] = "test:test@" .. l
14+
end
15+
16+
if not founded then
17+
print("Invalid TEST_TNT_LISTEN")
18+
os.exit()
19+
end
20+
21+
-- Set listen only when every other thing is configured.
22+
box.cfg{
23+
listen = listen,
24+
replication = replication,
25+
read_only = listen == allowed[1],
26+
}
27+
28+
box.once("init", function()
29+
box.schema.user.create('test', {password = 'test'})
30+
box.schema.func.create('queue.tube.test_queue:touch')
31+
box.schema.func.create('queue.tube.test_queue:ack')
32+
box.schema.func.create('queue.tube.test_queue:put')
33+
box.schema.func.create('queue.tube.test_queue:drop')
34+
box.schema.func.create('queue.tube.test_queue:peek')
35+
box.schema.func.create('queue.tube.test_queue:kick')
36+
box.schema.func.create('queue.tube.test_queue:take')
37+
box.schema.func.create('queue.tube.test_queue:delete')
38+
box.schema.func.create('queue.tube.test_queue:release')
39+
box.schema.func.create('queue.tube.test_queue:release_all')
40+
box.schema.func.create('queue.tube.test_queue:bury')
41+
box.schema.func.create('queue.identify')
42+
box.schema.func.create('queue.state')
43+
box.schema.func.create('queue.statistics')
44+
box.schema.user.grant('test', 'replication')
45+
box.schema.user.grant('test', 'create,read,write,drop', 'space')
46+
box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids')
47+
box.schema.user.grant('test', 'execute', 'universe')
48+
box.schema.user.grant('test', 'read,write', 'space', '_queue')
49+
box.schema.user.grant('test', 'read,write', 'space', '_schema')
50+
box.schema.user.grant('test', 'read,write', 'space', '_space_sequence')
51+
box.schema.user.grant('test', 'read,write', 'space', '_space')
52+
box.schema.user.grant('test', 'read,write', 'space', '_index')
53+
box.schema.user.grant('test', 'read,write', 'space', '_priv')
54+
if box.space._trigger ~= nil then
55+
box.schema.user.grant('test', 'read', 'space', '_trigger')
56+
end
57+
if box.space._fk_constraint ~= nil then
58+
box.schema.user.grant('test', 'read', 'space', '_fk_constraint')
59+
end
60+
if box.space._ck_constraint ~= nil then
61+
box.schema.user.grant('test', 'read', 'space', '_ck_constraint')
62+
end
63+
if box.space._func_index ~= nil then
64+
box.schema.user.grant('test', 'read', 'space', '_func_index')
65+
end
66+
end)
67+
68+
require('console').start()

queue/example_connection_pool_test.go

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package queue_test
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"sync"
7+
"time"
8+
9+
"github.com/google/uuid"
10+
"github.com/tarantool/go-tarantool"
11+
"github.com/tarantool/go-tarantool/connection_pool"
12+
"github.com/tarantool/go-tarantool/queue"
13+
)
14+
15+
type QueueConnectionHandler struct {
16+
name string
17+
cfg queue.Cfg
18+
19+
uuid uuid.UUID
20+
registered bool
21+
err error
22+
registerMutex sync.Mutex
23+
done chan struct{}
24+
}
25+
26+
var _ connection_pool.ConnectionHandler = &QueueConnectionHandler{}
27+
28+
func NewQueueConnectionHandler(name string, cfg queue.Cfg) *QueueConnectionHandler {
29+
return &QueueConnectionHandler{
30+
name: name,
31+
cfg: cfg,
32+
done: make(chan struct{}),
33+
}
34+
}
35+
36+
func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
37+
role connection_pool.Role) error {
38+
// The queue only works with a master instance.
39+
if role != connection_pool.MasterRole {
40+
return nil
41+
}
42+
43+
// We need to register a shared session only once.
44+
h.registerMutex.Lock()
45+
46+
if h.err != nil {
47+
h.registerMutex.Unlock()
48+
return h.err
49+
}
50+
51+
q := queue.New(conn, h.name)
52+
opts := queue.CfgOpts{InReplicaset: true, Ttr: 60 * time.Second}
53+
if !h.registered {
54+
if h.err = q.Cfg(opts); h.err == nil {
55+
if h.uuid, h.err = q.Identify(nil); h.err == nil {
56+
h.err = q.Create(h.cfg)
57+
}
58+
}
59+
h.registered = h.err == nil
60+
close(h.done)
61+
h.registerMutex.Unlock()
62+
return h.err
63+
} else {
64+
// We already registered. We don't need the lock any more.
65+
h.registerMutex.Unlock()
66+
if err := q.Cfg(opts); err == nil {
67+
return err
68+
}
69+
if _, err := q.Identify(&h.uuid); err != nil {
70+
return err
71+
}
72+
if err := q.Create(h.cfg); err != nil {
73+
return err
74+
}
75+
}
76+
return nil
77+
}
78+
79+
func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection,
80+
role connection_pool.Role) error {
81+
return nil
82+
}
83+
84+
// Example demonstrates how to use the queue package with the connection_pool
85+
// package. First of all, you need to create a connection handler for the
86+
// a ConnectionPool object to process new connections from RW-instances.
87+
//
88+
// You need to register a shared session UUID at a first master connection.
89+
// It needs to be used to re-identify as the shared session on new
90+
// RW-instances. See QueueConnectionHandler.Discovered() implementation.
91+
//
92+
// After that, you need to create a ConnectorAdapter object with RW mode for
93+
// the ConnectionPool to send requests into RW-instances. This adapter can
94+
// be used to create a ready-to-work queue object.
95+
func Example_connectionPool() {
96+
// You could to use example_connection_pool.lua to play with the code
97+
// manually. You need to start a master and a replica instance first.
98+
//
99+
// 1 terminal:
100+
// $ mkdir master && cd master
101+
// $ TEST_TNT_LISTEN="localhost:3017" tarantool ../example_connection_pool.lua
102+
//
103+
// 2 terminal:
104+
// $ mkdir replica && cd replica
105+
// $ TEST_TNT_LISTEN="localhost:3017" tarantool ../example_connection_pool.lua
106+
//
107+
// Then you need to update connection pool servers to:
108+
//
109+
// servers := []string{
110+
// "127.0.0.1:3016",
111+
// "127.0.0.1:3017",
112+
// }
113+
//
114+
// After it you could run the sample:
115+
//
116+
// $ go test . -v -run Example_connectionPool
117+
servers := []string{
118+
"127.0.0.1:3014",
119+
"127.0.0.1:3015",
120+
}
121+
connOpts := tarantool.Opts{
122+
Timeout: 500 * time.Millisecond,
123+
User: "test",
124+
Pass: "test",
125+
}
126+
127+
cfg := queue.Cfg{
128+
Temporary: false,
129+
IfNotExists: true,
130+
Kind: queue.FIFO,
131+
Opts: queue.Opts{
132+
Ttl: 10 * time.Second,
133+
},
134+
}
135+
h := NewQueueConnectionHandler("test_queue", cfg)
136+
poolOpts := connection_pool.OptsPool{
137+
CheckTimeout: 1 * time.Second,
138+
ConnectionHandler: h,
139+
}
140+
connPool, err := connection_pool.ConnectWithOpts(servers, connOpts, poolOpts)
141+
if err != nil {
142+
log.Fatalf("unable to connect to the pool: %s", err)
143+
}
144+
145+
defer connPool.Close()
146+
147+
<-h.done
148+
149+
if h.err != nil {
150+
log.Fatalf("unable to identify in the pool: %s", h.err)
151+
}
152+
153+
rw := connection_pool.NewConnectorAdapter(connPool, connection_pool.RW)
154+
q := queue.New(rw, "test_queue")
155+
156+
fmt.Println("A Queue object is ready to work.")
157+
158+
if _, err = q.Put("test_data"); err != nil {
159+
log.Fatalf("unable to put data into the queue: %s", err)
160+
}
161+
162+
if task, err := q.Take(); err != nil {
163+
log.Fatalf("unable to take data from the queue: %s", err)
164+
} else {
165+
task.Ack()
166+
fmt.Println("data:", task.Data())
167+
}
168+
// Output:
169+
// A Queue object is ready to work.
170+
// data: test_data
171+
}

queue/queue_test.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@ import (
1414
)
1515

1616
var server = "127.0.0.1:3013"
17+
var serversPool = []string{
18+
"127.0.0.1:3014",
19+
"127.0.0.1:3015",
20+
}
21+
22+
var instances []test_helpers.TarantoolInstance
23+
1724
var opts = Opts{
1825
Timeout: 2500 * time.Millisecond,
1926
User: "test",
@@ -899,12 +906,41 @@ func runTestMain(m *testing.M) int {
899906
ConnectRetry: 3,
900907
RetryTimeout: 500 * time.Millisecond,
901908
})
902-
defer test_helpers.StopTarantoolWithCleanup(inst)
903909

904910
if err != nil {
905911
log.Fatalf("Failed to prepare test tarantool: %s", err)
906912
}
907913

914+
defer test_helpers.StopTarantoolWithCleanup(inst)
915+
916+
workDirs := []string{"work_dir1", "work_dir2"}
917+
poolOpts := test_helpers.StartOpts{
918+
InitScript: "config.lua",
919+
User: opts.User,
920+
Pass: opts.Pass,
921+
WaitStart: 100 * time.Millisecond,
922+
ConnectRetry: 3,
923+
RetryTimeout: 500 * time.Millisecond,
924+
}
925+
instances, err = test_helpers.StartTarantoolInstances(serversPool, workDirs, poolOpts)
926+
927+
if err != nil {
928+
log.Fatalf("Failed to prepare test tarantool pool: %s", err)
929+
}
930+
931+
defer test_helpers.StopTarantoolInstances(instances)
932+
933+
roles := []bool{false, true}
934+
connOpts := Opts{
935+
Timeout: 500 * time.Millisecond,
936+
User: "test",
937+
Pass: "test",
938+
}
939+
err = test_helpers.SetClusterRO(serversPool, connOpts, roles)
940+
941+
if err != nil {
942+
log.Fatalf("Failed to set roles in tarantool pool: %s", err)
943+
}
908944
return m.Run()
909945
}
910946

0 commit comments

Comments
 (0)