diff --git a/crud.lua b/crud.lua index 2777013e8..cc6a78f19 100644 --- a/crud.lua +++ b/crud.lua @@ -12,7 +12,7 @@ local select = require('crud.select') local truncate = require('crud.truncate') local len = require('crud.len') local borders = require('crud.borders') -local sharding_key = require('crud.common.sharding_key') +local sharding_metadata = require('crud.common.sharding.sharding_metadata') local utils = require('crud.common.utils') local crud = {} @@ -114,7 +114,7 @@ function crud.init_storage() truncate.init() len.init() borders.init() - sharding_key.init() + sharding_metadata.init() end function crud.init_router() diff --git a/crud/common/sharding/init.lua b/crud/common/sharding/init.lua index b5f5e8df2..a580e2dec 100644 --- a/crud/common/sharding/init.lua +++ b/crud/common/sharding/init.lua @@ -4,7 +4,7 @@ local errors = require('errors') local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false}) local utils = require('crud.common.utils') -local sharding_key_module = require('crud.common.sharding.sharding_key') +local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local sharding = {} @@ -22,7 +22,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id) end local sharding_index_parts = space.index[0].parts - local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space.name) + local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space.name) if err ~= nil then return nil, err end diff --git a/crud/common/sharding/sharding_metadata.lua b/crud/common/sharding/sharding_metadata.lua new file mode 100644 index 000000000..222c6c695 --- /dev/null +++ b/crud/common/sharding/sharding_metadata.lua @@ -0,0 +1,142 @@ +local fiber = require('fiber') +local errors = require('errors') + +local call = require('crud.common.call') +local const = require('crud.common.const') +local dev_checks = require('crud.common.dev_checks') +local cache = require('crud.common.sharding_key_cache') + +local FetchShardingKeyError = errors.new_class('FetchShardingKeyError', {capture_stack = false}) + +local FETCH_FUNC_NAME = '_crud.fetch_on_storage' + +local sharding_key_module = {} + +-- Function decorator that is used to prevent _fetch_on_router() from being +-- called concurrently by different fibers. +local function locked(f) + dev_checks('function') + + return function(timeout, ...) + local timeout_deadline = fiber.clock() + timeout + local ok = cache.fetch_lock:put(true, timeout) + -- channel:put() returns false in two cases: when timeout is exceeded + -- or channel has been closed. However error message describes only + -- first reason, I'm not sure we need to disclose to users such details + -- like problems with synchronization objects. + if not ok then + return FetchShardingKeyError:new( + "Timeout for fetching sharding key is exceeded") + end + local timeout = timeout_deadline - fiber.clock() + local status, err = pcall(f, timeout, ...) + cache.fetch_lock:get() + if not status or err ~= nil then + return err + end + end +end + +-- Return a map with metadata or nil when space box.space._ddl_sharding_key is +-- not available on storage. +function sharding_key_module.fetch_on_storage() + local sharding_key_space = box.space._ddl_sharding_key + if sharding_key_space == nil then + return nil + end + + local SPACE_NAME_FIELDNO = 1 + local SPACE_SHARDING_KEY_FIELDNO = 2 + local metadata_map = {} + for _, tuple in sharding_key_space:pairs() do + local space_name = tuple[SPACE_NAME_FIELDNO] + local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO] + local space_format = box.space[space_name]:format() + metadata_map[space_name] = { + sharding_key_def = sharding_key_def, + space_format = space_format, + } + end + + return metadata_map +end + +-- Under high load we may get a case when more than one fiber will fetch +-- metadata from storages. It is not good from performance point of view. +-- locked() wraps a _fetch_on_router() to limit a number of fibers that fetches +-- a sharding metadata by a single one, other fibers will wait while +-- cache.fetch_lock become unlocked during timeout passed to +-- _fetch_on_router(). +local _fetch_on_router = locked(function(timeout) + dev_checks('number') + + if cache.sharding_key_as_index_obj_map ~= nil then + return + end + + local metadata_map, err = call.any(FETCH_FUNC_NAME, {}, { + timeout = timeout + }) + if err ~= nil then + return err + end + if metadata_map == nil then + cache.sharding_key_as_index_obj_map = {} + return + end + + cache.sharding_key_as_index_obj_map = {} + for space_name, metadata in pairs(metadata_map) do + local sharding_key_as_index_obj, err = as_index_object(space_name, + metadata.space_format, + metadata.sharding_key_def) + if err ~= nil then + return err + end + cache.sharding_key_as_index_obj_map[space_name] = sharding_key_as_index_obj + end +end) + +-- Get sharding index for a certain space. +-- +-- Return: +-- - sharding key as index object, when sharding key definition found on +-- storage. +-- - nil, when sharding key definition was not found on storage. Pay attention +-- that nil without error is a successfull return value. +-- - nil and error, when something goes wrong on fetching attempt. +-- +function sharding_key_module.fetch_on_router(space_name, timeout) + dev_checks('string', '?number') + + if cache.sharding_key_as_index_obj_map ~= nil then + return cache.sharding_key_as_index_obj_map[space_name] + end + + local timeout = timeout or const.FETCH_SHARDING_KEY_TIMEOUT + local err = _fetch_on_router(timeout) + if err ~= nil then + if cache.sharding_key_as_index_obj_map ~= nil then + return cache.sharding_key_as_index_obj_map[space_name] + end + return nil, err + end + + if cache.sharding_key_as_index_obj_map ~= nil then + return cache.sharding_key_as_index_obj_map[space_name] + end + + return nil, FetchShardingKeyError:new( + "Fetching sharding key for space '%s' is failed", space_name) +end + +function sharding_key_module.update_cache(space_name) + cache.drop_caches() + return sharding_key_module.fetch_on_router(space_name) +end + +function sharding_key_module.init() + _G._crud.fetch_on_storage = sharding_key_module.fetch_on_storage +end + +return sharding_key_module diff --git a/crud/common/sharding/sharding_key_cache.lua b/crud/common/sharding/sharding_metadata_cache.lua similarity index 100% rename from crud/common/sharding/sharding_key_cache.lua rename to crud/common/sharding/sharding_metadata_cache.lua diff --git a/crud/select/compat/select.lua b/crud/select/compat/select.lua index ed7d2be90..bb968f847 100644 --- a/crud/select/compat/select.lua +++ b/crud/select/compat/select.lua @@ -7,7 +7,7 @@ local sharding = require('crud.common.sharding.init') local dev_checks = require('crud.common.dev_checks') local common = require('crud.select.compat.common') local schema = require('crud.common.schema') -local sharding_key_module = require('crud.common.sharding.sharding_key') +local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local compare_conditions = require('crud.compare.conditions') local select_plan = require('crud.select.plan') @@ -51,7 +51,7 @@ local function build_select_iterator(space_name, user_conditions, opts) return nil, SelectError:new("Space %q doesn't exist", space_name), true end local space_format = space:format() - local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name) + local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space_name) if err ~= nil then return nil, err end diff --git a/crud/select/compat/select_old.lua b/crud/select/compat/select_old.lua index 04483cfd1..18b6bfc17 100644 --- a/crud/select/compat/select_old.lua +++ b/crud/select/compat/select_old.lua @@ -8,7 +8,7 @@ local utils = require('crud.common.utils') local sharding = require('crud.common.sharding.init') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') -local sharding_key_module = require('crud.common.sharding.sharding_key') +local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local compare_conditions = require('crud.compare.conditions') local select_plan = require('crud.select.plan') @@ -103,7 +103,7 @@ local function build_select_iterator(space_name, user_conditions, opts) return nil, SelectError:new("Space %q doesn't exist", space_name), true end local space_format = space:format() - local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name) + local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space_name) if err ~= nil then return nil, err end diff --git a/test/helper.lua b/test/helper.lua index 9a4779cf3..6661a37a3 100644 --- a/test/helper.lua +++ b/test/helper.lua @@ -325,10 +325,10 @@ end function helpers.update_cache(cluster, space_name) return cluster.main_server.net_box:eval([[ - local sharding_key = require('crud.common.sharding.sharding_key') + local sharding_metadata = require('crud.common.sharding.sharding_metadata') local space_name = ... - return sharding_key.update_cache(space_name) + return sharding_metadata.update_cache(space_name) ]], {space_name}) end diff --git a/test/unit/sharding_key_test.lua b/test/unit/sharding_metadata_test.lua similarity index 97% rename from test/unit/sharding_key_test.lua rename to test/unit/sharding_metadata_test.lua index fecdac83c..fb4fb5b8c 100644 --- a/test/unit/sharding_key_test.lua +++ b/test/unit/sharding_metadata_test.lua @@ -1,11 +1,12 @@ local t = require('luatest') +local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local sharding_key_module = require('crud.common.sharding.sharding_key') -local cache = require('crud.common.sharding.sharding_key_cache') +local cache = require('crud.common.sharding.sharding_metadata_cache') local utils = require('crud.common.utils') local helpers = require('test.helper') -local g = t.group('sharding_key') +local g = t.group('sharding_metadata') g.before_each(function() local sharding_key_format = {