Skip to content

Commit cfce2f1

Browse files
committed
api: support queue 1.2.0
* bump queue package version to 1.2.0 [1] * add Task.Touch(): increases TTR and/or TTL for tasks [2] * add Queue.ReleaseAll(): releases all taken tasks [3] * add Queue.State(): returns a current queue state [4] * add Queue.Identify(): identifies a shared session [5] 1. https://github.com/tarantool/queue/releases/tag/1.2.0 2. https://github.com/tarantool/queue/blob/1.2.0/README.md?plain=1#L562-L576 3. https://github.com/tarantool/queue/blob/1.2.0/README.md?plain=1#L698-L704 4. https://github.com/tarantool/queue/blob/1.2.0/README.md?plain=1#L377-L391 5. https://github.com/tarantool/queue/blob/1.2.0/README.md?plain=1#L465-L494 Closes #110 Closes #177
1 parent 0588b16 commit cfce2f1

File tree

7 files changed

+293
-1
lines changed

7 files changed

+293
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
2121
- Optional msgpack.v5 usage (#124)
2222
- TZ support for datetime (#163)
2323
- Interval support for datetime (#165)
24+
- Support queue 1.2.0 (#177)
2425

2526
### Changed
2627

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ clean:
2222

2323
.PHONY: deps
2424
deps: clean
25-
( cd ./queue; tarantoolctl rocks install queue 1.1.0 )
25+
( cd ./queue; tarantoolctl rocks install queue 1.2.0 )
2626

2727
.PHONY: datetime-timezones
2828
datetime-timezones:

queue/config.lua

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ box.cfg{
99

1010
box.once("init", function()
1111
box.schema.user.create('test', {password = 'test'})
12+
box.schema.func.create('queue.tube.test_queue:touch')
1213
box.schema.func.create('queue.tube.test_queue:ack')
1314
box.schema.func.create('queue.tube.test_queue:put')
1415
box.schema.func.create('queue.tube.test_queue:drop')
@@ -17,7 +18,10 @@ box.cfg{
1718
box.schema.func.create('queue.tube.test_queue:take')
1819
box.schema.func.create('queue.tube.test_queue:delete')
1920
box.schema.func.create('queue.tube.test_queue:release')
21+
box.schema.func.create('queue.tube.test_queue:release_all')
2022
box.schema.func.create('queue.tube.test_queue:bury')
23+
box.schema.func.create('queue.identify')
24+
box.schema.func.create('queue.state')
2125
box.schema.func.create('queue.statistics')
2226
box.schema.user.grant('test', 'create', 'space')
2327
box.schema.user.grant('test', 'write', 'space', '_schema')
@@ -33,6 +37,7 @@ box.cfg{
3337
box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers')
3438
box.schema.user.grant('test', 'read,write', 'space', '_priv')
3539
box.schema.user.grant('test', 'read,write', 'space', '_queue_taken_2')
40+
box.schema.user.grant('test', 'read,write', 'space', '_queue_shared_sessions')
3641
if box.space._trigger ~= nil then
3742
box.schema.user.grant('test', 'read', 'space', '_trigger')
3843
end

queue/const.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,22 @@ const (
1616
UTUBE queueType = "utube"
1717
UTUBE_TTL queueType = "utubettl"
1818
)
19+
20+
type State int
21+
22+
const (
23+
UnknownState State = iota
24+
InitState
25+
StartupState
26+
RunningState
27+
EndingState
28+
WaitingState
29+
)
30+
31+
var strToState = map[string]State{
32+
"INIT": InitState,
33+
"STARTUP": StartupState,
34+
"RUNNING": RunningState,
35+
"ENDING": EndingState,
36+
"WAITING": WaitingState,
37+
}

queue/queue.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"fmt"
1313
"time"
1414

15+
"github.com/google/uuid"
1516
"github.com/tarantool/go-tarantool"
1617
)
1718

@@ -20,6 +21,12 @@ type Queue interface {
2021
// Exists checks tube for existence.
2122
// Note: it uses Eval, so user needs 'execute universe' privilege.
2223
Exists() (bool, error)
24+
// Identify to a shared session.
25+
// In the queue the session has a unique UUID and many connections may
26+
// share one logical session. Also, the consumer can reconnect to the
27+
// existing session during the ttr time.
28+
// To get the UUID of the current session, call the Queue.Identify(nil).
29+
Identify(u *uuid.UUID) (uuid.UUID, error)
2330
// Create creates new tube with configuration.
2431
// Note: it uses Eval, so user needs 'execute universe' privilege
2532
// Note: you'd better not use this function in your application, cause it is
@@ -29,6 +36,8 @@ type Queue interface {
2936
// Note: you'd better not use this function in your application, cause it is
3037
// administrative task to create or delete queue.
3138
Drop() error
39+
// ReleaseAll forcibly returns all taken tasks to a ready state.
40+
ReleaseAll() error
3241
// Put creates new task in a tube.
3342
Put(data interface{}) (*Task, error)
3443
// PutWithOpts creates new task with options different from tube's defaults.
@@ -64,6 +73,8 @@ type Queue interface {
6473
Kick(count uint64) (uint64, error)
6574
// Delete the task identified by its id.
6675
Delete(taskId uint64) error
76+
// State returns a current queue state.
77+
State() (State, error)
6778
// Statistic returns some statistic about queue.
6879
Statistic() (interface{}, error)
6980
}
@@ -79,11 +90,15 @@ type cmd struct {
7990
take string
8091
drop string
8192
peek string
93+
touch string
8294
ack string
8395
delete string
8496
bury string
8597
kick string
8698
release string
99+
releaseAll string
100+
identify string
101+
state string
87102
statistics string
88103
}
89104

@@ -173,6 +188,36 @@ func (q *queue) Exists() (bool, error) {
173188
return exist, nil
174189
}
175190

191+
// Identify to a shared session.
192+
// In the queue the session has a unique UUID and many connections may share
193+
// one logical session. Also, the consumer can reconnect to the existing
194+
// session during the ttr time.
195+
// To get the UUID of the current session, call the Queue.Identify(nil).
196+
func (q *queue) Identify(u *uuid.UUID) (uuid.UUID, error) {
197+
// Unfortunately we can't use go-tarantool/uuid here:
198+
// https://github.com/tarantool/queue/issues/182
199+
var args []interface{}
200+
if u == nil {
201+
args = []interface{}{}
202+
} else {
203+
if bytes, err := u.MarshalBinary(); err != nil {
204+
return uuid.UUID{}, err
205+
} else {
206+
args = []interface{}{bytes}
207+
}
208+
}
209+
210+
if resp, err := q.conn.Call17(q.cmds.identify, args); err == nil {
211+
if us, ok := resp.Data[0].(string); ok {
212+
return uuid.FromBytes([]byte(us))
213+
} else {
214+
return uuid.UUID{}, fmt.Errorf("unexpected response: %v", resp.Data)
215+
}
216+
} else {
217+
return uuid.UUID{}, err
218+
}
219+
}
220+
176221
// Put data to queue. Returns task.
177222
func (q *queue) Put(data interface{}) (*Task, error) {
178223
return q.put(data)
@@ -251,6 +296,12 @@ func (q *queue) Drop() error {
251296
return err
252297
}
253298

299+
// ReleaseAll forcibly returns all taken tasks to a ready state.
300+
func (q *queue) ReleaseAll() error {
301+
_, err := q.conn.Call17(q.cmds.releaseAll, []interface{}{})
302+
return err
303+
}
304+
254305
// Look at a task without changing its state.
255306
func (q *queue) Peek(taskId uint64) (*Task, error) {
256307
qd := queueData{q: q}
@@ -260,6 +311,10 @@ func (q *queue) Peek(taskId uint64) (*Task, error) {
260311
return qd.task, nil
261312
}
262313

314+
func (q *queue) _touch(taskId uint64, increment time.Duration) (string, error) {
315+
return q.produce(q.cmds.touch, taskId, increment.Seconds())
316+
}
317+
263318
func (q *queue) _ack(taskId uint64) (string, error) {
264319
return q.produce(q.cmds.ack, taskId)
265320
}
@@ -312,6 +367,22 @@ func (q *queue) Delete(taskId uint64) error {
312367
return err
313368
}
314369

370+
// State returns a current queue state.
371+
func (q *queue) State() (State, error) {
372+
resp, err := q.conn.Call17(q.cmds.state, []interface{}{})
373+
if err != nil {
374+
return UnknownState, err
375+
}
376+
377+
if respState, ok := resp.Data[0].(string); ok {
378+
if state, ok := strToState[respState]; ok {
379+
return state, nil
380+
}
381+
return UnknownState, fmt.Errorf("unknown state: %v", resp.Data[0])
382+
}
383+
return UnknownState, fmt.Errorf("unexpected response: %v", resp.Data)
384+
}
385+
315386
// Return the number of tasks in a queue broken down by task_state, and the
316387
// number of requests broken down by the type of request.
317388
func (q *queue) Statistic() (interface{}, error) {
@@ -333,11 +404,15 @@ func makeCmd(q *queue) {
333404
take: "queue.tube." + q.name + ":take",
334405
drop: "queue.tube." + q.name + ":drop",
335406
peek: "queue.tube." + q.name + ":peek",
407+
touch: "queue.tube." + q.name + ":touch",
336408
ack: "queue.tube." + q.name + ":ack",
337409
delete: "queue.tube." + q.name + ":delete",
338410
bury: "queue.tube." + q.name + ":bury",
339411
kick: "queue.tube." + q.name + ":kick",
340412
release: "queue.tube." + q.name + ":release",
413+
releaseAll: "queue.tube." + q.name + ":release_all",
414+
identify: "queue.identify",
415+
state: "queue.state",
341416
statistics: "queue.statistics",
342417
}
343418
}

0 commit comments

Comments
 (0)