Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various enhancements to the API. #115

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions include/riak_control.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
-type plan() :: [] | legacy | ring_not_ready | unavailable.
-type transfer() :: riak_core_ring:pending_change().
-type transfers() :: [transfer()].
-type partition() :: list({integer(), integer(), integer()}).
-type partitions() :: [partition()].

-type stage_error() :: nodedown
| already_leaving
Expand All @@ -55,14 +57,7 @@

-type change() :: {node(), action()}.

-record(partition_info,
{ index :: index(),
partition :: integer(),
owner :: owner(),
vnodes :: services(),
handoffs :: handoffs()
}).

%% Node membership records.
-record(member_info,
{ node :: atom(),
status :: status(),
Expand All @@ -75,11 +70,27 @@
mem_used :: integer(),
mem_erlang :: integer(),
action :: action(),
replacement :: node()
}).
replacement :: node() }).

-record(member_info_v2,
{ node :: atom(),
status :: status(),
reachable :: boolean(),
vnodes :: vnodes(),
handoffs :: handoffs(),
ring_pct :: float(),
pending_pct :: float(),
mem_total :: integer(),
mem_used :: integer(),
mem_erlang :: integer(),
action :: action(),
stats :: [{atom(), [{atom(), term()}]}],
replacement :: node() }).

-type member() :: #member_info{} | #member_info_v2{}.
-type members() :: [member()].

