Skip to content
This repository was archived by the owner on Apr 14, 2022. It is now read-only.

Handle unique index constraint violation in shard #193

Merged
merged 2 commits into from
Jul 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions graphql/accessor_general.lua
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ local DEF_TIMEOUT_MS = 1000
-- save start time at start, calculate current time on each iteration and
-- substract the start time from it, compare with the timeout. With such
-- approch we don't add the timeout in nanoseconds to a start time and can
-- remove the divide by two below.
local TIMEOUT_INFINITY = 18446744073709551615ULL / (2 * 10^6) -- microseconds
-- remove the divide by two below. The value is roughly equal to 292 years.
local TIMEOUT_INFINITY = 18446744073709551615ULL / (2 * 10^6) -- milliseconds

accessor_general.TIMEOUT_INFINITY = TIMEOUT_INFINITY

Expand Down
104 changes: 88 additions & 16 deletions graphql/accessor_shard.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

local json = require('json')
local yaml = require('yaml')
local digest = require('digest')
local utils = require('graphql.utils')
local shard = utils.optional_require('shard')
local accessor_general = require('graphql.accessor_general')
Expand All @@ -22,6 +23,18 @@ local index_info_cache = {}

local function shard_check_error(func_name, result, err)
if result ~= nil then return end

-- avoid json encoding of an error message (when the error is in the known
-- format)
if type(err) == 'table' and type(err.error) == 'string' then
error({
message = err.error,
extensions = {
shard_error = err,
}
})
end

error(('%s: %s'):format(func_name, json.encode(err)))
end

Expand Down Expand Up @@ -215,6 +228,12 @@ local function space_operation(collection_name, nodes, operation, ...)
return master_result
end

local function get_shard_key_hash(key)
local shards_n = #shard.shards
local num = type(key) == 'number' and key or digest.crc32(key)
return 1 + digest.guava(num, shards_n)
end

-- }}}

--- Check whether a collection (it is sharded space for that accessor) exists.
Expand Down Expand Up @@ -395,6 +414,46 @@ end

--- Update a tuple with an update statements.
---
--- In case when the update should change the storage where the tuple stored
--- we perform insert to the new storage and delete from the old one. The
--- order must be 'first insert, then delete', because insert can report an
--- error in case of unique index constraints violation and we must not
--- perform delete in the case.
---
--- This function emulates (more or less preciselly, see below) behaviour of
--- update as if it would be performed on a local tarantool instance. In case
--- when the tuple resides on the same storage the update operation performs
--- a unique index constraints check within the storage, but not on the overall
--- cluster. In case when the tuple changes its storage the insert operation
--- performs the check within the target storage.
---
--- We can consider this as relaxing of the constraints: the function can
--- silently violate cluster-wide uniqueness constraints or report a
--- violation that was introduced by some previous operation, but cannot
--- report a violation when a local tarantool space would not.
---
--- 'Insert, then delete' approach is applicable and do not lead to a false
--- positive unique index constraint violation when storage nodes are different
--- and do not contain same tuples. We check the first condition in the
--- function and the second is guaranteed by the shard module.
---
--- Note: if one want to use this function as basis for a similar one, but
--- allowing update of a primary key the following details should be noticed. A
--- primary key update that **changes a storage** where the tuple saved can be
--- performed with the 'insert, then delete' approach. An update **within one
--- storage** cannot be performed in the following ways:
---
--- * as update (because tarantool forbids update of a primary key),
--- * 'insert, then delete' way (because insert can report a unique index
--- constraint violation due to values in the old version of the tuple),
--- * 'tuple:update(), then replace' (at least because old tuple resides in the
--- storage and because an other tuple can be silently rewritten).
---
--- To support primary key update for **one storage** case one can use 'delete,
--- then insert' way and perform the rollback action (insert old tuple) in case
--- when insert of the new tuple reports an error. There are other ways, e.g.
--- manual unique constraints check.
---
--- @tparam table self accessor_general instance
---
--- @tparam string collection_name
Expand All @@ -420,16 +479,6 @@ local function update_tuple(self, collection_name, key, statements, opts)

shard_check_status(func_name)

local is_shard_key_to_be_updated = false
for _, statement in ipairs(statements) do
-- statement is {operator, field_no, value}
local field_no = statement[2]
if field_no == SHARD_KEY_FIELD_NO then
is_shard_key_to_be_updated = true
break
end
end

-- We follow tarantool convention and disallow update of primary key parts.
local primary_index_info = get_index_info(collection_name, 0)
for _, statement in ipairs(statements) do
Expand All @@ -443,14 +492,37 @@ local function update_tuple(self, collection_name, key, statements, opts)
end
end

local is_shard_key_to_be_updated = false
local new_shard_key_value
for _, statement in ipairs(statements) do
-- statement is {operator, field_no, value}
local field_no = statement[2]
if field_no == SHARD_KEY_FIELD_NO then
is_shard_key_to_be_updated = true
new_shard_key_value = statement[3]
break
end
end

local tuple = opts.tuple or get_tuple(self, collection_name, key)

local is_storage_to_be_changed = false
if is_shard_key_to_be_updated then
local tuple = self.funcs.delete_tuple(self, collection_name, key,
{tuple = opts.tuple})
tuple = tuple:update(statements)
return self.funcs.insert_tuple(self, collection_name, tuple)
local old_shard_key_value = tuple[1]
local old_shard_key_hash = get_shard_key_hash(old_shard_key_value)
local new_shard_key_hash = get_shard_key_hash(new_shard_key_value)
is_storage_to_be_changed = old_shard_key_hash ~= new_shard_key_hash
end

if is_storage_to_be_changed then
-- different storages case
local old_tuple = opts.tuple or get_tuple(self, collection_name, key)
local new_tuple = old_tuple:update(statements)
self.funcs.insert_tuple(self, collection_name, new_tuple)
self.funcs.delete_tuple(self, collection_name, key, {tuple = old_tuple})
return new_tuple
else
local tuple = opts.tuple or get_tuple(self, collection_name,
key)
-- one storage case
local nodes = shard.shard(tuple[SHARD_KEY_FIELD_NO])
local tuple = space_operation(collection_name, nodes, 'update', key,
statements)
Expand Down
Loading