diff --git a/CHANGELOG.md b/CHANGELOG.md index 1177e4a8e..11b0e54e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. ### Added +- Support queue 1.2.0 (#177) + ### Changed ### Fixed diff --git a/Makefile b/Makefile index 418b3c89f..7149e05ed 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ clean: .PHONY: deps deps: clean - ( cd ./queue; tarantoolctl rocks install queue 1.1.0 ) + ( cd ./queue; tarantoolctl rocks install queue 1.2.0 ) .PHONY: datetime-timezones datetime-timezones: diff --git a/queue/config.lua b/queue/config.lua index cb64f4df8..df28496a3 100644 --- a/queue/config.lua +++ b/queue/config.lua @@ -9,6 +9,7 @@ box.cfg{ box.once("init", function() box.schema.user.create('test', {password = 'test'}) + box.schema.func.create('queue.tube.test_queue:touch') box.schema.func.create('queue.tube.test_queue:ack') box.schema.func.create('queue.tube.test_queue:put') box.schema.func.create('queue.tube.test_queue:drop') @@ -17,7 +18,10 @@ box.cfg{ box.schema.func.create('queue.tube.test_queue:take') box.schema.func.create('queue.tube.test_queue:delete') box.schema.func.create('queue.tube.test_queue:release') + box.schema.func.create('queue.tube.test_queue:release_all') box.schema.func.create('queue.tube.test_queue:bury') + box.schema.func.create('queue.identify') + box.schema.func.create('queue.state') box.schema.func.create('queue.statistics') box.schema.user.grant('test', 'create', 'space') box.schema.user.grant('test', 'write', 'space', '_schema') @@ -33,6 +37,7 @@ box.cfg{ box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers') box.schema.user.grant('test', 'read,write', 'space', '_priv') box.schema.user.grant('test', 'read,write', 'space', '_queue_taken_2') + box.schema.user.grant('test', 'read,write', 'space', '_queue_shared_sessions') if box.space._trigger ~= nil then box.schema.user.grant('test', 'read', 'space', '_trigger') end diff --git a/queue/const.go b/queue/const.go index f984fac62..0e0eadcc7 100644 --- a/queue/const.go +++ b/queue/const.go @@ -16,3 +16,22 @@ const ( UTUBE queueType = "utube" UTUBE_TTL queueType = "utubettl" ) + +type State int + +const ( + UnknownState State = iota + InitState + StartupState + RunningState + EndingState + WaitingState +) + +var strToState = map[string]State{ + "INIT": InitState, + "STARTUP": StartupState, + "RUNNING": RunningState, + "ENDING": EndingState, + "WAITING": WaitingState, +} diff --git a/queue/queue.go b/queue/queue.go index c40b8a0df..cbe80e057 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -12,14 +12,23 @@ import ( "fmt" "time" + "github.com/google/uuid" "github.com/tarantool/go-tarantool" ) // Queue is a handle to Tarantool queue's tube. type Queue interface { + // Set queue settings. + Cfg(opts CfgOpts) error // Exists checks tube for existence. // Note: it uses Eval, so user needs 'execute universe' privilege. Exists() (bool, error) + // Identify to a shared session. + // In the queue the session has a unique UUID and many connections may + // share one logical session. Also, the consumer can reconnect to the + // existing session during the ttr time. + // To get the UUID of the current session, call the Queue.Identify(nil). + Identify(u *uuid.UUID) (uuid.UUID, error) // Create creates new tube with configuration. // Note: it uses Eval, so user needs 'execute universe' privilege // Note: you'd better not use this function in your application, cause it is @@ -29,6 +38,8 @@ type Queue interface { // Note: you'd better not use this function in your application, cause it is // administrative task to create or delete queue. Drop() error + // ReleaseAll forcibly returns all taken tasks to a ready state. + ReleaseAll() error // Put creates new task in a tube. Put(data interface{}) (*Task, error) // PutWithOpts creates new task with options different from tube's defaults. @@ -64,6 +75,8 @@ type Queue interface { Kick(count uint64) (uint64, error) // Delete the task identified by its id. Delete(taskId uint64) error + // State returns a current queue state. + State() (State, error) // Statistic returns some statistic about queue. Statistic() (interface{}, error) } @@ -79,11 +92,16 @@ type cmd struct { take string drop string peek string + touch string ack string delete string bury string kick string release string + releaseAll string + cfg string + identify string + state string statistics string } @@ -110,6 +128,26 @@ func (cfg Cfg) getType() string { return kind } +// CfgOpts is argument type for the Queue.Cfg() call. +type CfgOpts struct { + // Enable replication mode. Must be true if the queue is used in master and + // replica mode. With replication mode enabled, the potential loss of + // performance can be ~20% compared to single mode. Default value is false. + InReplicaset bool + // Time to release in seconds. The time after which, if there is no active + // connection in the session, it will be released with all its tasks. + Ttr time.Duration +} + +func (opts CfgOpts) toMap() map[string]interface{} { + ret := make(map[string]interface{}) + ret["in_replicaset"] = opts.InReplicaset + if opts.Ttr != 0 { + ret["ttr"] = opts.Ttr + } + return ret +} + type Opts struct { Pri int // Task priorities. Ttl time.Duration // Task time to live. @@ -161,6 +199,12 @@ func (q *queue) Create(cfg Cfg) error { return err } +// Set queue settings. +func (q *queue) Cfg(opts CfgOpts) error { + _, err := q.conn.Call17(q.cmds.cfg, []interface{}{opts.toMap()}) + return err +} + // Exists checks existance of a tube. func (q *queue) Exists() (bool, error) { cmd := "local name = ... ; return queue.tube[name] ~= nil" @@ -173,6 +217,36 @@ func (q *queue) Exists() (bool, error) { return exist, nil } +// Identify to a shared session. +// In the queue the session has a unique UUID and many connections may share +// one logical session. Also, the consumer can reconnect to the existing +// session during the ttr time. +// To get the UUID of the current session, call the Queue.Identify(nil). +func (q *queue) Identify(u *uuid.UUID) (uuid.UUID, error) { + // Unfortunately we can't use go-tarantool/uuid here: + // https://github.com/tarantool/queue/issues/182 + var args []interface{} + if u == nil { + args = []interface{}{} + } else { + if bytes, err := u.MarshalBinary(); err != nil { + return uuid.UUID{}, err + } else { + args = []interface{}{bytes} + } + } + + if resp, err := q.conn.Call17(q.cmds.identify, args); err == nil { + if us, ok := resp.Data[0].(string); ok { + return uuid.FromBytes([]byte(us)) + } else { + return uuid.UUID{}, fmt.Errorf("unexpected response: %v", resp.Data) + } + } else { + return uuid.UUID{}, err + } +} + // Put data to queue. Returns task. func (q *queue) Put(data interface{}) (*Task, error) { return q.put(data) @@ -251,6 +325,12 @@ func (q *queue) Drop() error { return err } +// ReleaseAll forcibly returns all taken tasks to a ready state. +func (q *queue) ReleaseAll() error { + _, err := q.conn.Call17(q.cmds.releaseAll, []interface{}{}) + return err +} + // Look at a task without changing its state. func (q *queue) Peek(taskId uint64) (*Task, error) { qd := queueData{q: q} @@ -260,6 +340,10 @@ func (q *queue) Peek(taskId uint64) (*Task, error) { return qd.task, nil } +func (q *queue) _touch(taskId uint64, increment time.Duration) (string, error) { + return q.produce(q.cmds.touch, taskId, increment.Seconds()) +} + func (q *queue) _ack(taskId uint64) (string, error) { return q.produce(q.cmds.ack, taskId) } @@ -312,6 +396,22 @@ func (q *queue) Delete(taskId uint64) error { return err } +// State returns a current queue state. +func (q *queue) State() (State, error) { + resp, err := q.conn.Call17(q.cmds.state, []interface{}{}) + if err != nil { + return UnknownState, err + } + + if respState, ok := resp.Data[0].(string); ok { + if state, ok := strToState[respState]; ok { + return state, nil + } + return UnknownState, fmt.Errorf("unknown state: %v", resp.Data[0]) + } + return UnknownState, fmt.Errorf("unexpected response: %v", resp.Data) +} + // Return the number of tasks in a queue broken down by task_state, and the // number of requests broken down by the type of request. func (q *queue) Statistic() (interface{}, error) { @@ -333,11 +433,16 @@ func makeCmd(q *queue) { take: "queue.tube." + q.name + ":take", drop: "queue.tube." + q.name + ":drop", peek: "queue.tube." + q.name + ":peek", + touch: "queue.tube." + q.name + ":touch", ack: "queue.tube." + q.name + ":ack", delete: "queue.tube." + q.name + ":delete", bury: "queue.tube." + q.name + ":bury", kick: "queue.tube." + q.name + ":kick", release: "queue.tube." + q.name + ":release", + releaseAll: "queue.tube." + q.name + ":release_all", + cfg: "queue.cfg", + identify: "queue.identify", + state: "queue.state", statistics: "queue.statistics", } } diff --git a/queue/queue_test.go b/queue/queue_test.go index 807ac79e0..ef59156da 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -22,6 +22,25 @@ var opts = Opts{ //RateLimit: 4*1024, } +func createQueue(t *testing.T, conn *Connection, name string, cfg queue.Cfg) queue.Queue { + t.Helper() + + q := queue.New(conn, name) + if err := q.Create(cfg); err != nil { + t.Fatalf("Failed to create queue: %s", err) + } + + return q +} + +func dropQueue(t *testing.T, q queue.Queue) { + t.Helper() + + if err := q.Drop(); err != nil { + t.Fatalf("Failed to drop queue: %s", err) + } +} + /////////QUEUE///////// func TestFifoQueue(t *testing.T) { @@ -29,56 +48,174 @@ func TestFifoQueue(t *testing.T) { defer conn.Close() name := "test_queue" - q := queue.New(conn, name) - if err := q.Create(queue.Cfg{Temporary: true, Kind: queue.FIFO}); err != nil { - t.Errorf("Failed to create queue: %s", err.Error()) - return - } + q := createQueue(t, conn, name, queue.Cfg{Temporary: true, Kind: queue.FIFO}) + defer dropQueue(t, q) +} - //Drop - if err := q.Drop(); err != nil { - t.Errorf("Failed drop queue: %s", err.Error()) +func TestQueue_Cfg(t *testing.T) { + conn := test_helpers.ConnectWithValidation(t, server, opts) + defer conn.Close() + + name := "test_queue" + q := createQueue(t, conn, name, queue.Cfg{Temporary: true, Kind: queue.FIFO}) + defer dropQueue(t, q) + + err := q.Cfg(queue.CfgOpts{InReplicaset: false, Ttr: 5 * time.Second}) + if err != nil { + t.Fatalf("Unexpected q.Cfg() error: %s", err) } } -func TestFifoQueue_GetExist_Statistic(t *testing.T) { +func TestQueue_Identify(t *testing.T) { conn := test_helpers.ConnectWithValidation(t, server, opts) defer conn.Close() name := "test_queue" - q := queue.New(conn, name) - if err := q.Create(queue.Cfg{Temporary: true, Kind: queue.FIFO}); err != nil { - t.Errorf("Failed to create queue: %s", err.Error()) - return + q := createQueue(t, conn, name, queue.Cfg{Temporary: true, Kind: queue.FIFO}) + defer dropQueue(t, q) + + uuid, err := q.Identify(nil) + if err != nil { + t.Fatalf("Failed to identify: %s", err) + } + cpy := uuid + + uuid, err = q.Identify(&cpy) + if err != nil { + t.Fatalf("Failed to identify with uuid %s: %s", cpy, err) } + if cpy.String() != uuid.String() { + t.Fatalf("Unequal UUIDs after re-identify: %s, expected %s", uuid, cpy) + } +} + +func TestQueue_ReIdentify(t *testing.T) { + conn := test_helpers.ConnectWithValidation(t, server, opts) defer func() { - //Drop - err := q.Drop() - if err != nil { - t.Errorf("Failed drop queue: %s", err.Error()) + if conn != nil { + conn.Close() } }() + name := "test_queue" + cfg := queue.Cfg{ + Temporary: true, + Kind: queue.FIFO_TTL, + Opts: queue.Opts{Ttl: 5 * time.Second}, + } + q := createQueue(t, conn, name, cfg) + q.Cfg(queue.CfgOpts{InReplicaset: false, Ttr: 5 * time.Second}) + defer func() { + dropQueue(t, q) + }() + + uuid, err := q.Identify(nil) + if err != nil { + t.Fatalf("Failed to identify: %s", err) + } + newuuid, err := q.Identify(&uuid) + if err != nil { + t.Fatalf("Failed to identify: %s", err) + } + if newuuid.String() != uuid.String() { + t.Fatalf("Unequal UUIDs after re-identify: %s, expected %s", newuuid, uuid) + } + //Put + putData := "put_data" + task, err := q.Put(putData) + if err != nil { + conn.Close() + t.Fatalf("Failed put to queue: %s", err) + } else if err == nil && task == nil { + t.Fatalf("Task is nil after put") + } else { + if task.Data() != putData { + t.Errorf("Task data after put not equal with example. %s != %s", task.Data(), putData) + } + } + + //Take + task, err = q.TakeTimeout(2 * time.Second) + if err != nil { + t.Fatalf("Failed take from queue: %s", err) + } else if task == nil { + t.Fatalf("Task is nil after take") + } + + conn.Close() + conn = nil + + conn = test_helpers.ConnectWithValidation(t, server, opts) + q = queue.New(conn, name) + + //Identify in another connection + newuuid, err = q.Identify(&uuid) + if err != nil { + t.Fatalf("Failed to identify: %s", err) + } + if newuuid.String() != uuid.String() { + t.Fatalf("Unequal UUIDs after re-identify: %s, expected %s", newuuid, uuid) + } + + //Peek in another connection + task, err = q.Peek(task.Id()) + if err != nil { + t.Fatalf("Failed take from queue: %s", err) + } else if task == nil { + t.Fatalf("Task is nil after take") + } + + //Ack in another connection + err = task.Ack() + if err != nil { + t.Errorf("Failed ack %s", err) + } else if !task.IsDone() { + t.Errorf("Task status after take is not done. Status = %s", task.Status()) + } +} + +func TestQueue_State(t *testing.T) { + conn := test_helpers.ConnectWithValidation(t, server, opts) + defer conn.Close() + + name := "test_queue" + q := createQueue(t, conn, name, queue.Cfg{Temporary: true, Kind: queue.FIFO}) + defer dropQueue(t, q) + + state, err := q.State() + if err != nil { + t.Fatalf("Failed to get queue state: %s", err) + } + if state != queue.InitState && state != queue.RunningState { + t.Fatalf("Unexpected state: %d", state) + } +} + +func TestFifoQueue_GetExist_Statistic(t *testing.T) { + conn := test_helpers.ConnectWithValidation(t, server, opts) + defer conn.Close() + + name := "test_queue" + q := createQueue(t, conn, name, queue.Cfg{Temporary: true, Kind: queue.FIFO}) + defer dropQueue(t, q) + ok, err := q.Exists() if err != nil { - t.Errorf("Failed to get exist queue: %s", err.Error()) - return + t.Fatalf("Failed to get exist queue: %s", err) } if !ok { - t.Error("Queue is not found") - return + t.Fatal("Queue is not found") } putData := "put_data" _, err = q.Put(putData) if err != nil { - t.Errorf("Failed to put queue: %s", err.Error()) - return + t.Fatalf("Failed to put queue: %s", err) } stat, err := q.Statistic() if err != nil { - t.Errorf("Failed to get statistic queue: %s", err.Error()) + t.Errorf("Failed to get statistic queue: %s", err) } else if stat == nil { t.Error("Statistic is nil") } @@ -89,29 +226,16 @@ func TestFifoQueue_Put(t *testing.T) { defer conn.Close() name := "test_queue" - q := queue.New(conn, name) - if err := q.Create(queue.Cfg{Temporary: true, Kind: queue.FIFO}); err != nil { - t.Errorf("Failed to create queue: %s", err.Error()) - return - } - - defer func() { - //Drop - err := q.Drop() - if err != nil { - t.Errorf("Failed drop queue: %s", err.Error()) - } - }() + q := createQueue(t, conn, name, queue.Cfg{Temporary: true, Kind: queue.FIFO}) + defer dropQueue(t, q) //Put putData := "put_data" task, err := q.Put(putData) if err != nil { - t.Errorf("Failed put to queue: %s", err.Error()) - return + t.Fatalf("Failed put to queue: %s", err) } else if err == nil && task == nil { - t.Errorf("Task is nil after put") - return + t.Fatalf("Task is nil after put") } else { if task.Data() != putData { t.Errorf("Task data after put not equal with example. %s != %s", task.Data(), putData) @@ -124,29 +248,16 @@ func TestFifoQueue_Take(t *testing.T) { defer conn.Close() name := "test_queue" - q := queue.New(conn, name) - if err := q.Create(queue.Cfg{Temporary: true, Kind: queue.FIFO}); err != nil { - t.Errorf("Failed to create queue: %s", err.Error()) - return - } - - defer func() { - //Drop - err := q.Drop() - if err != nil { - t.Errorf("Failed drop queue: %s", err.Error()) - } - }() + q := createQueue(t, conn, name, queue.Cfg{Temporary: true, Kind: queue.FIFO}) + defer dropQueue(t, q) //Put putData := "put_data" task, err := q.Put(putData) if err != nil { - t.Errorf("Failed put to queue: %s", err.Error()) - return + t.Fatalf("Failed put to queue: %s", err) } else if err == nil && task == nil { - t.Errorf("Task is nil after put") - return + t.Fatalf("Task is nil after put") } else { if task.Data() != putData { t.Errorf("Task data after put not equal with example. %s != %s", task.Data(), putData) @@ -156,7 +267,7 @@ func TestFifoQueue_Take(t *testing.T) { //Take task, err = q.TakeTimeout(2 * time.Second) if err != nil { - t.Errorf("Failed take from queue: %s", err.Error()) + t.Errorf("Failed take from queue: %s", err) } else if task == nil { t.Errorf("Task is nil after take") } else { @@ -171,7 +282,7 @@ func TestFifoQueue_Take(t *testing.T) { err = task.Ack() if err != nil { - t.Errorf("Failed ack %s", err.Error()) + t.Errorf("Failed ack %s", err) } else if !task.IsDone() { t.Errorf("Task status after take is not done. Status = %s", task.Status()) } @@ -212,29 +323,16 @@ func TestFifoQueue_TakeTyped(t *testing.T) { defer conn.Close() name := "test_queue" - q := queue.New(conn, name) - if err := q.Create(queue.Cfg{Temporary: true, Kind: queue.FIFO}); err != nil { - t.Errorf("Failed to create queue: %s", err.Error()) - return - } - - defer func() { - //Drop - err := q.Drop() - if err != nil { - t.Errorf("Failed drop queue: %s", err.Error()) - } - }() + q := createQueue(t, conn, name, queue.Cfg{Temporary: true, Kind: queue.FIFO}) + defer dropQueue(t, q) //Put putData := &customData{customField: "put_data"} task, err := q.Put(putData) if err != nil { - t.Errorf("Failed put to queue: %s", err.Error()) - return + t.Fatalf("Failed put to queue: %s", err) } else if err == nil && task == nil { - t.Errorf("Task is nil after put") - return + t.Fatalf("Task is nil after put") } else { typedData, ok := task.Data().(*customData) if !ok { @@ -249,7 +347,7 @@ func TestFifoQueue_TakeTyped(t *testing.T) { takeData := &customData{} task, err = q.TakeTypedTimeout(2*time.Second, takeData) if err != nil { - t.Errorf("Failed take from queue: %s", err.Error()) + t.Errorf("Failed take from queue: %s", err) } else if task == nil { t.Errorf("Task is nil after take") } else { @@ -269,7 +367,7 @@ func TestFifoQueue_TakeTyped(t *testing.T) { err = task.Ack() if err != nil { - t.Errorf("Failed ack %s", err.Error()) + t.Errorf("Failed ack %s", err) } else if !task.IsDone() { t.Errorf("Task status after take is not done. Status = %s", task.Status()) } @@ -281,29 +379,16 @@ func TestFifoQueue_Peek(t *testing.T) { defer conn.Close() name := "test_queue" - q := queue.New(conn, name) - if err := q.Create(queue.Cfg{Temporary: true, Kind: queue.FIFO}); err != nil { - t.Errorf("Failed to create queue: %s", err.Error()) - return - } - - defer func() { - //Drop - err := q.Drop() - if err != nil { - t.Errorf("Failed drop queue: %s", err.Error()) - } - }() + q := createQueue(t, conn, name, queue.Cfg{Temporary: true, Kind: queue.FIFO}) + defer dropQueue(t, q) //Put putData := "put_data" task, err := q.Put(putData) if err != nil { - t.Errorf("Failed put to queue: %s", err.Error()) - return + t.Fatalf("Failed put to queue: %s", err) } else if err == nil && task == nil { - t.Errorf("Task is nil after put") - return + t.Fatalf("Task is nil after put") } else { if task.Data() != putData { t.Errorf("Task data after put not equal with example. %s != %s", task.Data(), putData) @@ -313,7 +398,7 @@ func TestFifoQueue_Peek(t *testing.T) { //Peek task, err = q.Peek(task.Id()) if err != nil { - t.Errorf("Failed peek from queue: %s", err.Error()) + t.Errorf("Failed peek from queue: %s", err) } else if task == nil { t.Errorf("Task is nil after peek") } else { @@ -332,29 +417,16 @@ func TestFifoQueue_Bury_Kick(t *testing.T) { defer conn.Close() name := "test_queue" - q := queue.New(conn, name) - if err := q.Create(queue.Cfg{Temporary: true, Kind: queue.FIFO}); err != nil { - t.Errorf("Failed to create queue: %s", err.Error()) - return - } - - defer func() { - //Drop - err := q.Drop() - if err != nil { - t.Errorf("Failed drop queue: %s", err.Error()) - } - }() + q := createQueue(t, conn, name, queue.Cfg{Temporary: true, Kind: queue.FIFO}) + defer dropQueue(t, q) //Put putData := "put_data" task, err := q.Put(putData) if err != nil { - t.Errorf("Failed put to queue: %s", err.Error()) - return + t.Fatalf("Failed put to queue: %s", err) } else if err == nil && task == nil { - t.Errorf("Task is nil after put") - return + t.Fatalf("Task is nil after put") } else { if task.Data() != putData { t.Errorf("Task data after put not equal with example. %s != %s", task.Data(), putData) @@ -364,8 +436,7 @@ func TestFifoQueue_Bury_Kick(t *testing.T) { //Bury err = task.Bury() if err != nil { - t.Errorf("Failed bury task %s", err.Error()) - return + t.Fatalf("Failed bury task %s", err) } else if !task.IsBuried() { t.Errorf("Task status after bury is not buried. Status = %s", task.Status()) } @@ -373,17 +444,15 @@ func TestFifoQueue_Bury_Kick(t *testing.T) { //Kick count, err := q.Kick(1) if err != nil { - t.Errorf("Failed kick task %s", err.Error()) - return + t.Fatalf("Failed kick task %s", err) } else if count != 1 { - t.Errorf("Kick result != 1") - return + t.Fatalf("Kick result != 1") } //Take task, err = q.TakeTimeout(2 * time.Second) if err != nil { - t.Errorf("Failed take from queue: %s", err.Error()) + t.Errorf("Failed take from queue: %s", err) } else if task == nil { t.Errorf("Task is nil after take") } else { @@ -397,7 +466,7 @@ func TestFifoQueue_Bury_Kick(t *testing.T) { err = task.Ack() if err != nil { - t.Errorf("Failed ack %s", err.Error()) + t.Errorf("Failed ack %s", err) } else if !task.IsDone() { t.Errorf("Task status after take is not done. Status = %s", task.Status()) } @@ -411,19 +480,8 @@ func TestFifoQueue_Delete(t *testing.T) { defer conn.Close() name := "test_queue" - q := queue.New(conn, name) - if err = q.Create(queue.Cfg{Temporary: true, Kind: queue.FIFO}); err != nil { - t.Errorf("Failed to create queue: %s", err.Error()) - return - } - - defer func() { - //Drop - err := q.Drop() - if err != nil { - t.Errorf("Failed drop queue: %s", err.Error()) - } - }() + q := createQueue(t, conn, name, queue.Cfg{Temporary: true, Kind: queue.FIFO}) + defer dropQueue(t, q) //Put var putData = "put_data" @@ -432,11 +490,9 @@ func TestFifoQueue_Delete(t *testing.T) { for i := 0; i < 2; i++ { tasks[i], err = q.Put(putData) if err != nil { - t.Errorf("Failed put to queue: %s", err.Error()) - return + t.Fatalf("Failed put to queue: %s", err) } else if err == nil && tasks[i] == nil { - t.Errorf("Task is nil after put") - return + t.Fatalf("Task is nil after put") } else { if tasks[i].Data() != putData { t.Errorf("Task data after put not equal with example. %s != %s", tasks[i].Data(), putData) @@ -447,8 +503,7 @@ func TestFifoQueue_Delete(t *testing.T) { //Delete by task method err = tasks[0].Delete() if err != nil { - t.Errorf("Failed bury task %s", err.Error()) - return + t.Fatalf("Failed bury task %s", err) } else if !tasks[0].IsDone() { t.Errorf("Task status after delete is not done. Status = %s", tasks[0].Status()) } @@ -456,8 +511,7 @@ func TestFifoQueue_Delete(t *testing.T) { //Delete by task ID err = q.Delete(tasks[1].Id()) if err != nil { - t.Errorf("Failed bury task %s", err.Error()) - return + t.Fatalf("Failed bury task %s", err) } else if !tasks[0].IsDone() { t.Errorf("Task status after delete is not done. Status = %s", tasks[0].Status()) } @@ -466,7 +520,7 @@ func TestFifoQueue_Delete(t *testing.T) { for i := 0; i < 2; i++ { tasks[i], err = q.TakeTimeout(2 * time.Second) if err != nil { - t.Errorf("Failed take from queue: %s", err.Error()) + t.Errorf("Failed take from queue: %s", err) } else if tasks[i] != nil { t.Errorf("Task is not nil after take. Task is %d", tasks[i].Id()) } @@ -478,28 +532,77 @@ func TestFifoQueue_Release(t *testing.T) { defer conn.Close() name := "test_queue" - q := queue.New(conn, name) - if err := q.Create(queue.Cfg{Temporary: true, Kind: queue.FIFO}); err != nil { - t.Errorf("Failed to create queue: %s", err.Error()) - return + q := createQueue(t, conn, name, queue.Cfg{Temporary: true, Kind: queue.FIFO}) + defer dropQueue(t, q) + + putData := "put_data" + task, err := q.Put(putData) + if err != nil { + t.Fatalf("Failed put to queue: %s", err) + } else if err == nil && task == nil { + t.Fatalf("Task is nil after put") + } else { + if task.Data() != putData { + t.Errorf("Task data after put not equal with example. %s != %s", task.Data(), putData) + } } - defer func() { - //Drop - err := q.Drop() + //Take + task, err = q.Take() + if err != nil { + t.Fatalf("Failed take from queue: %s", err) + } else if task == nil { + t.Fatal("Task is nil after take") + } + + //Release + err = task.Release() + if err != nil { + t.Fatalf("Failed release task %s", err) + } + + if !task.IsReady() { + t.Fatalf("Task status is not ready, but %s", task.Status()) + } + + //Take + task, err = q.Take() + if err != nil { + t.Fatalf("Failed take from queue: %s", err) + } else if task == nil { + t.Fatal("Task is nil after take") + } else { + if task.Data() != putData { + t.Errorf("Task data after take not equal with example. %s != %s", task.Data(), putData) + } + + if !task.IsTaken() { + t.Errorf("Task status after take is not taken. Status = %s", task.Status()) + } + + err = task.Ack() if err != nil { - t.Errorf("Failed drop queue: %s", err.Error()) + t.Errorf("Failed ack %s", err) + } else if !task.IsDone() { + t.Errorf("Task status after take is not done. Status = %s", task.Status()) } - }() + } +} + +func TestQueue_ReleaseAll(t *testing.T) { + conn := test_helpers.ConnectWithValidation(t, server, opts) + defer conn.Close() + + name := "test_queue" + q := createQueue(t, conn, name, queue.Cfg{Temporary: true, Kind: queue.FIFO}) + defer dropQueue(t, q) putData := "put_data" task, err := q.Put(putData) if err != nil { - t.Errorf("Failed put to queue: %s", err.Error()) - return + t.Fatalf("Failed put to queue: %s", err) } else if err == nil && task == nil { - t.Errorf("Task is nil after put") - return + t.Fatalf("Task is nil after put") } else { if task.Data() != putData { t.Errorf("Task data after put not equal with example. %s != %s", task.Data(), putData) @@ -509,33 +612,31 @@ func TestFifoQueue_Release(t *testing.T) { //Take task, err = q.Take() if err != nil { - t.Errorf("Failed take from queue: %s", err.Error()) - return + t.Fatalf("Failed take from queue: %s", err) } else if task == nil { - t.Error("Task is nil after take") - return + t.Fatal("Task is nil after take") } - //Release - err = task.Release() + //ReleaseAll + err = q.ReleaseAll() if err != nil { - t.Errorf("Failed release task %s", err.Error()) - return + t.Fatalf("Failed release task %s", err) } + task, err = q.Peek(task.Id()) + if err != nil { + t.Fatalf("Failed to peek task %s", err) + } if !task.IsReady() { - t.Errorf("Task status is not ready, but %s", task.Status()) - return + t.Fatalf("Task status is not ready, but %s", task.Status()) } //Take task, err = q.Take() if err != nil { - t.Errorf("Failed take from queue: %s", err.Error()) - return + t.Fatalf("Failed take from queue: %s", err) } else if task == nil { - t.Error("Task is nil after take") - return + t.Fatal("Task is nil after take") } else { if task.Data() != putData { t.Errorf("Task data after take not equal with example. %s != %s", task.Data(), putData) @@ -547,7 +648,7 @@ func TestFifoQueue_Release(t *testing.T) { err = task.Ack() if err != nil { - t.Errorf("Failed ack %s", err.Error()) + t.Errorf("Failed ack %s", err) } else if !task.IsDone() { t.Errorf("Task status after take is not done. Status = %s", task.Status()) } @@ -564,28 +665,15 @@ func TestTtlQueue(t *testing.T) { Kind: queue.FIFO_TTL, Opts: queue.Opts{Ttl: 5 * time.Second}, } - q := queue.New(conn, name) - if err := q.Create(cfg); err != nil { - t.Errorf("Failed to create queue: %s", err.Error()) - return - } - - defer func() { - //Drop - err := q.Drop() - if err != nil { - t.Errorf("Failed drop queue: %s", err.Error()) - } - }() + q := createQueue(t, conn, name, cfg) + defer dropQueue(t, q) putData := "put_data" task, err := q.Put(putData) if err != nil { - t.Errorf("Failed put to queue: %s", err.Error()) - return + t.Fatalf("Failed put to queue: %s", err) } else if err == nil && task == nil { - t.Errorf("Task is nil after put") - return + t.Fatalf("Task is nil after put") } else { if task.Data() != putData { t.Errorf("Task data after put not equal with example. %s != %s", task.Data(), putData) @@ -597,7 +685,7 @@ func TestTtlQueue(t *testing.T) { //Take task, err = q.TakeTimeout(2 * time.Second) if err != nil { - t.Errorf("Failed take from queue: %s", err.Error()) + t.Errorf("Failed take from queue: %s", err) } else if task != nil { t.Errorf("Task is not nil after sleep") } @@ -613,28 +701,15 @@ func TestTtlQueue_Put(t *testing.T) { Kind: queue.FIFO_TTL, Opts: queue.Opts{Ttl: 5 * time.Second}, } - q := queue.New(conn, name) - if err := q.Create(cfg); err != nil { - t.Errorf("Failed to create queue: %s", err.Error()) - return - } - - defer func() { - //Drop - err := q.Drop() - if err != nil { - t.Errorf("Failed drop queue: %s", err.Error()) - } - }() + q := createQueue(t, conn, name, cfg) + defer dropQueue(t, q) putData := "put_data" task, err := q.PutWithOpts(putData, queue.Opts{Ttl: 10 * time.Second}) if err != nil { - t.Errorf("Failed put to queue: %s", err.Error()) - return + t.Fatalf("Failed put to queue: %s", err) } else if err == nil && task == nil { - t.Errorf("Task is nil after put") - return + t.Fatalf("Task is nil after put") } else { if task.Data() != putData { t.Errorf("Task data after put not equal with example. %s != %s", task.Data(), putData) @@ -646,7 +721,7 @@ func TestTtlQueue_Put(t *testing.T) { //Take task, err = q.TakeTimeout(2 * time.Second) if err != nil { - t.Errorf("Failed take from queue: %s", err.Error()) + t.Errorf("Failed take from queue: %s", err) } else if task == nil { t.Errorf("Task is nil after sleep") } else { @@ -660,7 +735,7 @@ func TestTtlQueue_Put(t *testing.T) { err = task.Ack() if err != nil { - t.Errorf("Failed ack %s", err.Error()) + t.Errorf("Failed ack %s", err) } else if !task.IsDone() { t.Errorf("Task status after take is not done. Status = %s", task.Status()) } @@ -677,42 +752,32 @@ func TestUtube_Put(t *testing.T) { Kind: queue.UTUBE, IfNotExists: true, } - q := queue.New(conn, name) - if err := q.Create(cfg); err != nil { - t.Errorf("Failed to create queue: %s", err.Error()) - return - } - defer func() { - //Drop - err := q.Drop() - if err != nil { - t.Errorf("Failed drop queue: %s", err.Error()) - } - }() + q := createQueue(t, conn, name, cfg) + defer dropQueue(t, q) data1 := &customData{"test-data-0"} _, err := q.PutWithOpts(data1, queue.Opts{Utube: "test-utube-consumer-key"}) if err != nil { - t.Fatalf("Failed put task to queue: %s", err.Error()) + t.Fatalf("Failed put task to queue: %s", err) } data2 := &customData{"test-data-1"} _, err = q.PutWithOpts(data2, queue.Opts{Utube: "test-utube-consumer-key"}) if err != nil { - t.Fatalf("Failed put task to queue: %s", err.Error()) + t.Fatalf("Failed put task to queue: %s", err) } errChan := make(chan struct{}) go func() { t1, err := q.TakeTimeout(2 * time.Second) if err != nil { - t.Errorf("Failed to take task from utube: %s", err.Error()) + t.Errorf("Failed to take task from utube: %s", err) errChan <- struct{}{} return } time.Sleep(2 * time.Second) if err := t1.Ack(); err != nil { - t.Errorf("Failed to ack task: %s", err.Error()) + t.Errorf("Failed to ack task: %s", err) errChan <- struct{}{} return } @@ -724,10 +789,10 @@ func TestUtube_Put(t *testing.T) { start := time.Now() t2, err := q.TakeTimeout(2 * time.Second) if err != nil { - t.Fatalf("Failed to take task from utube: %s", err.Error()) + t.Fatalf("Failed to take task from utube: %s", err) } if err := t2.Ack(); err != nil { - t.Fatalf("Failed to ack task: %s", err.Error()) + t.Fatalf("Failed to ack task: %s", err) } end := time.Now() if _, ok := <-errChan; ok { @@ -738,6 +803,86 @@ func TestUtube_Put(t *testing.T) { } } +func TestTask_Touch(t *testing.T) { + conn := test_helpers.ConnectWithValidation(t, server, opts) + defer conn.Close() + + tests := []struct { + name string + cfg queue.Cfg + ok bool + }{ + {"test_queue", + queue.Cfg{ + Temporary: true, + Kind: queue.FIFO, + }, + false, + }, + {"test_queue_ttl", + queue.Cfg{ + Temporary: true, + Kind: queue.FIFO_TTL, + Opts: queue.Opts{Ttl: 5 * time.Second}, + }, + true, + }, + {"test_utube", + queue.Cfg{ + Temporary: true, + Kind: queue.UTUBE, + }, + false, + }, + {"test_utube_ttl", + queue.Cfg{ + Temporary: true, + Kind: queue.UTUBE_TTL, + Opts: queue.Opts{Ttl: 5 * time.Second}, + }, + true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + var task *queue.Task + + q := createQueue(t, conn, tc.name, tc.cfg) + defer func() { + if task != nil { + if err := task.Ack(); err != nil { + t.Fatalf("Failed to Ack: %s", err) + } + } + dropQueue(t, q) + }() + + putData := "put_data" + _, err := q.PutWithOpts(putData, + queue.Opts{ + Ttl: 10 * time.Second, + Utube: "test_utube", + }) + if err != nil { + t.Fatalf("Failed put a task: %s", err) + } + + task, err = q.TakeTimeout(2 * time.Second) + if err != nil { + t.Fatalf("Failed to take task from utube: %s", err) + } + + err = task.Touch(1 * time.Second) + if tc.ok && err != nil { + t.Fatalf("Failed to touch: %s", err) + } else if !tc.ok && err == nil { + t.Fatalf("Unexpected success") + } + }) + } +} + // runTestMain is a body of TestMain function // (see https://pkg.go.dev/testing#hdr-Main). // Using defer + os.Exit is not works so TestMain body diff --git a/queue/task.go b/queue/task.go index 2f4242311..01ab26da6 100644 --- a/queue/task.go +++ b/queue/task.go @@ -2,6 +2,7 @@ package queue import ( "fmt" + "time" ) // Task represents a task from Tarantool queue's tube. @@ -54,6 +55,11 @@ func (t *Task) Status() string { return t.status } +// Touch increases ttr of running task. +func (t *Task) Touch(increment time.Duration) error { + return t.accept(t.q._touch(t.id, increment)) +} + // Ack signals about task completion. func (t *Task) Ack() error { return t.accept(t.q._ack(t.id))