Skip to content

api: support queue 1.2.0 #206

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.

### Added

- Support queue 1.2.0 (#177)

### Changed

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ clean:

.PHONY: deps
deps: clean
( cd ./queue; tarantoolctl rocks install queue 1.1.0 )
( cd ./queue; tarantoolctl rocks install queue 1.2.0 )

.PHONY: datetime-timezones
datetime-timezones:
Expand Down
5 changes: 5 additions & 0 deletions queue/config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ box.cfg{

box.once("init", function()
box.schema.user.create('test', {password = 'test'})
box.schema.func.create('queue.tube.test_queue:touch')
box.schema.func.create('queue.tube.test_queue:ack')
box.schema.func.create('queue.tube.test_queue:put')
box.schema.func.create('queue.tube.test_queue:drop')
Expand All @@ -17,7 +18,10 @@ box.cfg{
box.schema.func.create('queue.tube.test_queue:take')
box.schema.func.create('queue.tube.test_queue:delete')
box.schema.func.create('queue.tube.test_queue:release')
box.schema.func.create('queue.tube.test_queue:release_all')
box.schema.func.create('queue.tube.test_queue:bury')
box.schema.func.create('queue.identify')
box.schema.func.create('queue.state')
box.schema.func.create('queue.statistics')
box.schema.user.grant('test', 'create', 'space')
box.schema.user.grant('test', 'write', 'space', '_schema')
Expand All @@ -33,6 +37,7 @@ box.cfg{
box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers')
box.schema.user.grant('test', 'read,write', 'space', '_priv')
box.schema.user.grant('test', 'read,write', 'space', '_queue_taken_2')
box.schema.user.grant('test', 'read,write', 'space', '_queue_shared_sessions')
if box.space._trigger ~= nil then
box.schema.user.grant('test', 'read', 'space', '_trigger')
end
Expand Down
19 changes: 19 additions & 0 deletions queue/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,22 @@ const (
UTUBE queueType = "utube"
UTUBE_TTL queueType = "utubettl"
)

type State int

const (
UnknownState State = iota
InitState
StartupState
RunningState
EndingState
WaitingState
)

var strToState = map[string]State{
"INIT": InitState,
"STARTUP": StartupState,
"RUNNING": RunningState,
"ENDING": EndingState,
"WAITING": WaitingState,
}
105 changes: 105 additions & 0 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,23 @@ import (
"fmt"
"time"

"github.com/google/uuid"
"github.com/tarantool/go-tarantool"
)

// Queue is a handle to Tarantool queue's tube.
type Queue interface {
// Set queue settings.
Cfg(opts CfgOpts) error
// Exists checks tube for existence.
// Note: it uses Eval, so user needs 'execute universe' privilege.
Exists() (bool, error)
// Identify to a shared session.
// In the queue the session has a unique UUID and many connections may
// share one logical session. Also, the consumer can reconnect to the
// existing session during the ttr time.
// To get the UUID of the current session, call the Queue.Identify(nil).
Identify(u *uuid.UUID) (uuid.UUID, error)
// Create creates new tube with configuration.
// Note: it uses Eval, so user needs 'execute universe' privilege
// Note: you'd better not use this function in your application, cause it is
Expand All @@ -29,6 +38,8 @@ type Queue interface {
// Note: you'd better not use this function in your application, cause it is
// administrative task to create or delete queue.
Drop() error
// ReleaseAll forcibly returns all taken tasks to a ready state.
ReleaseAll() error
// Put creates new task in a tube.
Put(data interface{}) (*Task, error)
// PutWithOpts creates new task with options different from tube's defaults.
Expand Down Expand Up @@ -64,6 +75,8 @@ type Queue interface {
Kick(count uint64) (uint64, error)
// Delete the task identified by its id.
Delete(taskId uint64) error
// State returns a current queue state.
State() (State, error)
// Statistic returns some statistic about queue.
Statistic() (interface{}, error)
}
Expand All @@ -79,11 +92,16 @@ type cmd struct {
take string
drop string
peek string
touch string
ack string
delete string
bury string
kick string
release string
releaseAll string
cfg string
identify string
state string
statistics string
}

Expand All @@ -110,6 +128,26 @@ func (cfg Cfg) getType() string {
return kind
}

// CfgOpts is argument type for the Queue.Cfg() call.
type CfgOpts struct {
// Enable replication mode. Must be true if the queue is used in master and
// replica mode. With replication mode enabled, the potential loss of
// performance can be ~20% compared to single mode. Default value is false.
InReplicaset bool
// Time to release in seconds. The time after which, if there is no active
// connection in the session, it will be released with all its tasks.
Ttr time.Duration
}

func (opts CfgOpts) toMap() map[string]interface{} {
ret := make(map[string]interface{})
ret["in_replicaset"] = opts.InReplicaset
if opts.Ttr != 0 {
ret["ttr"] = opts.Ttr
}
return ret
}

type Opts struct {
Pri int // Task priorities.
Ttl time.Duration // Task time to live.
Expand Down Expand Up @@ -161,6 +199,12 @@ func (q *queue) Create(cfg Cfg) error {
return err
}

// Set queue settings.
func (q *queue) Cfg(opts CfgOpts) error {
_, err := q.conn.Call17(q.cmds.cfg, []interface{}{opts.toMap()})
return err
}

// Exists checks existance of a tube.
func (q *queue) Exists() (bool, error) {
cmd := "local name = ... ; return queue.tube[name] ~= nil"
Expand All @@ -173,6 +217,36 @@ func (q *queue) Exists() (bool, error) {
return exist, nil
}

// Identify to a shared session.
// In the queue the session has a unique UUID and many connections may share
// one logical session. Also, the consumer can reconnect to the existing
// session during the ttr time.
// To get the UUID of the current session, call the Queue.Identify(nil).
func (q *queue) Identify(u *uuid.UUID) (uuid.UUID, error) {
// Unfortunately we can't use go-tarantool/uuid here:
// https://github.com/tarantool/queue/issues/182
var args []interface{}
if u == nil {
args = []interface{}{}
} else {
if bytes, err := u.MarshalBinary(); err != nil {
return uuid.UUID{}, err
} else {
args = []interface{}{bytes}
}
}

if resp, err := q.conn.Call17(q.cmds.identify, args); err == nil {
if us, ok := resp.Data[0].(string); ok {
return uuid.FromBytes([]byte(us))
} else {
return uuid.UUID{}, fmt.Errorf("unexpected response: %v", resp.Data)
}
} else {
return uuid.UUID{}, err
}
}

// Put data to queue. Returns task.
func (q *queue) Put(data interface{}) (*Task, error) {
return q.put(data)
Expand Down Expand Up @@ -251,6 +325,12 @@ func (q *queue) Drop() error {
return err
}

// ReleaseAll forcibly returns all taken tasks to a ready state.
func (q *queue) ReleaseAll() error {
_, err := q.conn.Call17(q.cmds.releaseAll, []interface{}{})
return err
}

// Look at a task without changing its state.
func (q *queue) Peek(taskId uint64) (*Task, error) {
qd := queueData{q: q}
Expand All @@ -260,6 +340,10 @@ func (q *queue) Peek(taskId uint64) (*Task, error) {
return qd.task, nil
}

func (q *queue) _touch(taskId uint64, increment time.Duration) (string, error) {
return q.produce(q.cmds.touch, taskId, increment.Seconds())
}

func (q *queue) _ack(taskId uint64) (string, error) {
return q.produce(q.cmds.ack, taskId)
}
Expand Down Expand Up @@ -312,6 +396,22 @@ func (q *queue) Delete(taskId uint64) error {
return err
}

// State returns a current queue state.
func (q *queue) State() (State, error) {
resp, err := q.conn.Call17(q.cmds.state, []interface{}{})
if err != nil {
return UnknownState, err
}

if respState, ok := resp.Data[0].(string); ok {
if state, ok := strToState[respState]; ok {
return state, nil
}
return UnknownState, fmt.Errorf("unknown state: %v", resp.Data[0])
}
return UnknownState, fmt.Errorf("unexpected response: %v", resp.Data)
}

// Return the number of tasks in a queue broken down by task_state, and the
// number of requests broken down by the type of request.
func (q *queue) Statistic() (interface{}, error) {
Expand All @@ -333,11 +433,16 @@ func makeCmd(q *queue) {
take: "queue.tube." + q.name + ":take",
drop: "queue.tube." + q.name + ":drop",
peek: "queue.tube." + q.name + ":peek",
touch: "queue.tube." + q.name + ":touch",
ack: "queue.tube." + q.name + ":ack",
delete: "queue.tube." + q.name + ":delete",
bury: "queue.tube." + q.name + ":bury",
kick: "queue.tube." + q.name + ":kick",
release: "queue.tube." + q.name + ":release",
releaseAll: "queue.tube." + q.name + ":release_all",
cfg: "queue.cfg",
identify: "queue.identify",
state: "queue.state",
statistics: "queue.statistics",
}
}
Expand Down
Loading