@@ -12,14 +12,23 @@ import (
12
12
"fmt"
13
13
"time"
14
14
15
+ "github.com/google/uuid"
15
16
"github.com/tarantool/go-tarantool"
16
17
)
17
18
18
19
// Queue is a handle to Tarantool queue's tube.
19
20
type Queue interface {
21
+ // Set queue settings.
22
+ Cfg (opts CfgOpts ) error
20
23
// Exists checks tube for existence.
21
24
// Note: it uses Eval, so user needs 'execute universe' privilege.
22
25
Exists () (bool , error )
26
+ // Identify to a shared session.
27
+ // In the queue the session has a unique UUID and many connections may
28
+ // share one logical session. Also, the consumer can reconnect to the
29
+ // existing session during the ttr time.
30
+ // To get the UUID of the current session, call the Queue.Identify(nil).
31
+ Identify (u * uuid.UUID ) (uuid.UUID , error )
23
32
// Create creates new tube with configuration.
24
33
// Note: it uses Eval, so user needs 'execute universe' privilege
25
34
// Note: you'd better not use this function in your application, cause it is
@@ -29,6 +38,8 @@ type Queue interface {
29
38
// Note: you'd better not use this function in your application, cause it is
30
39
// administrative task to create or delete queue.
31
40
Drop () error
41
+ // ReleaseAll forcibly returns all taken tasks to a ready state.
42
+ ReleaseAll () error
32
43
// Put creates new task in a tube.
33
44
Put (data interface {}) (* Task , error )
34
45
// PutWithOpts creates new task with options different from tube's defaults.
@@ -64,6 +75,8 @@ type Queue interface {
64
75
Kick (count uint64 ) (uint64 , error )
65
76
// Delete the task identified by its id.
66
77
Delete (taskId uint64 ) error
78
+ // State returns a current queue state.
79
+ State () (State , error )
67
80
// Statistic returns some statistic about queue.
68
81
Statistic () (interface {}, error )
69
82
}
@@ -79,11 +92,16 @@ type cmd struct {
79
92
take string
80
93
drop string
81
94
peek string
95
+ touch string
82
96
ack string
83
97
delete string
84
98
bury string
85
99
kick string
86
100
release string
101
+ releaseAll string
102
+ cfg string
103
+ identify string
104
+ state string
87
105
statistics string
88
106
}
89
107
@@ -110,6 +128,26 @@ func (cfg Cfg) getType() string {
110
128
return kind
111
129
}
112
130
131
+ // CfgOpts is argument type for the Queue.Cfg() call.
132
+ type CfgOpts struct {
133
+ // Enable replication mode. Must be true if the queue is used in master and
134
+ // replica mode. With replication mode enabled, the potential loss of
135
+ // performance can be ~20% compared to single mode. Default value is false.
136
+ InReplicaset bool
137
+ // Time to release in seconds. The time after which, if there is no active
138
+ // connection in the session, it will be released with all its tasks.
139
+ Ttr time.Duration
140
+ }
141
+
142
+ func (opts CfgOpts ) toMap () map [string ]interface {} {
143
+ ret := make (map [string ]interface {})
144
+ ret ["in_replicaset" ] = opts .InReplicaset
145
+ if opts .Ttr != 0 {
146
+ ret ["ttr" ] = opts .Ttr
147
+ }
148
+ return ret
149
+ }
150
+
113
151
type Opts struct {
114
152
Pri int // Task priorities.
115
153
Ttl time.Duration // Task time to live.
@@ -161,6 +199,12 @@ func (q *queue) Create(cfg Cfg) error {
161
199
return err
162
200
}
163
201
202
+ // Set queue settings.
203
+ func (q * queue ) Cfg (opts CfgOpts ) error {
204
+ _ , err := q .conn .Call17 (q .cmds .cfg , []interface {}{opts .toMap ()})
205
+ return err
206
+ }
207
+
164
208
// Exists checks existance of a tube.
165
209
func (q * queue ) Exists () (bool , error ) {
166
210
cmd := "local name = ... ; return queue.tube[name] ~= nil"
@@ -173,6 +217,36 @@ func (q *queue) Exists() (bool, error) {
173
217
return exist , nil
174
218
}
175
219
220
+ // Identify to a shared session.
221
+ // In the queue the session has a unique UUID and many connections may share
222
+ // one logical session. Also, the consumer can reconnect to the existing
223
+ // session during the ttr time.
224
+ // To get the UUID of the current session, call the Queue.Identify(nil).
225
+ func (q * queue ) Identify (u * uuid.UUID ) (uuid.UUID , error ) {
226
+ // Unfortunately we can't use go-tarantool/uuid here:
227
+ // https://github.com/tarantool/queue/issues/182
228
+ var args []interface {}
229
+ if u == nil {
230
+ args = []interface {}{}
231
+ } else {
232
+ if bytes , err := u .MarshalBinary (); err != nil {
233
+ return uuid.UUID {}, err
234
+ } else {
235
+ args = []interface {}{bytes }
236
+ }
237
+ }
238
+
239
+ if resp , err := q .conn .Call17 (q .cmds .identify , args ); err == nil {
240
+ if us , ok := resp .Data [0 ].(string ); ok {
241
+ return uuid .FromBytes ([]byte (us ))
242
+ } else {
243
+ return uuid.UUID {}, fmt .Errorf ("unexpected response: %v" , resp .Data )
244
+ }
245
+ } else {
246
+ return uuid.UUID {}, err
247
+ }
248
+ }
249
+
176
250
// Put data to queue. Returns task.
177
251
func (q * queue ) Put (data interface {}) (* Task , error ) {
178
252
return q .put (data )
@@ -251,6 +325,12 @@ func (q *queue) Drop() error {
251
325
return err
252
326
}
253
327
328
+ // ReleaseAll forcibly returns all taken tasks to a ready state.
329
+ func (q * queue ) ReleaseAll () error {
330
+ _ , err := q .conn .Call17 (q .cmds .releaseAll , []interface {}{})
331
+ return err
332
+ }
333
+
254
334
// Look at a task without changing its state.
255
335
func (q * queue ) Peek (taskId uint64 ) (* Task , error ) {
256
336
qd := queueData {q : q }
@@ -260,6 +340,10 @@ func (q *queue) Peek(taskId uint64) (*Task, error) {
260
340
return qd .task , nil
261
341
}
262
342
343
+ func (q * queue ) _touch (taskId uint64 , increment time.Duration ) (string , error ) {
344
+ return q .produce (q .cmds .touch , taskId , increment .Seconds ())
345
+ }
346
+
263
347
func (q * queue ) _ack (taskId uint64 ) (string , error ) {
264
348
return q .produce (q .cmds .ack , taskId )
265
349
}
@@ -312,6 +396,22 @@ func (q *queue) Delete(taskId uint64) error {
312
396
return err
313
397
}
314
398
399
+ // State returns a current queue state.
400
+ func (q * queue ) State () (State , error ) {
401
+ resp , err := q .conn .Call17 (q .cmds .state , []interface {}{})
402
+ if err != nil {
403
+ return UnknownState , err
404
+ }
405
+
406
+ if respState , ok := resp .Data [0 ].(string ); ok {
407
+ if state , ok := strToState [respState ]; ok {
408
+ return state , nil
409
+ }
410
+ return UnknownState , fmt .Errorf ("unknown state: %v" , resp .Data [0 ])
411
+ }
412
+ return UnknownState , fmt .Errorf ("unexpected response: %v" , resp .Data )
413
+ }
414
+
315
415
// Return the number of tasks in a queue broken down by task_state, and the
316
416
// number of requests broken down by the type of request.
317
417
func (q * queue ) Statistic () (interface {}, error ) {
@@ -333,11 +433,16 @@ func makeCmd(q *queue) {
333
433
take : "queue.tube." + q .name + ":take" ,
334
434
drop : "queue.tube." + q .name + ":drop" ,
335
435
peek : "queue.tube." + q .name + ":peek" ,
436
+ touch : "queue.tube." + q .name + ":touch" ,
336
437
ack : "queue.tube." + q .name + ":ack" ,
337
438
delete : "queue.tube." + q .name + ":delete" ,
338
439
bury : "queue.tube." + q .name + ":bury" ,
339
440
kick : "queue.tube." + q .name + ":kick" ,
340
441
release : "queue.tube." + q .name + ":release" ,
442
+ releaseAll : "queue.tube." + q .name + ":release_all" ,
443
+ cfg : "queue.cfg" ,
444
+ identify : "queue.identify" ,
445
+ state : "queue.state" ,
341
446
statistics : "queue.statistics" ,
342
447
}
343
448
}
0 commit comments