-type partitions() :: [#partition_info{}].
-type members() :: [#member_info{}].
-define(MEMBER_INFO, #member_info_v2).

%% These two should always match, in terms of webmachine dispatcher
%% logic, and ADMIN_BASE_PATH should always end with a /
Expand Down
13 changes: 13 additions & 0 deletions priv/admin/js/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ minispade.register('core', function() {
}
};

/** Handle an object type. */
DS.attr.transforms.object = {
from: function(serialized) {
return Em.none(serialized) ? {} : serialized;
},

to: function(deserialized) {
return Em.none(deserialized) ? {} : deserialized;
}
};

/**
* @class
*
Expand Down Expand Up @@ -43,6 +54,8 @@ minispade.register('core', function() {

low_mem: DS.attr("boolean"),

stats: DS.attr("object"),

/**
* This boolean attribute determines if the node
* responsible for the API requests is running
Expand Down
36 changes: 27 additions & 9 deletions priv/admin/js/nodes.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ minispade.register('nodes', function() {
meBinding: 'content.me',

/**
* In order for labels to be clickable, they need to be bound to checks/radios
* by ID. However, since these nodes are cloned by Ember, we need a way to make
* sure all of those elements get id's that don't override each other. This
* function gives us an ID string we can use as a prefix for id's on these other
* elements.
* In order for labels to be clickable, they need to be bound to
* checks/radios by ID. However, since these nodes are cloned by
* Ember, we need a way to make sure all of those elements get id's
* that don't override each other. This function gives us an ID
* string we can use as a prefix for id's on these other elements.
*
* @returns {String}
*/
Expand All @@ -116,14 +116,20 @@ minispade.register('nodes', function() {
}.property(),

/**
* An ID value for the leave normally radio button and corresponding label.
* An ID value for the leave normally radio button and
* corresponding label.
*
* @returns {String}
*/
stopRadio: function() {
return this.get('nodeID') + '_stop_node';
}.property('nodeID'),

/**
* An ID value for the force leave radio button and corresponding label.
* An ID value for the force leave radio button and corresponding
* label.
*
* @returns {String}
*/
downRadio: function() {
return this.get('nodeID') + '_down_node';
Expand All @@ -133,6 +139,8 @@ minispade.register('nodes', function() {
* A node can not be stopped when:
* - It is unreachable.
* - It is down.
*
* @returns {String}
*/
stopRadioClasses: function() {
var status = this.get('status'),
Expand All @@ -148,6 +156,8 @@ minispade.register('nodes', function() {
* A node can not be marked as down when:
* - It is alive and well
* - It is already down.
*
* @returns {String}
*/
downRadioClasses: function() {
var status = this.get('status'),
Expand All @@ -162,17 +172,25 @@ minispade.register('nodes', function() {
/**
* When a node can't be stopped, disable the user
* from clicking the stop radio button.
*
* @returns {String}
*/
stopDisablerClasses: function() {
return 'disabler' + (/\ssemi\-transparent$/.test(this.get('stopRadioClasses')) ? ' show' : '');
return 'disabler' +
(/\ssemi\-transparent$/.test(this.get('stopRadioClasses')) ?
' show' : '');
}.property('stopRadioClasses'),

/**
* When a node can't be downed, disable the user from
* clicking the down radio button.
*
* @returns {String}
*/
downDisablerClasses: function() {
return 'disabler' + (/\ssemi\-transparent$/.test(this.get('downRadioClasses')) ? ' show' : '');
return 'disabler' +
(/\ssemi\-transparent$/.test(this.get('downRadioClasses')) ?
' show' : '');
}.property('downRadioClasses')
});

Expand Down
68 changes: 52 additions & 16 deletions src/riak_control_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,12 @@ update_nodes(State=#state{ring=Ring}) ->
-spec update_partitions(#state{}) -> #state{}.
update_partitions(State=#state{ring=Ring, nodes=Nodes}) ->
Unavailable = [Name ||
#member_info{node=Name, reachable=false} <- Nodes],
?MEMBER_INFO{node=Name, reachable=false} <- Nodes],
Partitions = riak_control_ring:status(Ring, Unavailable),
State#state{partitions=Partitions}.

%% @doc Ping and retrieve vnode workers.
-spec get_member_info({node(), status()}, ring()) -> #member_info{}.
-spec get_member_info({node(), status()}, ring()) -> member().
get_member_info(_Member={Node, Status}, Ring) ->
RingSize = riak_core_ring:num_partitions(Ring),

Expand All @@ -293,43 +293,59 @@ get_member_info(_Member={Node, Status}, Ring) ->
%% try and get a list of all the vnodes running on the node
case rpc:call(Node, riak_control_session, get_my_info, []) of
{badrpc,nodedown} ->
#member_info{node = Node,
?MEMBER_INFO{node = Node,
status = Status,
reachable = false,
vnodes = [],
handoffs = [],
ring_pct = PctRing,
pending_pct = PctPending};
{badrpc,_Reason} ->
#member_info{node = Node,
?MEMBER_INFO{node = Node,
status = incompatible,
reachable = true,
vnodes = [],
handoffs = [],
ring_pct = PctRing,
pending_pct = PctPending};
MemberInfo = #member_info{} ->
%% there is a race condition here, when a node is stopped
%% gracefully (e.g. `riak stop`) the event will reach us
%% before the node is actually down and the rpc call will
%% succeed, but since it's shutting down it won't have any
%% vnode workers running...
MemberInfo#member_info{status = Status,
MemberInfo0 ->
MemberInfo = upgrade_member_info(MemberInfo0),
MemberInfo?MEMBER_INFO{status = Status,
ring_pct = PctRing,
pending_pct = PctPending}
end.

%% @doc Return current nodes information.
-spec get_my_info() -> #member_info{}.
-spec get_my_info() -> member().
get_my_info() ->
{Total, Used} = get_my_memory(),
#member_info{node = node(),
VNodes = riak_core_vnode_manager:all_vnodes(),
VNodeTypes = lists:usort([Type || {Type, _, _} <- VNodes]),
Stats = [stats(Type) || Type <- VNodeTypes],
Handoffs = get_handoff_status(),
?MEMBER_INFO{node = node(),
reachable = true,
mem_total = Total,
mem_used = Used,
mem_erlang = proplists:get_value(total,erlang:memory()),
vnodes = riak_core_vnode_manager:all_vnodes(),
handoffs = get_handoff_status()}.
mem_erlang = proplists:get_value(total, erlang:memory()),
vnodes = VNodes,
handoffs = Handoffs,
stats = Stats}.

%% @doc Retrieve stats for a given vnode type.
-spec stats(atom()) -> {atom(), list({atom(), term()})}.
stats(VNodeType) ->
case VNodeType of
riak_kv_vnode ->
Stats = proplists:delete(disk, riak_kv_stat:get_stats()),
{riak_kv, Stats};
riak_search_vnode ->
{riak_search, []};
riak_pipe_vnode ->
{riak_pipe, []};
riak_core_vnode ->
{riak_core, riak_core_state:get_stats()}
end.

%% @doc Return current nodes memory.
-spec get_my_memory() -> {term(), term()}.
Expand Down Expand Up @@ -443,3 +459,23 @@ maybe_stage_change(Node, Action, Replacement) ->
stop ->
rpc:call(Node, riak_core, stop, [])
end.

%% @doc Conditionally upgrade member info records once they cross node
%% boundaries.
-spec upgrade_member_info(member()) -> member().
upgrade_member_info(MemberInfo = ?MEMBER_INFO{}) ->
MemberInfo;
upgrade_member_info(MemberInfo = #member_info{}) ->
?MEMBER_INFO{
node = MemberInfo#member_info.node,
status = MemberInfo#member_info.status,
reachable = MemberInfo#member_info.reachable,
vnodes = MemberInfo#member_info.vnodes,
handoffs = MemberInfo#member_info.handoffs,
ring_pct = MemberInfo#member_info.ring_pct,
pending_pct = MemberInfo#member_info.pending_pct,
mem_total = MemberInfo#member_info.mem_total,
mem_used = MemberInfo#member_info.mem_used,
mem_erlang = MemberInfo#member_info.mem_erlang,
action = MemberInfo#member_info.action,
replacement = MemberInfo#member_info.replacement}.
48 changes: 24 additions & 24 deletions src/riak_control_wm_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ to_json(ReqData, Context) ->

%% Get the current node list.
{ok, _V, Nodes} = riak_control_session:get_nodes(),
Current = [jsonify_node(Node, Claimant) || Node=#member_info{} <- Nodes],
Current = [jsonify_node(Node, Claimant) || Node=?MEMBER_INFO{} <- Nodes],

%% Get the current list of planned changes and updated claim.
Planned = case riak_control_session:get_plan() of
Expand All @@ -211,39 +211,39 @@ to_json(ReqData, Context) ->
{mochijson2:encode({struct,[{cluster,Clusters}]}), ReqData, Context}.

%% @doc Generate a new "planned" cluster which outlines transitions.
-spec merge_transitions(list(#member_info{}), list(), list(), node()) ->
-spec merge_transitions(list(member()), list(), list(), node()) ->
[{struct, list()}].
merge_transitions(Nodes, Changes, Claim, Claimant) ->
[jsonify_node(apply_changes(Node, Changes, Claim), Claimant) ||
Node <- Nodes].

%% @doc Merge change into member info record.
-spec apply_changes(#member_info{}, list(), list()) -> #member_info{}.
-spec apply_changes(member(), list(), list()) -> member().
apply_changes(Node, Changes, Claim) ->
apply_status_change(apply_claim_change(Node, Claim), Changes).

%% @doc Merge change into member info record.
-spec apply_status_change(#member_info{}, list()) -> #member_info{}.
-spec apply_status_change(member(), list()) -> member().
apply_status_change(Node, Changes) ->
Name = Node#member_info.node,
Name = Node?MEMBER_INFO.node,

case lists:keyfind(Name, 1, Changes) of
false ->
Node;
{_, {Action, Replacement}} ->
Node#member_info{action=Action, replacement=Replacement};
Node?MEMBER_INFO{action=Action, replacement=Replacement};
{_, Action} ->
Node#member_info{action=Action}
Node?MEMBER_INFO{action=Action}
end.

%% @doc Merge change into member info record.
-spec apply_claim_change(#member_info{}, list()) -> #member_info{}.
-spec apply_claim_change(member(), list()) -> member().
apply_claim_change(Node, Claim) ->
Name = Node#member_info.node,
Name = Node?MEMBER_INFO.node,

case lists:keyfind(Name, 1, Claim) of
false ->
Node#member_info{ring_pct=0.0, pending_pct=0.0};
Node?MEMBER_INFO{ring_pct=0.0, pending_pct=0.0};
{_, {_, Future}} ->
%% @doc Hack until core returns normalized values.
Normalized = if
Expand All @@ -252,30 +252,30 @@ apply_claim_change(Node, Claim) ->
true ->
Future
end,
Node#member_info{ring_pct=Normalized, pending_pct=Normalized}
Node?MEMBER_INFO{ring_pct=Normalized, pending_pct=Normalized}
end.

%% @doc Turn a node into a proper struct for serialization.
-spec jsonify_node(#member_info{}, node()) -> {struct, list()}.
-spec jsonify_node(member(), node()) -> {struct, list()}.
jsonify_node(Node, Claimant) ->
LWM=app_helper:get_env(riak_control,low_mem_watermark,0.1),
MemUsed = Node#member_info.mem_used,
MemTotal = Node#member_info.mem_total,
Reachable = Node#member_info.reachable,
MemUsed = Node?MEMBER_INFO.mem_used,
MemTotal = Node?MEMBER_INFO.mem_total,
Reachable = Node?MEMBER_INFO.reachable,
LowMem = low_mem(Reachable, MemUsed, MemTotal, LWM),
{struct,[{"name",Node#member_info.node},
{"status",Node#member_info.status},
{struct,[{"name",Node?MEMBER_INFO.node},
{"status",Node?MEMBER_INFO.status},
{"reachable",Reachable},
{"ring_pct",Node#member_info.ring_pct},
{"pending_pct",Node#member_info.pending_pct},
{"ring_pct",Node?MEMBER_INFO.ring_pct},
{"pending_pct",Node?MEMBER_INFO.pending_pct},
{"mem_total",MemTotal},
{"mem_used",MemUsed},
{"mem_erlang",Node#member_info.mem_erlang},
{"mem_erlang",Node?MEMBER_INFO.mem_erlang},
{"low_mem",LowMem},
{"me",Node#member_info.node == node()},
{"claimant",Node#member_info.node == Claimant},
{"action",Node#member_info.action},
{"replacement",Node#member_info.replacement}]}.
{"me",Node?MEMBER_INFO.node == node()},
{"claimant",Node?MEMBER_INFO.node == Claimant},
{"action",Node?MEMBER_INFO.action},
{"replacement",Node?MEMBER_INFO.replacement}]}.

%% @doc Given a struct/proplist that we've received via JSON,
%% recursively turn the keys into atoms from binaries.
Expand Down
Loading