This repository contains an example riak_core application using the most recent version of riak_core and running on Erlang/OTP 25 with rebar3.
Below is a detailed tutorial that explains the step-by-step process to produce the same code base from scratch.
The project and tutorial structure were largely based on the Little Riak Core Book and the Create a riak_core application in Elixir series.
- Example application usage
- Riak Core Tutorial
Run on three separate terminals:
make dev1
make dev2
make dev3
Join the nodes and ping:
(rc_example2@127.0.0.1)1> riak_core:join('rc_example1@127.0.0.1').
(rc_example2@127.0.0.1)2> rc_example:ping().
(rc_example3@127.0.0.1)1> riak_core:join('rc_example1@127.0.0.1').
(rc_example3@127.0.0.1)2> rc_example:ping().
Check the ring status:
(rc_example3@127.0.0.1)3> rc_example:ring_status().
Try the key/value commands:
(rc_example1@127.0.0.1)1> rc_example:put(k1, v1).
ok
(rc_example1@127.0.0.1)2> rc_example:put(k2, v2).
ok
(rc_example2@127.0.0.1)1> rc_example:get(k2).
v2
Riak Core is the distributed
systems framework used by the Riak data store
to distribute data and scale. More generally, it can be thought of as
a toolkit for building distributed, scalable, fault-tolerant
applications. In practical terms, Riak Core is an Erlang/OTP application, and most
of the user defined work is done in the riak_core_vnode
behavior.
What makes Riak Core so interesting and useful is that it implements the ideas of the Amazon's Dyamo architecture and exposes its infrastructure as a reusable library, allowing to easily apply them in any context that can benefit from decentralized distribution of work (including but not limited to data stores).
As you will see, it provides the basic blocks to build distributed services, consistent hashing, routing, support for sharding and replicating, distributed queries, etc. They need not all be used. For example, a game server which handles requests from players could partition players to handle load, and ensure that players requests are always handled on the same vnode to ensure data locality.
A distributed batch job handling system could also use consistent hashing and routing to ensure jobs from the same batch are always handled by the same node, or distribute the jobs across several partitions and then use the distributed map-reduce queries to gather results.
We're using Basho's Riak Core for this tutorial, you can check it here riak_core, as it seems to be maintained at the time of writing this.
As part of our interest in this technology and our intention to use it in new projects we had to struggle a bit with scarce and outdated documenatation, stale dependencies, etc. The intention is thus to provide a tutorial on how to use Riak Core today, on an Erlang 25 and rebar3 project, with minimal dependencies and operational sugar. You'll notice the structure borrows heavily from the Little Riak Core Book and the riak_core in Elixir series, which were our main references.
- Introducing Riak Core
- Riak Core Wiki
- Masterless Distributed Computing with Riak Core
- Ryan Zezeski's "working" blog: First, multinode and The vnode
- Little Riak Core Book
- riak_core in Elixir: Part I, Part II, Part III, Part IV and Part V
- A Gentle Introduction to Riak Core
- Understanding Riak Core: Handoff, Building Handoff and The visit fun
- udon_ng example application.
Riak Core is based on the Dynamo architecture, meaning it scales and distributes the work in a decentralized manner, using Consistent Hashing.
Most operations are applied to an object which is identified by some data value. In the context of a Key/Value store, for example, the identifier is the Key used in get, put and delete operations.
Before performing the operation, a hashing function is applied to the key. The key hash will be used to decide which node in the cluster should be responsible for executing the operation. The range of possible values the key hash can take (the keyspace, usually depicted as a ring), is partitioned in equally sized buckets, which are assigned to virtual nodes, also known as vnodes.
The number of vnodes is fixed at cluster creation and a given hash value will always belong to the same partition (i.e. the same vnode). The vnodes in turn are evenly distributed across all available physical nodes. Note this distribution isn't fixed as the keyspace partitioning is: the vnode distribution can change if a physical node is added to the cluster or goes down.
You can find a more detailed demonstration of consistent hashing here.
This architecture enables several desirable properties in our system: high avalability, incremental scalability and decentralization, with a low operational cost. You can find a detailed discussion of these properties in the Dynamo paper.
In this tutorial we'll build an in-memory, distributed key/value store. Let's start by creating a new project with rebar3:
$ rebar3 new app rc_example
===> Writing rc_example/src/rc_example_app.erl
===> Writing rc_example/src/rc_example_sup.erl
===> Writing rc_example/src/rc_example.app.src
===> Writing rc_example/rebar.config
===> Writing rc_example/.gitignore
===> Writing rc_example/LICENSE
===> Writing rc_example/README.md
Note there's a rebar3 template for riak core. The reason we don't use it here is that it's outdated and it generates a lot of operational code that would take a lot of effort to figure out and fix. Instead, we'll start with an empty project and build our way up, although the code generated by the template can serve as a good reference along the way.
Next up we'll fill up some of the rebar.config file. We'll add the riak_core dependency and lager, which we'll use for logging:
{erl_opts, [debug_info]}.
{deps, [{riak_core, {git, "https://github.com/basho/riak_core", {branch, "develop"}}}]}.
At this point you should be able to compile your project running rebar3 compile
.
Now that the project compiles, let's try to build and run a
release. First we need to add lager and riak_core to
src/rc_example.app.src
, so they're started along with our
application. We also need to add compiler and cuttlefish, which is a system riak uses
for its internal configuration:
{applications,
[kernel,
stdlib,
lager,
compiler,
cuttlefish,
riak_core
]}
Then, add the release configuration for
development in rebar.config
:
{relx, [{release, {rc_example, "0.1.0"}, [rc_example]},
{dev_mode, true},
{include_erts, false},
{sys_config, "conf/sys.config"},
{vm_args, "conf/vm.args"},
{extended_start_script, false}]}.
Note we won't be using the rebar3 shell
command, which doesn't play
along nicely with riak_core; we need a proper release instead (although we can
use dev_mode). Thus, we can build and run the release with:
$ rebar3 release && _build/default/rel/rc_example/bin/rc_example
If you go ahead and run that you'll see an error like Failed to load ring file: "no such file or directory"
. We need to add some configuration to
conf/sys.config
and conf/vm.args
to properly start riak_core:
%% vm.args
-name rc_example@127.0.0.1
%% conf/sys.config
[{riak_core,
[{ring_state_dir, "./data/ring"},
{web_port, 8098},
{handoff_port, 8099},
{schema_dirs, ["lib/rc_example-0.1.0/priv"]}
]}].
vm.args
just sets the node name; in sys.config
we set a data
directory for riak core (ring_state_dir
) and a
couple of ports; we also need to point riak to its schema (by setting
schema_dirs
). For this to work we have to copy
this file
to priv/riak_core.schema
.
At this point we should have a runnable release (if you see errors, try removing the _build directory):
$ rebar3 release && _build/default/rel/rc_example/bin/rc_example
So far we've got a single Erlang node running a release with riak_core in it, but we didn't really write any code to test it. So, before getting into the distributed aspects of riak_core, let's add the simplest possible functionality: a ping command.
Recall from the overview, that the keyspace (the range of all possible results of hashing a key) is partitioned, and each partition is assigned to a virtual node. The vnode is a worker process which handles incoming requests known as commands and is implemented as an OTP behavior. In our initial example we'll create an empty vnode that only knows how to handle a ping command. A detailed explanation of vnodes can be found here.
Let's add a src/rc_example_vnode.erl
module that will implement the
riak_core_vnode
behavior:
-module(rc_example_vnode).
-behaviour(riak_core_vnode).
-export([start_vnode/1,
init/1,
terminate/2,
handle_command/3,
is_empty/1,
delete/1,
handle_handoff_command/3,
handoff_starting/2,
handoff_cancelled/1,
handoff_finished/2,
handle_handoff_data/2,
encode_handoff_item/2,
handle_coverage/4,
handle_exit/3]).
start_vnode(I) ->
riak_core_vnode_master:get_vnode_pid(I, ?MODULE).
init([Partition]) ->
{ok, #{partition => Partition}}.
handle_command(ping, _Sender, State = #{partition := Partition}) ->
log("Received ping command ~p", [Partition], State),
{reply, {pong, Partition}, State};
handle_command(Message, _Sender, State) ->
log("unhandled_command ~p", [Message], State),
{noreply, State}.
First off, the start_vnode
function. This is not a riak_vnode
behavior callback, but it's nevertheless required for the vnode to work.
This function isn't documented, and to my knowledge it will always
have the same implementation: riak_core_vnode_master:get_vnode_pid(I, ?MODULE).
, so it could probably be handled internally by
riak_core. Since it isn't, we copy paste that line everytime ¯\_(ツ)_/¯
The init
callback initializes the state of the vnode, much like in a
gen_server. In the code above we intialize a state map that only
contains the id of the partition assigned to the vnode.
The next interesting callback is handle_command
, which as you may
expect handles the requests that are assigned to the vnode. The nature
of the command will be defined by the Message parameter. In the case of
our simple ping command, we add a new handle_command
clause that
just replies with the partition id of the vnode.
That's all we need to get started, the rest of the riak_vnode
callbacks will have dummy implementations. We'll get back at those in the
following sections.
handle_handoff_command(_Message, _Sender, State) ->
{noreply, State}.
handoff_starting(_TargetNode, State) ->
{true, State}.
handoff_cancelled(State) ->
{ok, State}.
handoff_finished(_TargetNode, State) ->
{ok, State}.
handle_handoff_data(_Data, State) ->
{reply, ok, State}.
encode_handoff_item(_ObjectName, _ObjectValue) ->
<<>>.
is_empty(State) ->
{true, State}.
delete(State) ->
{ok, State}.
handle_coverage(_Req, _KeySpaces, _Sender, State) ->
{stop, not_implemented, State}.
handle_exit(_Pid, _Reason, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
%% internal
%% same as logger:info but prepends the partition
log(String, State) ->
log(String, [], State).
log(String, Args, #{partition := Partition}) ->
String2 = "[~.36B] " ++ String,
Args2 = [Partition | Args],
logger:info(String2, Args2),
ok.
We also added a small log
helper that prepends the partition to all
the vnode logs.
Before moving on we need to add some boilerplate code for riak_core to
find and manage our example vnode. Update the start
callback in
src/rc_example_app.erl
:
start(_StartType, _StartArgs) ->
ok = riak_core:register([{vnode_module, rc_example_vnode}]),
ok = riak_core_node_watcher:service_up(rc_example, self()),
rc_example_sup:start_link().
The first line initialises the ring telling riak_core to use
rc_example_vnode
as a vnode module. The second one starts the
node_watcher, a process responsible for tracking the status of nodes within a riak_core cluster.
We also need to update the supervisor in src/rc_example_sup.erl
, to
start the vnode_master, the process that coordinates the
distribution of work within the physical node: it starts all the
worker vnodes, receives all the requests on that particular physical
node and routes each of them to the vnode that should handle it.
init([]) ->
VMaster = {rc_example_vnode_master,
{riak_core_vnode_master, start_link, [rc_example_vnode]},
permanent, 5000, worker, [riak_core_vnode_master]},
{ok, {{one_for_one, 5, 10}, [VMaster]}}.
So far we have a vnode that knows how to respond to an incoming ping
request, but we still need an API to be able to send that
request. We'll add a src/rc_example.erl
file that will contain the
public interface to our application:
-module(rc_example).
-export([ping/0]).
ping()->
Key = os:timestamp(),
DocIdx = hash_key(Key),
PrefList = riak_core_apl:get_apl(DocIdx, 1, rc_example),
[IndexNode] = PrefList,
Command = ping,
riak_core_vnode_master:sync_spawn_command(IndexNode, Command, rc_example_vnode_master).
%% internal
hash_key(Key) ->
riak_core_util:chash_key({<<"rc_example">>, term_to_binary(Key)}).
Let's go over the ping()
implementation line by line. As stated
before, most operations will be performed over a single object (with the
exception of aggregation operations, like listing all available keys
in a key/value store). That object is usually identified by some key,
which will be hashed to decide what partition (that is what vnode at
what physical node) should receive
the request. In the case of ping
, there isn't any actual object
involved, and thus no key, but we make a random one by using
os:timestamp()
. The nature of the hashing algorithm
distributes values uniformly over the ring, so each new timestamp
should be assigned to a random partition of the ring.
The hash_key
helper calls riak_core_util:chash_key
to produce the
hash of the key. Note chash_key
receives a tuple of two binaries;
the first element is called the bucket, a value
riak_core will use to namespace your keys; you can choose to have a
single one per application, or many, according to your needs.
The result of the hash is passed to riak_core_apl:get_apl
which returns an Active Preference List (APL) for the given key, this
is a list of active vnodes that can handle that request. The amount of
offered vnodes will be determined by the second argument of the
function. We can try these functions in the release shell to get a
better sense of how they work:
(rc_example@127.0.0.1)1> riak_core_util:chash_key({<<"rc_example">>, term_to_binary(os:timestamp())}).
<<233,235,224,243,192,63,109,102,255,125,189,206,164,247,
117,34,94,199,14,184>>
(rc_example@127.0.0.1)2> K1 = riak_core_util:chash_key({<<"rc_example">>, term_to_binary(os:timestamp())}).
<<190,175,151,200,144,123,229,205,94,16,209,140,252,108,
247,20,238,31,6,82>>
(rc_example1@127.0.0.1)3> riak_core_apl:get_apl(K1, 1, rc_example).
[{1096126227998177188652763624537212264741949407232,
'rc_example@127.0.0.1'}]
(rc_example1@127.0.0.1)4> K2 = riak_core_util:chash_key({<<"rc_example">>, term_to_binary(os:timestamp())}).
<<113,53,13,80,4,131,62,95,63,164,211,74,145,83,189,77,
254,224,190,198>>
(rc_example@127.0.0.1)5> riak_core_apl:get_apl(K2, 1, rc_example).
[{662242929415565384811044689824565743281594433536,
'rc_example@127.0.0.1'}]
(rc_example@127.0.0.1)6> riak_core_apl:get_apl(K2, 3, rc_example).
[{662242929415565384811044689824565743281594433536,
'rc_example@127.0.0.1'},
{685078892498860742907977265335757665463718379520,
'rc_example@127.0.0.1'},
{707914855582156101004909840846949587645842325504,
'rc_example@127.0.0.1'}]
We get different partitions every time, always on the same physical node (because we're still running a single one).
The last line of ping/0
sends the ping
command to the selected
vnode through the riak_core_vnode_master
. The function used to do so
is sync_spawn_command
, which acts a bit like a gen_server:call
in
the sense that it blocks the calling process waiting for the
response. There are other functions to send commands to a vnode:
riak_core_vnode_master:command/3
(which works asynchronously like
gen_server:cast
) and riak_core_vnode_master:sync_command/3
(which
is like sync_spawn_command
but blocks the vnode_master process).
You can find more details of the functions used in this
section here. To wrap up
let's run our ping
function from the shell:
(rc_example@127.0.0.1)1> rc_example:ping().
10:19:00.903 [info] Received ping command 479555224749202520035584085735030365824602865664
{pong,479555224749202520035584085735030365824602865664}
(rc_example@127.0.0.1)2> rc_example:ping().
10:19:01.503 [info] Received ping command 479555224749202520035584085735030365824602865664
{pong,502391187832497878132516661246222288006726811648}
At this point we can execute a simple command, but none of the previous effort would make any sense if we keep running stuff on a single node. The whole point of riak_core is to distribute work in a fault-tolerant and decetralized manner. In this section we'll update our configuration so we can run our project in a three-node Erlang cluster. For practical reasons all of the nodes will reside on our local machine, but moving them to separate servers should fairly simple.
If you review our codebase you'll note that the one spot that
has fixed node configuration is conf/vm.args
, where we set the
node name to rc_example@127.0.0.1
. We want to have rc_example1
,
rc_example2
and rc_example3
instead. We'll be running our
nodes in the same machine so we also need to use different ports for
riak_core in each node (the web_port
and handoff_port
tuples in conf/sys.config
).
Since we'll have an almost identical configuration in all of the nodes,
we'll use the overalys feature that rebar3 inherits from relx. You can
read about
it here,
although it's not strictly necessary for the
purposes of this tutorial. First we tell rebar3 that conf/sys.config
and conf/vm.args
should be treated as templates by adding an
overlay
tuple in the relx
configuration:
{overlay, [{template, "conf/sys.config", "releases/{{release_version}}/sys.config"},
{template, "conf/vm.args", "releases/{{release_version}}/vm.args"}]}
If you're having problems with the templates, check your rebar3 version and this github issue.
The template variables' values will be taken from overlay_vars
files. We will
define three
different rebar profiles in
rebar.config
, each pointing to a different overaly_vars
file:
{profiles, [{dev1, [{relx, [{overlay_vars, "conf/vars_dev1.config"}]}]},
{dev2, [{relx, [{overlay_vars, "conf/vars_dev2.config"}]}]},
{dev3, [{relx, [{overlay_vars, "conf/vars_dev3.config"}]}]}]}
Now create
vars_dev1.config
, vars_dev2.config
and vars_dev3.config
in the
conf
directory as follows:
%% conf/vars_dev1.config
{node, "rc_example1@127.0.0.1"}.
{web_port, 8198}.
{handoff_port, 8199}.
%% conf/vars_dev2.config
{node, "rc_example2@127.0.0.1"}.
{web_port, 8298}.
{handoff_port, 8299}.
%% conf/vars_dev3.config
{node, "rc_example3@127.0.0.1"}.
{web_port, 8398}.
{handoff_port, 8399}.
Lastly, update sys.config
and vm.args
to refer to template
variables instead of concrete values:
%% conf/sys.config
[{riak_core,
[{ring_state_dir, "./data/ring"},
{web_port, {{web_port}}},
{handoff_port, {{handoff_port}}},
{schema_dirs, ["lib/rc_example-0.1.0/priv"]}]}].
%% conf/vm.args
-name {{node}}
To run the release we need to tell rebar which profile to use, for example:
rebar3 as dev1 release && _build/dev1/rel/rc_example/bin/rc_example
Let's add a Makefile to easily run any of the nodes:
.PHONY: dev1 dev2 dev3 clean_data
dev1:
./rebar3 as dev1 release && _build/dev1/rel/rc_example/bin/rc_example
dev2:
./rebar3 as dev2 release && _build/dev2/rel/rc_example/bin/rc_example
dev3:
./rebar3 as dev3 release && _build/dev3/rel/rc_example/bin/rc_example
clean_data:
rm -rf _build/dev1/rel/rc_example/data* ; rm -rf _build/dev2/rel/rc_example/data* ; rm -rf _build/dev3/rel/rc_example/data*
Currently, the latest rebar3 release is not built with
We also include a clean_data
target, for the cases when we want to start
with a fresh cluster (riak_core persists cluster
information between runs, so you may need to remove it when you make
changes to your configuration).
Before testing our cluster, let's add a function to inspect its status
in src/rc_example.erl
:
-export([ping/0,
ring_status/0]).
ring_status() ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
riak_core_ring:pretty_print(Ring, [legend]).
Now open three terminals and run one of these commands on each:
$ make dev1
$ make dev2
$ make dev3
If you try the ring_status
function, you'll see something like:
(rc_example1@127.0.0.1)1> rc_example:ring_status().
==================================== Nodes ====================================
Node a: 64 (100.0%) rc_example1@127.0.0.1
==================================== Ring =====================================
aaaa|aaaa|aaaa|aaaa|aaaa|aaaa|aaaa|aaaa|aaaa|aaaa|aaaa|aaaa|aaaa|aaaa|aaaa|aaaa|
ok
Each node only knows about itself. We can fix that by making node 2 and
3 join node 1. riak_core:join
is used for a single node to join a
cluster:
%% node 2
(rc_example2@127.0.0.1)1> riak_core:join('rc_example1@127.0.0.1').
18:46:45.409 [info] 'rc_example2@127.0.0.1' changed from 'joining' to 'valid'
%% node 3
(rc_example3@127.0.0.1)1> riak_core:join('rc_example1@127.0.0.1').
18:46:47.120 [info] 'rc_example3@127.0.0.1' changed from 'joining' to 'valid'
Now ring_status()
should show the three nodes with a third of the
keyspace each (it may take some seconds for the percentages to settle):
(rc_example1@127.0.0.1)2> rc_example:ring_status().
==================================== Nodes ====================================
Node a: 21 ( 32.8%) rc_example1@127.0.0.1
Node b: 22 ( 34.4%) rc_example2@127.0.0.1
Node c: 21 ( 32.8%) rc_example3@127.0.0.1
==================================== Ring =====================================
abcc|abcc|abcc|abcc|abcc|abcc|abcc|abcc|abcc|abcc|abbc|abba|abba|abba|abba|abba|
ok
If you call rc_example:ping()
a couple of times, you should see that
the log output (received ping command
) is printed in a different
terminal every time, because vnodes from any of the physical nodes can
receive the command.
Now that we have the project layout and distribution setup we can
start working on our in-memory Key/Value store. As you may imagine,
this means modifying our worker vnode to support a new set of
commands: put
, get
and delete
. Here are the relevant parts:
init([Partition]) ->
{ok, #{partition => Partition, data => #{}}}.
handle_command({put, Key, Value}, _Sender, State = #{data := Data}) ->
log("PUT ~p:~p", [Key, Value], State),
NewData = Data#{Key => Value},
{reply, ok, State#{data => NewData}};
handle_command({get, Key}, _Sender, State = #{data := Data}) ->
log("GET ~p", [Key], State),
{reply, maps:get(Key, Data, not_found), State};
handle_command({delete, Key}, _Sender, State = #{data := Data}) ->
log("DELETE ~p", [Key], State),
NewData = maps:remove(Key, Data),
{reply, maps:get(Key, Data, not_found), State#{data => NewData}};
In init
, we update our state map to include a data
map; we'll use
it as our humble data store. Then we
add new handle_command
clauses for each operation: put, get,
set. The command is received as a named tuple and the result is returned in
a reply
, like in a gen_server.
Just like we did with ping
, we'll create public functions in
src/rc_example.erl
to execute the new commands:
-module(rc_example).
-export([ping/0,
ring_status/0,
put/2,
get/1,
delete/1]).
ping() ->
sync_command(os:timestamp(), ping).
ring_status() ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
riak_core_ring:pretty_print(Ring, [legend]).
put(Key, Value) ->
sync_command(Key, {put, Key, Value}).
get(Key) ->
sync_command(Key, {get, Key}).
delete(Key) ->
sync_command(Key, {delete, Key}).
%% internal
hash_key(Key) ->
riak_core_util:chash_key({<<"rc_example">>, term_to_binary(Key)}).
sync_command(Key, Command) ->
DocIdx = hash_key(Key),
PrefList = riak_core_apl:get_apl(DocIdx, 1, rc_example),
[IndexNode] = PrefList,
riak_core_vnode_master:sync_spawn_command(IndexNode, Command, rc_example_vnode_master).
The hashing, vnode selection and command execution is the same
in all cases, so it was extracted into its own sync_command
helper
function.
Let's test the new commands. Stop the three nodes if you are still
running them, and run make dev1
, make dev2
and make dev3
to refresh
the code of the releases (you'll note that the nodes join the cluster
without the need to call riak_core:join
again). In any of the
shells you can try our Key/Value store:
(rc_example1@127.0.0.1)1> rc_example:put(key1, value1).
13:12:53.291 [info] PUT key1:value1
ok
(rc_example1@127.0.0.1)2> rc_example:put(key2, value2).
13:12:59.602 [info] PUT key2:value2
ok
(rc_example1@127.0.0.1)3> rc_example:get(key1).
13:13:30.160 [info] GET key1
value1
(rc_example1@127.0.0.1)4> rc_example:delete(key1).
13:13:37.984 [info] DELETE key1
value1
(rc_example1@127.0.0.1)5> rc_example:get(key1).
not_found
13:13:43.392 [info] GET key1
(rc_example1@127.0.0.1)6> rc_example:put(key3453, value3453).
(rc_example1@127.0.0.1)7> rc_example:get(key3453).
value3453
In the example above, keys key1
and key2
are stored on vnodes that
reside in the first node (and thus we get the log output in the
shell of rc_example1), while key3453
is on another one.
As you can see, even if the initial setup can be a little burdensome, you can get distribution of work and fault-tolerance with little effort by just handling your application-specific logic in a vnode module.
Our application already has some basic functionality so we should start thinking about how to test it. This is a distributed system that requires multiple nodes to work and manual tests will become more difficult as it grows; moreover, since most of the complexity resides in the interaction of its components, we won't benefit much from isolated unit tests, instead we should write an integration suite that provides end-to-end verification of each feature. To accomplish that we will use Common Tests, and a combination of ct_slave and rpc to start multiple nodes and interact with them.
To start off, let's add a new make target that runs the tests with rebar3 (remember to add it in the .PHONY targets):
.PHONY: dev1 dev2 dev3 dev4 clean_data test
test:
./rebar3 ct --name test@127.0.0.1
Now create a test directory with a single module
test/key_value_SUITE.erl
:
-module(key_value_SUITE).
-include_lib("common_test/include/ct.hrl").
-compile(export_all).
all() ->
[ping_test,
key_value_test].
init_per_suite(Config) ->
Node1 = 'node1@127.0.0.1',
Node2 = 'node2@127.0.0.1',
Node3 = 'node3@127.0.0.1',
start_node(Node1, 8198, 8199),
start_node(Node2, 8298, 8299),
start_node(Node3, 8398, 8399),
build_cluster(Node1, Node2, Node3),
[{node1, Node1},
{node2, Node2},
{node3, Node3} | Config].
end_per_suite(Config) ->
Node1 = ?config(node1, Config),
Node2 = ?config(node2, Config),
Node3 = ?config(node3, Config),
stop_node(Node1),
stop_node(Node2),
stop_node(Node3),
ok.
We include the ct header file and declare two tests in the all()
callback, which we'll define shortly. In init_per_suite
we create three nodes using the start_node
helper, and make them
join a cluster with build_cluster
; we keep the node names in the
test configuration so we can later use it to remotely execute
functions in those nodes; finally we stop the nodes in
end_per_suite
using another helper. We'll leave the
implementations, which contain most of the ct_slave magic, for the end
of this section. Now let's focus in the tests:
ping_test(Config) ->
Node1 = ?config(node1, Config),
Node2 = ?config(node2, Config),
Node3 = ?config(node3, Config),
{pong, _Partition1} = rc_command(Node1, ping),
{pong, _Partition2} = rc_command(Node2, ping),
{pong, _Partition3} = rc_command(Node3, ping),
ok.
key_value_test(Config) ->
Node1 = ?config(node1, Config),
Node2 = ?config(node2, Config),
Node3 = ?config(node3, Config),
ok = rc_command(Node1, put, [k1, v1]),
ok = rc_command(Node1, put, [k2, v2]),
ok = rc_command(Node1, put, [k3, v3]),
%% get from any of the nodes
v1 = rc_command(Node1, get, [k1]),
v2 = rc_command(Node1, get, [k2]),
v3 = rc_command(Node1, get, [k3]),
not_found = rc_command(Node1, get, [k10]),
v1 = rc_command(Node2, get, [k1]),
v2 = rc_command(Node2, get, [k2]),
v3 = rc_command(Node2, get, [k3]),
not_found = rc_command(Node2, get, [k10]),
v1 = rc_command(Node3, get, [k1]),
v2 = rc_command(Node3, get, [k2]),
v3 = rc_command(Node3, get, [k3]),
not_found = rc_command(Node3, get, [k10]),
%% test reset and delete
ok = rc_command(Node1, put, [k1, v_new]),
v_new = rc_command(Node1, get, [k1]),
v_new = rc_command(Node1, delete, [k1]),
not_found = rc_command(Node1, get, [k1]),
ok = rc_command(Node1, put, [k1, v_new]),
v_new = rc_command(Node1, get, [k1]),
ok.
The ping_test
just sends a ping
command to each of the nodes, and
makes sure it gets a pong
response every time. Note we use an
rc_command
helper, which executes a command of our rc_example
application in the given node. The key_value_test
puts some keys in
the store through the first node, then makes sure those keys can be
retrieved from any of the nodes (regardless of where they are actually
stored), then it tests the delete command and makes sure the store
generally works as expected.
There's nothing special about these tests when we abstract away the details of setting up the nodes and the riak_core cluster.
This module is being deprecated since OTP 25, and will be removed in OTP 27, in its place, you can use the peer module, for which we wrote a section for below. So, if you're using OTP25 or above, skip this section and if not, keep reading. If you want to know more about this, check out this thread from the Erlang Forums.
Let's look at the implementation of the different helpers we used in the
previous section. We need the start_node
helper to
create a new Erlang node with a given name, and we need it to start our
rc_example application, much like what happens when we run our
development releases; for this to work we should also set up the
required riak_core application environment:
start_node(NodeName, WebPort, HandoffPort) ->
%% need to set the code path so the same modules are available in the peer
CodePath = code:get_path(),
PathFlag = "-pa " ++ lists:concat(lists:join(" ", CodePath)),
{ok, _} = ct_slave:start(NodeName, [{erl_flags, PathFlag}]),
%% set the required environment for riak core
DataDir = "./data/" ++ atom_to_list(NodeName),
rpc:call(NodeName, application, load, [riak_core]),
rpc:call(NodeName, application, set_env, [riak_core, ring_state_dir, DataDir]),
rpc:call(NodeName, application, set_env, [riak_core, platform_data_dir, DataDir]),
rpc:call(NodeName, application, set_env, [riak_core, web_port, WebPort]),
rpc:call(NodeName, application, set_env, [riak_core, handoff_port, HandoffPort]),
rpc:call(NodeName, application, set_env, [riak_core, schema_dirs, ["../../lib/rc_example/priv"]]),
%% start the rc_example app
{ok, _} = rpc:call(NodeName, application, ensure_all_started, [rc_example]),
ok.
stop_node(NodeName) ->
ct_slave:stop(NodeName).
ct_slave makes it pretty simple to manage erlang nodes with the
ct_slave:start
and ct_slave:stop
functions. The gotcha is that when we start a new node we need to
point the code path to Erlang, in order for the node to know where to
look for code dependencies. The best way I've found to do it, based on
this thread,
is to get the path from the master node that runs the test, and pass
it to Erlang with the -pa
flag. There is probably
a more succint way to do this, for example using code:set_path
, but
I couldn't make it work.
First, change the init_per_suite
function like this:
init_per_suite(Config) ->
Host = "127.0.0.1",
Node1 = start_node('node1', Host, 8198, 8199),
Node2 = start_node('node2', Host, 8298, 8299),
Node3 = start_node('node3', Host, 8398, 8399),
build_cluster(Node1, Node2, Node3),
[{node1, Node1},
{node2, Node2},
{node3, Node3} | Config].
The change of arguments being passed to start_node
is because we will be using the ?CT_PEER
macro, which receives a map
like in here, and
behaves like if we were using peer:start
, but adapted to Common Tests.
Then, you should change start_node
to this:
start_node(Name, Host, WebPort, HandoffPort) ->
%% Need to set the code path so the same modules are available in the slave
CodePath = code:get_path(),
%% Arguments to set up the node
NodeArgs = #{name => Name, host => Host, args => ["-pa" | CodePath]},
%% Since OTP 25, ct_slaves nodes are deprecated
%% (and to be removed in OTP 27), so we're
%% using peer nodes instead, with the CT_PEER macro.
{ok, Peer, Node} = ?CT_PEER(NodeArgs),
unlink(Peer),
DataDir = "./data/" ++ atom_to_list(Name),
%% set the required environment for riak core
ok = rpc:call(Node, application, load, [riak_core]),
ok = rpc:call(Node, application, set_env, [riak_core, ring_state_dir, DataDir]),
ok = rpc:call(Node, application, set_env, [riak_core, platform_data_dir, DataDir]),
ok = rpc:call(Node, application, set_env, [riak_core, web_port, WebPort]),
ok = rpc:call(Node, application, set_env, [riak_core, handoff_port, HandoffPort]),
ok = rpc:call(Node, application, set_env, [riak_core, schema_dirs, ["../../lib/rc_example/priv"]]),
%% start the rc_example app
{ok, _} = rpc:call(Node, application, ensure_all_started, [rc_example]),
Node.
One important thing to note is that we unlink the peer. This is due to
start_node
being executed with a process that dies before the test
actually runs.
A convenient thing about the ?CT_PEER
macro is that it kills the
Node when the test ends, so we don't need to manually kill the nodes
anymore. So, go ahead and delete the stop_node
function and redefine
end_per_suite as:
end_per_suite(_) -> ok.
Once the node is up, we can start running functions on it with
rpc:call
. In order for
riak_core to work, we need to load the
application and fill its environment with application:set_env
; we
set the same variables as we did in conf/sys.config
, with the addition of
platform_data_dir
(this is a directory that riak_core uses to store
metadata; we need to set it explicitly here because otherwise the
three nodes would conflict trying to write in the same default directory). With
the configuration in place, we can start the rc_example app remotely
calling application:ensure_all_started
. Lastly, the stop_node
helper just needs to call ct_slave:stop
.
When our three nodes are up with the application running, we need to connect them to build the cluster, like we did from the shell:
build_cluster(Node1, Node2, Node3) ->
rpc:call(Node2, riak_core, join, [Node1]),
rpc:call(Node3, riak_core, join, [Node1]),
ok.
The last helper, rc_command
, is a very simple one, it just remotely
calls one of the functions in the rc_example
module:
rc_command(Node, Command) ->
rc_command(Node, Command, []).
rc_command(Node, Command, Arguments) ->
rpc:call(Node, rc_example, Command, Arguments).
So far we've been working with commands that operate over a single object, like a single Key in a Key/Value store. In those cases the Key was hashed and the key hash determined the vnode responsible for handling the operation. In the case of the ping command, as discussed, we didn't have a key but we faked one by using the current timestamp.
There is another kind of command, one that involves all the vnodes in the ring. What happens, for example, if we want to list all the Keys in our Key/Value store? Each vnode contains a subset of the Keys so the to get the full list we need to ask all the vnodes and join the results. This is what coverage commands consist of: riak_core sends a command to all of the vnodes then process the results as they arrive.
In this section we're going to implement two new commands: keys
and
values
, which as you may guess return the list of keys and values
currently present in the datastore.
The vnode is the easy part. Each vnode just needs to return the list
of Keys or Values it contains in the data
field of its state. This
is done in the handle_coverage
callback:
handle_coverage(keys, _KeySpaces, {_, ReqId, _}, State = #{data := Data}) ->
log("Received keys coverage", State),
Keys = maps:keys(Data),
{reply, {ReqId, Keys}, State};
handle_coverage(values, _KeySpaces, {_, ReqId, _}, State = #{data := Data}) ->
log("Received values coverage", State),
Values = maps:values(Data),
{reply, {ReqId, Values}, State}.
We need to introduce a new component, the one that will be in charge
of managing the coverage command, that is of starting it and gathering
the results sent from each of the vnodes. riak_core provides the
riak_core_coverage_fsm
behavior for this purpose (a finite state
machine). Let's create a src/rc_example_coverage_fsm.erl
module implementing
that behavior and go over each of its functions:
-module(rc_example_coverage_fsm).
-behaviour(riak_core_coverage_fsm).
-export([start_link/4,
init/2,
process_results/2,
finish/2]).
start_link(ReqId, ClientPid, Request, Timeout) ->
riak_core_coverage_fsm:start_link(?MODULE, {pid, ReqId, ClientPid},
[Request, Timeout]).
So far, nothing very special: the start_link will be called by a
supervisor to start the process (see next section) and the
parameters are more or less forwarded to
riak_core_coverage_fsm:start_link
.
init({pid, ReqId, ClientPid}, [Request, Timeout]) ->
logger:info("Starting coverage request ~p ~p", [ReqId, Request]),
State = #{req_id => ReqId,
from => ClientPid,
request => Request,
accum => []},
{Request, allup, 1, 1, rc_example, rc_example_vnode_master, Timeout, State}.
In init
, we initialize the process state as usual. We create a state
map where we put request metadata, the client process id (so we can
later reply to it with the result of the command) and an accumulator list
that we will update with the results coming from each vnode.
init
returns a big tuple with a bunch of parameters that control how
the coverage command should work. Let's briefly explain each of
them (mostly taken from
here;
you'll have to dig around for more details):
- Request: an opaque data structure representing the command to be
handled by the vnodes. In our case it will be either of the
keys
orvalues
atoms. - VNodeSelector: an atom that specifies whether we want to run the
command in all vnodes (
all
) or only in those reachable (allup
). - ReplicationFactor: used to accurately create a minimal covering set of vnodes.
- PrimaryVNodeCoverage: The number of primary VNodes from the preference list to use in creating the coverage plan. Typically just 1.
- NodeCheckService: the service used to check for available nodes. This is the same as the atom passed to the node_watcher at application startup.
- VNodeMaster: The atom to use to reach the vnode master module (
rc_example_vnode_master
). - Timeout: timeout of the coverage request.
- State: the initial state for the module.
process_results({{_ReqId, {_Partition, _Node}}, []}, State ) ->
{done, State};
process_results({{_ReqId, {Partition, Node}}, Data},
State = #{accum := Accum}) ->
NewAccum = [{Partition, Node, Data} | Accum],
{done, State#{accum => NewAccum}}.
The process_results
callback gets called when the coverage module
receives a set of results from a vnode. For our keys
and values
commands we store the partition
and node identifiers along with the data, so we can see where each
piece came from in the final result. Since in our
tests most of the vnodes will be empty, we filter them out
by handling the empty list case in a separate process_results
clause
that leaves the accumulator unchanged.
finish(clean, State = #{req_id := ReqId, from := From, accum := Accum}) ->
logger:info("Finished coverage request ~p", [ReqId]),
%% send the result back to the caller
From ! {ReqId, {ok, Accum}},
{stop, normal, State};
finish({error, Reason}, State = #{req_id := ReqId, from := From, accum := Accum}) ->
logger:warning("Coverage query failed! Reason: ~p", [Reason]),
From ! {ReqId, {partial, Reason, Accum}},
{stop, normal, State}.
Finally, the finish
function will be called when the coverage
command is done. If it goes well, the first argument will be clean
;
in that case we reply the accumulated data to the caller Pid (stored
in from
). If there's an error we handle it in the second finish
clause.
We need to supervise our rc_example_coverage_fsm
processes. Since
these are created on demand, one per each command that needs
to be executed, we are going to use the simple_one_for_one
supervisor strategy. Create a
src/rc_example_coverage_fsm_sup.erl
module:
-module(rc_example_coverage_fsm_sup).
-behavior(supervisor).
-export([start_link/0,
start_fsm/1,
init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
CoverageFSM = {undefined,
{rc_example_coverage_fsm, start_link, []},
temporary, 5000, worker, [rc_example_coverage_fsm]},
{ok, {{simple_one_for_one, 10, 10}, [CoverageFSM]}}.
start_fsm(Args) ->
supervisor:start_child(?MODULE, Args).
When a coverage command needs to be executed, start_fsm
is
called to create a new child of this supervisor.
We also need to add rc_example_coverage_fsm_sup
to the main
application supervisor in src/rc_example_sup.erl
:
init([]) ->
VMaster = {rc_example_vnode_master,
{riak_core_vnode_master, start_link, [rc_example_vnode]},
permanent, 5000, worker, [riak_core_vnode_master]},
CoverageFSM = {rc_example_coverage_fsm_sup,
{rc_example_coverage_fsm_sup, start_link, []},
permanent, infinity, supervisor, [rc_example_coverage_fsm_sup]},
{ok, {{one_for_one, 5, 10}, [VMaster, CoverageFSM]}}.
Now that we have all the components in place, let's add the keys
and
values
functions to src/rc_example.erl
:
keys() ->
coverage_command(keys).
values() ->
coverage_command(values).
%% internal
coverage_command(Command) ->
Timeout = 5000,
ReqId = erlang:phash2(erlang:monotonic_time()),
{ok, _} = rc_example_coverage_fsm_sup:start_fsm([ReqId, self(), Command, Timeout]),
receive
{ReqId, Val} -> Val
end.
We create a ReqId to identify the request and call
rc_example_coverage_fsm_sup:start_fsm
to create a new child for the
supervisor, passing all the parameters the rc_example_coverage_fsm
needs to execute the command. Finally, we receive the result value,
identified by the ReqId.
Restart your releases, fill the store with some values and try the new commands:
(rc_example1@127.0.0.1)1> rc_example:put(k1, v1).
ok
(rc_example1@127.0.0.1)2> rc_example:put(k100, v100).
12:17:25.001 [info] [RKFE4TBE76PR5N05XGK31C8TPLJGJY8] PUT k100:v100
ok
(rc_example1@127.0.0.1)3> rc_example:put(k101, v101).
ok
(rc_example1@127.0.0.1)4> rc_example:put(k444, v444).
ok
(rc_example1@127.0.0.1)5> rc_example:put(k4445, v4445).
ok
(rc_example1@127.0.0.1)6> rc_example:keys().
12:17:45.916 [info] Starting coverage request 63138856 keys
12:17:45.921 [info] [RKFE4TBE76PR5N05XGK31C8TPLJGJY8] Received keys coverage
12:17:45.921 [info] [TFPL6YYQR76578P6XWBOLKEAZ9KS1S0] Received keys coverage
12:17:45.921 [info] [0] Received keys coverage
12:17:45.921 [info] [S18XWCQ8C6TUO1FF6KHZFEA710JSFEO] Received keys coverage
12:17:45.921 [info] [IOTYLKHHK4JWG0YA4DNZM9ISOOD6Y9S] Received keys coverage
(...)
{ok,[{707914855582156101004909840846949587645842325504,
'rc_example3@127.0.0.1',
[k101]},
{1141798154164767904846628775559596109106197299200,
'rc_example2@127.0.0.1',
[k1]},
{890602560248518965780370444936484965102833893376,
'rc_example3@127.0.0.1',
[k444]},
{981946412581700398168100746981252653831329677312,
'rc_example3@127.0.0.1',
[k4445]},
{1347321821914426127719021955160323408745312813056,
'rc_example1@127.0.0.1',
[k100]}]}
As expected, the result contains all the inserted keys and what vnode
and physical node they come from. We also see the received keys coverage
output from every vnode when they receive the command.
We need to test our new functionality. Before writing an
integration test, let's quickly add a clear
coverage command to
empty our database, which will come in handy next. Add a new
handle_coverage
clause in src/rc_example_vnode.erl
:
handle_coverage(clear, _KeySpaces, {_, ReqId, _}, State) ->
log("Received clear coverage", State),
NewState = State#{data => #{}},
{reply, {ReqId, []}, NewState}.
When the clear
command is received, the vnode will empty the data
map in its internal state. Note we return an empty list because our
coverage fsm expects to get a list from each vnode; if the new command
required a different result manipulation we could consider adding a another
process_results
clause in the fsm or even an entirely separate fsm
module; in this case we don't really care about results processing,
just the side effect of clearing the vnode data.
We will also add the clear
function to the public API in
src/rc_example.erl
. It will just call coverage_command
and return ok
.
-export([clear/0]).
clear() ->
{ok, []} = coverage_command(clear),
ok.
With that in place let's test our coverage commands in
test/key_value_SUITE.erl
:
all() ->
[ping_test,
key_value_test,
coverage_test].
%% ...
coverage_test(Config) ->
Node1 = ?config(node1, Config),
Node2 = ?config(node2, Config),
%% clear, should contain no keys and no values
ok = rc_command(Node1, clear),
[] = rc_coverage(Node1, keys),
[] = rc_coverage(Node1, values),
ToKey = fun (N) -> "key" ++ integer_to_list(N) end,
ToValue = fun (N) -> "value" ++ integer_to_list(N) end,
Range = lists:seq(1, 100),
lists:foreach(fun(N) ->
ok = rc_command(Node1, put, [ToKey(N), ToValue(N)])
end, Range),
ActualKeys = rc_coverage(Node2, keys),
ActualValues = rc_coverage(Node2, values),
100 = length(ActualKeys),
100 = length(ActualValues),
true = have_same_elements(ActualKeys, lists:map(ToKey, Range)),
true = have_same_elements(ActualValues, lists:map(ToValue, Range)),
%% store should be empty after a new clear
ok = rc_command(Node1, clear),
[] = rc_coverage(Node1, keys),
[] = rc_coverage(Node1, values),
ok.
%% internal
rc_coverage(Node, Command) ->
{ok, List} = rc_command(Node, Command),
%% convert the coverage result to a plain list
lists:foldl(fun({_Partition, _Node, Values}, Accum) ->
lists:append(Accum, Values)
end, [], List).
have_same_elements(List1, List2) ->
S1 = sets:from_list(List1),
S2 = sets:from_list(List2),
sets:is_subset(S1, S2) andalso sets:is_subset(S2, S1).
The coverage_test
first calls clear
to empty the store, and makes
sure keys
and values
return an empty result. Note that our
integration suite is not ideal in the sense that the tests are not
isolated from each other: they share the same data store and they
couldn't, for example, be run concurrently. In a real world
project we could consider creating new nodes for each test
(although this could be slow) or more likely introduce some sort of
namespacing in our data store (perhaps through the use of buckets). For
the purposes of this tutorial, though, it's enough to clear the store
in this particular test.
The test continues by storing a range of 100 keys and values in our
database and calling the keys
and values
commands. We assert that
the results contain 100 elements each and we use some sets logic to check that
the elements are the same that we originally inserted. Finally, we
clear the store again and check keys
and values
come back
empty. The rc_coverage
helper just calls a command and cleans the
result by removing the partition and node annotations.
In the non-ideal world of distributed systems we need to account for the fact that software and hardware can fail and that networks are unreliable. In other words, we need to build our distributed system such that it keeps functioning when one or more of the nodes becomes unavailable. riak_core provides some useful building blocks to achieve it: it will monitor the cluster, redistribute partitions when nodes go down and even expose a mechanism to move data around when they come back online (see handoff). But some of the work will be implementation specific.
If your system works as a distributed database, that is if your
vnodes hold state that should survive node outages, then you'll have
to replicate each piece of data to multiple vnodes, so a fallback vnode
can take over when the primary is not available. In our Key/Value
store example, this means that put
commands should
be sent to multiple vnodes to replicate the data, and delete
commands
should be sent to all the replicas. This is introduces room for a lot
of design decisions, each with their own tradeoffs:
- How many physical nodes should a cluster consist of?
- How many replicas of each key should be stored?
- How many successful responses are required for a write operation to succeed?
- How many to read data?
- How to handle write conflicts between replicas?
- etc.
These are more related to database design and tuning than to
riak_core itself; riak_core is about distribution mechanics, so we
won't go too fair into the specifics here. For the sake of
completeness, let's briefly mention how the riak_core
API allows us to introduce redundancy. If you review the
src/rc_example.erl
module, you'll recal that we use
:riak_core_apl.get_apl
to obtain a list of vnodes that can handle a
given command; let's say we want to replicate our data to three nodes,
then we can request for that amount:
(rc_example1@127.0.0.1)2> K = riak_core_util:chash_key({<<"rc_example">>, term_to_binary(os:timestamp())}).
(rc_example1@127.0.0.1)3> riak_core_apl:get_apl(K, 3, rc_example).
[{1073290264914881830555831049026020342559825461248,
'rc_example1@127.0.0.1'},
{1096126227998177188652763624537212264741949407232,
'rc_example1@127.0.0.1'},
{1118962191081472546749696200048404186924073353216,
'rc_example2@127.0.0.1'}]
Then to actually send a command, instead of using
riak_core_vnode_master:sync_spawn_command
, we turn to the more
generic riak_core_vnode_master:command
which takes a Preference List
instead of a single target vnode:
replicated_command(Key, Command) ->
DocIdx = hash_key(Key),
PrefList = riak_core_apl:get_apl(DocIdx, 3, rc_example),
ReqId = erlang:phash2(erlang:monotonic_time()),
Sender = {raw, ReqId, self()},
riak_core_vnode_master:command(PrefList, Command, Sender, rc_example_vnode_master),
receive
{ReqId, Reply} -> Reply
end.
Note we need to create a request id and pass the current process in
the Sender
argument so riak_core knows where to send the reply to.
In this case, for demonstration purposes, we just do a blocking
receive
and return the first message that arrives; a more serious
implementation could use a gen_server or a fsm to gather the responses
and achieve some sort of quorum. If you are interested in the
topic, you can review the
Little Riiak Core book and
the
Elixir series,
both of which implement solutions to this problem.
Part of the strength of the Dynamo architectures (and thus, of Riak Core) is how it enables scalability with small operational effort. Because the keyspace is designed as a ring of virtual nodes, adding or removing physical nodes to a cluster means changing the distribution of the vnodes across the physical nodes: a vnode will always handle the same segment of the keyspace (the same chunk of the key hashes), but where does that vnode resides physycally can change.
For example: if we have a one-node cluster, it will necessarily contain the entire ring, that is all of the vnodes. If we start a second physical node and join that cluster, half of the vnodes will be "handed over" to the new physical node, so the keyspace is kept evenly distributed across the cluster.
Riak Core provides the necessary infrastructure to decide where and when a vnode needs to be moved. We only need to fill in the specifics of how to iterate over our particualr vnodes' state, encode it in the giving end, and decode it in the receiving vnode. This process is called handoff. We'll go over all the required steps to support this scenario in our application. For a walkthrough of how handoff is implemented internally, check the riak_core wiki.
Note that if your vnodes are "stateless", for example if you just use riak_core as a mechanism to distribute work and don't need to keep internal state, you don't need to worry about handoff and can just leave the related callbacks empty.
- An
ownership
handoff happens when a physical node joins or leaves the cluster. In this scenario, riak_core reassigns the physical nodes responsible for each vnode and it executes the handoff to move the vnode data from its old home to its new home. hinted
handoffs can occur if there's vnode redundancy (see previous section). When the primary vnode for a particular part of the ring is offline, riak_core still accepts operations on it and routes those to a secondary vnode. When the primary vnode comes back online, riak_core uses handoff to sync the current vnode state from the secondary to the primary. Once the primary is synchronized, operations are routed to it once again.
There are also repair
and resize
related handoffs, which are a advanced
topics that we won't cover. You can read about
them
here,
here and here.
If you check our vnode implementation, you'll notice that half of the callbacks deal with handoff. Let's go over their implementation, in the same order as they are called.
First, we need to include the riak_core_vnode
header file, because
we will refer to a macro defined there:
-include_lib("riak_core/include/riak_core_vnode.hrl").
handoff_starting
is called on the sending vnode before the handoff
begins. If the function returns true, the handoff will proceed through
the normal path. If it returns false, the handoff will be
cancelled. We don't need any special action here, so we just log and
move forward:
handoff_starting(_TargetNode, State) ->
log("starting handoff", State),
{true, State}.
handoff_cancelled
is called on the sending vnode in case the process
is cancelled (usually explcitly by an admin tool). Again, we just log:
handoff_cancelled(State) ->
log("handoff cancelled", State),
{ok, State}.
is_empty
should return a boolean informing if there's any data to
migrate in the vnode; if there's not handoff is finished (calling handoff_finished
).
is_empty(State = #{data := Data}) ->
IsEmpty = maps:size(Data) == 0,
{IsEmpty, State}.
The bulk of the work is done in the handle_handoff_command
callback. This function can be a bit confusing, because it serves
two different purposes depending on its calling arguments: to handle the
request to fold over the vnode's data that needs to be transferred,
and to handle regular vnode commands (e.g. ping
, put
, etc.) that arrive
during handoff (and would otherwise be passed to handle_command
).
Let's focus on the first of those cases. riak_core knows what it needs to do with each piece of data the vnode holds (encode it, transfer it over the network to the new vnode and decode it there), but not what that data looks like or how it's stored (in our case Key/Value pairs on a map), so it gives us a function that encapsulates the processing and we need to apply it to our data:
handle_handoff_command(?FOLD_REQ{foldfun=FoldFun, acc0=Acc0}, _Sender,
State = #{data := Data}) ->
log("Received fold request for handoff", State),
Result = maps:fold(FoldFun, Acc0, Data),
{reply, Result, State};
Nevermind the weird macro wrapper: ?FOLD_REQ
is just a record and we
only care to extract the fold function (FoldFun) and the initial
accumulator (Acc0). When a command with this shape arrives, we
iterate over our vnodes' data, applying the given fold function. Note
that the this function expects to be passed three arguments: key, value,
and accumulator. This means that if your data structure doesn't
already support this form of fold function you'll have to wrap it; in
our case we just need to call maps:fold/3
since our data is a
map. The result of the fold is included in the reply
tuple.
FoldFun is synchronous and in our case the result
of the command is replied right away, but there's also the option to
return an async
tuple; you can check
riak_kv and
riak_search implementations
for reference. Note that if you go down this route, you may need to handle incoming commands that
modify your vnodes' data while you are iterating over it.
The second situation in which handle_handoff_command
can be called
is when a regular command arrives during handoff. If you check the
callback specification you'll
see that the result can be the same as in handle_command
, with two
additional return types: forward
and drop
. The forward reply will
send the request to the target node, while the drop reply
signifies that you won't even attempt to fulfill it. Which one to use
depends on your application and the nature of the command.
Let's reason about the possible situations in the case of our
Key/Value store. When a handle_handoff_command
arrives we can't tell
if handoff has just started or is about to finish; we can't tell if
the value associated with the command's key has been migrated to the
receiving vnode already or the only copy is in this one. So the strategy we can
take to stay consistent and avoid unnecessary effort is: when the
command is a write (a put
or a delete
), we change our local copy
of the code and we forward it to the receiving vnode (that way, if it
was already migrated, the change is applied in that copy too); if the
command is read (a get
), we reply with our local copy of the data
(we know it's up to date because we applied all the writes
locally). Let's see how this looks in the code:
handle_handoff_command({get, Key}, Sender, State) ->
log("GET during handoff, handling locally ~p", [Key], State),
handle_command({get, Key}, Sender, State);
handle_handoff_command(Message, Sender, State) ->
{reply, _Result, NewState} = handle_command(Message, Sender, State),
{forward, NewState}.
We added extra handle_handoff_command
clauses for each of those
cases. The first one handles get
, a read operation; the
implementation just calls handle_command
since we
want to reply with the local copy of the data, as usual.
The second clause catches the rest of the commands, put
and
delete
, which are write operations. In these cases we call
handle_command
as well, to modify our local copy of the data, but
instead of using the result, we return forward
, so the command is
sent to the receiving vnode as well.
That's it for handle_handoff_command
. For a deeper understanding of
the different scenarios we suggest checking this and this comments, along with the
relevant code.
Moving on to the remaining callbacks. encode_handoff_item
is called
on the sending vnode, each time a Key/Value pair is about to be sent
over the wire; we use term_to_binary
to encode it. On the other end,
handle_handoff_data
will be called on the receiving vnode to decode
the Key and Value; we use binary_to_term
and update the data
map with the new pair:
encode_handoff_item(Key, Value) ->
erlang:term_to_binary({Key, Value}).
handle_handoff_data(BinData, State = #{data := Data}) ->
{Key, Value} = erlang:binary_to_term(BinData),
log("received handoff data ~p", [{Key, Value}], State),
NewData = Data#{Key => Value},
{reply, ok, State#{data => NewData}}.
Finally, when handoff is done handoff_finished
is called. After
that, the sending vnode should be deleted; any necessary cleanup can
be done in the delete
callback. We don't do any special work in
these two callbacks, just log and return:
handoff_finished(_TargetNode, State) ->
log("finished handoff", State),
{ok, State}.
delete(State) ->
log("deleting the vnode", State),
{ok, State#{data => #{}}}.
Handoff is a slow process, so it would be inconvenient to test it as part of our integration suite. Instead, let's do some shell experiments to see it in action. Clean the cluster and start three nodes:
# terminal 1
$ make clean_data
$ make dev1
# terminal 2
$ make dev2
# terminal 3
$ make dev3
When the nodes are running, join the cluser as we did before:
%% node 2
(rc_example2@127.0.0.1)1> riak_core:join('rc_example1@127.0.0.1').
18:46:45.409 [info] 'rc_example2@127.0.0.1' changed from 'joining' to 'valid'
%% node 3
(rc_example3@127.0.0.1)1> riak_core:join('rc_example1@127.0.0.1').
18:46:47.120 [info] 'rc_example3@127.0.0.1' changed from 'joining' to 'valid'
You may see a bunch of handoff messages now. Eventually the cluster will settle, with the ring evenly distributed across nodes:
(rc_example1@127.0.0.1)3> rc_example:ring_status().
==================================== Nodes ====================================
Node a: 21 ( 32.8%) rc_example1@127.0.0.1
Node b: 22 ( 34.4%) rc_example2@127.0.0.1
Node c: 21 ( 32.8%) rc_example3@127.0.0.1
==================================== Ring =====================================
abcc|abcc|abcc|abcc|abcc|abcc|abcc|abcc|abcc|abcc|abbc|abba|abba|abba|abba|abba|
ok
Let's add a key, and use the keys
function to find out what node
it ends up in:
(rc_example1@127.0.0.1)4> rc_example:put(k1, hello).
ok
(rc_example1@127.0.0.1)5> rc_example:keys().
{ok,[{1141798154164767904846628775559596109106197299200,
'rc_example2@127.0.0.1', [k1]}]}
In my case, k1
is routed to the second node 'rc_example2@127.0.0.1'
(it can be a different one in your
machine). Let's see what happens if we make that node (the one that
holds the Key/Value pair) leave the cluster:
(rc_example2@127.0.0.1)2> riak_core:leave().
ok
You'll get a bunch of handoff logs again in your screen. During this
period, rc_example:ring_status()
will show the
percentage of the ring assigned to the node decreasing until it
reaches zero. After this the node will be shutdown, and from any of
the remaining nodes you'll see something like this:
14:53:24.322 [info] 'rc_example2@127.0.0.1' changed from 'leaving' to 'exiting'
14:53:24.351 [info] 'rc_example2@127.0.0.1' removed from cluster
(previously: 'exiting')
(rc_example1@127.0.0.1)6> rc_example:ring_status().
==================================== Nodes ====================================
Node a: 32 ( 50.0%) rc_example1@127.0.0.1
Node b: 32 ( 50.0%) rc_example3@127.0.0.1
==================================== Ring =====================================
abab|abab|abab|abab|abab|abab|abab|abab|abab|abab|abab|abab|abab|abab|abab|abab|
ok
Now the entire ring is distributed among the two remaining nodes. If
we now query for k1
, we'll confirm that another one took ownership
of that key:
(rc_example1@127.0.0.1)7> rc_example:get(k1).
hello
{ok,[{1141798154164767904846628775559596109106197299200,
'rc_example1@127.0.0.1', [k1]}]}
In my case, it is 'rc_example1@127.0.0.1'
that took over that part
of the ring.
The previous section demonstrated what happens when we intentionally change the cluster by removing a node. Now let's see what happens when there's a failure and a node becomes unexpectedly unavailable. We didn't add replication as discussed in the fault-tolerance section, so we can't expect to preserve data from the failing node, but we can see the hinted handoff mechanics anyway: the failing node won't lose ownership of its partitions, but the commands that arrive while it's down will have to be temporarily routed to available nodes. When the failing node comes back online, it will receive handoffs with the data created while it was down.
Repeat the steps from previous section to clean the data, restart the nodes and join the cluster. Set a key again, and check in which node it resides:
(rc_example1@127.0.0.1)3> rc_example:put(k1, hello).
ok
(rc_example1@127.0.0.1)4> rc_example:keys().
{ok,[{1141798154164767904846628775559596109106197299200,
'rc_example2@127.0.0.1', [k1]}]}
In this case, k1
physically resides in
'rc_example2@127.0.0.1'
. Kill that node with ctrl-g q
or a similar
command. At this point the k1
key and its value will be lost,
because we don't have any kind of data replication; but if you put the
key again, you'll notice it will be saved in one of the live nodes:
(rc_example1@127.0.0.1)6> rc_example:get(k1).
not_found
(rc_example1@127.0.0.1)7> rc_example:put(k1, newvalue).
ok
(rc_example1@127.0.0.1)8> rc_example:get(k1).
newvalue
Now start the killed node again, and try to retrieve k1
:
(rc_example1@127.0.0.1)9> rc_example:get(k1).
not_found
The second node recovered ownership of the partition to which k1
belongs, but doesn't (yet) have any value for it. If you wait a while
(around a minute in my laptop), you should see something along these lines:
16:20:56.845 [info] [ND1G8YLUY5OVK16UNH2ZITWHUUGHOU8] starting handoff
16:20:56.860 [info] Starting hinted transfer of rc_example_vnode from 'rc_example1@127.0.0.1' 1141798154164767904846628775559596109106197299200 to 'rc_example2@127.0.0.1' 1141798154164767904846628775559596109106197299200
16:20:56.860 [info] [ND1G8YLUY5OVK16UNH2ZITWHUUGHOU8] Received fold request for handoff
16:20:56.862 [info] hinted transfer of rc_example_vnode from 'rc_example1@127.0.0.1' 1141798154164767904846628775559596109106197299200 to 'rc_example2@127.0.0.1' 1141798154164767904846628775559596109106197299200 completed: sent 32.00 B bytes in 1 of 1 objects in 0.00 seconds (63.78 KB/second)
16:20:56.862 [info] [ND1G8YLUY5OVK16UNH2ZITWHUUGHOU8] finished handoff
The fallback node that temporarily held k1
handed over its data back
to the vnode in rc_example2. If you get the key again, you should see
the new value, this time coming from rc_example2:
(rc_example1@127.0.0.1)10> rc_example:get(k1).
newvalue
(rc_example1@127.0.0.1)11> rc_example:keys().
{ok,[{1141798154164767904846628775559596109106197299200,
'rc_example2@127.0.0.1', [k1]}]}