From 6950f283e58e246a8e2208bdd3291223737627aa Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Tue, 2 Jun 2015 15:24:25 -0400 Subject: [PATCH] *add solr upgrade test which tests upgrade to 4.10.4 from 4.7.0 and also tests a downgrade when previous was riak-2.1.1 *clean-up yokozuna_rt for reusability --- src/rt.erl | 2 + src/yokozuna_rt.erl | 143 ++++++++++++++++++--- tests/yz_core_properties_create_unload.erl | 27 +--- tests/yz_solr_upgrade.erl | 122 ++++++++++++++++++ 4 files changed, 256 insertions(+), 38 deletions(-) create mode 100644 tests/yz_solr_upgrade.erl diff --git a/src/rt.erl b/src/rt.erl index 7a090660b..66922c9f2 100644 --- a/src/rt.erl +++ b/src/rt.erl @@ -173,6 +173,8 @@ wait_until_bucket_type_status/3, whats_up/0 ]). +-export_type([interfaces/0, + conn_info/0]). -type strings() :: [string(),...] | []. -type capability() :: atom() | {atom(), tuple()}. diff --git a/src/yokozuna_rt.erl b/src/yokozuna_rt.erl index 8ab45d796..268769cb2 100644 --- a/src/yokozuna_rt.erl +++ b/src/yokozuna_rt.erl @@ -22,14 +22,33 @@ -include_lib("eunit/include/eunit.hrl"). -include("yokozuna_rt.hrl"). --export([expire_trees/1, +-export([check_exists/2, + expire_trees/1, + host_entries/1, + remove_index_dirs/2, rolling_upgrade/2, rolling_upgrade/3, + search/4, + search/5, + search_expect/5, + search_expect/6, + search_expect/7, verify_num_found_query/3, wait_for_aae/1, wait_for_full_exchange_round/2, write_data/5]). +-type host() :: string(). +-type portnum() :: integer(). +-type count() :: non_neg_integer(). +-type json_string() :: atom | string() | binary(). + +-define(FMT(S, Args), lists:flatten(io_lib:format(S, Args))). + +-spec host_entries(rt:conn_info()) -> [{host(), portnum()}]. +host_entries(ClusterConnInfo) -> + [riak_http(I) || {_,I} <- ClusterConnInfo]. + %% @doc Write `Keys' via the PB inteface to a `Bucket' and have them %% searchable in an `Index'. -spec write_data([node()], pid(), index_name(), bucket(), [binary()]) -> ok. @@ -79,21 +98,6 @@ rolling_upgrade(Cluster, Vsn, YZCfgChanges) -> end || {SolrPort, Node} <- Cluster2], ok. --spec config_merge(proplists:proplist(), proplists:proplist()) -> - orddict:orddict() | proplists:proplist(). -config_merge(DefaultCfg, NewCfg) when NewCfg /= [] -> - orddict:update(yokozuna, - fun(V) -> - orddict:merge(fun(_, _X, Y) -> Y end, - orddict:from_list(V), - orddict:from_list( - orddict:fetch( - yokozuna, NewCfg))) - end, - DefaultCfg); -config_merge(DefaultCfg, _NewCfg) -> - DefaultCfg. - %% @doc Use AAE status to verify that exchange has occurred for all %% partitions since the time this function was invoked. -spec wait_for_aae([node()]) -> ok. @@ -162,6 +166,27 @@ expire_trees(Cluster) -> timer:sleep(100), ok. +%% @doc Remove index directories, removing the index. +-spec remove_index_dirs([node()], index_name()) -> ok. +remove_index_dirs(Nodes, IndexName) -> + IndexDirs = [rpc:call(Node, yz_index, index_dir, [IndexName]) || + Node <- Nodes], + lager:info("Remove index dirs: ~p, on nodes: ~p~n", + [IndexDirs, Nodes]), + [rt:stop(ANode) || ANode <- Nodes], + [rt:del_dir(binary_to_list(IndexDir)) || IndexDir <- IndexDirs], + [rt:start(ANode) || ANode <- Nodes], + ok. + +%% @doc Check if index/core exists in metadata, disk via yz_index:exists. +-spec check_exists([node()], index_name()) -> ok. +check_exists(Nodes, IndexName) -> + rt:wait_until(Nodes, + fun(N) -> + rpc:call(N, yz_index, exists, [IndexName]) + end). + +-spec verify_num_found_query([node()], index_name(), count()) -> ok. verify_num_found_query(Cluster, Index, ExpectedCount) -> F = fun(Node) -> Pid = rt:pbc(Node), @@ -172,3 +197,89 @@ verify_num_found_query(Cluster, Index, ExpectedCount) -> end, rt:wait_until(Cluster, F), ok. + +search_expect(HP, Index, Name, Term, Expect) -> + search_expect(yokozuna, HP, Index, Name, Term, Expect). + +search_expect(Type, HP, Index, Name, Term, Expect) -> + {ok, "200", _, R} = search(Type, HP, Index, Name, Term), + verify_count_http(Expect, R). + +search_expect(solr, {Host, Port}, Index, Name0, Term0, Shards, Expect) + when is_list(Shards), length(Shards) > 0 -> + Name = quote_unicode(Name0), + Term = quote_unicode(Term0), + URL = internal_solr_url(Host, Port, Index, Name, Term, Shards), + lager:info("Run search ~s", [URL]), + Opts = [{response_format, binary}], + {ok, "200", _, R} = ibrowse:send_req(URL, [], get, [], Opts), + verify_count_http(Expect, R). + +search(HP, Index, Name, Term) -> + search(yokozuna, HP, Index, Name, Term). + +search(Type, {Host, Port}, Index, Name, Term) when is_integer(Port) -> + search(Type, {Host, integer_to_list(Port)}, Index, Name, Term); + +search(Type, {Host, Port}, Index, Name0, Term0) -> + Name = quote_unicode(Name0), + Term = quote_unicode(Term0), + FmtStr = case Type of + solr -> + "http://~s:~s/internal_solr/~s/select?q=~s:~s&wt=json"; + yokozuna -> + "http://~s:~s/search/query/~s?q=~s:~s&wt=json" + end, + URL = ?FMT(FmtStr, [Host, Port, Index, Name, Term]), + lager:info("Run search ~s", [URL]), + Opts = [{response_format, binary}], + ibrowse:send_req(URL, [], get, [], Opts). + +%%%=================================================================== +%%% Private +%%%=================================================================== + +-spec verify_count_http(count(), json_string()) -> boolean(). +verify_count_http(Expected, Resp) -> + Count = get_count_http(Resp), + lager:info("Expected: ~p, Actual: ~p", [Expected, Count]), + Expected == Count. + +-spec get_count_http(json_string()) -> count(). +get_count_http(Resp) -> + Struct = mochijson2:decode(Resp), + kvc:path([<<"response">>, <<"numFound">>], Struct). + +-spec riak_http({node(), rt:interfaces()} | rt:interfaces()) -> + {host(), portnum()}. +riak_http({_Node, ConnInfo}) -> + riak_http(ConnInfo); +riak_http(ConnInfo) -> + proplists:get_value(http, ConnInfo). + +-spec config_merge(proplists:proplist(), proplists:proplist()) -> + orddict:orddict() | proplists:proplist(). +config_merge(DefaultCfg, NewCfg) when NewCfg /= [] -> + orddict:update(yokozuna, + fun(V) -> + orddict:merge(fun(_, _X, Y) -> Y end, + orddict:from_list(V), + orddict:from_list( + orddict:fetch( + yokozuna, NewCfg))) + end, + DefaultCfg); +config_merge(DefaultCfg, _NewCfg) -> + DefaultCfg. + +internal_solr_url(Host, Port, Index) -> + ?FMT("http://~s:~B/internal_solr/~s", [Host, Port, Index]). +internal_solr_url(Host, Port, Index, Name, Term, Shards) -> + Ss = [internal_solr_url(Host, ShardPort, Index) + || {_, ShardPort} <- Shards], + ?FMT("http://~s:~B/internal_solr/~s/select?wt=json&q=~s:~s&shards=~s", + [Host, Port, Index, Name, Term, string:join(Ss, ",")]). + +quote_unicode(Value) -> + mochiweb_util:quote_plus(binary_to_list( + unicode:characters_to_binary(Value))). diff --git a/tests/yz_core_properties_create_unload.erl b/tests/yz_core_properties_create_unload.erl index 50821950b..eef47618e 100644 --- a/tests/yz_core_properties_create_unload.erl +++ b/tests/yz_core_properties_create_unload.erl @@ -83,7 +83,7 @@ test_core_props_removal(Cluster, RandNodes, KeyCount, Pid) -> lager:info("Remove core.properties file in each index data dir"), remove_core_props(RandNodes, ?INDEX), - check_exists(Cluster, ?INDEX), + yokozuna_rt:check_exists(Cluster, ?INDEX), lager:info("Write one more piece of data"), ok = rt:pbc_write(Pid, ?BUCKET, <<"foo">>, <<"foo">>, "text/plain"), @@ -93,9 +93,9 @@ test_core_props_removal(Cluster, RandNodes, KeyCount, Pid) -> test_remove_index_dirs(Cluster, RandNodes, KeyCount, Pid) -> lager:info("Remove index directories on each node and let them recreate/reindex"), - remove_index_dirs(RandNodes, ?INDEX), + yokozuna_rt:remove_index_dirs(RandNodes, ?INDEX), - check_exists(Cluster, ?INDEX), + yokozuna_rt:check_exists(Cluster, ?INDEX), yokozuna_rt:expire_trees(Cluster), yokozuna_rt:wait_for_aae(Cluster), @@ -112,9 +112,9 @@ test_remove_segment_infos_and_rebuild(Cluster, RandNodes, KeyCount, Pid) -> lager:info("To fix, we remove index directories on each node and let them recreate/reindex"), - remove_index_dirs(RandNodes, ?INDEX), + yokozuna_rt:remove_index_dirs(RandNodes, ?INDEX), - check_exists(Cluster, ?INDEX), + yokozuna_rt:check_exists(Cluster, ?INDEX), yokozuna_rt:expire_trees(Cluster), yokozuna_rt:wait_for_aae(Cluster), @@ -147,23 +147,6 @@ remove_core_props(Nodes, IndexName) -> [file:delete(PropsFile) || PropsFile <- PropsFiles], ok. -%% @doc Check if index/core exists in metadata, disk via yz_index:exists. -check_exists(Nodes, IndexName) -> - rt:wait_until(Nodes, - fun(N) -> - rpc:call(N, yz_index, exists, [IndexName]) - end). - -%% @doc Remove index directories, removing the index. -remove_index_dirs(Nodes, IndexName) -> - IndexDirs = [rpc:call(Node, yz_index, index_dir, [IndexName]) || - Node <- Nodes], - lager:info("Remove index dirs: ~p, on nodes: ~p~n", - [IndexDirs, Nodes]), - [rt:stop(ANode) || ANode <- Nodes], - [rt:del_dir(binary_to_list(IndexDir)) || IndexDir <- IndexDirs], - [rt:start(ANode) || ANode <- Nodes]. - %% @doc Remove lucence segment info files to check if reindexing will occur %% on re-creation/re-indexing. remove_segment_infos(Nodes, IndexName) -> diff --git a/tests/yz_solr_upgrade.erl b/tests/yz_solr_upgrade.erl new file mode 100644 index 000000000..e81b7d26f --- /dev/null +++ b/tests/yz_solr_upgrade.erl @@ -0,0 +1,122 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%%-------------------------------------------------------------------- +-module(yz_solr_upgrade). +-compile(export_all). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("riakc/include/riakc.hrl"). + +-define(INDEX, <<"test_solr_upgrade_idx">>). +-define(BUCKET, <<"test_solr_upgrade_bucket">>). +-define(SEQMAX, 1000). +-define(CFG, + [{riak_core, + [ + {ring_creation_size, 16}, + {anti_entropy_build_limit, {100, 1000}}, + {anti_entropy_concurrency, 8} + ]}, + {yokozuna, + [ + {anti_entropy_tick, 1000}, + {enabled, true} + ]} + ]). + +confirm() -> + TestMetaData = riak_test_runner:metadata(), + OldVsn = proplists:get_value(upgrade_version, TestMetaData, previous), + + [Node1, Node2|RestNodes] = Cluster = rt:build_cluster(lists:duplicate(4, {OldVsn, ?CFG})), + + %% Generate keys, YZ only supports UTF-8 compatible keys + GenKeys = [<> || N <- lists:seq(1, ?SEQMAX), + not lists:any( + fun(E) -> E > 127 end, + binary_to_list(<>))], + KeyCount = length(GenKeys), + lager:info("KeyCount ~p", [KeyCount]), + + OldPid = rt:pbc(rt:select_random(RestNodes)), + + yokozuna_rt:write_data(Cluster, OldPid, ?INDEX, ?BUCKET, GenKeys), + %% wait for solr soft commit + timer:sleep(1100), + + riakc_pb_socket:stop(OldPid), + + HP1 = random_hp([Node1, Node2]), + yokozuna_rt:search_expect(HP1, ?INDEX, <<"*">>, <<"*">>, KeyCount), + + %% Upgrade Node 1 and 2 + lager:info("Upgrade to solr version 4.10.4 on Nodes 1 - 2"), + upgrade_to_current([Node1, Node2]), + + lager:info("Write one more piece of data"), + Pid2 = rt:pbc(Node2), + ok = rt:pbc_write(Pid2, ?BUCKET, <<"foo">>, <<"foo">>, "text/plain"), + timer:sleep(1100), + riakc_pb_socket:stop(Pid2), + + HP2 = random_hp(RestNodes), + yokozuna_rt:search_expect(HP2, ?INDEX, <<"*">>, <<"*">>, KeyCount + 1), + + %% Upgrade Rest + lager:info("Upgrade to solr version 4.10.4 on Nodes 3 - 4"), + upgrade_to_current(RestNodes), + + lager:info("Write one more piece of data"), + RandPid = rt:pbc(rt:select_random(RestNodes)), + ok = rt:pbc_write(RandPid, ?BUCKET, <<"food">>, <<"food">>, "text/plain"), + timer:sleep(1100), + riakc_pb_socket:stop(RandPid), + + yokozuna_rt:expire_trees(Cluster), + yokozuna_rt:wait_for_aae(Cluster), + HP3 = random_hp(Cluster), + yokozuna_rt:search_expect(HP3, ?INDEX, <<"*">>, <<"*">>, KeyCount + 2), + + lager:info("Downgrade cluster to previous version. Once upgraded, the + the index format will change, throwing an error, unless you + reindex (& resync the AAE trees) that core/search_index again."), + yokozuna_rt:rolling_upgrade(Cluster, previous), + + yokozuna_rt:remove_index_dirs(Cluster, ?INDEX), + yokozuna_rt:check_exists(Cluster, ?INDEX), + yokozuna_rt:expire_trees(Cluster), + yokozuna_rt:wait_for_aae(Cluster), + + HP4 = random_hp(Cluster), + yokozuna_rt:search_expect(HP4, ?INDEX, <<"*">>, <<"*">>, KeyCount + 2), + + pass. + +random_hp(Nodes) -> + rt:select_random(yokozuna_rt:host_entries( + rt:connection_info(Nodes))). + +upgrade_to_current(Nodes) -> + [rt:upgrade(ANode, current) || ANode <- Nodes], + [rt:wait_for_service(ANode, riak_kv) || ANode <- Nodes], + [rt:wait_for_service(ANode, yokozuna) || ANode <- Nodes]. + +downgrade_to_previous(Nodes) -> + [rt:upgrade(ANode, previous) || ANode <- Nodes], + [rt:wait_for_service(ANode, riak_kv) || ANode <- Nodes], + [rt:wait_for_service(ANode, yokozuna) || ANode <- Nodes].