From 7bd5593532cec7f086e1fec7f5113f85e3418499 Mon Sep 17 00:00:00 2001 From: Chris Tilt Date: Fri, 4 Oct 2013 12:26:00 -0700 Subject: [PATCH 1/5] Add a background lock and task manager module to riak core. * The goal is to allow riak sub-systems to coordinate use of shared resources, * e.g. protect from concurrent vnode folds on the same partition. * Locks and tokens have a settable maximum concurrency limit. * "Taken" locks and tokens are tracked in an ETS table. * max_concurrency is returned when the set limits are reached. * Processes that take locks are monitored. * Locks are released when the taking processes terminate. * Tokens are refreshed at a specified periodic rate. * Token processes are not monitored because tokens never "release". * A table manager is introduced to add persistence across process crashes, * and to allow proper table transfer to occur without losing the table. * An EQC test exercises the majority of the API. see test/bg_manager_eqc.erl * See the original PR for background manager here: https://github.com/basho/riak_core/pull/364 --- include/riak_core_bg_manager.hrl | 88 +++ src/riak_core_bg_manager.erl | 1142 ++++++++++++++++++++++++++++++ src/riak_core_sup.erl | 18 +- src/riak_core_table_manager.erl | 187 +++++ test/bg_manager_eqc.erl | 1003 ++++++++++++++++++++++++++ test/bg_manager_tests.erl | 196 +++++ 6 files changed, 2631 insertions(+), 3 deletions(-) create mode 100644 include/riak_core_bg_manager.hrl create mode 100644 src/riak_core_bg_manager.erl create mode 100644 src/riak_core_table_manager.erl create mode 100644 test/bg_manager_eqc.erl create mode 100644 test/bg_manager_tests.erl diff --git a/include/riak_core_bg_manager.hrl b/include/riak_core_bg_manager.hrl new file mode 100644 index 000000000..762ed9389 --- /dev/null +++ b/include/riak_core_bg_manager.hrl @@ -0,0 +1,88 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% NOTES: +%% The background manager allows tokens and locks to be "acquired" by +%% competing processes in a way that limits the total load on the cluster. +%% +%% The model is different than your typical semaphore. Here, we are +%% interested in coordinating background jobs that start, run, and die. +%% +%% +%% The term "given" is a general version of "held", "acquired", or +%% "allocated" for both locks and tokens. Held doesn't make sense for +%% tokens since they aren't held. So, "given" applies to both locks +%% and tokens, but you can think "held" for locks if that's more fun. +%% +%% Resources are defined by their "names", which is the same as "type" +%% or "kind". A lock name might be the atom 'aae_hashtree_lock' or the +%% tuple '{my_ultimate_lock, 42}'. +%% +%% Usage: +%% 1. register your lock/token and set it's max concurrency/rate. +%% 2. "get" a lock/token by it's resource type/name +%% 3. do stuff +%% 4. let your process die, which gives back a lock. +%% ------------------------------------------------------------------- +-type bg_lock() :: any(). +-type bg_token() :: any(). +-type bg_resource() :: bg_token() | bg_lock(). +-type bg_resource_type() :: lock | token. + +-type bg_meta() :: {atom(), any()}. %% meta data to associate with a lock/token +-type bg_period() :: pos_integer(). %% token refill period in milliseconds +-type bg_count() :: pos_integer(). %% token refill tokens to count at each refill period +-type bg_rate() :: undefined | {bg_period(), bg_count()}. %% token refill rate +-type bg_concurrency_limit() :: non_neg_integer() | infinity. %% max lock concurrency allowed +-type bg_state() :: given | blocked | failed. %% state of an instance of a resource. + +%% Results of a "ps" of live given or blocked locks/tokens +-record(bg_stat_live, + { + resource :: bg_resource(), %% resource name, e.g. 'aae_hashtree_lock' + type :: bg_resource_type(), %% resource type, e.g. 'lock' + consumer :: pid(), %% process asking for token + meta :: [bg_meta()], %% associated meta data + state :: bg_state() %% result of last request, e.g. 'given' + }). +-type bg_stat_live() :: #bg_stat_live{}. + +%% Results of a "head" or "tail", per resource. Historical query result. +-record(bg_stat_hist, + { + type :: undefined | bg_resource_type(), %% undefined only on default + limit :: non_neg_integer(), %% maximum available, defined by token rate during interval + refills :: non_neg_integer(), %% number of times a token was refilled during interval. 0 if lock + given :: non_neg_integer(), %% number of times this resource was handed out within interval + blocked :: non_neg_integer() %% number of blocked processes waiting for a token + }). +-type bg_stat_hist() :: #bg_stat_hist{}. +-define(BG_DEFAULT_STAT_HIST, + #bg_stat_hist{type=undefined, limit=undefined, refills=0, given=0, blocked=0}). + +-define(BG_DEFAULT_WINDOW_INTERVAL, 60*1000). %% in milliseconds +-define(BG_DEFAULT_OUTPUT_SAMPLES, 20). %% default number of sample windows displayed +-define(BG_DEFAULT_KEPT_SAMPLES, 10000). %% number of history samples to keep + +-define(BG_INFO_ETS_TABLE, background_mgr_info_table). %% name of private lock/token manager info ETS table +-define(BG_INFO_ETS_OPTS, [private, set]). %% creation time properties of info ETS table + +-define(BG_ENTRY_ETS_TABLE, background_mgr_entry_table). %% name of private lock/token manager entry ETS table +-define(BG_ENTRY_ETS_OPTS, [private, bag]). %% creation time properties of entry ETS table + + diff --git a/src/riak_core_bg_manager.erl b/src/riak_core_bg_manager.erl new file mode 100644 index 000000000..92cd56026 --- /dev/null +++ b/src/riak_core_bg_manager.erl @@ -0,0 +1,1142 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% @doc +%% We use two ETS tables to store critical data. In the event this process crashes, +%% the tables will be given back to the table manager and we can reclaim them when +%% we restart. Thus, limits and states are maintained across restarts of the +%% module, but not of the application. Since we are supervised by riak_core_sup, +%% that's fine. +%% +%% === Info Table === +%% The table must be a set and is best if private. See ?BG_INFO_ETS_OPTS in MODULE.hrl. +%% Table Schema... +%% KEY Data Notes +%% --- ---- ----- +%% {info, Resource} #resource_info One token object per key. +%% bypassed boolean() +%% enabled boolean() +%% +%% === Entries Table === +%% The table must be a bag and is best if private. See ?BG_ENTRY_ETS_OPTS in MODULE.hrl. +%% KEY Data Notes +%% --- ---- ----- +%% {given, Resource} #resource_entry Multiple objects per key. +%% +%% ------------------------------------------------------------------- +-module(riak_core_bg_manager). + +-behaviour(gen_server). + +-include("riak_core_bg_manager.hrl"). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +%% API +-export([ + %% Universal + start_link/0, + bypass/1, + bypassed/0, + enabled/0, + enabled/1, + enable/0, + enable/1, + disable/0, + disable/1, + disable/2, + query_resource/3, + all_resources/0, + all_given/0, + %% Locks + concurrency_limit/1, + set_concurrency_limit/2, + set_concurrency_limit/3, + concurrency_limit_reached/1, + get_lock/1, + get_lock/2, + get_lock/3, + lock_info/0, + lock_info/1, + lock_count/1, + all_locks/0, + locks_held/0, + locks_held/1, + %% Tokens + set_token_rate/2, + token_rate/1, + get_token/1, + get_token/2, + get_token/3, + token_info/0, + token_info/1, + all_tokens/0, + tokens_given/0, + tokens_given/1, + %% Testing + start/1 + ]). + +%% reporting +-export([clear_history/0, + head/0, + head/1, + head/2, + head/3, + tail/0, + tail/1, + tail/2, + tail/3, + ps/0, + ps/1 + ]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-define(NOT_TRANSFERED(S), S#state.info_table == undefined orelse S#state.entry_table == undefined). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Starts the server +-spec start_link() -> {ok, pid()} | ignore | {error, term}. +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%% Test entry point to start stand-alone server +start(Interval) -> + gen_server:start({local, ?SERVER}, ?MODULE, [Interval], []). + +%% @doc Global kill switch - causes all locks/tokens to be given out freely without limits. +%% Nothing will be tracked or recorded. +-spec bypass(boolean()) -> ok. +bypass(Switch) -> + gen_server:cast(?SERVER, {bypass, Switch}). + +%% @doc Return bypass state as boolean. +-spec bypassed() -> boolean(). +bypassed() -> + gen_server:call(?SERVER, bypassed). + +%% @doc Enable handing out of all locks and tokens +-spec enable() -> enabled | bypassed. +enable() -> + gen_server:call(?SERVER, enable). + +%% @doc Disable handing out of all locks and tokens +-spec disable() -> disabled | bypassed. +disable() -> + gen_server:call(?SERVER, disable). + +%% @doc Return global enabled status. +-spec enabled() -> enabled | disabled | bypassed. +enabled() -> + gen_server:call(?SERVER, enabled). + +%% @doc Enable handing out resources of the kind specified. If the resource +%% has not already been registered, this will have no effect. +-spec enable(bg_resource()) -> enabled | unregistered | bypassed. +enable(Resource) -> + gen_server:call(?SERVER, {enable, Resource}). + +%% @doc Disable handing out resource of the given kind. +-spec disable(bg_resource()) -> disabled | unregistered | bypassed. +disable(Resource) -> + gen_server:call(?SERVER, {disable, Resource}). + +-spec enabled(bg_resource()) -> enabled | disabled | bypassed. +enabled(Resource) -> + gen_server:call(?SERVER, {enabled, Resource}). + +%% @doc Disable handing out resource of the given kind. If kill == true, +%% processes that currently hold the given resource will be killed. +-spec disable(bg_resource(), boolean()) -> disabled | unregistered | bypassed. +disable(Resource, Kill) -> + gen_server:call(?SERVER, {disable, Resource, Kill}). + +%% @doc Query the current set of registered resources by name, states, and types. +%% The special atom 'all' querys all resources. A list of states and a list +%% of types allows selective query. +-spec query_resource(bg_resource() | all, [bg_state()], [bg_resource_type()]) -> [bg_stat_live()]. +query_resource(Resource, States, Types) -> + gen_server:call(?SERVER, {query_resource, Resource, States, Types}, infinity). + +%% @doc Get a list of all resources of all types in all states +-spec all_resources() -> [bg_stat_live()]. +all_resources() -> + query_resource(all, [given], [token, lock]). + +%% @doc Get a list of all resources of all kinds in the given state +-spec all_given() -> [bg_stat_live()]. +all_given() -> + query_resource(all, [given], [token, lock]). + +%%%%%%%%%%% +%% Lock API +%%%%%%%%%%% + +%% @doc Get the current maximum concurrency for the given lock type. +-spec concurrency_limit(bg_lock()) -> bg_concurrency_limit(). +concurrency_limit(Lock) -> + gen_server:call(?MODULE, {concurrency_limit, Lock}, infinity). + +%% @doc same as `set_concurrency_limit(Type, Limit, false)' +-spec set_concurrency_limit(bg_lock(), bg_concurrency_limit()) -> bg_concurrency_limit(). +set_concurrency_limit(Lock, Limit) -> + set_concurrency_limit(Lock, Limit, false). + +%% @doc Set a new maximum concurrency for the given lock type and return +%% the previous maximum or default. If more locks are held than the new +%% limit how they are handled depends on the value of `Kill'. If `true', +%% then the extra locks are released by killing processes with reason `max_concurrency'. +%% If `false', then the processes holding the extra locks are aloud to do so until they +%% are released. +-spec set_concurrency_limit(bg_lock(), bg_concurrency_limit(), boolean()) -> bg_concurrency_limit(). +set_concurrency_limit(Lock, Limit, Kill) -> + gen_server:call(?MODULE, {set_concurrency_limit, Lock, Limit, Kill}, infinity). + +%% @doc Returns true if the number of held locks is at the limit for the given lock type +-spec concurrency_limit_reached(bg_lock()) -> boolean(). +concurrency_limit_reached(Lock) -> + gen_server:call(?MODULE, {lock_limit_reached, Lock}, infinity). + +%% @doc Acquire a concurrency lock of the given name, if available, +%% and associate the lock with the calling process. Returns the +%% reference to the monitored process or max_concurrency. +-spec get_lock(bg_lock()) -> {ok, reference()} | max_concurrency. +get_lock(Lock) -> + get_lock(Lock, self()). + +%% @doc Acquire a concurrency lock, if available, and associate the +%% lock with the provided pid or metadata. If metadata +%% is provided the lock is associated with the calling process +%% If no locks are available, max_concurrency is returned. +-spec get_lock(bg_lock(), pid() | [{atom(), any()}]) -> {ok, reference()} | max_concurrency. +get_lock(Lock, Pid) when is_pid(Pid) -> + get_lock(Lock, Pid, []); +get_lock(Lock, Opts) when is_list(Opts)-> + get_lock(Lock, self(), Opts). + +%% @doc Acquire a concurrency lock, if available, and associate +%% the lock with the provided pid and metadata. +-spec get_lock(bg_lock(), pid(), [{atom(), any()}]) -> {ok, reference()} | max_concurrency. +get_lock(Lock, Pid, Meta) -> + gen_server:call(?MODULE, {get_lock, Lock, Pid, Meta}, infinity). + +%% @doc Return the current concurrency count of the given lock type. +-spec lock_count(bg_lock()) -> integer() | unregistered. +lock_count(Lock) -> + gen_server:call(?MODULE, {lock_count, Lock}, infinity). + +%% @doc Return list of lock types and associated info. To be returned in this list +%% a lock type must have had its concurrency set or have been enabled/disabled. +-spec lock_info() -> [{bg_lock(), boolean(), bg_concurrency_limit()}]. +lock_info() -> + gen_server:call(?MODULE, lock_info, infinity). + +%% @doc Return the registration info for the named Lock +-spec lock_info(bg_lock()) -> {boolean(), bg_concurrency_limit()} | unregistered. +lock_info(Lock) -> + gen_server:call(?MODULE, {lock_info, Lock}, infinity). + +%% @doc Returns all locks. +-spec all_locks() -> [bg_stat_live()]. +all_locks() -> + query_resource(all, [given], [lock]). + + +%% @doc Returns all currently held locks or those that match Lock +-spec locks_held() -> [bg_stat_live()]. +locks_held() -> + locks_held(all). + +-spec locks_held(bg_lock() | all) -> [bg_stat_live()]. +locks_held(Lock) -> + query_resource(Lock, [given], [lock]). + +%%%%%%%%%%%% +%% Token API +%%%%%%%%%%%% + +%% @doc Set the refill rate of tokens. Return previous value. +-spec set_token_rate(bg_token(), bg_rate()) -> bg_rate(). +set_token_rate(_Token, undefined) -> undefined; +set_token_rate(Token, Rate={_Period, _Count}) -> + gen_server:call(?SERVER, {set_token_rate, Token, Rate}, infinity). + +%% @doc Get the current refill rate of named token. +-spec token_rate(bg_token()) -> bg_rate(). +token_rate(Token) -> + gen_server:call(?SERVER, {token_rate, Token}, infinity). + +%% @doc Get a token without blocking. +%% Associate token with provided pid or metadata. If metadata +%% is provided the lock is associated with the calling process. +%% Returns "max_concurrency" if empty. +-spec get_token(bg_token(), pid() | [{atom(), any()}]) -> ok | max_concurrency. +get_token(Token, Pid) when is_pid(Pid) -> + get_token(Token, Pid, []); +get_token(Token, Meta) -> + get_token(Token, self(), Meta). + +-spec get_token(bg_token()) -> ok | max_concurrency. +get_token(Token) -> + get_token(Token, self()). + +-spec get_token(bg_token(), pid(), [{atom(), any()}]) -> ok | max_concurrency. +get_token(Token, Pid, Meta) -> + gen_server:call(?SERVER, {get_token, Token, Pid, Meta}, infinity). + +%% @doc Return list of token kinds and associated info. To be returned in this list +%% a token must have had its rate set. +-spec token_info() -> [{bg_token(), boolean(), bg_rate()}]. +token_info() -> + gen_server:call(?MODULE, token_info, infinity). + +%% @doc Return the registration info for the named Token +-spec token_info(bg_token()) -> {boolean(), bg_rate()}. +token_info(Token) -> + gen_server:call(?MODULE, {token_info, Token}, infinity). + +-spec all_tokens() -> [bg_stat_live()]. +all_tokens() -> + query_resource(all, [given], [token]). + +%% @doc Get a list of token resources in the given state. +tokens_given() -> + tokens_given(all). +-spec tokens_given(bg_token() | all) -> [bg_stat_live()]. +tokens_given(Token) -> + query_resource(Token, [given], [token]). + +%% Stats/Reporting + +clear_history() -> + gen_server:cast(?SERVER, clear_history). + +%% List history of token manager +%% @doc show history of token request/grants over default and custom intervals. +%% offset is forwards-relative to the oldest sample interval +-spec head() -> [[bg_stat_hist()]]. +head() -> + head(all). +-spec head(bg_token()) -> [[bg_stat_hist()]]. +head(Token) -> + head(Token, ?BG_DEFAULT_OUTPUT_SAMPLES). +-spec head(bg_token(), non_neg_integer()) -> [[bg_stat_hist()]]. +head(Token, NumSamples) -> + head(Token, 0, NumSamples). +-spec head(bg_token(), non_neg_integer(), bg_count()) -> [[bg_stat_hist()]]. +head(Token, Offset, NumSamples) -> + gen_server:call(?SERVER, {head, Token, Offset, NumSamples}, infinity). + +%% @doc return history of token request/grants over default and custom intervals. +%% offset is backwards-relative to the newest sample interval +-spec tail() -> [[bg_stat_hist()]]. +tail() -> + tail(all). +-spec tail(bg_token()) -> [[bg_stat_hist()]]. +tail(Token) -> + tail(Token, ?BG_DEFAULT_OUTPUT_SAMPLES). +-spec tail(bg_token(), bg_count()) -> [[bg_stat_hist()]]. +tail(Token, NumSamples) -> + tail(Token, NumSamples, NumSamples). +-spec tail(bg_token(), bg_count(), bg_count()) -> [[bg_stat_hist()]]. +tail(Token, Offset, NumSamples) -> + gen_server:call(?SERVER, {tail, Token, Offset, NumSamples}, infinity). + +%% @doc List most recent requests/grants for all tokens and locks +-spec ps() -> [bg_stat_live()]. +ps() -> + ps(all). +%% @doc List most recent requests/grants for named resource or one of +%% either 'token' or 'lock'. The later two options will list all +%% resources of that type in the given/locked. +-spec ps(bg_resource() | token | lock) -> [bg_stat_live()]. +ps(Arg) -> + gen_server:call(?SERVER, {ps, Arg}, infinity). + +%%%=================================================================== +%%% Data Structures +%%%=================================================================== + +-type bg_limit() :: bg_concurrency_limit() | bg_rate(). + +%% General settings of a lock type. +-record(resource_info, + {type :: bg_resource_type(), + limit :: bg_limit(), + enabled :: boolean()}). + +-define(resource_type(X), (X)#resource_info.type). +-define(resource_limit(X), (X)#resource_info.limit). +-define(resource_enabled(X), (X)#resource_info.enabled). + +-define(DEFAULT_CONCURRENCY, 0). %% DO NOT CHANGE. DEFAULT SET TO 0 TO ENFORCE "REGISTRATION" +-define(DEFAULT_RATE, undefined).%% DO NOT CHANGE. DEFAULT SET TO 0 TO ENFORCE "REGISTRATION" +-define(DEFAULT_LOCK_INFO, #resource_info{type=lock, enabled=true, limit=?DEFAULT_CONCURRENCY}). +-define(DEFAULT_TOKEN_INFO, #resource_info{type= token, enabled=true, limit=?DEFAULT_RATE}). + +%% An instance of a resource entry in "given" +-record(resource_entry, + {resource :: bg_resource(), + type :: bg_resource_type(), + pid :: pid(), %% owning process + meta :: bg_meta(), %% associated metadata + ref :: reference(), %% optional monitor reference to owning process + state :: bg_state() %% state of item on given + }). + +-define(RESOURCE_ENTRY(Resource, Type, Pid, Meta, Ref, State), + #resource_entry{resource=Resource, type=Type, pid=Pid, meta=Meta, ref=Ref, state=State}). +-define(e_resource(X), (X)#resource_entry.resource). +-define(e_type(X), (X)#resource_entry.type). +-define(e_pid(X), (X)#resource_entry.pid). +-define(e_meta(X), (X)#resource_entry.meta). +-define(e_ref(X), (X)#resource_entry.ref). +-define(e_state(X), (X)#resource_entry.state). + +%%% +%%% Gen Server State record +%%% + +-record(state, + {info_table:: ets:tid(), %% TableID of ?BG_INFO_ETS_TABLE + entry_table:: ets:tid(), %% TableID of ?BG_ENTRY_ETS_TABLE + %% NOTE: None of the following data is persisted across process crashes. + enabled :: boolean(), %% Global enable/disable switch, true at startup + bypassed:: boolean(), %% Global kill switch. false at startup + %% stats + window :: orddict:orddict(), %% bg_resource() -> bg_stat_hist() + history :: queue(), %% bg_resource() -> queue of bg_stat_hist() + window_interval :: bg_period(), %% history window size in milliseconds + window_tref :: reference() %% reference to history window sampler timer + }). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec init([]) -> {ok, #state{}} | + {ok, #state{}, non_neg_integer() | infinity} | + ignore | + {stop, term()}. +init([]) -> + init([?BG_DEFAULT_WINDOW_INTERVAL]); +init([Interval]) -> + lager:debug("Background Manager starting up."), + %% Claiming a table will result in a handle_info('ETS-TRANSFER', ...) message. + %% We have two to claim... + ok = riak_core_table_manager:claim_table(?BG_INFO_ETS_TABLE), + ok = riak_core_table_manager:claim_table(?BG_ENTRY_ETS_TABLE), + State = #state{info_table=undefined, %% resolved in the ETS-TRANSFER handler + entry_table=undefined, %% resolved in the ETS-TRANSFER handler + window=orddict:new(), + enabled=true, + bypassed=false, + window_interval=Interval, + history=queue:new()}, + State2 = schedule_sample_history(State), + {ok, State2}. + +%% @private +%% @doc Handling call messages +-spec handle_call(term(), {pid(), term()}, #state{}) -> + {reply, term(), #state{}} | + {reply, term(), #state{}, non_neg_integer()} | + {noreply, #state{}} | + {noreply, #state{}, non_neg_integer()} | + {stop, term(), term(), #state{}} | + {stop, term(), #state{}}. + +handle_call(bypassed, _From, State=#state{bypassed=Bypassed}) -> + {reply, Bypassed, State}; +handle_call({enabled, Resource}, _From, State) -> + do_handle_call_exception(fun do_enabled/2, [Resource, State], State); +handle_call({enable, Resource}, _From, State) -> + do_handle_call_exception(fun do_enable_resource/3, [Resource, true, State], State); +handle_call({disable, Resource}, _From, State) -> + do_handle_call_exception(fun do_enable_resource/3, [Resource, false, State], State); +handle_call({disable, Lock, Kill}, _From, State) -> + do_handle_call_exception(fun do_disable_lock/3, [Lock, Kill, State], State); +handle_call(enabled, _From, State) -> + {reply, status_of(true, State), State}; +handle_call(enable, _From, State) -> + State2 = update_enabled(true, State), + {reply, status_of(true, State2), State2}; +handle_call(disable, _From, State) -> + State2 = update_enabled(false, State), + {reply, status_of(true, State2), State2}; +handle_call({query_resource, Resource, States, Types}, _From, State) -> + Result = do_query(Resource, States, Types, State), + {reply, Result, State}; +handle_call({get_lock, Lock, Pid, Meta}, _From, State) -> + do_handle_call_exception(fun do_get_resource/5, [Lock, lock, Pid, Meta, State], State); +handle_call({lock_count, Lock}, _From, State) -> + {reply, held_count(Lock, State), State}; +handle_call({lock_limit_reached, Lock}, _From, State) -> + do_handle_call_exception(fun do_lock_limit_reached/2, [Lock, State], State); +handle_call(lock_info, _From, State) -> + do_handle_call_exception(fun do_get_type_info/2, [lock, State], State); +handle_call({lock_info, Lock}, _From, State) -> + do_handle_call_exception(fun do_resource_info/2, [Lock, State], State); +handle_call({concurrency_limit, Lock}, _From, State) -> + do_handle_call_exception(fun do_resource_limit/3, [lock, Lock, State], State); +handle_call({set_concurrency_limit, Lock, Limit, Kill}, _From, State) -> + do_set_concurrency_limit(Lock, Limit, Kill, State); +handle_call({token_rate, Token}, _From, State) -> + do_handle_call_exception(fun do_resource_limit/3, [token, Token, State], State); +handle_call(token_info, _From, State) -> + do_handle_call_exception(fun do_get_type_info/2, [token, State], State); +handle_call({token_info, Token}, _From, State) -> + do_handle_call_exception(fun do_resource_info/2, [Token, State], State); +handle_call({set_token_rate, Token, Rate}, _From, State) -> + do_handle_call_exception(fun do_set_token_rate/3, [Token, Rate, State], State); +handle_call({get_token, Token, Pid, Meta}, _From, State) -> + do_handle_call_exception(fun do_get_resource/5, [Token, token, Pid, Meta, State], State); +handle_call({head, Token, Offset, Count}, _From, State) -> + Result = do_hist(head, Token, Offset, Count, State), + {reply, Result, State}; +handle_call({tail, Token, Offset, Count}, _From, State) -> + Result = do_hist(tail, Token, Offset, Count, State), + {reply, Result, State}; +handle_call({ps, lock}, _From, State) -> + Result = do_query(all, [given], [lock], State), + {reply, Result, State}; +handle_call({ps, token}, _From, State) -> + Result = do_query(all, [given], [token], State), + {reply, Result, State}; +handle_call({ps, Resource}, _From, State) -> + Result = do_query(Resource, [given], [token, lock], State), + {reply, Result, State}. + +%% @private +%% @doc Handling cast messages +-spec handle_cast(term(), #state{}) -> {noreply, #state{}} | + {noreply, #state{}, non_neg_integer()} | + {stop, term(), #state{}}. +handle_cast({bypass, false}, State) -> + {noreply, update_bypassed(false,State)}; +handle_cast({bypass, true}, State) -> + {noreply, update_bypassed(true,State)}; +handle_cast({bypass, _Other}, State) -> + {noreply, State}; +handle_cast(clear_history, State) -> + State2 = do_clear_history(State), + {noreply, State2}. + +%% @private +%% @doc Handling all non call/cast messages +-spec handle_info(term(), #state{}) -> {noreply, #state{}} | + {noreply, #state{}, non_neg_integer()} | + {stop, term(), #state{}}. +%% Handle transfer of ETS table from table manager +handle_info({'ETS-TRANSFER', TableId, Pid, TableName}, State) -> + lager:debug("table_mgr (~p) -> bg_mgr (~p) receiving ownership of TableId: ~p", [Pid, self(), TableId]), + State2 = case TableName of + ?BG_INFO_ETS_TABLE -> State#state{info_table=TableId}; + ?BG_ENTRY_ETS_TABLE -> State#state{entry_table=TableId} + end, + case {State2#state.info_table, State2#state.entry_table} of + {undefined, _E} -> {noreply, State2}; + {_I, undefined} -> {noreply, State2}; + {_I, _E} -> + %% Got both tables, we can proceed with reviving ourself + State3 = validate_holds(State2), + State4 = restore_enabled(true, State3), + State5 = restore_bypassed(false, State4), + reschedule_token_refills(State5), + {noreply, State5} + end; +handle_info({'DOWN', Ref, _, _, _}, State) -> + State2 = release_resource(Ref, State), + {noreply, State2}; +handle_info(sample_history, State) -> + State2 = schedule_sample_history(State), + State3 = do_sample_history(State2), + {noreply, State3}; +handle_info({refill_tokens, Type}, State) -> + State2 = do_refill_tokens(Type, State), + schedule_refill_tokens(Type, State2), + {noreply, State2}; +handle_info(_Info, State) -> + {noreply, State}. + +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec terminate(term(), #state{}) -> term(). +terminate(_Reason, _State) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec code_change(term() | {down, term()}, #state{}, term()) -> {ok, #state{}}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +%% @doc bypass > enable/disable +status_of(_Enabled, #state{bypassed=true}) -> bypassed; +status_of(true, #state{enabled=true}) -> enabled; +status_of(_E,_S) -> disabled. + +%% @private +%% @doc We must have just gotten the table data back after a crash/restart. +%% Walk through the given resources and release any holds by dead processes. +%% Assumes TableId is always valid (called only after transfer) +validate_holds(State=#state{entry_table=TableId}) -> + [validate_hold(Obj, TableId) || Obj <- ets:match_object(TableId, {{given, '_'},'_'})], + State. + +%% @private +%% @doc If the given entry has no alive process associated with it, +%% remove the hold from the ETS table. If it is alive, then we need +%% to re-monitor it and update the table with the new ref. +validate_hold({Key,Entry}=Obj, TableId) when ?e_type(Entry) == lock -> + case is_process_alive(?e_pid(Entry)) of + true -> + %% Still alive. Re-monitor and update table + Ref = monitor(process, ?e_pid(Entry)), + Entry2 = Entry#resource_entry{ref=Ref}, + ets:delete_object(TableId, Obj), + ets:insert(TableId, {Key, Entry2}); + false -> + %% Process is not alive - release the lock + ets:delete_object(TableId, Obj) + end; +validate_hold(_Obj, _TableId) -> %% tokens don't monitor processes + ok. + +%% @doc Update state with bypassed status and store to ETS +update_bypassed(_Bypassed, State) when ?NOT_TRANSFERED(State) -> + State; +update_bypassed(Bypassed, State=#state{info_table=TableId}) -> + ets:insert(TableId, {bypassed, Bypassed}), + State#state{bypassed=Bypassed}. + +%% @doc Update state with enabled status and store to ETS +update_enabled(_Enabled, State) when ?NOT_TRANSFERED(State) -> + State; +update_enabled(Enabled, State=#state{info_table=TableId}) -> + ets:insert(TableId, {enabled, Enabled}), + State#state{enabled=Enabled}. + +%% Assumes tables have been transfered. +restore_boolean(Key, Default, #state{info_table=TableId}) -> + case ets:lookup(TableId, Key) of + [] -> + ets:insert(TableId, {Key, Default}), + Default; + [{_Key,Value} | _Rest] -> + Value + end. + +%% Assumes tables have been transfered. +restore_bypassed(Default, State) -> + State#state{bypassed=restore_boolean(bypassed, Default, State)}. + +%% Assumes tables have been transfered. +restore_enabled(Default, State) -> + State#state{enabled=restore_boolean(enabled, Default, State)}. + +%% @private +%% @doc Wrap a call, to a function with args, with a try/catch that handles +%% thrown exceptions, namely '{unregistered, Resource}' and return the +%% failed error response for a gen server call. +do_handle_call_exception(Function, Args, State) -> + try apply(Function, Args) + catch + Error -> + lager:error("Exception: ~p in function ~p", [Error, Function]), + {reply, Error, State} + end. + +%% @doc Throws {unregistered, Resource} for unknown Lock. +do_disable_lock(_Lock, _Kill, State) when ?NOT_TRANSFERED(State) -> + {noreply, State}; +do_disable_lock(Lock, Kill, State) -> + Info = resource_info(Lock, State), + enforce_type_or_throw(Lock, lock, Info), + maybe_honor_limit(Kill, Lock, 0, State), + do_enable_resource(Lock, false, State). + +%% @doc Throws unregistered for unknown Token +do_set_token_rate(Token, Rate, State) -> + try + Info = resource_info(Token, State), + OldRate = Info#resource_info.limit, + enforce_type_or_throw(Token, token, Info), + State2 = update_limit(Token, Rate, Info, State), + schedule_refill_tokens(Token, State2), + {reply, OldRate, State2} + catch + table_id_undefined -> + %% This could go into a queue to be played when the transfer happens. + {reply, undefined, State}; + {unregistered, Token} -> + {reply, undefined, update_limit(Token, Rate, ?DEFAULT_TOKEN_INFO, State)}; + {badtype, _Token}=Error -> + {reply, Error, State} + end. + +do_get_type_info(_Type, State) when ?NOT_TRANSFERED(State) -> + %% Table not trasnferred yet. + []; +do_get_type_info(Type, State) -> + S = fun({R,_T,E,L}) -> {R,E,L} end, + Resources = all_registered_resources(Type, State), + Infos = [S(resource_info_tuple(Resource, State)) || Resource <- Resources], + {reply, Infos, State}. + +%% Returns empty if the ETS table has not been transferred to us yet. +do_resource_limit(lock, _Resource, State) when ?NOT_TRANSFERED(State) -> + {reply, 0, State}; +do_resource_limit(token, _Resource, State) when ?NOT_TRANSFERED(State) -> + {reply, {0,0}, State}; +do_resource_limit(_Type, Resource, State) -> + Info = resource_info(Resource, State), + Rate = ?resource_limit(Info), + {reply, Rate, State}. + +enforce_type_or_throw(Resource, Type, Info) -> + case ?resource_type(Info) of + Type -> ok; + _Other -> throw({badtype, Resource}) + end. + +do_set_concurrency_limit(Lock, Limit, Kill, State) -> + try + Info = resource_info(Lock, State), + enforce_type_or_throw(Lock, lock, Info), + OldLimit = limit(Info), + State2 = update_limit(Lock, Limit, ?DEFAULT_LOCK_INFO, State), + maybe_honor_limit(Kill, Lock, Limit, State2), + {reply, OldLimit, State2} + catch + table_id_undefined -> + %% This could go into a queue to be played when the transfer happens. + {reply, 0, State}; + {unregistered, Lock} -> + {reply, 0, update_limit(Lock, Limit, ?DEFAULT_LOCK_INFO, State)}; + {badtype, _Lock}=Error -> + {reply, Error, State} + end. + +%% @doc Throws unregistered for unknown Lock +do_resource_info(Lock, State) -> + {_R,_T,E,L} = resource_info_tuple(Lock, State), + {reply, {E,L}, State}. + +%% @doc Throws unregistered for unknown Lock +do_lock_limit_reached(Lock, State) -> + Info = resource_info(Lock, State), + enforce_type_or_throw(Lock, lock, Info), + HeldCount = held_count(Lock, State), + Limit = limit(Info), + {reply, HeldCount >= Limit, State}. + +%% @private +%% @doc Return the maximum allowed number of resources for the given +%% info, which considers the type of resource, e.g. lock vs token. +limit(#resource_info{type=lock, limit=Limit}) -> Limit; +limit(#resource_info{type=token, limit={_Period,MaxCount}}) -> MaxCount. + +%% @private +%% @doc Release the resource associated with the given reference. This is mostly +%% meaningful for locks. +release_resource(_Ref, State) when ?NOT_TRANSFERED(State) -> + State; +release_resource(Ref, State=#state{entry_table=TableId}) -> + %% There should only be one instance of the object, but we'll zap all that match. + Given = [Obj || Obj <- ets:match_object(TableId, {{given, '_'},'_'})], + Matches = [Obj || {_Key,Entry}=Obj <- Given, ?e_ref(Entry) == Ref], + [ets:delete_object(TableId, Obj) || Obj <- Matches], + State. + +maybe_honor_limit(true, Lock, Limit, State) -> + Entries = all_given_entries(State), + Held = [Entry || Entry <- Entries, ?e_type(Entry) == lock, ?e_resource(Entry) == Lock], + case Limit < length(Held) of + true -> + {_Keep, Discards} = lists:split(Limit, Held), + %% killing of processes will generate 'DOWN' messages and release the locks + [erlang:exit(?e_pid(Discard), max_concurrency) || Discard <- Discards], + ok; + false -> + ok + end; +maybe_honor_limit(false, _LockType, _Limit, _State) -> + ok. + +held_count(Resource, State) -> + length(resources_given(Resource, State)). + +do_enabled(Resource, State) -> + Info = resource_info(Resource, State), + {reply, status_of(?resource_enabled(Info), State), State}. + +do_enable_resource(Resource, Enabled, State) -> + Info = resource_info(Resource, State), + State2 = update_resource_enabled(Resource, Enabled, Info, State), + {reply, status_of(Enabled, State2), State2}. + +update_resource_enabled(Resource, Value, Default, State) -> + update_resource_info(Resource, + fun(Info) -> Info#resource_info{enabled=Value} end, + Default#resource_info{enabled=Value}, + State). + +update_limit(Resource, Limit, Default, State) -> + update_resource_info(Resource, + fun(Info) -> Info#resource_info{limit=Limit} end, + Default#resource_info{limit=Limit}, + State). + +update_resource_info(Resource, Fun, Default, State=#state{info_table=TableId}) -> + Key = {info, Resource}, + NewInfo = case ets:lookup(TableId, Key) of + [] -> Default; + [{_Key,Info} | _Rest] -> Fun(Info) + end, + ets:insert(TableId, {Key, NewInfo}), + State. + +%% @doc Throws unregistered for unknown Resource +resource_info(_Resource, State) when ?NOT_TRANSFERED(State) -> + throw(table_id_undefined); +resource_info(Resource, #state{info_table=TableId}) -> + Key = {info,Resource}, + case ets:lookup(TableId, Key) of + [] -> throw({unregistered, Resource}); + [{_Key,Info}] -> Info; + [{_Key,_Info} | _Rest] -> throw({too_many_info_objects, Resource}) + end. + +%% @doc Throws unregistered for unknown Resource +resource_info_tuple(Resource, State) -> + Info = resource_info(Resource, State), + {Resource, ?resource_type(Info), ?resource_enabled(Info), ?resource_limit(Info)}. + +%% @private +%% @doc +%% Get existing token type info from ETS table and schedule all for refill. +%% This is needed because we just reloaded our saved persisent state data +%% after a crash. Assumes table is available. Called only after Transfer. +reschedule_token_refills(State) -> + Tokens = all_registered_resources(token, State), + [schedule_refill_tokens(Token, State) || Token <- Tokens]. + +%% Schedule a timer event to refill tokens of given type +schedule_refill_tokens(_Token, State) when ?NOT_TRANSFERED(State) -> + ok; +schedule_refill_tokens(Token, State) -> + case ?resource_limit(resource_info(Token, State)) of + undefined -> + ok; + {Period, _Count} -> + erlang:send_after(Period, self(), {refill_tokens, Token}) + end. + +%% Schedule a timer event to snapshot the current history +schedule_sample_history(State=#state{window_interval=Interval}) -> + TRef = erlang:send_after(Interval, self(), sample_history), + State#state{window_tref=TRef}. + +%% @doc Update the "limit" history stat for all registered resources into current window. +update_stat_all_limits(State) -> + lists:foldl(fun({Resource, Info}, S) -> + increment_stat_limit(Resource, ?resource_limit(Info), S) + end, + State, + all_resource_info(State)). + +do_sample_history(State) -> + %% Update window with current limits before copying it + State2 = update_stat_all_limits(State), + %% Move the current window of measurements onto the history queues. + %% Trim queue down to ?BG_DEFAULT_KEPT_SAMPLES if too big now. + Queue2 = queue:in(State2#state.window, State2#state.history), + Trimmed = case queue:len(Queue2) > ?BG_DEFAULT_KEPT_SAMPLES of + true -> + {_Discarded, Rest} = queue:out(Queue2), + Rest; + false -> + Queue2 + end, + EmptyWindow = orddict:new(), + State2#state{window=EmptyWindow, history=Trimmed}. + +update_stat_window(Resource, Fun, Default, State=#state{window=Window}) -> + NewWindow = orddict:update(Resource, Fun, Default, Window), + State#state{window=NewWindow}. + +resources_given(Resource, #state{entry_table=TableId}) -> + [Entry || {{given,_R},Entry} <- ets:match_object(TableId, {{given, Resource},'_'})]. + +%% Key = {given, Resource}, +%% [Given || {_K,Given} <- ets:lookup(TableId, Key)]. + +%% @private +%% @doc Add a Resource Entry to the "given" table. Here, we really do want +%% to allow multiple entries because each Resource "name" can be given multiple +%% times. +add_given_entry(Resource, Entry, TableId) -> + Key = {given, Resource}, + ets:insert(TableId, {Key, Entry}). + +remove_given_entries(Resource, State=#state{entry_table=TableId}) -> + Key = {given, Resource}, + ets:delete(TableId, Key), + State. + +%% @private +%% @doc Add a resource queue entry to our given set. +give_resource(Entry, State=#state{entry_table=TableId}) -> + Resource = ?e_resource(Entry), + Type = ?e_type(Entry), + add_given_entry(Resource, Entry#resource_entry{state=given}, TableId), + %% update given stats + increment_stat_given(Resource, Type, State). + +%% @private +%% @doc Add Resource to our given set. +give_resource(Resource, Type, Pid, Ref, Meta, State) -> + Entry = ?RESOURCE_ENTRY(Resource, Type, Pid, Meta, Ref, given), + give_resource(Entry, State). + +-spec try_get_resource(boolean(), bg_resource(), bg_resource_type(), pid(), [{atom(), any()}], #state{}) -> + {max_concurrency, #state{}} + | {ok, #state{}} + | {{ok, reference()}, #state{}}. +try_get_resource(false, _Resource, _Type, _Pid, _Meta, State) -> + {max_concurrency, State}; +try_get_resource(true, Resource, Type, Pid, Meta, State) -> + case Type of + token -> + Ref = random_bogus_ref(), + {ok, give_resource(Resource, Type, Pid, Ref, Meta, State)}; + lock -> + Ref = monitor(process, Pid), + {{ok,Ref}, give_resource(Resource, Type, Pid, Ref, Meta, State)} + end. + +%% @private +%% @doc reply now if resource is available. Returns max_concurrency +%% if resource not available or globally or specifically disabled. +-spec do_get_resource(bg_resource(), bg_resource_type(), pid(), [{atom(), any()}], #state{}) -> + {reply, max_concurrency, #state{}} + | {reply, {ok, #state{}}} + | {reply, {{ok, reference()}, #state{}}}. +do_get_resource(_Resource, _Type, _Pid, _Meta, State) when ?NOT_TRANSFERED(State) -> + %% Table transfer has not occurred yet. Reply "max_concurrency" so that callers + %% will try back later, hopefully when we have our table back. + {reply, max_concurrency, State}; +%% @doc When the API is bypassed, we ignore concurrency limits. +do_get_resource(Resource, Type, Pid, Meta, State=#state{bypassed=true}) -> + {Result, State2} = try_get_resource(true, Resource, Type, Pid, Meta, State), + {reply, Result, State2}; +do_get_resource(_Resource, _Type, _Pid, _Meta, State=#state{enabled=false}) -> + {reply, max_concurrency, State}; +do_get_resource(Resource, Type, Pid, Meta, State) -> + Info = resource_info(Resource, State), + enforce_type_or_throw(Resource, Type, Info), + Enabled = ?resource_enabled(Info), + Limit = limit(Info), + Given = length(resources_given(Resource, State)), + {Result, State2} = try_get_resource(Enabled andalso not (Given >= Limit), + Resource, Type, Pid, Meta, State), + {reply, Result, State2}. + +%% @private +%% @doc This should create a unique reference every time it's called; and should +%% not repeat across process restarts since our ETS table lives across process +%% lifetimes. This is needed to create unique entries in the "given" table. +random_bogus_ref() -> + make_ref(). + +all_resource_info(#state{info_table=TableId}) -> + [{Resource, Info} || {{info, Resource}, Info} <- ets:match_object(TableId, {{info, '_'},'_'})]. + +all_registered_resources(Type, #state{info_table=TableId}) -> + [Resource || {{info, Resource}, Info} <- ets:match_object(TableId, {{info, '_'},'_'}), + ?resource_type(Info) == Type]. + +all_given_entries(#state{entry_table=TableId}) -> + %% multiple entries per resource type, i.e. uses the "bag" + [Entry || {{given, _Resource}, Entry} <- ets:match_object(TableId, {{given, '_'},'_'})]. + +format_entry(Entry) -> + #bg_stat_live + { + resource = ?e_resource(Entry), + type = ?e_type(Entry), + consumer = ?e_pid(Entry), + meta = ?e_meta(Entry), + state = ?e_state(Entry) + }. + +fmt_live_entries(Entries) -> + [format_entry(Entry) || Entry <- Entries]. + +%% States :: [given], Types :: [lock | token] +do_query(_Resource, _States, _Types, State) when ?NOT_TRANSFERED(State) -> + %% Table hasn't been transfered yet. + []; +do_query(all, States, Types, State) -> + E1 = case lists:member(given, States) of + true -> + Entries = all_given_entries(State), + lists:flatten([Entry || Entry <- Entries, + lists:member(?e_type(Entry), Types)]); + false -> + [] + end, + fmt_live_entries(E1); +do_query(Resource, States, Types, State) -> + E1 = case lists:member(given, States) of + true -> + Entries = resources_given(Resource, State), + [Entry || Entry <- Entries, lists:member(?e_type(Entry), Types)]; + false -> + [] + end, + fmt_live_entries(E1). + +%% @private +%% @doc Token refill timer event handler. +%% Capture stats of what was given in the previous period, +%% Clear all tokens of this type from the given set, +do_refill_tokens(Token, State) -> + State2 = increment_stat_refills(Token, State), + remove_given_entries(Token, State2). + +default_refill(Token, State) -> + Limit = limit(resource_info(Token, State)), + ?BG_DEFAULT_STAT_HIST#bg_stat_hist{type=token, refills=1, limit=Limit}. + +default_given(Token, Type, State) -> + Limit = limit(resource_info(Token, State)), + ?BG_DEFAULT_STAT_HIST#bg_stat_hist{type=Type, given=1, limit=Limit}. + +increment_stat_limit(_Resource, undefined, State) -> + State; +increment_stat_limit(Resource, Limit, State) -> + {Type, Count} = case Limit of + {_Period, C} -> {token, C}; + N -> {lock, N} + end, + update_stat_window(Resource, + fun(Stat) -> Stat#bg_stat_hist{limit=Count} end, + ?BG_DEFAULT_STAT_HIST#bg_stat_hist{type=Type, limit=Count}, + State). + +increment_stat_refills(Token, State) -> + update_stat_window(Token, + fun(Stat) -> Stat#bg_stat_hist{refills=1+Stat#bg_stat_hist.refills} end, + default_refill(Token, State), + State). + +increment_stat_given(Token, Type, State) -> + update_stat_window(Token, + fun(Stat) -> Stat#bg_stat_hist{given=1+Stat#bg_stat_hist.given} end, + default_given(Token, Type, State), + State). + +%% erase saved history +do_clear_history(State=#state{window_tref=TRef}) -> + erlang:cancel_timer(TRef), + State2 = State#state{history=queue:new()}, + schedule_sample_history(State2). + +%% Return stats history from head or tail of stats history queue +do_hist(_End, _Resource, _Offset, _Count, State) when ?NOT_TRANSFERED(State) -> + []; +do_hist(End, Resource, Offset, Count, State) when Offset < 0 -> + do_hist(End, Resource, 0, Count, State); +do_hist(_End, _Resource, _Offset, Count, _State) when Count =< 0 -> + []; +do_hist(End, Resource, Offset, Count, #state{history=HistQueue}) -> + QLen = queue:len(HistQueue), + First = max(1, case End of + head -> min(Offset+1, QLen); + tail -> QLen - Offset + 1 + end), + Last = min(QLen, max(First + Count - 1, 1)), + H = case segment_queue(First, Last, HistQueue) of + empty -> []; + {ok, Hist } -> + case Resource of + all -> + StatsDictList = queue:to_list(Hist), + [orddict:to_list(Stat) || Stat <- StatsDictList]; + _T -> + [[{Resource, stat_window(Resource, StatsDict)}] + || StatsDict <- queue:to_list(Hist), stat_window(Resource, StatsDict) =/= undefined] + end + end, + %% Remove empty windows + lists:filter(fun(S) -> S =/= [] end, H). + +segment_queue(First, Last, _Q) when Last < First -> + empty; +segment_queue(First, Last, Queue) -> + QLen = queue:len(Queue), + case QLen >= Last andalso QLen > 0 of + true -> + %% trim off extra tail, then trim head + Front = case QLen == Last of + true -> Queue; + false -> + {QFirst, _QRest} = queue:split(Last, Queue), + QFirst + end, + case First == 1 of + true -> {ok, Front}; + false -> + {_Skip, Back} = queue:split(First-1, Front), + {ok, Back} + end; + false -> + %% empty + empty + end. + +%% @private +%% @doc Get stat history for given token type from sample set +-spec stat_window(bg_resource(), orddict:orddict()) -> bg_stat_hist(). +stat_window(Resource, Window) -> + case orddict:find(Resource, Window) of + error -> undefined; + {ok, StatHist} -> StatHist + end. diff --git a/src/riak_core_sup.erl b/src/riak_core_sup.erl index f44926392..be0301a5e 100644 --- a/src/riak_core_sup.erl +++ b/src/riak_core_sup.erl @@ -2,7 +2,7 @@ %% %% riak_core: Core Riak Application %% -%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved. %% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file @@ -24,6 +24,8 @@ -behaviour(supervisor). +-include("riak_core_bg_manager.hrl"). + %% API -export([start_link/0]). -export([ensembles_enabled/0]). @@ -32,9 +34,17 @@ -export([init/1]). %% Helper macro for declaring children of supervisor --define(CHILD(I, Type, Timeout), {I, {I, start_link, []}, permanent, Timeout, Type, [I]}). +-define(CHILD(I, Type, Timeout, Args), {I, {I, start_link, Args}, permanent, Timeout, Type, [I]}). +-define(CHILD(I, Type, Timeout), ?CHILD(I, Type, Timeout, [])). -define(CHILD(I, Type), ?CHILD(I, Type, 5000)). +%% ETS tables to be created and maintained by riak_core_table_manager, which is not linked +%% to any processes except this supervisor. Please keep it that way so tables don't get lost +%% when their user processes crash. Implement ETS-TRANSFER handler for user processes. +-define(TBL_MGR_ARGS, [{?BG_INFO_ETS_TABLE, ?BG_INFO_ETS_OPTS}, + {?BG_ENTRY_ETS_TABLE, ?BG_ENTRY_ETS_OPTS} + ]). + %% =================================================================== %% API functions %% =================================================================== @@ -56,7 +66,9 @@ init([]) -> permanent, 30000, supervisor, [riak_ensemble_sup]}, Children = lists:flatten( - [?CHILD(riak_core_sysmon_minder, worker), + [?CHILD(riak_core_table_manager, worker, 5000, [?TBL_MGR_ARGS]), + ?CHILD(riak_core_bg_manager, worker), + ?CHILD(riak_core_sysmon_minder, worker), ?CHILD(riak_core_vnode_sup, supervisor, 305000), ?CHILD(riak_core_eventhandler_sup, supervisor), [?CHILD(riak_core_dist_mon, worker) || DistMonEnabled], diff --git a/src/riak_core_table_manager.erl b/src/riak_core_table_manager.erl new file mode 100644 index 000000000..1f1e9afe8 --- /dev/null +++ b/src/riak_core_table_manager.erl @@ -0,0 +1,187 @@ +%% ------------------------------------------------------------------- +%% +%% riak_core_table_manager: ETS table ownership and crash protection +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc A gen_server process that creates and serves as heir to a +%% ETS table; and coordinates with matching user processes. If a user +%% process exits, this server will inherit the ETS table and hand it +%% back to the user process when it restarts. +%% +%% For theory of operation, please see the web page: +%% http://steve.vinoski.net/blog/2011/03/23/dont-lose-your-ets-tables/ + +-module(riak_core_table_manager). + +-behaviour(gen_server). + +%% API +-export([start_link/1, + claim_table/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, {tables}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% +%% @end +%%-------------------------------------------------------------------- +-spec start_link([term()]) -> {ok, Pid::pid()} | ignore | {error, Error::term()}. +start_link(TableSpecs) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [TableSpecs], []). + + +%%-------------------------------------------------------------------- +%% @doc +%% Gives the registration table away to the caller, which should be +%% the registrar process. +%% +%% @end +%%-------------------------------------------------------------------- +-spec claim_table(atom()) -> ok. +claim_table(TableName) -> + gen_server:call(?SERVER, {claim_table, TableName}, infinity). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initializes the server +%% @end +%%-------------------------------------------------------------------- +-spec init([term()]) -> {ok, undefined}. +%% Tables :: [{TableName, [props]}] +%% Table specs are provided by the process that creates this table manager, +%% presumably a supervisor such as riak_core_sup. +init([TableSpecs]) -> + lager:debug("Table Manager starting up with tables: ~p", [TableSpecs]), + Tables = lists:foldl(fun(Spec, TT) -> create_table(Spec, TT) end, [], TableSpecs), + {ok, #state{tables=Tables}}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% @end +%%-------------------------------------------------------------------- +-spec handle_call(Msg::term(), From::{pid(), term()}, State::term()) -> + {reply, Reply::term(), State::term()} | + {noreply, State::term()}. +%% TableName :: atom() +handle_call({claim_table, TableName}, {Pid, _Tag}, State) -> + %% The user process is (re-)claiming the table. Give it away. + %% We remain the heir in case the user process exits. + case lookup_table(TableName, State) of + undefined -> + %% Table does not exist, which is madness. + {reply, {undefined_table, TableName}, State}; + TableId -> + lager:debug("Giving away table ~p (~p) to ~p", [TableName, TableId, Pid]), + ets:give_away(TableId, Pid, TableName), + Reply = ok, + {reply, Reply, State} + end; + +handle_call(_Msg, _From, State) -> + {noreply, State}. + + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% @end +%%-------------------------------------------------------------------- +-spec handle_cast(term(), term()) -> {noreply, State::term()}. +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_info({'ETS-TRANSFER', TableId, FromPid, _HeirData}, State) -> + %% The table's user process exited and transferred the table back to us. + lager:debug("Table user process ~p exited, ~p received table ~p", [FromPid, self(), TableId]), + {noreply, State}; + +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% +%% @end +%%-------------------------------------------------------------------- +-spec terminate(term(), term()) -> ok. +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @end +%%-------------------------------------------------------------------- +-spec code_change(term(), term(), term()) -> {ok, term()}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +%% Create the initial table based on a table name and properties. +%% The table will eventually be given away by claim_table, but we +%% want to remain the heir in case the claimer crashes. +create_table({TableName, TableProps}, Tables) -> + TableId = ets:new(TableName, TableProps), + ets:setopts(TableId, [{heir, self(), undefined}]), + [{TableName, TableId} | Tables]. + +-spec lookup_table(TableName::atom(), State::term()) -> ets:tid() | undefined. +lookup_table(TableName, #state{tables=Tables}) -> + proplists:get_value(TableName, Tables). diff --git a/test/bg_manager_eqc.erl b/test/bg_manager_eqc.erl new file mode 100644 index 000000000..589310b05 --- /dev/null +++ b/test/bg_manager_eqc.erl @@ -0,0 +1,1003 @@ +%%% @author Jordan West <> +%%% @copyright (C) 2013, Jordan West +%%% @doc +%%% +%%% @end +%%% Created : 13 Nov 2013 by Jordan West <> + +-module(bg_manager_eqc). + +%%-ifdef(TEST). +%% -ifdef(EQC). + +-include("riak_core_bg_manager.hrl"). +-include_lib("eqc/include/eqc.hrl"). +-include_lib("eqc/include/eqc_statem.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +-type bg_eqc_type() :: atom(). +-type bg_eqc_limit() :: non_neg_integer(). + +-record(state,{ + %% whether or not the bgmgr is running + alive :: boolean(), + %% whether or not the global bypass switch is engaged + bypassed :: boolean(), + %% whether or not the global enable switch is engaged + enabled :: boolean(), + %% processes started by the test and the processes state + procs :: [{pid(), running | not_running}], + %% resources that are disabled are on the list + disabled :: [{bg_eqc_type()}], + %% concurrency limits for lock types + limits :: [{bg_eqc_type(), bg_eqc_limit()}], + %% max counts per "period" for token types + counts :: [{bg_eqc_type(), bg_eqc_limit()}], + %% locks held (or once held then released) for each lock type + %% and their state + locks :: [{bg_eqc_type(), [{reference(), pid(), [], held | released}]}], + %% number of tokens taken by type + tokens :: [{bg_eqc_type(), non_neg_integer()}], + %% current history samples accumulator: [{resource, limit, refills, given}] + samples :: [{bg_eqc_type(), non_neg_integer(), non_neg_integer(), non_neg_integer()}], + %% snapshot of samples on window interval + history :: [[{bg_eqc_type(), non_neg_integer(), non_neg_integer(), non_neg_integer()}]] + }). + +run_eqc() -> + run_eqc(100). + +run_eqc(Type) when is_atom(Type) -> + run_eqc(100, Type); +run_eqc(N) -> + run_eqc(N, simple). + +run_eqc(N, simple) -> + run_eqc(N, prop_bgmgr()); +run_eqc(N, para) -> + run_eqc(N, parallel); +run_eqc(N, parallel) -> + run_eqc(N, prop_bgmgr_parallel()); +run_eqc(N, Prop) -> + eqc:quickcheck(eqc:numtests(N, Prop)). + +run_check() -> + eqc:check(prop_bgmgr()). + +run_recheck() -> + eqc:recheck(prop_bgmgr()). + + +%% @doc Returns the state in which each test case starts. (Unless a different +%% initial state is supplied explicitly to, e.g. commands/2.) +initial_state() -> + #state{ + alive = true, + bypassed = false, + enabled = true, + disabled = [], + procs = [], + limits = [], + counts = [], + locks = [], + tokens = [], + samples = [], + history = [] + }. + +%% ------ Grouped operator: set_concurrency_limit +%% @doc set_concurrency_limit_command - Command generator +set_concurrency_limit_args(_S) -> + %% TODO: change Kill (3rd arg, to boolean gen) + [lock_type(), lock_limit(), false]. + +%% @doc set_concurrency_limit precondition +set_concurrency_limit_pre(S) -> + is_alive(S). + +%% @doc set_concurreny_limit command +set_concurrency_limit(Type, Limit, false) -> + riak_core_bg_manager:set_concurrency_limit(Type, Limit). + +%% @doc state transition for set_concurrency_limit command +set_concurrency_limit_next(S=#state{limits=Limits}, _Value, [Type,Limit,_Kill]) -> + S#state{ limits = lists:keystore(Type, 1, Limits, {Type, Limit}) }. + +%% @doc set_concurrency_limit_post - Postcondition for set_concurrency_limit +set_concurrency_limit_post(S, [Type,_Limit,_Kill], Res) -> + %% check returned value is equal to value we have in state prior to this call + %% since returned value is promised to be previous one that was set + eq(limit(Type, S), Res). + +%% ------ Grouped operator: concurrency_limit +%% @doc concurrency_limit command arguments generator +concurrency_limit_args(_S) -> + [lock_type()]. + +%% @doc concurrency_limit precondition +concurrency_limit_pre(S) -> + is_alive(S). + +%% @doc concurrency limit command +concurrency_limit(Type) -> + riak_core_bg_manager:concurrency_limit(Type). + +%% @doc Postcondition for concurrency_limit +concurrency_limit_post(S, [Type], Limit) -> + ExpectedLimit = limit(Type, {unregistered, Type}, S), + eq(ExpectedLimit, Limit). + +%% ------ Grouped operator: concurrency_limit_reached +%% @doc concurrency_limit_reached command argument generator +concurrency_limit_reached_args(_S) -> + [lock_type()]. + +%% @doc concurrency_limit_reached precondition +concurrency_limit_reached_pre(S) -> + is_alive(S). + +%% @doc concurrency_limit_reached command +concurrency_limit_reached(Type) -> + riak_core_bg_manager:concurrency_limit_reached(Type). + +%% @doc concurrency_limit_reached_post - Postcondition for concurrency_limit_reached +concurrency_limit_reached_post(S, [Type], {unregistered, Type}) -> + eq(limit(Type, undefined, S), undefined); +concurrency_limit_reached_post(S, [Type], Res) -> + Limit = limit(Type, S), + ExistingCount = length(held_locks(Type, S)), + eq(ExistingCount >= Limit, Res). + + +%% ------ Grouped operator: get_lock +%% @doc argument generator for get_lock command +get_lock_args(S) -> + %% TODO: test getting locks on behalf of calling process instead of other process + %% TODO: test trying to get lock on behalf of killed process? + [lock_type(), oneof(running_procs(S)), []]. + +%% @doc Precondition for generation of get_lock command +get_lock_pre(S) -> + %% need some running procs to get locks on behalf of + RunningProcs = length(running_procs(S)) > 0, + RunningProcs andalso is_alive(S). + +%% @doc Precondition for generation of get_lock command +get_lock_pre(S, [Type, _Pid, _Meta]) -> + %% must call set_concurrency_limit at least once + %% TODO: we can probably remove and test this restriction instead + is_integer(limit(Type, unregistered, S)). + +get_lock(Type, Pid, Meta) -> + case riak_core_bg_manager:get_lock(Type, Pid, Meta) of + {ok, Ref} -> Ref; + Other -> Other + end. + + +%% @doc State transition for get_lock command +%% `Res' is either the lock reference or max_concurrency +get_lock_next(S=#state{enabled=Enabled, bypassed=Bypassed}, Res, [Type, Pid, Meta]) -> + TypeLimit = limit(Type, S), + Held = held_locks(Type, S), + ReallyEnabled = Enabled andalso resource_enabled(Type, S), + case (ReallyEnabled andalso length(Held) < TypeLimit) orelse Bypassed of + %% got lock + true -> add_held_lock(Type, Res, Pid, Meta, S); + %% failed to get lock + false -> S + end. + +%% @doc Postcondition for get_lock +%% We expect to get max_concurrency if globally disabled or we hit the limit. +%% We expect to get ok if bypassed or under the limit. +get_lock_post(#state{bypassed=true}, [_Type, _Pid, _Meta], max_concurrency) -> + 'max_concurrency returned when bypassed'; +get_lock_post(S=#state{enabled=Enabled}, [Type, _Pid, _Meta], max_concurrency) -> + %% Since S reflects the state before we check that it + %% was already at the limit. + Limit = limit(Type, S), + ExistingCount = length(held_locks(Type, S)), + %% check >= because we may have lowered limit *without* + %% forcing some processes to release their locks by killing them + ReallyEnabled = Enabled andalso resource_enabled(Type, S), + case (not ReallyEnabled) orelse ExistingCount >= Limit of + true -> true; + false -> + %% hack to get more informative post-cond failure (like eq) + {ExistingCount, 'not >=', Limit} + end; +get_lock_post(S=#state{bypassed=Bypassed, enabled=Enabled}, [Type, _Pid, _Meta], _LockRef) -> + %% Since S reflects the state before we check that it + %% was not already at the limit. + Limit = limit(Type, S), + ExistingCount = length(held_locks(Type, S)), + ReallyEnabled = Enabled andalso resource_enabled(Type, S), + case (ReallyEnabled andalso ExistingCount < Limit) orelse Bypassed of + true -> true; + false -> + %% hack to get more informative post-cond failure (like eq) + {ExistingCount, 'not <', Limit} + end. + +%% ------ Grouped operator: start_process +%% @doc args generator for start_process +start_process_args(_S) -> + []. + +%% @doc start_process_pre - Precondition for generation +start_process_pre(S) -> + %% limit the number of running processes in the test, we should need an unbounded amount + %% TODO: move "20" to define + length(running_procs(S)) < 5. + +start_process() -> + spawn(fun() -> + receive die -> ok + %% this protects us against leaking too many processes when running + %% prop_bgmgr_parallel(), which doesn't clean up the processes it starts + after 360000 -> timeout + end + end). + +%% @doc state transition for start_process command +start_process_next(S=#state{procs=Procs}, Value, []) -> + S#state{ procs = lists:keystore(Value, 1, Procs, {Value, running}) }. + +%% @doc postcondition for start_process +start_process_post(_S, [], Pid)-> + is_process_alive(Pid). + +%% ------ Grouped operator: stop_process +%% @doc stop_process_command - Argument generator +stop_process_args(S) -> + [oneof(running_procs(S))]. + +%% @doc stop_process_pre - Precondition for generation +stop_process_pre(S) -> + %% need some running procs in order to kill one. + length(running_procs(S)) > 0. + +%% @doc stop_process_pre - Precondition for stop_process +stop_process_pre(S, [Pid]) -> + %% only interesting to kill processes that hold locks + lists:keyfind(Pid, 2, all_locks(S)) /= false. + +%% @doc stop_process command +stop_process(Pid) -> + Pid ! die, + wait_for_pid(Pid). + +%% @doc state transition for stop_process command +stop_process_next(S=#state{procs=Procs}, _Value, [Pid]) -> + %% mark process as no longer running and release all locks held by the process + UpdatedProcs = lists:keystore(Pid, 1, Procs, {Pid, not_running}), + release_locks(Pid, S#state{procs = UpdatedProcs}). + + +%% @doc postcondition for stop_process +stop_process_post(_S, [Pid], ok) -> + not is_process_alive(Pid); +stop_process_post(_S, [Pid], {error, didnotexit}) -> + {error, {didnotexit, Pid}}. + +%% ------ Grouped operator: set_token_rate +%% @doc set_token_rate arguments generator +set_token_rate_args(_S) -> + %% NOTE: change token_type() to lock_type() to provoke failure due to registration of lock/token under same name + %% (waiting for fix in bg mgr). + [token_type(), token_count()]. + +%% @doc set_token_rate precondition +set_token_rate_pre(S) -> + is_alive(S). + +%% @doc set_token_rate state transition +%% Note that set_token_rate takes a rate, which is {Period, Count}, +%% but this test generates it's own refill messages, so rate is not modeled. +set_token_rate_next(S=#state{counts=Counts}, _Value, [Type, Count]) -> + S2 = update_sample(Type, Count, 0, 0, S), + S2#state{ counts = lists:keystore(Type, 1, Counts, {Type, Count}) }. + +%% @doc set_token_rate command +set_token_rate(Type, Count) -> + %% we refill tokens as a command in the model so we use + %% token rate to give us the biggest refill period we can get. + %% no test should run longer than that or we have a problem. + riak_core_bg_manager:set_token_rate(Type, mk_token_rate(Count)). + +%% @doc Postcondition for set_token_rate +set_token_rate_post(S, [Type, _Count], Res) -> + %% check returned value is equal to value we have in state prior to this call + %% since returned value is promised to be previous one that was set + eq(Res, mk_token_rate(max_num_tokens(Type, undefined, S))). + +%% ------ Grouped operator: token_rate +%% @doc token_rate_command +token_rate_args(_S) -> + [token_type()]. + +%% @doc token_rate precondition +token_rate_pre(S) -> + is_alive(S). + +%% @doc token_rate command +token_rate(Type) -> + riak_core_bg_manager:token_rate(Type). + +%% @doc Postcondition for token_rate +token_rate_post(S, [Type], Res) -> + ExpectedRate = mk_token_rate(max_num_tokens(Type, {unregistered, Type}, S)), + eq(ExpectedRate, Res). + +%% ------ Grouped operator: get_token +%% @doc get_token args generator +get_token_args(S) -> + %% TODO: generate meta for future query tests + ArityTwo = [[token_type(), oneof(running_procs(S))] || length(running_procs(S)) > 0], + ArityOne = [[token_type()]], + oneof(ArityTwo ++ ArityOne). + +%% @doc Precondition for get_token +get_token_pre(S, [Type, _Pid]) -> + get_token_pre(S, [Type]); +get_token_pre(S, [Type]) -> + %% must call set_token_rate at least once + %% TODO: we can probably remove and test this restriction instead + is_integer(max_num_tokens(Type, unregistered, S)) andalso is_alive(S). + +%% @doc get_token state transition +get_token_next(S, Value, [Type, _Pid]) -> + get_token_next(S, Value, [Type]); +get_token_next(S=#state{bypassed=Bypassed, enabled=Enabled}, _Value, [Type]) -> + CurCount = num_tokens(Type, S), + %% NOTE: this assumes the precondition requires we call set_token_rate at least once + %% in case we don't we treat the max as 0 + Max = max_num_tokens(Type, unregistered, S), + ReallyEnabled = Enabled andalso resource_enabled(Type, S), + case (ReallyEnabled andalso CurCount < Max) orelse Bypassed of + true -> increment_token_count(Type, S); + false -> S + end. + +get_token(Type) -> + riak_core_bg_manager:get_token(Type). + +get_token(Type, Pid) -> + riak_core_bg_manager:get_token(Type, Pid). + +%% @doc Postcondition for get_token +%% We expect to get max_concurrency if globally disabled or we hit the limit. +%% We expect to get ok if bypassed or under the limit. +get_token_post(S, [Type, _Pid], Res) -> + get_token_post(S, [Type], Res); +get_token_post(#state{bypassed=true}, [_Type], max_concurrency) -> + 'max_concurrency returned while bypassed'; +get_token_post(S=#state{enabled=Enabled}, [Type], max_concurrency) -> + CurCount = num_tokens(Type, S), + %% NOTE: this assumes the precondition requires we call set_token_rate at least once + %% in case we don't we treat the max as 0 + Max = max_num_tokens(Type, unregistered, S), + ReallyEnabled = Enabled andalso resource_enabled(Type, S), + case (not ReallyEnabled) orelse CurCount >= Max of + true -> true; + false -> + %% hack to get more info out of postcond failure + {CurCount, 'not >=', Max} + end; +get_token_post(S=#state{bypassed=Bypassed, enabled=Enabled}, [Type], ok) -> + CurCount = num_tokens(Type, S), + %% NOTE: this assumes the precondition requires we call set_token_rate at least once + %% in case we don't we treat the max as 0 + Max = max_num_tokens(Type, unregistered, S), + ReallyEnabled = Enabled andalso resource_enabled(Type, S), + case (ReallyEnabled andalso CurCount < Max) orelse Bypassed of + true -> true; + false -> + {CurCount, 'not <', Max} + end. + +%% ------ Grouped operator: refill_tokens +%% @doc refill_tokens args generator +refill_tokens_args(_S) -> + [token_type()]. + +%% @doc refill_tokens precondition +refill_tokens_pre(S, [Type]) -> + %% only refill tokens if we have registered type (called set_token_rate at least once) + is_integer(max_num_tokens(Type, unregistered, S)) andalso is_alive(S). + +%% @doc refill_tokens state transition +refill_tokens_next(S, _Value, [Type]) -> + reset_token_count(Type, S). + +refill_tokens(Type) -> + riak_core_bg_manager ! {refill_tokens, Type}, + %% TODO: find way to get rid of this timer sleep + timer:sleep(100). + +%% ------ Grouped operator: sample_history +%% @doc sample_history args generator +sample_history_args(_S) -> + []. + +%% @doc sample_history precondition +sample_history_pre(S, []) -> + is_alive(S). + +%% @doc sample_history next state function +sample_history_next(S, _Value, []) -> + do_sample_history(S). + +%% @doc sample_history command +sample_history() -> + riak_core_bg_manager ! sample_history, + timer:sleep(100). + +%% ------ Grouped operator: crash +%% @doc crash args generator +crash_args(_S) -> + []. + +%% @doc precondition for crash command +crash_pre(#state{alive=Alive}) -> + %% only crash if actually running + Alive. + +%% @doc state transition for crash command +crash_next(S, _Value, _Args) -> + S#state{ alive = false }. + +%% @doc crash command +crash() -> + stop_pid(whereis(riak_core_bg_manager)). + +%% @doc crash command post condition +crash_post(_S, _Args, _Res) -> + %% TODO: anything we want to validate here? + true. + +%% ------ Grouped operator: revive +%% @doc revive arguments generator +revive_args(_S) -> + []. + +%% @doc revive precondition +revive_pre(#state{alive=Alive}) -> + %% only revive if we are in a crashed state + not Alive. + +%% @doc revive_next - Next state function +revive_next(S, _Value, _Args) -> + S2 = S#state{ alive = true }, + clear_history(S2). + +%% @doc revive command +revive() -> + {ok, _BgMgr} = riak_core_bg_manager:start(window_interval()). + +%% @doc revive_post - Postcondition for revive +revive_post(_S, _Args, _Res) -> + %% TODO: what to validate here, if anything? + true. + +%% ------ Grouped operator: ps query +%% @doc ps arguments generator +ps_args(_S) -> + [oneof([all, lock_type(), token_type()])]. + +%% @doc ps precondition +ps_pre(S) -> + is_alive(S). + +%% @doc ps next state function +ps_next(S, _Value, _Args) -> + S. + +%% @doc ps command +ps(Resource) -> + riak_core_bg_manager:ps(Resource). + +%% @doc ps postcondition +ps_post(State, [Resource], Result) -> + %% only one of these will have non-zero result unless Resource = all + NumLocks = length(held_locks(Resource, State)), + NumTokens = num_tokens_taken(Resource, State), + %% TODO: could validate record entries in addition to correct counts + eq(length(Result), NumLocks+NumTokens). + +%% ------ Grouped operator: head query +%% @doc head arguments generator +head_args(_S) -> + %% [Resource, Offset, NumSamples] + [oneof([all, lock_type(), token_type()]), 0, choose(0,5)]. + +%% @doc ps precondition +head_pre(S) -> + is_alive(S). + +%% @doc ps next state function +head_next(S, _Value, _Args) -> + S. + +%% @doc head command +head(Resource, Offset, NumSamples) -> + riak_core_bg_manager:head(Resource, Offset, NumSamples). + +%% @doc ps postcondition +head_post(#state{history=RevHistory}, [Resource, Offset, NumSamples], Result) -> + History = lists:reverse(RevHistory), + Start = Offset+1, + Len = min(NumSamples, length(History) - Offset), + Keep = lists:sublist(History, Start, Len), + H2 = [lists:filter(fun({R,_L,_R,_G}) -> Resource == all orelse R == Resource end, Samples) + || Samples <- Keep, Samples =/= []], + H3 = lists:filter(fun(S) -> S =/= [] end, H2), + eq(length(Result), length(H3)). + +%% ------ Grouped operator: tail query +%% @doc tail arguments generator +tail_args(_S) -> + %% [Resource, Offset, NumSamples] + [oneof([all, lock_type(), token_type()]), 0, choose(0,5)]. + +%% @doc ps precondition +tail_pre(S) -> + is_alive(S). + +%% @doc ps next state function +tail_next(S, _Value, _Args) -> + S. + +%% @doc tail command +tail(Resource, Offset, NumSamples) -> + riak_core_bg_manager:tail(Resource, Offset, NumSamples). + +%% @doc ps postcondition +tail_post(#state{history=RevHistory}, [Resource, Offset, NumSamples], Result) -> + History = lists:reverse(RevHistory), + HistLen = length(History), + Start = HistLen - Offset + 1, + Len = min(NumSamples, HistLen - Offset), + Keep = lists:sublist(History, Start, Len), + H2 = [lists:filter(fun({R,_L,_R,_G}) -> Resource == all orelse R == Resource end, Samples) + || Samples <- Keep, Samples =/= []], + H3 = lists:filter(fun(S) -> S =/= [] end, H2), + eq(length(Result), length(H3)). + +%% ------ Grouped operator: bypass +%% @doc bypass arguments generator +bypass_args(_S) -> + [oneof([true, false])]. + +%% @doc bypass precondition +bypass_pre(S) -> + is_alive(S). + +%% @doc bypass next state function +bypass_next(S, _Value, [Switch]) -> + S#state{bypassed=Switch}. + +%% @doc bypass command +bypass(Switch) -> + Res = riak_core_bg_manager:bypass(Switch), %% expect 'ok' + Value = riak_core_bg_manager:bypassed(), %% expect eq(Value, Switch) + {Res, Value}. + +%% @doc bypass postcondition +bypass_post(_S, [Switch], Result) -> + eq(Result, {ok, Switch}). + +%% ------ Grouped operator: bypassed +%% @doc bypass arguments generator +bypassed_args(_S) -> + []. + +%% @doc eanble precondition +bypassed_pre(S) -> + is_alive(S). + +%% @doc bypassed next state function +bypassed_next(S, _Value, []) -> + S. + +%% @doc bypassed command +bypassed() -> + riak_core_bg_manager:bypassed(). + +%% @doc bypassed postcondition +bypassed_post(#state{bypassed=Bypassed}, _Value, Result) -> + eq(Result, Bypassed). + +%% ------ Grouped operator: enable +%% @doc bypass arguments generator +enable_args(_S) -> + [oneof([[], token_type(), lock_type()])]. + +%% @doc enable precondition +%% global enable +enable_pre(S) -> + is_alive(S). + +%% per resource enable +enable_pre(S,[Type]) -> + is_integer(max_num_tokens(Type, unregistered, S)) andalso is_alive(S). + +%% @doc enable next state function +%% global enable +enable_next(S, _Value, []) -> + S#state{enabled=true}; +%% per resource enable +enable_next(S, _Value, [Type]) -> + enable_resource(Type, S). + +%% @doc enable command +enable() -> + riak_core_bg_manager:enable(). + +enable(Resource) -> + riak_core_bg_manager:enable(Resource). + +%% @doc enable postcondition +%% global enable +enable_post(S, [], Result) -> + eq(Result, status_of(true, S#state{enabled=true})); +%% per resource enable +enable_post(S, [_Resource], Result) -> + ResourceEnabled = true, + eq(Result, status_of(ResourceEnabled, S)). + +%% ------ Grouped operator: disable +%% @doc bypass arguments generator +disable_args(_S) -> + [oneof([[], token_type(), lock_type()])]. + +%% @doc eanble precondition +%% global disable +disable_pre(S) -> + is_alive(S). + +%% per resource disable +disable_pre(S,[Type]) -> + is_integer(max_num_tokens(Type, unregistered, S)) andalso is_alive(S). + +%% @doc disable next state function +%% global disable +disable_next(S, _Value, []) -> + S#state{enabled=false}; +%% per resource disable +disable_next(S, _Value, [Type]) -> + disable_resource(Type, S). + +%% @doc disable command +disable() -> + riak_core_bg_manager:disable(). + +disable(Resource) -> + riak_core_bg_manager:disable(Resource). + +%% @doc disable postcondition +%% global +disable_post(S, [], Result) -> + Ignored = true, + eq(Result, status_of(Ignored, S#state{enabled=false})); +%% per resource +disable_post(S, [_Resource], Result) -> + ResourceEnabled = false, + eq(Result, status_of(ResourceEnabled, S)). + +%% ------ Grouped operator: enabled +%% @doc bypass arguments generator +enabled_args(_S) -> + []. + +%% @doc eanble precondition +enabled_pre(S) -> + is_alive(S). + +%% @doc enabled next state function +enabled_next(S, _Value, []) -> + S. + +%% @doc enabled command +enabled() -> + riak_core_bg_manager:enabled(). + +%% @doc enabled postcondition +enabled_post(S, _Value, Result) -> + eq(Result, status_of(true, S)). + +%%------------ helpers ------------------------- +%% @doc resources are disabled iff they appear on the "disabled" list +resource_enabled(Resource, #state{disabled=Disabled}) -> + not lists:member(Resource, Disabled). + +%% @doc enable the resource by removing from the "disabled" list +enable_resource(Resource, State=#state{disabled=Disabled}) -> + State#state{disabled=lists:delete(Resource, Disabled)}. + +disable_resource(Resource, State=#state{disabled=Disabled}) -> + State#state{disabled=[Resource | lists:delete(Resource, Disabled)]}. + +%% @doc return status considering Resource status, enbaled, and bypassed +status_of(_Enabled, #state{bypassed=true}) -> bypassed; +status_of(true, #state{enabled=true}) -> enabled; +status_of(_E,_S) -> disabled. + +%% -- Generators +lock_type() -> + oneof([a,b,c,d]). %%,e,f,g,h,i]). + +token_type() -> + oneof(['A','B','C','D']). %%,'E','F','G','H','I']). + +lock_limit() -> + choose(0, 5). + +token_count() -> + choose(0, 5). + +ps_() -> + choose(0, 10). + +%% @doc weight/2 - Distribution of calls +weight(_S, set_concurrency_limit) -> 3; +weight(_S, concurrency_limit) -> 3; +weight(_S, concurrency_limit_reached) -> 3; +weight(_S, start_process) -> 3; +weight(#state{alive=true}, stop_process) -> 3; +weight(#state{alive=false}, stop_process) -> 3; +weight(_S, get_lock) -> 20; +weight(_S, set_token_rate) -> 3; +weight(_S, token_rate) -> 0; +weight(_S, get_token) -> 20; +weight(_S, refill_tokens) -> 10; +weight(_S, sample_history) -> 10; +weight(_S, ps) -> 3; +weight(_S, head) -> 3; +weight(_S, tail) -> 3; +weight(_S, crash) -> 3; +weight(_S, revive) -> 1; +weight(_S, _Cmd) -> 1. + +%% Other Functions +limit(Type, State) -> + limit(Type, 0, State). + +limit(Type, Default, #state{limits=Limits}) -> + case lists:keyfind(Type, 1, Limits) of + false -> Default; + {Type, Limit} -> Limit + end. + +num_tokens(Type, #state{tokens=Tokens}) -> + case lists:keyfind(Type, 1, Tokens) of + false -> 0; + {Type, NumTokens} -> NumTokens + end. + +max_num_tokens(Type, Default, #state{counts=Counts}) -> + case lists:keyfind(Type, 1, Counts) of + false -> Default; + {Type, Limit} -> Limit + end. + +num_tokens_taken(all, #state{tokens=Tokens}) -> + lists:foldl(fun({_Resource, Count}, Sum) -> Count+Sum end, 0, Tokens); +num_tokens_taken(Resource, #state{tokens=Tokens}) -> + lists:foldl(fun(Count, Sum) -> Count+Sum end, + 0, + [Count || {R, Count} <- Tokens, R == Resource]). + +clear_history(State) -> + State#state{history=[], samples=[]}. + +%% @doc Snapshot current history samples and reset samples to empty +do_sample_history(State=#state{limits=Limits, counts=Counts}) -> + %% First, grab updates for all resources + S2 = lists:foldl(fun({Resource, Limit}, S) -> + update_sample(Resource, Limit, 0, 0, S) + end, + State, + Limits++Counts), + NewHistory = [S2#state.samples | S2#state.history], + S2#state{history=NewHistory, samples=[]}. + +%% @doc Update the current samples with supplied increments. +%% Limit is overwritten unless undefined. It's not expected to change too often, +%% hopefully less often than the sampling window (Niquist!). +%% This should probably be called approximately every time the API is called. +update_sample(Resource, Limit, Refill, Given, State=#state{samples=Samples}) -> + %% find sample counts for specified resource and increment per arguments. + {_R, Limit1, Refills1, Given1} = case lists:keyfind(Resource, 1, Samples) of + false -> {Resource, 0, 0, 0}; + S -> S + end, + Sample = {Resource, defined_or_default(Limit, Limit1), Refill+Refills1, + Given+Given1}, + Samples2 = lists:keystore(Resource, 1, Samples, Sample), + State#state{samples=Samples2}. + +defined_or_default(undefined, Default) -> Default; +defined_or_default(Value, _Default) -> Value. + +is_alive(#state{alive=Alive}) -> + Alive. + +mk_token_rate({unregistered, _}=Unreg) -> + Unreg; +mk_token_rate(undefined) -> + undefined; +mk_token_rate(Count) -> + %% erlang:send_after max is used so that we can trigger token refilling from EQC test + {max_send_after(), Count}. + +window_interval() -> + max_send_after(). + +max_send_after() -> + 4294967295. + +running_procs(#state{procs=Procs}) -> + [Pid || {Pid, running} <- Procs]. + +all_locks(#state{locks=Locks}) -> + lists:flatten([ByType || {_Type, ByType} <- Locks]). + +all_locks(all, State) -> + all_locks(State); +all_locks(Type, #state{locks=Locks}) -> + case lists:keyfind(Type, 1, Locks) of + false -> []; + {Type, All} -> All + end. + +held_locks(Type, State) -> + [{Ref, Pid, Meta, held} || {Ref, Pid, Meta, held} <- all_locks(Type, State)]. + +update_locks(Type, TypeLocks, State=#state{locks=Locks}) -> + State#state{ locks = lists:keystore(Type, 1, Locks, {Type, TypeLocks}) }. + +add_held_lock(Type, Ref, Pid, Meta, State) -> + All = all_locks(Type, State), + S2 = update_sample(Type, undefined, 0, 1, State), + update_locks(Type, [{Ref, Pid, Meta, held} | All], S2). + +release_locks(Pid, State=#state{locks=Locks}) -> + lists:foldl(fun({Type, ByType}, StateAcc) -> + NewLocks = mark_locks_released(Pid, ByType), + update_locks(Type, NewLocks, StateAcc) + end, + State, Locks). + +mark_locks_released(Pid, Locks) -> + WithoutPid = [Lock || Lock <- Locks, element(2, Lock) =/= Pid], + MarkedReleased = [{Ref, LockPid, Meta, released} || {Ref, LockPid, Meta, _} <- Locks, LockPid =:= Pid], + MarkedReleased ++ WithoutPid. + +increment_token_count(Type, State=#state{tokens=Tokens}) -> + CurCount = num_tokens(Type, State), + S2 = update_sample(Type, undefined, 0, 1, State), + S2#state{ tokens = lists:keystore(Type, 1, Tokens, {Type, CurCount + 1}) }. + +reset_token_count(Type, State=#state{tokens=Tokens}) -> + S2 = update_sample(Type, undefined, 1, 0, State), + S2#state{ tokens = lists:keystore(Type, 1, Tokens, {Type, 0}) }. + +stop_pid(Other) when not is_pid(Other) -> + ok; +stop_pid(Pid) -> + unlink(Pid), + exit(Pid, shutdown), + ok = wait_for_pid(Pid). + +wait_for_pid(Pid) -> + Mref = erlang:monitor(process, Pid), + receive + {'DOWN', Mref, process, _, _} -> + ok + after + 5000 -> + {error, didnotexit} + end. + +bg_manager_monitors() -> + bg_manager_monitors(whereis(riak_core_bg_manager)). + +bg_manager_monitors(undefined) -> + crashed; +bg_manager_monitors(Pid) -> + process_info(Pid, monitors). + +prop_bgmgr() -> + ?FORALL(Cmds, commands(?MODULE), + aggregate(command_names(Cmds), + ?TRAPEXIT( + begin + stop_pid(whereis(riak_core_table_manager)), + stop_pid(whereis(riak_core_bg_manager)), + {ok, _TableMgr} = riak_core_table_manager:start_link([{?BG_INFO_ETS_TABLE, + [protected, set, named_table]}, + {?BG_ENTRY_ETS_TABLE, + [protected, bag, named_table]}]), + {ok, _BgMgr} = riak_core_bg_manager:start(window_interval()), + {H, S, Res} = run_commands(?MODULE,Cmds), + InfoTable = ets:tab2list(?BG_INFO_ETS_TABLE), + EntryTable = ets:tab2list(?BG_ENTRY_ETS_TABLE), + Monitors = bg_manager_monitors(), + RunnngPids = running_procs(S), + %% cleanup processes not killed during test + [stop_pid(Pid) || Pid <- RunnngPids], + stop_pid(whereis(riak_core_table_manager)), + stop_pid(whereis(riak_core_bg_manager)), + ?WHENFAIL( + begin + io:format("~n~nFinal State: ~n"), + io:format("---------------~n"), + io:format("alive = ~p~n", [S#state.alive]), + io:format("bypassed = ~p~n", [S#state.bypassed]), + io:format("enabled = ~p~n", [S#state.enabled]), + io:format("procs = ~p~n", [S#state.procs]), + io:format("limits = ~p~n", [S#state.limits]), + io:format("locks = ~p~n", [S#state.locks]), + io:format("counts = ~p~n", [S#state.counts]), + io:format("tokens = ~p~n", [S#state.tokens]), + io:format("samples = ~p~n", [S#state.samples]), + io:format("history = ~p~n", [S#state.history]), + io:format("---------------~n"), + io:format("~n~nbackground_mgr tables: ~n"), + io:format("---------------~n"), + io:format("~p~n", [InfoTable]), + io:format("---------------~n"), + io:format("~p~n", [EntryTable]), + io:format("---------------~n"), + io:format("~n~nbg_manager monitors: ~n"), + io:format("---------------~n"), + io:format("~p~n", [Monitors]), + io:format("---------------~n") + + end, + pretty_commands(?MODULE, Cmds, {H, S, Res}, + Res == ok)) + end))). + + +prop_bgmgr_parallel() -> + ?FORALL(Cmds, parallel_commands(?MODULE), + aggregate(command_names(Cmds), + ?TRAPEXIT( + begin + stop_pid(whereis(riak_core_table_manager)), + stop_pid(whereis(riak_core_bg_manager)), + {ok, TableMgr} = riak_core_table_manager:start_link([{?BG_INFO_ETS_TABLE, + ?BG_INFO_ETS_OPTS}, + {?BG_ENTRY_ETS_TABLE, + ?BG_ENTRY_ETS_OPTS}]), + {ok, BgMgr} = riak_core_bg_manager:start(window_interval()), + {Seq, Par, Res} = run_parallel_commands(?MODULE,Cmds), + InfoTable = ets:tab2list(?BG_INFO_ETS_TABLE), + EntryTable = ets:tab2list(?BG_ENTRY_ETS_TABLE), + Monitors = bg_manager_monitors(), + stop_pid(TableMgr), + stop_pid(BgMgr), + ?WHENFAIL( + begin + io:format("~n~nbackground_mgr tables: ~n"), + io:format("---------------~n"), + io:format("~p~n", [InfoTable]), + io:format("---------------~n"), + io:format("~p~n", [EntryTable]), + io:format("---------------~n"), + io:format("~n~nbg_manager monitors: ~n"), + io:format("---------------~n"), + io:format("~p~n", [Monitors]), + io:format("---------------~n") + end, + pretty_commands(?MODULE, Cmds, {Seq, Par, Res}, + Res == ok)) + end))). + +%% -endif. +%% -endif. diff --git a/test/bg_manager_tests.erl b/test/bg_manager_tests.erl new file mode 100644 index 000000000..a9b8dd40d --- /dev/null +++ b/test/bg_manager_tests.erl @@ -0,0 +1,196 @@ +-module(bg_manager_tests). +-compile(export_all). + +-include_lib("riak_core_bg_manager.hrl"). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +-define(BG_MGR, riak_core_bg_manager). +-define(TIMEOUT, 3000). %% blocking resource timeout + +bg_mgr_test_() -> + {timeout, 60000, %% Seconds to finish all of the tests + {setup, fun() -> + start_table_mgr(), %% unlinks from our test as well. + start_bg_mgr() %% uses non-linking start. + end, + fun(_) -> ok end, %% cleanup + fun(_) -> + [ %% Tests + { "set/get token rates + verify rates", + fun() -> + setup_token_rates(), + verify_token_rates() + end}, + + { "crash token manager + verify rates persist", + fun() -> + crash_and_restart_token_manager(), + verify_token_rates() + end}, + + {"lock/token separation", + fun() -> + %% Trying to set the rate on a token of the wrong type looks + %% like an unregistered token. + ?assertEqual({unregistered, token_a}, riak_core_bg_manager:get_token(token_a)), + ?assertEqual(undefined, riak_core_bg_manager:set_token_rate(token_a, {1,5})), + ?assertEqual({badtype, token_a}, + riak_core_bg_manager:set_concurrency_limit(token_a, 42)), + + %% Same for locks. + ?assertEqual({unregistered, lock_a}, riak_core_bg_manager:get_lock(lock_a)), + ?assertEqual(0, riak_core_bg_manager:set_concurrency_limit(lock_a, 52)), + ?assertEqual({badtype, lock_a}, + riak_core_bg_manager:set_token_rate(lock_a, {1, 5})), + + %% Don't allow get_token(Lock) + ?assertEqual({badtype, lock_a}, riak_core_bg_manager:get_token(lock_a)), + + %% Don't allow get_lock(Token) + ?assertEqual({badtype, token_a}, riak_core_bg_manager:get_lock(token_a)) + + end}, + + {"failing crash/revive EQC test case", + %% bg_manager_eqc:set_token_rate('B', 2) -> 0 + %% bg_manager_eqc:get_token('B') -> ok + %% bg_manager_eqc:crash() -> ok + %% bg_manager_eqc:revive() -> true + %% bg_manager_eqc:get_token('B') -> ok + %% bg_manager_eqc:get_token('B') -> ok + fun() -> + T = token_b, + Max = 2, + Period = 5000000, + ?BG_MGR:set_token_rate(T, {Period, Max}), + ?assertEqual(ok, ?BG_MGR:get_token(T)), + ?assertEqual(1, length(?BG_MGR:tokens_given(T))), + crash_and_restart_token_manager(), + ?assertEqual(ok, ?BG_MGR:get_token(T)), + ?assertEqual(2, length(?BG_MGR:tokens_given(T))), + ?assertEqual(max_concurrency, ?BG_MGR:get_token(T)) + end}, + + {"bypass API", + fun() -> + %% bypass API + riak_core_bg_manager:bypass(true), + + %% reduce token rate to zero and ensure we still get one + riak_core_bg_manager:set_token_rate(token_a, {1,0}), + ok = riak_core_bg_manager:get_token(token_a), + + %% reduce lock limit to zero and ensure we still get one + riak_core_bg_manager:set_concurrency_limit(lock_a, 0), + {ok, _Ref1} = riak_core_bg_manager:get_lock(lock_a), + + %% even if globally disabled, we get a lock + riak_core_bg_manager:disable(), + {ok, _Ref2} = riak_core_bg_manager:get_lock(lock_a), + + %% turn it back on + ?BG_MGR:bypass(false), + ?BG_MGR:enable(), + ?assertEqual(max_concurrency, ?BG_MGR:get_lock(lock_a)) + end} + + ] end} + }. + +registered_token_names() -> + [Token || {Token, _Enabled, _Rate} <- ?BG_MGR:token_info()]. + +check_head_token(Token, StatsList) -> + F1 = fun(Stats) -> filter_stat(Token, Stats) end, + ?assertEqual(?BG_MGR:head(Token), lists:map(F1, StatsList)). + +filter_stat(Token, Stats) -> + lists:filter(fun({T,_Stat}) -> Token==T end, Stats). + +spawn_sync_request(Token, Pid, Meta) -> + spawn(fun() -> ok = ?BG_MGR:get_token_blocking(Token, Pid, Meta, ?TIMEOUT) end). + +make_hist_stat(Type, Limit, Refills, Given, Blocked) -> + #bg_stat_hist + { + type=Type, + limit=Limit, + refills=Refills, + given=Given, + blocked=Blocked + }. + +make_live_stat(Resource, Type, Pid, Meta, Status) -> + #bg_stat_live + { + resource=Resource, + type=Type, + consumer=Pid, + meta=Meta, + state=Status + }. + +-spec some_token_rates() -> [{bg_token(), {bg_period(), bg_count()}}]. +some_token_rates() -> + [ {token1, {1*1000, 5}}, + {token2, {1*1000, 4}}, + {{token3,stuff3}, {1*1000, 3}} + ]. + +expected_token_names() -> + [ Type || {Type, _Rate} <- some_token_rates()]. + +max_token_period() -> + lists:foldl(fun({_T,{Period,_Limit}},Max) -> erlang:max(Period,Max) end, 0, some_token_rates()). + +expected_token_limit(Token) -> + {_Period, Limit} = proplists:get_value(Token, some_token_rates()), + Limit. + +setup_token_rates() -> + [?BG_MGR:set_token_rate(Type, Rate) || {Type, Rate} <- some_token_rates()]. + +verify_token_rate(Type, ExpectedRate) -> + Rate = ?BG_MGR:token_rate(Type), + ?assertEqual(ExpectedRate, Rate). + +verify_token_rates() -> + [verify_token_rate(Type, Rate) || {Type, Rate} <- some_token_rates()], + %% check un-registered token is not setup + Rate = ?BG_MGR:token_rate(bogusToken), + DefaultRate = {unregistered, bogusToken}, + ?assertEqual(DefaultRate, Rate). + +%% start a stand-alone server, not linked, so that when we crash it, it +%% doesn't take down our test too. +start_bg_mgr() -> + %% setup with history window to 1 seconds + ?BG_MGR:start(1*1000), + timer:sleep(100). + +kill_bg_mgr() -> + Pid = erlang:whereis(?BG_MGR), + ?assertNot(Pid == undefined), + erlang:exit(Pid, kill). + +crash_and_restart_token_manager() -> + kill_bg_mgr(), + timer:sleep(100), + start_bg_mgr(), + timer:sleep(100). + +start_table_mgr() -> + {ok,Pid} = riak_core_table_manager:start_link([{?BG_INFO_ETS_TABLE, ?BG_INFO_ETS_OPTS}, + {?BG_ENTRY_ETS_TABLE, ?BG_ENTRY_ETS_OPTS} + ]), + unlink(Pid), + timer:sleep(100). + +kill_table_mgr() -> + Pid = erlang:whereis(riak_core_table_manager), + ?assertNot(Pid == undefined), + erlang:exit(Pid, kill). + +-endif. From 390134b9e6aeed3fc1aa04858c5c563aff2f0980 Mon Sep 17 00:00:00 2001 From: Chris Tilt Date: Thu, 12 Dec 2013 11:03:42 -0800 Subject: [PATCH 2/5] Concurrency API should return 'undefined' for limits when the ETS table is unavailable * Allows callers to try again later --- src/riak_core_bg_manager.erl | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/riak_core_bg_manager.erl b/src/riak_core_bg_manager.erl index 92cd56026..0bead58bc 100644 --- a/src/riak_core_bg_manager.erl +++ b/src/riak_core_bg_manager.erl @@ -198,12 +198,13 @@ all_given() -> %%%%%%%%%%% %% @doc Get the current maximum concurrency for the given lock type. --spec concurrency_limit(bg_lock()) -> bg_concurrency_limit(). +%% If the background manager is unavailable, undefined is returned. +-spec concurrency_limit(bg_lock()) -> bg_concurrency_limit() | undefined. concurrency_limit(Lock) -> gen_server:call(?MODULE, {concurrency_limit, Lock}, infinity). %% @doc same as `set_concurrency_limit(Type, Limit, false)' --spec set_concurrency_limit(bg_lock(), bg_concurrency_limit()) -> bg_concurrency_limit(). +-spec set_concurrency_limit(bg_lock(), bg_concurrency_limit()) -> bg_concurrency_limit() | undefined. set_concurrency_limit(Lock, Limit) -> set_concurrency_limit(Lock, Limit, false). @@ -213,7 +214,7 @@ set_concurrency_limit(Lock, Limit) -> %% then the extra locks are released by killing processes with reason `max_concurrency'. %% If `false', then the processes holding the extra locks are aloud to do so until they %% are released. --spec set_concurrency_limit(bg_lock(), bg_concurrency_limit(), boolean()) -> bg_concurrency_limit(). +-spec set_concurrency_limit(bg_lock(), bg_concurrency_limit(), boolean()) -> bg_concurrency_limit() | undefined. set_concurrency_limit(Lock, Limit, Kill) -> gen_server:call(?MODULE, {set_concurrency_limit, Lock, Limit, Kill}, infinity). @@ -287,7 +288,8 @@ set_token_rate(Token, Rate={_Period, _Count}) -> gen_server:call(?SERVER, {set_token_rate, Token, Rate}, infinity). %% @doc Get the current refill rate of named token. --spec token_rate(bg_token()) -> bg_rate(). +%% If the background manager is unavailable, undefined is returned. +-spec token_rate(bg_token()) -> bg_rate() | undefined. token_rate(Token) -> gen_server:call(?SERVER, {token_rate, Token}, infinity). @@ -720,10 +722,8 @@ do_get_type_info(Type, State) -> {reply, Infos, State}. %% Returns empty if the ETS table has not been transferred to us yet. -do_resource_limit(lock, _Resource, State) when ?NOT_TRANSFERED(State) -> - {reply, 0, State}; -do_resource_limit(token, _Resource, State) when ?NOT_TRANSFERED(State) -> - {reply, {0,0}, State}; +do_resource_limit(_Type, _Resource, State) when ?NOT_TRANSFERED(State) -> + {reply, undefined, state}; do_resource_limit(_Type, Resource, State) -> Info = resource_info(Resource, State), Rate = ?resource_limit(Info), @@ -746,7 +746,7 @@ do_set_concurrency_limit(Lock, Limit, Kill, State) -> catch table_id_undefined -> %% This could go into a queue to be played when the transfer happens. - {reply, 0, State}; + {reply, undefined, State}; {unregistered, Lock} -> {reply, 0, update_limit(Lock, Limit, ?DEFAULT_LOCK_INFO, State)}; {badtype, _Lock}=Error -> @@ -818,6 +818,7 @@ update_resource_enabled(Resource, Value, Default, State) -> State). update_limit(Resource, Limit, Default, State) -> + lager:info("Set concurrency ~p <- ~p", [Resource, Limit]), update_resource_info(Resource, fun(Info) -> Info#resource_info{limit=Limit} end, Default#resource_info{limit=Limit}, @@ -932,6 +933,7 @@ give_resource(Entry, State=#state{entry_table=TableId}) -> %% @private %% @doc Add Resource to our given set. give_resource(Resource, Type, Pid, Ref, Meta, State) -> + lager:info("Gave ~p/~p to ~p:~p", [Type, Resource, Pid, Meta]), Entry = ?RESOURCE_ENTRY(Resource, Type, Pid, Meta, Ref, given), give_resource(Entry, State). @@ -939,7 +941,8 @@ give_resource(Resource, Type, Pid, Ref, Meta, State) -> {max_concurrency, #state{}} | {ok, #state{}} | {{ok, reference()}, #state{}}. -try_get_resource(false, _Resource, _Type, _Pid, _Meta, State) -> +try_get_resource(false, Resource, Type, Pid, Meta, State) -> + lager:info("Gave max_concurrency ~p/~p to ~p:~p", [Type, Resource, Pid, Meta]), {max_concurrency, State}; try_get_resource(true, Resource, Type, Pid, Meta, State) -> case Type of @@ -1062,6 +1065,7 @@ increment_stat_limit(Resource, Limit, State) -> State). increment_stat_refills(Token, State) -> + lager:info("Refill token ~p", [Token]), update_stat_window(Token, fun(Stat) -> Stat#bg_stat_hist{refills=1+Stat#bg_stat_hist.refills} end, default_refill(Token, State), From b6f494c18caaae5d03564bb8ce6e0bce928a612f Mon Sep 17 00:00:00 2001 From: Chris Tilt Date: Fri, 13 Dec 2013 14:35:48 -0800 Subject: [PATCH 3/5] Remove history API, use unregistered to signal unavailability of bg-mgr, fix set_token_rate bug. --- include/riak_core_bg_manager.hrl | 23 +- src/riak_core_bg_manager.erl | 347 ++++++------------------------- test/bg_manager_eqc.erl | 150 +------------ test/bg_manager_tests.erl | 24 +-- 4 files changed, 76 insertions(+), 468 deletions(-) diff --git a/include/riak_core_bg_manager.hrl b/include/riak_core_bg_manager.hrl index 762ed9389..e53c547b9 100644 --- a/include/riak_core_bg_manager.hrl +++ b/include/riak_core_bg_manager.hrl @@ -49,36 +49,17 @@ -type bg_count() :: pos_integer(). %% token refill tokens to count at each refill period -type bg_rate() :: undefined | {bg_period(), bg_count()}. %% token refill rate -type bg_concurrency_limit() :: non_neg_integer() | infinity. %% max lock concurrency allowed --type bg_state() :: given | blocked | failed. %% state of an instance of a resource. +-type bg_consumer() :: {pid, [bg_meta()]}. %% a consumer of a resource %% Results of a "ps" of live given or blocked locks/tokens -record(bg_stat_live, { resource :: bg_resource(), %% resource name, e.g. 'aae_hashtree_lock' type :: bg_resource_type(), %% resource type, e.g. 'lock' - consumer :: pid(), %% process asking for token - meta :: [bg_meta()], %% associated meta data - state :: bg_state() %% result of last request, e.g. 'given' + owner :: bg_consumer() %% this consumer has the lock or token }). -type bg_stat_live() :: #bg_stat_live{}. -%% Results of a "head" or "tail", per resource. Historical query result. --record(bg_stat_hist, - { - type :: undefined | bg_resource_type(), %% undefined only on default - limit :: non_neg_integer(), %% maximum available, defined by token rate during interval - refills :: non_neg_integer(), %% number of times a token was refilled during interval. 0 if lock - given :: non_neg_integer(), %% number of times this resource was handed out within interval - blocked :: non_neg_integer() %% number of blocked processes waiting for a token - }). --type bg_stat_hist() :: #bg_stat_hist{}. --define(BG_DEFAULT_STAT_HIST, - #bg_stat_hist{type=undefined, limit=undefined, refills=0, given=0, blocked=0}). - --define(BG_DEFAULT_WINDOW_INTERVAL, 60*1000). %% in milliseconds --define(BG_DEFAULT_OUTPUT_SAMPLES, 20). %% default number of sample windows displayed --define(BG_DEFAULT_KEPT_SAMPLES, 10000). %% number of history samples to keep - -define(BG_INFO_ETS_TABLE, background_mgr_info_table). %% name of private lock/token manager info ETS table -define(BG_INFO_ETS_OPTS, [private, set]). %% creation time properties of info ETS table diff --git a/src/riak_core_bg_manager.erl b/src/riak_core_bg_manager.erl index 0bead58bc..494cbe1e8 100644 --- a/src/riak_core_bg_manager.erl +++ b/src/riak_core_bg_manager.erl @@ -62,7 +62,7 @@ disable/0, disable/1, disable/2, - query_resource/3, + query_resource/2, all_resources/0, all_given/0, %% Locks @@ -91,20 +91,11 @@ tokens_given/0, tokens_given/1, %% Testing - start/1 + start/0 ]). %% reporting --export([clear_history/0, - head/0, - head/1, - head/2, - head/3, - tail/0, - tail/1, - tail/2, - tail/3, - ps/0, +-export([ps/0, ps/1 ]). @@ -126,8 +117,8 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %% Test entry point to start stand-alone server -start(Interval) -> - gen_server:start({local, ?SERVER}, ?MODULE, [Interval], []). +start() -> + gen_server:start({local, ?SERVER}, ?MODULE, [], []). %% @doc Global kill switch - causes all locks/tokens to be given out freely without limits. %% Nothing will be tracked or recorded. @@ -176,22 +167,22 @@ enabled(Resource) -> disable(Resource, Kill) -> gen_server:call(?SERVER, {disable, Resource, Kill}). -%% @doc Query the current set of registered resources by name, states, and types. -%% The special atom 'all' querys all resources. A list of states and a list -%% of types allows selective query. --spec query_resource(bg_resource() | all, [bg_state()], [bg_resource_type()]) -> [bg_stat_live()]. -query_resource(Resource, States, Types) -> - gen_server:call(?SERVER, {query_resource, Resource, States, Types}, infinity). +%% @doc Query the current set of registered resources by name and types. +%% The special atom 'all' querys all resources. A list of types allows +%% selective query. +-spec query_resource(bg_resource() | all, [bg_resource_type()]) -> [bg_stat_live()]. +query_resource(Resource, Types) -> + gen_server:call(?SERVER, {query_resource, Resource, Types}, infinity). %% @doc Get a list of all resources of all types in all states -spec all_resources() -> [bg_stat_live()]. all_resources() -> - query_resource(all, [given], [token, lock]). + query_resource(all, [token, lock]). %% @doc Get a list of all resources of all kinds in the given state -spec all_given() -> [bg_stat_live()]. all_given() -> - query_resource(all, [given], [token, lock]). + query_resource(all, [token, lock]). %%%%%%%%%%% %% Lock API @@ -204,7 +195,8 @@ concurrency_limit(Lock) -> gen_server:call(?MODULE, {concurrency_limit, Lock}, infinity). %% @doc same as `set_concurrency_limit(Type, Limit, false)' --spec set_concurrency_limit(bg_lock(), bg_concurrency_limit()) -> bg_concurrency_limit() | undefined. +-spec set_concurrency_limit(bg_lock(), bg_concurrency_limit()) -> + bg_concurrency_limit() | undefined | unregistered. set_concurrency_limit(Lock, Limit) -> set_concurrency_limit(Lock, Limit, false). @@ -214,7 +206,8 @@ set_concurrency_limit(Lock, Limit) -> %% then the extra locks are released by killing processes with reason `max_concurrency'. %% If `false', then the processes holding the extra locks are aloud to do so until they %% are released. --spec set_concurrency_limit(bg_lock(), bg_concurrency_limit(), boolean()) -> bg_concurrency_limit() | undefined. +-spec set_concurrency_limit(bg_lock(), bg_concurrency_limit(), boolean()) -> + bg_concurrency_limit() | undefined | unregistered. set_concurrency_limit(Lock, Limit, Kill) -> gen_server:call(?MODULE, {set_concurrency_limit, Lock, Limit, Kill}, infinity). @@ -265,7 +258,7 @@ lock_info(Lock) -> %% @doc Returns all locks. -spec all_locks() -> [bg_stat_live()]. all_locks() -> - query_resource(all, [given], [lock]). + query_resource(all, [lock]). %% @doc Returns all currently held locks or those that match Lock @@ -275,7 +268,7 @@ locks_held() -> -spec locks_held(bg_lock() | all) -> [bg_stat_live()]. locks_held(Lock) -> - query_resource(Lock, [given], [lock]). + query_resource(Lock, [lock]). %%%%%%%%%%%% %% Token API @@ -324,50 +317,14 @@ token_info(Token) -> -spec all_tokens() -> [bg_stat_live()]. all_tokens() -> - query_resource(all, [given], [token]). + query_resource(all, [token]). %% @doc Get a list of token resources in the given state. tokens_given() -> tokens_given(all). -spec tokens_given(bg_token() | all) -> [bg_stat_live()]. tokens_given(Token) -> - query_resource(Token, [given], [token]). - -%% Stats/Reporting - -clear_history() -> - gen_server:cast(?SERVER, clear_history). - -%% List history of token manager -%% @doc show history of token request/grants over default and custom intervals. -%% offset is forwards-relative to the oldest sample interval --spec head() -> [[bg_stat_hist()]]. -head() -> - head(all). --spec head(bg_token()) -> [[bg_stat_hist()]]. -head(Token) -> - head(Token, ?BG_DEFAULT_OUTPUT_SAMPLES). --spec head(bg_token(), non_neg_integer()) -> [[bg_stat_hist()]]. -head(Token, NumSamples) -> - head(Token, 0, NumSamples). --spec head(bg_token(), non_neg_integer(), bg_count()) -> [[bg_stat_hist()]]. -head(Token, Offset, NumSamples) -> - gen_server:call(?SERVER, {head, Token, Offset, NumSamples}, infinity). - -%% @doc return history of token request/grants over default and custom intervals. -%% offset is backwards-relative to the newest sample interval --spec tail() -> [[bg_stat_hist()]]. -tail() -> - tail(all). --spec tail(bg_token()) -> [[bg_stat_hist()]]. -tail(Token) -> - tail(Token, ?BG_DEFAULT_OUTPUT_SAMPLES). --spec tail(bg_token(), bg_count()) -> [[bg_stat_hist()]]. -tail(Token, NumSamples) -> - tail(Token, NumSamples, NumSamples). --spec tail(bg_token(), bg_count(), bg_count()) -> [[bg_stat_hist()]]. -tail(Token, Offset, NumSamples) -> - gen_server:call(?SERVER, {tail, Token, Offset, NumSamples}, infinity). + query_resource(Token, [token]). %% @doc List most recent requests/grants for all tokens and locks -spec ps() -> [bg_stat_live()]. @@ -407,18 +364,16 @@ ps(Arg) -> type :: bg_resource_type(), pid :: pid(), %% owning process meta :: bg_meta(), %% associated metadata - ref :: reference(), %% optional monitor reference to owning process - state :: bg_state() %% state of item on given + ref :: reference() %% optional monitor reference to owning process }). --define(RESOURCE_ENTRY(Resource, Type, Pid, Meta, Ref, State), - #resource_entry{resource=Resource, type=Type, pid=Pid, meta=Meta, ref=Ref, state=State}). +-define(RESOURCE_ENTRY(Resource, Type, Pid, Meta, Ref), + #resource_entry{resource=Resource, type=Type, pid=Pid, meta=Meta, ref=Ref}). -define(e_resource(X), (X)#resource_entry.resource). -define(e_type(X), (X)#resource_entry.type). -define(e_pid(X), (X)#resource_entry.pid). -define(e_meta(X), (X)#resource_entry.meta). -define(e_ref(X), (X)#resource_entry.ref). --define(e_state(X), (X)#resource_entry.state). %%% %%% Gen Server State record @@ -429,12 +384,7 @@ ps(Arg) -> entry_table:: ets:tid(), %% TableID of ?BG_ENTRY_ETS_TABLE %% NOTE: None of the following data is persisted across process crashes. enabled :: boolean(), %% Global enable/disable switch, true at startup - bypassed:: boolean(), %% Global kill switch. false at startup - %% stats - window :: orddict:orddict(), %% bg_resource() -> bg_stat_hist() - history :: queue(), %% bg_resource() -> queue of bg_stat_hist() - window_interval :: bg_period(), %% history window size in milliseconds - window_tref :: reference() %% reference to history window sampler timer + bypassed:: boolean() %% Global kill switch. false at startup }). %%%=================================================================== @@ -448,8 +398,6 @@ ps(Arg) -> ignore | {stop, term()}. init([]) -> - init([?BG_DEFAULT_WINDOW_INTERVAL]); -init([Interval]) -> lager:debug("Background Manager starting up."), %% Claiming a table will result in a handle_info('ETS-TRANSFER', ...) message. %% We have two to claim... @@ -457,13 +405,9 @@ init([Interval]) -> ok = riak_core_table_manager:claim_table(?BG_ENTRY_ETS_TABLE), State = #state{info_table=undefined, %% resolved in the ETS-TRANSFER handler entry_table=undefined, %% resolved in the ETS-TRANSFER handler - window=orddict:new(), enabled=true, - bypassed=false, - window_interval=Interval, - history=queue:new()}, - State2 = schedule_sample_history(State), - {ok, State2}. + bypassed=false}, + {ok, State}. %% @private %% @doc Handling call messages @@ -493,8 +437,8 @@ handle_call(enable, _From, State) -> handle_call(disable, _From, State) -> State2 = update_enabled(false, State), {reply, status_of(true, State2), State2}; -handle_call({query_resource, Resource, States, Types}, _From, State) -> - Result = do_query(Resource, States, Types, State), +handle_call({query_resource, Resource, Types}, _From, State) -> + Result = do_query(Resource, Types, State), {reply, Result, State}; handle_call({get_lock, Lock, Pid, Meta}, _From, State) -> do_handle_call_exception(fun do_get_resource/5, [Lock, lock, Pid, Meta, State], State); @@ -520,20 +464,14 @@ handle_call({set_token_rate, Token, Rate}, _From, State) -> do_handle_call_exception(fun do_set_token_rate/3, [Token, Rate, State], State); handle_call({get_token, Token, Pid, Meta}, _From, State) -> do_handle_call_exception(fun do_get_resource/5, [Token, token, Pid, Meta, State], State); -handle_call({head, Token, Offset, Count}, _From, State) -> - Result = do_hist(head, Token, Offset, Count, State), - {reply, Result, State}; -handle_call({tail, Token, Offset, Count}, _From, State) -> - Result = do_hist(tail, Token, Offset, Count, State), - {reply, Result, State}; handle_call({ps, lock}, _From, State) -> - Result = do_query(all, [given], [lock], State), + Result = do_query(all, [lock], State), {reply, Result, State}; handle_call({ps, token}, _From, State) -> - Result = do_query(all, [given], [token], State), + Result = do_query(all, [token], State), {reply, Result, State}; handle_call({ps, Resource}, _From, State) -> - Result = do_query(Resource, [given], [token, lock], State), + Result = do_query(Resource, [token, lock], State), {reply, Result, State}. %% @private @@ -546,10 +484,7 @@ handle_cast({bypass, false}, State) -> handle_cast({bypass, true}, State) -> {noreply, update_bypassed(true,State)}; handle_cast({bypass, _Other}, State) -> - {noreply, State}; -handle_cast(clear_history, State) -> - State2 = do_clear_history(State), - {noreply, State2}. + {noreply, State}. %% @private %% @doc Handling all non call/cast messages @@ -577,10 +512,6 @@ handle_info({'ETS-TRANSFER', TableId, Pid, TableName}, State) -> handle_info({'DOWN', Ref, _, _, _}, State) -> State2 = release_resource(Ref, State), {noreply, State2}; -handle_info(sample_history, State) -> - State2 = schedule_sample_history(State), - State3 = do_sample_history(State2), - {noreply, State3}; handle_info({refill_tokens, Type}, State) -> State2 = do_refill_tokens(Type, State), schedule_refill_tokens(Type, State2), @@ -696,18 +627,20 @@ do_disable_lock(Lock, Kill, State) -> %% @doc Throws unregistered for unknown Token do_set_token_rate(Token, Rate, State) -> try - Info = resource_info(Token, State), + Info = resource_info(Token, State), %% may throw table_id_undefined or unregistered OldRate = Info#resource_info.limit, - enforce_type_or_throw(Token, token, Info), + enforce_type_or_throw(Token, token, Info), %% may throw badtype State2 = update_limit(Token, Rate, Info, State), schedule_refill_tokens(Token, State2), {reply, OldRate, State2} catch table_id_undefined -> - %% This could go into a queue to be played when the transfer happens. - {reply, undefined, State}; + %% This could go into a queue to be played when the transfer happens. + {reply, unregistered, State}; {unregistered, Token} -> - {reply, undefined, update_limit(Token, Rate, ?DEFAULT_TOKEN_INFO, State)}; + State3 = update_limit(Token, Rate, ?DEFAULT_TOKEN_INFO, State), + schedule_refill_tokens(Token, State3), + {reply, undefined, State3}; {badtype, _Token}=Error -> {reply, Error, State} end. @@ -737,8 +670,8 @@ enforce_type_or_throw(Resource, Type, Info) -> do_set_concurrency_limit(Lock, Limit, Kill, State) -> try - Info = resource_info(Lock, State), - enforce_type_or_throw(Lock, lock, Info), + Info = resource_info(Lock, State), %% may throw table_id_undefined or unregistered + enforce_type_or_throw(Lock, lock, Info), %% may throw badtype OldLimit = limit(Info), State2 = update_limit(Lock, Limit, ?DEFAULT_LOCK_INFO, State), maybe_honor_limit(Kill, Lock, Limit, State2), @@ -746,9 +679,9 @@ do_set_concurrency_limit(Lock, Limit, Kill, State) -> catch table_id_undefined -> %% This could go into a queue to be played when the transfer happens. - {reply, undefined, State}; + {reply, unregistered, State}; {unregistered, Lock} -> - {reply, 0, update_limit(Lock, Limit, ?DEFAULT_LOCK_INFO, State)}; + {reply, undefined, update_limit(Lock, Limit, ?DEFAULT_LOCK_INFO, State)}; {badtype, _Lock}=Error -> {reply, Error, State} end. @@ -818,7 +751,6 @@ update_resource_enabled(Resource, Value, Default, State) -> State). update_limit(Resource, Limit, Default, State) -> - lager:info("Set concurrency ~p <- ~p", [Resource, Limit]), update_resource_info(Resource, fun(Info) -> Info#resource_info{limit=Limit} end, Default#resource_info{limit=Limit}, @@ -869,45 +801,9 @@ schedule_refill_tokens(Token, State) -> erlang:send_after(Period, self(), {refill_tokens, Token}) end. -%% Schedule a timer event to snapshot the current history -schedule_sample_history(State=#state{window_interval=Interval}) -> - TRef = erlang:send_after(Interval, self(), sample_history), - State#state{window_tref=TRef}. - -%% @doc Update the "limit" history stat for all registered resources into current window. -update_stat_all_limits(State) -> - lists:foldl(fun({Resource, Info}, S) -> - increment_stat_limit(Resource, ?resource_limit(Info), S) - end, - State, - all_resource_info(State)). - -do_sample_history(State) -> - %% Update window with current limits before copying it - State2 = update_stat_all_limits(State), - %% Move the current window of measurements onto the history queues. - %% Trim queue down to ?BG_DEFAULT_KEPT_SAMPLES if too big now. - Queue2 = queue:in(State2#state.window, State2#state.history), - Trimmed = case queue:len(Queue2) > ?BG_DEFAULT_KEPT_SAMPLES of - true -> - {_Discarded, Rest} = queue:out(Queue2), - Rest; - false -> - Queue2 - end, - EmptyWindow = orddict:new(), - State2#state{window=EmptyWindow, history=Trimmed}. - -update_stat_window(Resource, Fun, Default, State=#state{window=Window}) -> - NewWindow = orddict:update(Resource, Fun, Default, Window), - State#state{window=NewWindow}. - resources_given(Resource, #state{entry_table=TableId}) -> [Entry || {{given,_R},Entry} <- ets:match_object(TableId, {{given, Resource},'_'})]. -%% Key = {given, Resource}, -%% [Given || {_K,Given} <- ets:lookup(TableId, Key)]. - %% @private %% @doc Add a Resource Entry to the "given" table. Here, we really do want %% to allow multiple entries because each Resource "name" can be given multiple @@ -925,24 +821,20 @@ remove_given_entries(Resource, State=#state{entry_table=TableId}) -> %% @doc Add a resource queue entry to our given set. give_resource(Entry, State=#state{entry_table=TableId}) -> Resource = ?e_resource(Entry), - Type = ?e_type(Entry), - add_given_entry(Resource, Entry#resource_entry{state=given}, TableId), - %% update given stats - increment_stat_given(Resource, Type, State). + add_given_entry(Resource, Entry, TableId), + State. %% @private %% @doc Add Resource to our given set. give_resource(Resource, Type, Pid, Ref, Meta, State) -> - lager:info("Gave ~p/~p to ~p:~p", [Type, Resource, Pid, Meta]), - Entry = ?RESOURCE_ENTRY(Resource, Type, Pid, Meta, Ref, given), + Entry = ?RESOURCE_ENTRY(Resource, Type, Pid, Meta, Ref), give_resource(Entry, State). -spec try_get_resource(boolean(), bg_resource(), bg_resource_type(), pid(), [{atom(), any()}], #state{}) -> {max_concurrency, #state{}} | {ok, #state{}} | {{ok, reference()}, #state{}}. -try_get_resource(false, Resource, Type, Pid, Meta, State) -> - lager:info("Gave max_concurrency ~p/~p to ~p:~p", [Type, Resource, Pid, Meta]), +try_get_resource(false, _Resource, _Type, _Pid, _Meta, State) -> {max_concurrency, State}; try_get_resource(true, Resource, Type, Pid, Meta, State) -> case Type of @@ -988,9 +880,6 @@ do_get_resource(Resource, Type, Pid, Meta, State) -> random_bogus_ref() -> make_ref(). -all_resource_info(#state{info_table=TableId}) -> - [{Resource, Info} || {{info, Resource}, Info} <- ets:match_object(TableId, {{info, '_'},'_'})]. - all_registered_resources(Type, #state{info_table=TableId}) -> [Resource || {{info, Resource}, Info} <- ets:match_object(TableId, {{info, '_'},'_'}), ?resource_type(Info) == Type]. @@ -1004,143 +893,29 @@ format_entry(Entry) -> { resource = ?e_resource(Entry), type = ?e_type(Entry), - consumer = ?e_pid(Entry), - meta = ?e_meta(Entry), - state = ?e_state(Entry) + owner = {?e_pid(Entry), ?e_meta(Entry)} }. fmt_live_entries(Entries) -> [format_entry(Entry) || Entry <- Entries]. -%% States :: [given], Types :: [lock | token] -do_query(_Resource, _States, _Types, State) when ?NOT_TRANSFERED(State) -> +do_query(_Resource, _Types, State) when ?NOT_TRANSFERED(State) -> %% Table hasn't been transfered yet. []; -do_query(all, States, Types, State) -> - E1 = case lists:member(given, States) of - true -> - Entries = all_given_entries(State), - lists:flatten([Entry || Entry <- Entries, - lists:member(?e_type(Entry), Types)]); - false -> - [] - end, - fmt_live_entries(E1); -do_query(Resource, States, Types, State) -> - E1 = case lists:member(given, States) of - true -> - Entries = resources_given(Resource, State), - [Entry || Entry <- Entries, lists:member(?e_type(Entry), Types)]; - false -> - [] - end, - fmt_live_entries(E1). +do_query(all, Types, State) -> + Entries = all_given_entries(State), + E = lists:flatten([Entry || Entry <- Entries, + lists:member(?e_type(Entry), Types)]), + + fmt_live_entries(E); +do_query(Resource, Types, State) -> + Entries = resources_given(Resource, State), + E = [Entry || Entry <- Entries, lists:member(?e_type(Entry), Types)], + fmt_live_entries(E). %% @private %% @doc Token refill timer event handler. %% Capture stats of what was given in the previous period, %% Clear all tokens of this type from the given set, do_refill_tokens(Token, State) -> - State2 = increment_stat_refills(Token, State), - remove_given_entries(Token, State2). - -default_refill(Token, State) -> - Limit = limit(resource_info(Token, State)), - ?BG_DEFAULT_STAT_HIST#bg_stat_hist{type=token, refills=1, limit=Limit}. - -default_given(Token, Type, State) -> - Limit = limit(resource_info(Token, State)), - ?BG_DEFAULT_STAT_HIST#bg_stat_hist{type=Type, given=1, limit=Limit}. - -increment_stat_limit(_Resource, undefined, State) -> - State; -increment_stat_limit(Resource, Limit, State) -> - {Type, Count} = case Limit of - {_Period, C} -> {token, C}; - N -> {lock, N} - end, - update_stat_window(Resource, - fun(Stat) -> Stat#bg_stat_hist{limit=Count} end, - ?BG_DEFAULT_STAT_HIST#bg_stat_hist{type=Type, limit=Count}, - State). - -increment_stat_refills(Token, State) -> - lager:info("Refill token ~p", [Token]), - update_stat_window(Token, - fun(Stat) -> Stat#bg_stat_hist{refills=1+Stat#bg_stat_hist.refills} end, - default_refill(Token, State), - State). - -increment_stat_given(Token, Type, State) -> - update_stat_window(Token, - fun(Stat) -> Stat#bg_stat_hist{given=1+Stat#bg_stat_hist.given} end, - default_given(Token, Type, State), - State). - -%% erase saved history -do_clear_history(State=#state{window_tref=TRef}) -> - erlang:cancel_timer(TRef), - State2 = State#state{history=queue:new()}, - schedule_sample_history(State2). - -%% Return stats history from head or tail of stats history queue -do_hist(_End, _Resource, _Offset, _Count, State) when ?NOT_TRANSFERED(State) -> - []; -do_hist(End, Resource, Offset, Count, State) when Offset < 0 -> - do_hist(End, Resource, 0, Count, State); -do_hist(_End, _Resource, _Offset, Count, _State) when Count =< 0 -> - []; -do_hist(End, Resource, Offset, Count, #state{history=HistQueue}) -> - QLen = queue:len(HistQueue), - First = max(1, case End of - head -> min(Offset+1, QLen); - tail -> QLen - Offset + 1 - end), - Last = min(QLen, max(First + Count - 1, 1)), - H = case segment_queue(First, Last, HistQueue) of - empty -> []; - {ok, Hist } -> - case Resource of - all -> - StatsDictList = queue:to_list(Hist), - [orddict:to_list(Stat) || Stat <- StatsDictList]; - _T -> - [[{Resource, stat_window(Resource, StatsDict)}] - || StatsDict <- queue:to_list(Hist), stat_window(Resource, StatsDict) =/= undefined] - end - end, - %% Remove empty windows - lists:filter(fun(S) -> S =/= [] end, H). - -segment_queue(First, Last, _Q) when Last < First -> - empty; -segment_queue(First, Last, Queue) -> - QLen = queue:len(Queue), - case QLen >= Last andalso QLen > 0 of - true -> - %% trim off extra tail, then trim head - Front = case QLen == Last of - true -> Queue; - false -> - {QFirst, _QRest} = queue:split(Last, Queue), - QFirst - end, - case First == 1 of - true -> {ok, Front}; - false -> - {_Skip, Back} = queue:split(First-1, Front), - {ok, Back} - end; - false -> - %% empty - empty - end. - -%% @private -%% @doc Get stat history for given token type from sample set --spec stat_window(bg_resource(), orddict:orddict()) -> bg_stat_hist(). -stat_window(Resource, Window) -> - case orddict:find(Resource, Window) of - error -> undefined; - {ok, StatHist} -> StatHist - end. + remove_given_entries(Token, State). diff --git a/test/bg_manager_eqc.erl b/test/bg_manager_eqc.erl index 589310b05..c230efc2e 100644 --- a/test/bg_manager_eqc.erl +++ b/test/bg_manager_eqc.erl @@ -39,11 +39,7 @@ %% and their state locks :: [{bg_eqc_type(), [{reference(), pid(), [], held | released}]}], %% number of tokens taken by type - tokens :: [{bg_eqc_type(), non_neg_integer()}], - %% current history samples accumulator: [{resource, limit, refills, given}] - samples :: [{bg_eqc_type(), non_neg_integer(), non_neg_integer(), non_neg_integer()}], - %% snapshot of samples on window interval - history :: [[{bg_eqc_type(), non_neg_integer(), non_neg_integer(), non_neg_integer()}]] + tokens :: [{bg_eqc_type(), non_neg_integer()}] }). run_eqc() -> @@ -82,9 +78,7 @@ initial_state() -> limits = [], counts = [], locks = [], - tokens = [], - samples = [], - history = [] + tokens = [] }. %% ------ Grouped operator: set_concurrency_limit @@ -298,8 +292,7 @@ set_token_rate_pre(S) -> %% Note that set_token_rate takes a rate, which is {Period, Count}, %% but this test generates it's own refill messages, so rate is not modeled. set_token_rate_next(S=#state{counts=Counts}, _Value, [Type, Count]) -> - S2 = update_sample(Type, Count, 0, 0, S), - S2#state{ counts = lists:keystore(Type, 1, Counts, {Type, Count}) }. + S#state{ counts = lists:keystore(Type, 1, Counts, {Type, Count}) }. %% @doc set_token_rate command set_token_rate(Type, Count) -> @@ -418,24 +411,6 @@ refill_tokens(Type) -> %% TODO: find way to get rid of this timer sleep timer:sleep(100). -%% ------ Grouped operator: sample_history -%% @doc sample_history args generator -sample_history_args(_S) -> - []. - -%% @doc sample_history precondition -sample_history_pre(S, []) -> - is_alive(S). - -%% @doc sample_history next state function -sample_history_next(S, _Value, []) -> - do_sample_history(S). - -%% @doc sample_history command -sample_history() -> - riak_core_bg_manager ! sample_history, - timer:sleep(100). - %% ------ Grouped operator: crash %% @doc crash args generator crash_args(_S) -> @@ -471,12 +446,11 @@ revive_pre(#state{alive=Alive}) -> %% @doc revive_next - Next state function revive_next(S, _Value, _Args) -> - S2 = S#state{ alive = true }, - clear_history(S2). + S#state{ alive = true }. %% @doc revive command revive() -> - {ok, _BgMgr} = riak_core_bg_manager:start(window_interval()). + {ok, _BgMgr} = riak_core_bg_manager:start(). %% @doc revive_post - Postcondition for revive revive_post(_S, _Args, _Res) -> @@ -508,65 +482,6 @@ ps_post(State, [Resource], Result) -> %% TODO: could validate record entries in addition to correct counts eq(length(Result), NumLocks+NumTokens). -%% ------ Grouped operator: head query -%% @doc head arguments generator -head_args(_S) -> - %% [Resource, Offset, NumSamples] - [oneof([all, lock_type(), token_type()]), 0, choose(0,5)]. - -%% @doc ps precondition -head_pre(S) -> - is_alive(S). - -%% @doc ps next state function -head_next(S, _Value, _Args) -> - S. - -%% @doc head command -head(Resource, Offset, NumSamples) -> - riak_core_bg_manager:head(Resource, Offset, NumSamples). - -%% @doc ps postcondition -head_post(#state{history=RevHistory}, [Resource, Offset, NumSamples], Result) -> - History = lists:reverse(RevHistory), - Start = Offset+1, - Len = min(NumSamples, length(History) - Offset), - Keep = lists:sublist(History, Start, Len), - H2 = [lists:filter(fun({R,_L,_R,_G}) -> Resource == all orelse R == Resource end, Samples) - || Samples <- Keep, Samples =/= []], - H3 = lists:filter(fun(S) -> S =/= [] end, H2), - eq(length(Result), length(H3)). - -%% ------ Grouped operator: tail query -%% @doc tail arguments generator -tail_args(_S) -> - %% [Resource, Offset, NumSamples] - [oneof([all, lock_type(), token_type()]), 0, choose(0,5)]. - -%% @doc ps precondition -tail_pre(S) -> - is_alive(S). - -%% @doc ps next state function -tail_next(S, _Value, _Args) -> - S. - -%% @doc tail command -tail(Resource, Offset, NumSamples) -> - riak_core_bg_manager:tail(Resource, Offset, NumSamples). - -%% @doc ps postcondition -tail_post(#state{history=RevHistory}, [Resource, Offset, NumSamples], Result) -> - History = lists:reverse(RevHistory), - HistLen = length(History), - Start = HistLen - Offset + 1, - Len = min(NumSamples, HistLen - Offset), - Keep = lists:sublist(History, Start, Len), - H2 = [lists:filter(fun({R,_L,_R,_G}) -> Resource == all orelse R == Resource end, Samples) - || Samples <- Keep, Samples =/= []], - H3 = lists:filter(fun(S) -> S =/= [] end, H2), - eq(length(Result), length(H3)). - %% ------ Grouped operator: bypass %% @doc bypass arguments generator bypass_args(_S) -> @@ -754,17 +669,14 @@ weight(_S, set_token_rate) -> 3; weight(_S, token_rate) -> 0; weight(_S, get_token) -> 20; weight(_S, refill_tokens) -> 10; -weight(_S, sample_history) -> 10; weight(_S, ps) -> 3; -weight(_S, head) -> 3; -weight(_S, tail) -> 3; weight(_S, crash) -> 3; weight(_S, revive) -> 1; weight(_S, _Cmd) -> 1. %% Other Functions limit(Type, State) -> - limit(Type, 0, State). + limit(Type, undefined, State). limit(Type, Default, #state{limits=Limits}) -> case lists:keyfind(Type, 1, Limits) of @@ -791,38 +703,6 @@ num_tokens_taken(Resource, #state{tokens=Tokens}) -> 0, [Count || {R, Count} <- Tokens, R == Resource]). -clear_history(State) -> - State#state{history=[], samples=[]}. - -%% @doc Snapshot current history samples and reset samples to empty -do_sample_history(State=#state{limits=Limits, counts=Counts}) -> - %% First, grab updates for all resources - S2 = lists:foldl(fun({Resource, Limit}, S) -> - update_sample(Resource, Limit, 0, 0, S) - end, - State, - Limits++Counts), - NewHistory = [S2#state.samples | S2#state.history], - S2#state{history=NewHistory, samples=[]}. - -%% @doc Update the current samples with supplied increments. -%% Limit is overwritten unless undefined. It's not expected to change too often, -%% hopefully less often than the sampling window (Niquist!). -%% This should probably be called approximately every time the API is called. -update_sample(Resource, Limit, Refill, Given, State=#state{samples=Samples}) -> - %% find sample counts for specified resource and increment per arguments. - {_R, Limit1, Refills1, Given1} = case lists:keyfind(Resource, 1, Samples) of - false -> {Resource, 0, 0, 0}; - S -> S - end, - Sample = {Resource, defined_or_default(Limit, Limit1), Refill+Refills1, - Given+Given1}, - Samples2 = lists:keystore(Resource, 1, Samples, Sample), - State#state{samples=Samples2}. - -defined_or_default(undefined, Default) -> Default; -defined_or_default(Value, _Default) -> Value. - is_alive(#state{alive=Alive}) -> Alive. @@ -834,9 +714,6 @@ mk_token_rate(Count) -> %% erlang:send_after max is used so that we can trigger token refilling from EQC test {max_send_after(), Count}. -window_interval() -> - max_send_after(). - max_send_after() -> 4294967295. @@ -862,8 +739,7 @@ update_locks(Type, TypeLocks, State=#state{locks=Locks}) -> add_held_lock(Type, Ref, Pid, Meta, State) -> All = all_locks(Type, State), - S2 = update_sample(Type, undefined, 0, 1, State), - update_locks(Type, [{Ref, Pid, Meta, held} | All], S2). + update_locks(Type, [{Ref, Pid, Meta, held} | All], State). release_locks(Pid, State=#state{locks=Locks}) -> lists:foldl(fun({Type, ByType}, StateAcc) -> @@ -879,12 +755,10 @@ mark_locks_released(Pid, Locks) -> increment_token_count(Type, State=#state{tokens=Tokens}) -> CurCount = num_tokens(Type, State), - S2 = update_sample(Type, undefined, 0, 1, State), - S2#state{ tokens = lists:keystore(Type, 1, Tokens, {Type, CurCount + 1}) }. + State#state{ tokens = lists:keystore(Type, 1, Tokens, {Type, CurCount + 1}) }. reset_token_count(Type, State=#state{tokens=Tokens}) -> - S2 = update_sample(Type, undefined, 1, 0, State), - S2#state{ tokens = lists:keystore(Type, 1, Tokens, {Type, 0}) }. + State#state{ tokens = lists:keystore(Type, 1, Tokens, {Type, 0}) }. stop_pid(Other) when not is_pid(Other) -> ok; @@ -922,7 +796,7 @@ prop_bgmgr() -> [protected, set, named_table]}, {?BG_ENTRY_ETS_TABLE, [protected, bag, named_table]}]), - {ok, _BgMgr} = riak_core_bg_manager:start(window_interval()), + {ok, _BgMgr} = riak_core_bg_manager:start(), {H, S, Res} = run_commands(?MODULE,Cmds), InfoTable = ets:tab2list(?BG_INFO_ETS_TABLE), EntryTable = ets:tab2list(?BG_ENTRY_ETS_TABLE), @@ -944,8 +818,6 @@ prop_bgmgr() -> io:format("locks = ~p~n", [S#state.locks]), io:format("counts = ~p~n", [S#state.counts]), io:format("tokens = ~p~n", [S#state.tokens]), - io:format("samples = ~p~n", [S#state.samples]), - io:format("history = ~p~n", [S#state.history]), io:format("---------------~n"), io:format("~n~nbackground_mgr tables: ~n"), io:format("---------------~n"), @@ -975,7 +847,7 @@ prop_bgmgr_parallel() -> ?BG_INFO_ETS_OPTS}, {?BG_ENTRY_ETS_TABLE, ?BG_ENTRY_ETS_OPTS}]), - {ok, BgMgr} = riak_core_bg_manager:start(window_interval()), + {ok, BgMgr} = riak_core_bg_manager:start(), {Seq, Par, Res} = run_parallel_commands(?MODULE,Cmds), InfoTable = ets:tab2list(?BG_INFO_ETS_TABLE), EntryTable = ets:tab2list(?BG_ENTRY_ETS_TABLE), diff --git a/test/bg_manager_tests.erl b/test/bg_manager_tests.erl index a9b8dd40d..08ee0882d 100644 --- a/test/bg_manager_tests.erl +++ b/test/bg_manager_tests.erl @@ -41,7 +41,7 @@ bg_mgr_test_() -> %% Same for locks. ?assertEqual({unregistered, lock_a}, riak_core_bg_manager:get_lock(lock_a)), - ?assertEqual(0, riak_core_bg_manager:set_concurrency_limit(lock_a, 52)), + ?assertEqual(undefined, riak_core_bg_manager:set_concurrency_limit(lock_a, 52)), ?assertEqual({badtype, lock_a}, riak_core_bg_manager:set_token_rate(lock_a, {1, 5})), @@ -112,26 +112,6 @@ filter_stat(Token, Stats) -> spawn_sync_request(Token, Pid, Meta) -> spawn(fun() -> ok = ?BG_MGR:get_token_blocking(Token, Pid, Meta, ?TIMEOUT) end). -make_hist_stat(Type, Limit, Refills, Given, Blocked) -> - #bg_stat_hist - { - type=Type, - limit=Limit, - refills=Refills, - given=Given, - blocked=Blocked - }. - -make_live_stat(Resource, Type, Pid, Meta, Status) -> - #bg_stat_live - { - resource=Resource, - type=Type, - consumer=Pid, - meta=Meta, - state=Status - }. - -spec some_token_rates() -> [{bg_token(), {bg_period(), bg_count()}}]. some_token_rates() -> [ {token1, {1*1000, 5}}, @@ -167,7 +147,7 @@ verify_token_rates() -> %% doesn't take down our test too. start_bg_mgr() -> %% setup with history window to 1 seconds - ?BG_MGR:start(1*1000), + ?BG_MGR:start(), timer:sleep(100). kill_bg_mgr() -> From 656a19f9051c27d7053abbbca66083a1daea423a Mon Sep 17 00:00:00 2001 From: Chris Tilt Date: Tue, 10 Dec 2013 16:50:16 -0800 Subject: [PATCH 4/5] Integrate vnode lock with background manager * Use per-vnode locks to guard concurrent kv folds on the same partition. * Include kv module name to lock to differentiate from pipe/other vnode types. --- include/riak_core_locks.hrl | 8 +++++ src/riak_core_handoff_sender.erl | 49 ++++++++++++++++++++++++++++-- src/riak_core_vnode.erl | 52 ++++++++++++++++++++++++++++++++ src/riak_core_vnode_manager.erl | 1 - 4 files changed, 107 insertions(+), 3 deletions(-) create mode 100644 include/riak_core_locks.hrl diff --git a/include/riak_core_locks.hrl b/include/riak_core_locks.hrl new file mode 100644 index 000000000..4a9345524 --- /dev/null +++ b/include/riak_core_locks.hrl @@ -0,0 +1,8 @@ +%% Copyright(C), Basho Technologies, 2013 +%% +%% @doc Definitions of shared locks/tokens for use with background manager. +%% See @link riak_core_background_mgr:get_lock/1 +%% +%% @doc vnode_lock(Module, PartitionIndex) is a kv per-vnode lock, used possibly, +%% by AAE tree rebuilds, fullsync, and handoff. +-define(VNODE_LOCK(Mod, Idx), {vnode_lock, Mod, Idx}). diff --git a/src/riak_core_handoff_sender.erl b/src/riak_core_handoff_sender.erl index 61e6c1964..b4dbd71e8 100644 --- a/src/riak_core_handoff_sender.erl +++ b/src/riak_core_handoff_sender.erl @@ -22,6 +22,7 @@ -module(riak_core_handoff_sender). -export([start_link/4, get_handoff_ssl_options/0]). +-include("riak_core_locks.hrl"). -include("riak_core_vnode.hrl"). -include("riak_core_handoff.hrl"). -define(ACK_COUNT, 1000). @@ -92,7 +93,20 @@ start_fold(TargetNode, Module, {Type, Opts}, ParentPid, SslOpts) -> SrcNode = node(), SrcPartition = get_src_partition(Opts), TargetPartition = get_target_partition(Opts), - try + %% Take per-vnode concurrency limit "lock". It will be released when this process exists/dies. + case maybe_get_vnode_lock(Module, SrcPartition) of + max_concurrency -> + %% shared lock not registered yet or limit reached. Failing with + %% max_concurrency will cause this partition to be retried again later. + lager:info("Failed to get vnode lock for partition ~p", [SrcPartition]), + exit({shutdown, max_concurrency}); + ok -> + %% Got the lock or didn't try to. If we got it, our process is now monitored; + %% the lock will be released when we exit. + ok + end, + + try Filter = get_filter(Opts), [_Name,Host] = string:tokens(atom_to_list(TargetNode), "@"), {ok, Port} = get_handoff_port(TargetNode), @@ -568,4 +582,35 @@ remote_supports_batching(Node) -> lager:debug("remote node doesn't support batching"), false end. - + +%% @private +%% @doc Return true iff neither of the "skip background manager" configuration +%% settings are defined as anything other than false. 'skip_background_manager' +%% turns off all use of bg-mgr. 'handoff_skip_background_manager' only stops +%% handoff from using bg-mgr. +-spec use_bg_mgr() -> boolean(). +use_bg_mgr() -> + %% note we're tolerant here of any non-boolean value as well. Iff it's 'false', we don't skip. + GlobalSkip = app_helper:get_env(riak_core, skip_background_manager, false) =/= false, + HandoffSkip = app_helper:get_env(riak_core, handoff_skip_background_manager, false) =/= false, + not (GlobalSkip orelse HandoffSkip). + +%% @private +%% @doc Unless skipping the background manager, try to acquire the per-vnode lock. +%% Sets our task meta-data in the lock as 'handoff', which is useful for +%% seeing what's holding the lock via @link riak_core_background_mgr:ps/0. +-spec maybe_get_vnode_lock(Module::module(), SrcPartition::integer()) -> ok | max_concurrency. +maybe_get_vnode_lock(riak_kv_vnode=Module, SrcPartition) -> + Lock = ?VNODE_LOCK(Module, SrcPartition), + case use_bg_mgr() of + true -> + case riak_core_bg_manager:get_lock(Lock, self(), [{task, handoff}]) of + {ok, _Ref} -> ok; + max_concurrency -> max_concurrency + end; + false -> + lager:info("Handoff is skipping the background manager vnode lock: ~p", [Lock]), + ok + end; +maybe_get_vnode_lock(_Module, _SrcPartition) -> + ok. diff --git a/src/riak_core_vnode.erl b/src/riak_core_vnode.erl index 437f8e0c6..35d206c34 100644 --- a/src/riak_core_vnode.erl +++ b/src/riak_core_vnode.erl @@ -19,6 +19,7 @@ -module('riak_core_vnode'). -behaviour(gen_fsm). -include("riak_core_vnode.hrl"). +-include("riak_core_locks.hrl"). -export([behaviour_info/1]). -export([start_link/3, start_link/4, @@ -145,6 +146,7 @@ init([Mod, Index, InitialInactivityTimeout, Forward]) -> process_flag(trap_exit, true), State = #state{index=Index, mod=Mod, forward=Forward, inactivity_timeout=InitialInactivityTimeout}, + try_set_vnode_lock_limit(Mod, Index), %% Check if parallel disabled, if enabled (default) %% we don't care about the actual number, so using magic 2. case app_helper:get_env(riak_core, vnode_parallel_start, 2) =< 1 of @@ -763,6 +765,10 @@ handle_sync_event(core_status, _From, StateName, State=#state{index=Index, end, {reply, {Mode, Status}, StateName, State, State#state.inactivity_timeout}. +handle_info({set_concurrency_limit, Lock, Limit}, StateName, State) -> + try_set_concurrency_limit(Lock, Limit), + {noreply, StateName, State}; + handle_info({'$vnode_proxy_ping', From, Msgs}, StateName, State) -> riak_core_vnode_proxy:cast(From, {vnode_proxy_pong, self(), Msgs}), {next_state, StateName, State, State#state.inactivity_timeout}; @@ -1001,6 +1007,52 @@ mod_set_forwarding(Forward, State=#state{mod=Mod, modstate=ModState}) -> State end. +%% @private +%% @doc Return true iff neither of the "skip background manager" configuration +%% settings are defined as anything other than false. 'skip_background_manager' +%% turns off all use of bg-mgr. +-spec use_bg_mgr() -> boolean(). +use_bg_mgr() -> + %% note we're tolerant here of any non-boolean value as well. Iff it's 'false', we don't skip. + GlobalSkip = app_helper:get_env(riak_core, skip_background_manager, false) =/= false, + not GlobalSkip. + +%% @private +%% @doc Query the application environment for 'vnode_lock_concurrency', and +%% if it's an integer, use it to set the maximum vnode lock concurrency +%% for Idx. If the background manager is not available yet, schedule a +%% retry for later. If the application environment has a non "false" +%% setting of the key 'skip_background_manager', then this code just +%% returns ok without registering. +try_set_vnode_lock_limit(riak_kv_vnode=Mod, Idx) -> + %% register per-vnode concurrency limit "lock" with 1 so that only a single participating + %% subsystem can run a vnode fold at a time. Participation is voluntary :-) + Concurrency = case app_helper:get_env(riak_core, vnode_lock_concurrency, 1) of + N when is_integer(N) -> N; + _NotNumber -> 1 + end, + try_set_concurrency_limit(?VNODE_LOCK(Mod, Idx), Concurrency); +try_set_vnode_lock_limit(_Mod, _Idx) -> + ok. + +try_set_concurrency_limit(Lock, Limit) -> + try_set_concurrency_limit(Lock, Limit, use_bg_mgr()). + +try_set_concurrency_limit(_Lock, _Limit, false) -> + lager:info("Skipping background manager."), + ok; +try_set_concurrency_limit(Lock, Limit, true) -> + %% this is ok to do more than once + case riak_core_bg_manager:set_concurrency_limit(Lock, Limit) of + undefined -> + %% not ready yet, try again later + lager:info("Background manager unavailable. Will try to set: ~p later.", [Lock]), + erlang:send_after(250, ?MODULE, {set_concurrency_limit, Lock, Limit}); + _ -> + lager:debug("Registered lock: ~p", [Lock]), + ok + end. + %% =================================================================== %% Test API %% =================================================================== diff --git a/src/riak_core_vnode_manager.erl b/src/riak_core_vnode_manager.erl index c12f73927..99346956d 100644 --- a/src/riak_core_vnode_manager.erl +++ b/src/riak_core_vnode_manager.erl @@ -582,7 +582,6 @@ get_vnode(IdxList, Mod, State) -> lager:debug("Will start VNode for partition ~p", [Idx]), {ok, Pid} = riak_core_vnode_sup:start_vnode(Mod, Idx, ForwardTo), - lager:debug("Started VNode, waiting for initialization to complete ~p, ~p ", [Pid, Idx]), ok = riak_core_vnode:wait_for_init(Pid), lager:debug("VNode initialization ready ~p, ~p", [Pid, Idx]), {Idx, Pid} From f6274df75c5e79271c23b1b5544a46e50d2d407a Mon Sep 17 00:00:00 2001 From: Chris Tilt Date: Fri, 13 Dec 2013 14:30:13 -0800 Subject: [PATCH 5/5] Use unregistered as a means to detect bg-mgr unavailability --- src/riak_core_handoff_sender.erl | 4 ++-- src/riak_core_vnode.erl | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/riak_core_handoff_sender.erl b/src/riak_core_handoff_sender.erl index b4dbd71e8..291057ff5 100644 --- a/src/riak_core_handoff_sender.erl +++ b/src/riak_core_handoff_sender.erl @@ -98,7 +98,7 @@ start_fold(TargetNode, Module, {Type, Opts}, ParentPid, SslOpts) -> max_concurrency -> %% shared lock not registered yet or limit reached. Failing with %% max_concurrency will cause this partition to be retried again later. - lager:info("Failed to get vnode lock for partition ~p", [SrcPartition]), + lager:debug("Failed to get vnode lock for partition ~p", [SrcPartition]), exit({shutdown, max_concurrency}); ok -> %% Got the lock or didn't try to. If we got it, our process is now monitored; @@ -609,7 +609,7 @@ maybe_get_vnode_lock(riak_kv_vnode=Module, SrcPartition) -> max_concurrency -> max_concurrency end; false -> - lager:info("Handoff is skipping the background manager vnode lock: ~p", [Lock]), + lager:debug("Handoff is skipping the background manager vnode lock: ~p", [Lock]), ok end; maybe_get_vnode_lock(_Module, _SrcPartition) -> diff --git a/src/riak_core_vnode.erl b/src/riak_core_vnode.erl index 35d206c34..11f293973 100644 --- a/src/riak_core_vnode.erl +++ b/src/riak_core_vnode.erl @@ -767,7 +767,7 @@ handle_sync_event(core_status, _From, StateName, State=#state{index=Index, handle_info({set_concurrency_limit, Lock, Limit}, StateName, State) -> try_set_concurrency_limit(Lock, Limit), - {noreply, StateName, State}; + {next_state, StateName, State}; handle_info({'$vnode_proxy_ping', From, Msgs}, StateName, State) -> riak_core_vnode_proxy:cast(From, {vnode_proxy_pong, self(), Msgs}), @@ -1044,9 +1044,9 @@ try_set_concurrency_limit(_Lock, _Limit, false) -> try_set_concurrency_limit(Lock, Limit, true) -> %% this is ok to do more than once case riak_core_bg_manager:set_concurrency_limit(Lock, Limit) of - undefined -> + unregistered -> %% not ready yet, try again later - lager:info("Background manager unavailable. Will try to set: ~p later.", [Lock]), + lager:debug("Background manager unavailable. Will try to set: ~p later.", [Lock]), erlang:send_after(250, ?MODULE, {set_concurrency_limit, Lock, Limit}); _ -> lager:debug("Registered lock: ~p", [Lock]),