Skip to content

Commit c031562

Browse files
committed
api: support queue 1.2.0
* bump queue package version to 1.2.0 [1] * add Task.Touch(): increases TTR and/or TTL for tasks [2] * add Queue.Cfg(): set queue settings [3] * add Queue.ReleaseAll(): releases all taken tasks [4] * add Queue.State(): returns a current queue state [5] * add Queue.Identify(): identifies a shared session [6] 1. https://github.com/tarantool/queue/releases/tag/1.2.0 2. https://github.com/tarantool/queue/blob/1.2.0/README.md?plain=1#L562-L576 3. https://github.com/tarantool/queue/blob/1.2.0/README.md?plain=1#L450-L463 4. https://github.com/tarantool/queue/blob/1.2.0/README.md?plain=1#L698-L704 5. https://github.com/tarantool/queue/blob/1.2.0/README.md?plain=1#L377-L391 6. https://github.com/tarantool/queue/blob/1.2.0/README.md?plain=1#L465-L494 Closes #110 Closes #177
1 parent 0588b16 commit c031562

File tree

7 files changed

+423
-1
lines changed

7 files changed

+423
-1
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1010

1111
### Added
1212

13+
- Support queue 1.2.0 (#177)
14+
1315
### Changed
1416

1517
### Fixed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ clean:
2222

2323
.PHONY: deps
2424
deps: clean
25-
( cd ./queue; tarantoolctl rocks install queue 1.1.0 )
25+
( cd ./queue; tarantoolctl rocks install queue 1.2.0 )
2626

2727
.PHONY: datetime-timezones
2828
datetime-timezones:

queue/config.lua

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ box.cfg{
99

1010
box.once("init", function()
1111
box.schema.user.create('test', {password = 'test'})
12+
box.schema.func.create('queue.tube.test_queue:touch')
1213
box.schema.func.create('queue.tube.test_queue:ack')
1314
box.schema.func.create('queue.tube.test_queue:put')
1415
box.schema.func.create('queue.tube.test_queue:drop')
@@ -17,7 +18,10 @@ box.cfg{
1718
box.schema.func.create('queue.tube.test_queue:take')
1819
box.schema.func.create('queue.tube.test_queue:delete')
1920
box.schema.func.create('queue.tube.test_queue:release')
21+
box.schema.func.create('queue.tube.test_queue:release_all')
2022
box.schema.func.create('queue.tube.test_queue:bury')
23+
box.schema.func.create('queue.identify')
24+
box.schema.func.create('queue.state')
2125
box.schema.func.create('queue.statistics')
2226
box.schema.user.grant('test', 'create', 'space')
2327
box.schema.user.grant('test', 'write', 'space', '_schema')
@@ -33,6 +37,7 @@ box.cfg{
3337
box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers')
3438
box.schema.user.grant('test', 'read,write', 'space', '_priv')
3539
box.schema.user.grant('test', 'read,write', 'space', '_queue_taken_2')
40+
box.schema.user.grant('test', 'read,write', 'space', '_queue_shared_sessions')
3641
if box.space._trigger ~= nil then
3742
box.schema.user.grant('test', 'read', 'space', '_trigger')
3843
end

queue/const.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,22 @@ const (
1616
UTUBE queueType = "utube"
1717
UTUBE_TTL queueType = "utubettl"
1818
)
19+
20+
type State int
21+
22+
const (
23+
UnknownState State = iota
24+
InitState
25+
StartupState
26+
RunningState
27+
EndingState
28+
WaitingState
29+
)
30+
31+
var strToState = map[string]State{
32+
"INIT": InitState,
33+
"STARTUP": StartupState,
34+
"RUNNING": RunningState,
35+
"ENDING": EndingState,
36+
"WAITING": WaitingState,
37+
}

queue/queue.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,23 @@ import (
1212
"fmt"
1313
"time"
1414

15+
"github.com/google/uuid"
1516
"github.com/tarantool/go-tarantool"
1617
)
1718

1819
// Queue is a handle to Tarantool queue's tube.
1920
type Queue interface {
21+
// Set queue settings.
22+
Cfg(opts CfgOpts) error
2023
// Exists checks tube for existence.
2124
// Note: it uses Eval, so user needs 'execute universe' privilege.
2225
Exists() (bool, error)
26+
// Identify to a shared session.
27+
// In the queue the session has a unique UUID and many connections may
28+
// share one logical session. Also, the consumer can reconnect to the
29+
// existing session during the ttr time.
30+
// To get the UUID of the current session, call the Queue.Identify(nil).
31+
Identify(u *uuid.UUID) (uuid.UUID, error)
2332
// Create creates new tube with configuration.
2433
// Note: it uses Eval, so user needs 'execute universe' privilege
2534
// Note: you'd better not use this function in your application, cause it is
@@ -29,6 +38,8 @@ type Queue interface {
2938
// Note: you'd better not use this function in your application, cause it is
3039
// administrative task to create or delete queue.
3140
Drop() error
41+
// ReleaseAll forcibly returns all taken tasks to a ready state.
42+
ReleaseAll() error
3243
// Put creates new task in a tube.
3344
Put(data interface{}) (*Task, error)
3445
// PutWithOpts creates new task with options different from tube's defaults.
@@ -64,6 +75,8 @@ type Queue interface {
6475
Kick(count uint64) (uint64, error)
6576
// Delete the task identified by its id.
6677
Delete(taskId uint64) error
78+
// State returns a current queue state.
79+
State() (State, error)
6780
// Statistic returns some statistic about queue.
6881
Statistic() (interface{}, error)
6982
}
@@ -79,11 +92,16 @@ type cmd struct {
7992
take string
8093
drop string
8194
peek string
95+
touch string
8296
ack string
8397
delete string
8498
bury string
8599
kick string
86100
release string
101+
releaseAll string
102+
cfg string
103+
identify string
104+
state string
87105
statistics string
88106
}
89107

@@ -110,6 +128,26 @@ func (cfg Cfg) getType() string {
110128
return kind
111129
}
112130

131+
// CfgOpts is argument type for the Queue.Cfg() call.
132+
type CfgOpts struct {
133+
// Enable replication mode. Must be true if the queue is used in master and
134+
// replica mode. With replication mode enabled, the potential loss of
135+
// performance can be ~20% compared to single mode. Default value is false.
136+
InReplicaset bool
137+
// Time to release in seconds. The time after which, if there is no active
138+
// connection in the session, it will be released with all its tasks.
139+
Ttr time.Duration
140+
}
141+
142+
func (opts CfgOpts) toMap() map[string]interface{} {
143+
ret := make(map[string]interface{})
144+
ret["in_replicaset"] = opts.InReplicaset
145+
if opts.Ttr != 0 {
146+
ret["ttr"] = opts.Ttr
147+
}
148+
return ret
149+
}
150+
113151
type Opts struct {
114152
Pri int // Task priorities.
115153
Ttl time.Duration // Task time to live.
@@ -161,6 +199,12 @@ func (q *queue) Create(cfg Cfg) error {
161199
return err
162200
}
163201

202+
// Set queue settings.
203+
func (q *queue) Cfg(opts CfgOpts) error {
204+
_, err := q.conn.Call17(q.cmds.cfg, []interface{}{opts.toMap()})
205+
return err
206+
}
207+
164208
// Exists checks existance of a tube.
165209
func (q *queue) Exists() (bool, error) {
166210
cmd := "local name = ... ; return queue.tube[name] ~= nil"
@@ -173,6 +217,36 @@ func (q *queue) Exists() (bool, error) {
173217
return exist, nil
174218
}
175219

220+
// Identify to a shared session.
221+
// In the queue the session has a unique UUID and many connections may share
222+
// one logical session. Also, the consumer can reconnect to the existing
223+
// session during the ttr time.
224+
// To get the UUID of the current session, call the Queue.Identify(nil).
225+
func (q *queue) Identify(u *uuid.UUID) (uuid.UUID, error) {
226+
// Unfortunately we can't use go-tarantool/uuid here:
227+
// https://github.com/tarantool/queue/issues/182
228+
var args []interface{}
229+
if u == nil {
230+
args = []interface{}{}
231+
} else {
232+
if bytes, err := u.MarshalBinary(); err != nil {
233+
return uuid.UUID{}, err
234+
} else {
235+
args = []interface{}{bytes}
236+
}
237+
}
238+
239+
if resp, err := q.conn.Call17(q.cmds.identify, args); err == nil {
240+
if us, ok := resp.Data[0].(string); ok {
241+
return uuid.FromBytes([]byte(us))
242+
} else {
243+
return uuid.UUID{}, fmt.Errorf("unexpected response: %v", resp.Data)
244+
}
245+
} else {
246+
return uuid.UUID{}, err
247+
}
248+
}
249+
176250
// Put data to queue. Returns task.
177251
func (q *queue) Put(data interface{}) (*Task, error) {
178252
return q.put(data)
@@ -251,6 +325,12 @@ func (q *queue) Drop() error {
251325
return err
252326
}
253327

328+
// ReleaseAll forcibly returns all taken tasks to a ready state.
329+
func (q *queue) ReleaseAll() error {
330+
_, err := q.conn.Call17(q.cmds.releaseAll, []interface{}{})
331+
return err
332+
}
333+
254334
// Look at a task without changing its state.
255335
func (q *queue) Peek(taskId uint64) (*Task, error) {
256336
qd := queueData{q: q}
@@ -260,6 +340,10 @@ func (q *queue) Peek(taskId uint64) (*Task, error) {
260340
return qd.task, nil
261341
}
262342

343+
func (q *queue) _touch(taskId uint64, increment time.Duration) (string, error) {
344+
return q.produce(q.cmds.touch, taskId, increment.Seconds())
345+
}
346+
263347
func (q *queue) _ack(taskId uint64) (string, error) {
264348
return q.produce(q.cmds.ack, taskId)
265349
}
@@ -312,6 +396,22 @@ func (q *queue) Delete(taskId uint64) error {
312396
return err
313397
}
314398

399+
// State returns a current queue state.
400+
func (q *queue) State() (State, error) {
401+
resp, err := q.conn.Call17(q.cmds.state, []interface{}{})
402+
if err != nil {
403+
return UnknownState, err
404+
}
405+
406+
if respState, ok := resp.Data[0].(string); ok {
407+
if state, ok := strToState[respState]; ok {
408+
return state, nil
409+
}
410+
return UnknownState, fmt.Errorf("unknown state: %v", resp.Data[0])
411+
}
412+
return UnknownState, fmt.Errorf("unexpected response: %v", resp.Data)
413+
}
414+
315415
// Return the number of tasks in a queue broken down by task_state, and the
316416
// number of requests broken down by the type of request.
317417
func (q *queue) Statistic() (interface{}, error) {
@@ -333,11 +433,16 @@ func makeCmd(q *queue) {
333433
take: "queue.tube." + q.name + ":take",
334434
drop: "queue.tube." + q.name + ":drop",
335435
peek: "queue.tube." + q.name + ":peek",
436+
touch: "queue.tube." + q.name + ":touch",
336437
ack: "queue.tube." + q.name + ":ack",
337438
delete: "queue.tube." + q.name + ":delete",
338439
bury: "queue.tube." + q.name + ":bury",
339440
kick: "queue.tube." + q.name + ":kick",
340441
release: "queue.tube." + q.name + ":release",
442+
releaseAll: "queue.tube." + q.name + ":release_all",
443+
cfg: "queue.cfg",
444+
identify: "queue.identify",
445+
state: "queue.state",
341446
statistics: "queue.statistics",
342447
}
343448
}

0 commit comments

Comments
 (0)