Skip to content

Commit cde62df

Browse files
committed
queue: add an example with connection_pool
The example demonstrates how to use the queue package with the connection_pool package. Closes #176
1 parent 6173a8d commit cde62df

File tree

5 files changed

+185
-9
lines changed

5 files changed

+185
-9
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1515
ConnectionPool (#178)
1616
- Execute, ExecuteTyped and ExecuteAsync methods to ConnectionPool (#176)
1717
- ConnectorAdapter type to use ConnectionPool as Connector interface (#176)
18+
- An example how to use queue and connection_pool subpackages together (#176)
1819

1920
### Changed
2021

connection_pool/connection_pool.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ type ConnectionPool struct {
9292
anyPool *RoundRobinStrategy
9393
poolsMutex sync.RWMutex
9494
}
95+
9596
var _ Pooler = (*ConnectionPool)(nil)
9697

9798
type connState struct {

queue/config.lua

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,21 +23,15 @@ box.cfg{
2323
box.schema.func.create('queue.identify')
2424
box.schema.func.create('queue.state')
2525
box.schema.func.create('queue.statistics')
26-
box.schema.user.grant('test', 'create', 'space')
27-
box.schema.user.grant('test', 'write', 'space', '_schema')
28-
box.schema.user.grant('test', 'write', 'space', '_space')
29-
box.schema.user.grant('test', 'read,write', 'space', '_space_sequence')
30-
box.schema.user.grant('test', 'write', 'space', '_index')
26+
box.schema.user.grant('test', 'create,read,write,drop', 'space')
3127
box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids')
3228
box.schema.user.grant('test', 'execute', 'universe')
3329
box.schema.user.grant('test', 'read,write', 'space', '_queue')
3430
box.schema.user.grant('test', 'read,write', 'space', '_schema')
31+
box.schema.user.grant('test', 'read,write', 'space', '_space_sequence')
3532
box.schema.user.grant('test', 'read,write', 'space', '_space')
3633
box.schema.user.grant('test', 'read,write', 'space', '_index')
37-
box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers')
3834
box.schema.user.grant('test', 'read,write', 'space', '_priv')
39-
box.schema.user.grant('test', 'read,write', 'space', '_queue_taken_2')
40-
box.schema.user.grant('test', 'read,write', 'space', '_queue_shared_sessions')
4135
if box.space._trigger ~= nil then
4236
box.schema.user.grant('test', 'read', 'space', '_trigger')
4337
end
@@ -56,3 +50,5 @@ end)
5650
box.cfg{
5751
listen = os.getenv("TEST_TNT_LISTEN"),
5852
}
53+
54+
require('console').start()

queue/example_connection_pool_test.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package queue_test
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"sync"
7+
"time"
8+
9+
"github.com/google/uuid"
10+
"github.com/tarantool/go-tarantool"
11+
"github.com/tarantool/go-tarantool/connection_pool"
12+
"github.com/tarantool/go-tarantool/queue"
13+
)
14+
15+
type QueueConnectionListener struct {
16+
name string
17+
cfg queue.Cfg
18+
queue queue.Queue
19+
20+
uuid uuid.UUID
21+
registered bool
22+
err error
23+
mutex sync.Mutex
24+
done chan struct{}
25+
}
26+
27+
func NewQueueConnectionListener(name string, cfg queue.Cfg) *QueueConnectionListener {
28+
return &QueueConnectionListener{
29+
name: name,
30+
cfg: cfg,
31+
done: make(chan struct{}),
32+
}
33+
}
34+
35+
func (l *QueueConnectionListener) Add(conn *tarantool.Connection,
36+
role connection_pool.Role) error {
37+
if role != connection_pool.MasterRole {
38+
return nil
39+
}
40+
41+
l.mutex.Lock()
42+
defer l.mutex.Unlock()
43+
44+
if l.err != nil {
45+
return l.err
46+
}
47+
48+
l.queue = queue.New(conn, l.name)
49+
if !l.registered {
50+
opts := queue.CfgOpts{InReplicaset: true, Ttr: 60 * time.Second}
51+
if l.err = l.queue.Cfg(opts); l.err == nil {
52+
if l.uuid, l.err = l.queue.Identify(nil); l.err == nil {
53+
l.err = l.queue.Create(l.cfg)
54+
}
55+
}
56+
l.registered = l.err == nil
57+
close(l.done)
58+
return l.err
59+
} else {
60+
if _, err := l.queue.Identify(&l.uuid); err != nil {
61+
return err
62+
}
63+
if err := l.queue.Create(l.cfg); err != nil {
64+
return err
65+
}
66+
}
67+
return nil
68+
}
69+
70+
func (l *QueueConnectionListener) Removed(conn *tarantool.Connection,
71+
role connection_pool.Role) {
72+
}
73+
74+
// Example demonstrates how to use the queue package with the connection_pool
75+
// package. First of all, you need to create a connection listener for the
76+
// a ConnectionPool object to process new connections from RW-instances.
77+
//
78+
// You need to registry a shared session UUID at a first master connection.
79+
// It needs to be used to re-identify as the shared session on new
80+
// RW-instances. See QueueConnectionListener.Added() implementation.
81+
//
82+
// After that, you need to create a ConnectorAdapter object with RW mode for
83+
// the ConnectionPool to send requests into RW-instances. This adapter can
84+
// be used to create a ready-to-work queue object.
85+
func Example_connectionPool() {
86+
servers := []string{
87+
"127.0.0.1:3014",
88+
"127.0.0.1:3015",
89+
"127.0.0.1:3016",
90+
}
91+
connOpts := tarantool.Opts{
92+
Timeout: 500 * time.Millisecond,
93+
User: "test",
94+
Pass: "test",
95+
}
96+
97+
cfg := queue.Cfg{
98+
Temporary: false,
99+
IfNotExists: true,
100+
Kind: queue.FIFO,
101+
Opts: queue.Opts{
102+
Ttl: 10 * time.Second,
103+
},
104+
}
105+
l := NewQueueConnectionListener("test_queue", cfg)
106+
poolOpts := connection_pool.OptsPool{
107+
CheckTimeout: 1 * time.Second,
108+
ConnectionListener: l,
109+
}
110+
connPool, err := connection_pool.ConnectWithOpts(servers, connOpts, poolOpts)
111+
if err != nil {
112+
log.Fatalf("unable to connect to the pool: %s", err)
113+
}
114+
115+
defer connPool.Close()
116+
117+
<-l.done
118+
119+
if l.err != nil {
120+
log.Fatalf("unable to identify in the pool: %s", l.err)
121+
}
122+
123+
rw := connection_pool.NewConnectorAdapter(connPool, connection_pool.RW)
124+
q := queue.New(rw, "test_queue")
125+
126+
fmt.Println("A Queue object is ready to work.")
127+
128+
if _, err = q.Put("test_data"); err != nil {
129+
log.Fatalf("unable to put data into the queue: %s", err)
130+
}
131+
132+
if task, err := q.Take(); err != nil {
133+
log.Fatalf("unable to take data from the queue: %s", err)
134+
} else {
135+
task.Ack()
136+
fmt.Println("data:", task.Data())
137+
}
138+
// Output:
139+
// A Queue object is ready to work.
140+
// data: test_data
141+
}

queue/queue_test.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,14 @@ import (
1414
)
1515

1616
var server = "127.0.0.1:3013"
17+
var serversPool = []string{
18+
"127.0.0.1:3014",
19+
"127.0.0.1:3015",
20+
"127.0.0.1:3016",
21+
}
22+
23+
var instances []test_helpers.TarantoolInstance
24+
1725
var opts = Opts{
1826
Timeout: 2500 * time.Millisecond,
1927
User: "test",
@@ -899,12 +907,41 @@ func runTestMain(m *testing.M) int {
899907
ConnectRetry: 3,
900908
RetryTimeout: 500 * time.Millisecond,
901909
})
902-
defer test_helpers.StopTarantoolWithCleanup(inst)
903910

904911
if err != nil {
905912
log.Fatalf("Failed to prepare test tarantool: %s", err)
906913
}
907914

915+
defer test_helpers.StopTarantoolWithCleanup(inst)
916+
917+
workDirs := []string{"work_dir1", "work_dir2", "work_dir3"}
918+
poolOpts := test_helpers.StartOpts{
919+
InitScript: "config.lua",
920+
User: opts.User,
921+
Pass: opts.Pass,
922+
WaitStart: 100 * time.Millisecond,
923+
ConnectRetry: 3,
924+
RetryTimeout: 500 * time.Millisecond,
925+
}
926+
instances, err = test_helpers.StartTarantoolInstances(serversPool, workDirs, poolOpts)
927+
928+
if err != nil {
929+
log.Fatalf("Failed to prepare test tarantool pool: %s", err)
930+
}
931+
932+
defer test_helpers.StopTarantoolInstances(instances)
933+
934+
roles := []bool{true, false, false}
935+
connOpts := Opts{
936+
Timeout: 500 * time.Millisecond,
937+
User: "test",
938+
Pass: "test",
939+
}
940+
err = test_helpers.SetClusterRO(serversPool, connOpts, roles)
941+
942+
if err != nil {
943+
log.Fatalf("Failed to set roles in tarantool pool: %s", err)
944+
}
908945
return m.Run()
909946
}
910947

0 commit comments

Comments
 (0)