Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
yosukehara committed Mar 13, 2016
1 parent 20fab3c commit e51b374
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 51 deletions.
60 changes: 43 additions & 17 deletions include/leo_object_storage.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,24 @@

%% Default Values
-define(AVS_FILE_EXT, ".avs").
-define(DEF_METADATA_DB, 'bitcask').
-define(DEF_OBJECT_STORAGE_SUB_DIR, "object/").
-define(DEF_METADATA_DB, 'bitcask').
-define(DEF_OBJECT_STORAGE_SUB_DIR, "object/").
-define(DEF_METADATA_STORAGE_SUB_DIR, "metadata/").
-define(DEF_STATE_SUB_DIR, "state/").
-define(DEF_STATE_SUB_DIR, "state/").

-define(SERVER_OBJ_STORAGE, 'object_storage').
-define(SERVER_METADATA_DB, 'metadata_db').

-define(DEF_NUM_OF_OBJ_STORAGE_READ_PROCS, 3).

%% ETS-Table
-define(ETS_CONTAINERS_TABLE, 'leo_object_storage_containers').
-define(ETS_INFO_TABLE, 'leo_object_storage_info').
-define(ETS_INFO_TABLE, 'leo_object_storage_info').

%% regarding compaction
-define(ENV_COMPACTION_STATUS, 'compaction_status').
-define(ENV_COMPACTION_STATUS, 'compaction_status').
-define(STATE_RUNNING_COMPACTION, 'compacting').
-define(STATE_ACTIVE, 'active').
-define(STATE_ACTIVE, 'active').
-type(storage_status() :: ?STATE_RUNNING_COMPACTION | ?STATE_ACTIVE).

-define(DEF_LIMIT_COMPACTION_PROCS, 4).
Expand Down Expand Up @@ -184,6 +186,7 @@
-define(MD5_EMPTY_BIN, 281949768489412648962353822266799178366).
-define(MAX_KEY_SIZE, 1024 * 4).


%%--------------------------------------------------------------------
%% AVS-Related
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -245,6 +248,7 @@
-define(DEF_POS_START, -1).
-define(DEF_POS_END, -1).


%%--------------------------------------------------------------------
%% Records
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -389,31 +393,50 @@
%% apllication-env
-define(env_metadata_db(),
case application:get_env(?APP_NAME, metadata_storage) of
{ok, EnvMetadataDB} -> EnvMetadataDB;
_ -> ?DEF_METADATA_DB
{ok, EnvMetadataDB} ->
EnvMetadataDB;
_ ->
?DEF_METADATA_DB
end).

-ifdef(TEST).
-define(env_strict_check(), true).
-else.
-define(env_strict_check(),
case application:get_env(?APP_NAME, strict_check) of
{ok, EnvStrictCheck} -> EnvStrictCheck;
_ -> false
{ok, EnvStrictCheck} ->
EnvStrictCheck;
_ ->
false
end).
-endif.

-define(env_enable_diagnosis_log(),
case application:get_env(leo_object_storage,
case application:get_env(?APP_NAME,
is_enable_diagnosis_log) of
{ok, true} ->
true;
_ ->
false
end).

-define(env_num_of_obj_storage_read_procs(),
case application:get_env(?APP_NAME,
num_of_obj_storage_read_procs) of
{ok, EnvNumOfObjStorageReadProcs} ->
EnvNumOfObjStorageReadProcs;
_ ->
?DEF_NUM_OF_OBJ_STORAGE_READ_PROCS
end).

-define(get_obj_storage_read_proc(_PidL,_AddrId),
begin
lists:nth((_AddrId rem erlang:length(_PidL)) + 1,_PidL)
end).


-define(env_limit_num_of_compaction_procs(),
case application:get_env(leo_object_storage,
case application:get_env(?APP_NAME,
limit_num_of_compaction_procs) of
{ok, EnvLimitCompactionProcs} when is_integer(EnvLimitCompactionProcs) ->
EnvLimitCompactionProcs;
Expand All @@ -422,7 +445,7 @@
end).

-define(env_threshold_slow_processing(),
case application:get_env(leo_object_storage,
case application:get_env(?APP_NAME,
threshold_slow_processing) of
{ok, EnvThresholdSlowProc} when is_integer(EnvThresholdSlowProc) ->
EnvThresholdSlowProc;
Expand All @@ -432,7 +455,7 @@

%% [Interval between batch processes]
-define(env_compaction_interval_reg(),
case application:get_env(leo_object_storage,
case application:get_env(?APP_NAME,
compaction_waiting_time_regular) of
{ok, EnvRegCompactionWT} when is_integer(EnvRegCompactionWT) ->
EnvRegCompactionWT;
Expand All @@ -441,7 +464,7 @@
end).

-define(env_compaction_interval_max(),
case application:get_env(leo_object_storage,
case application:get_env(?APP_NAME,
compaction_waiting_time_max) of
{ok, EnvMaxCompactionWT} when is_integer(EnvMaxCompactionWT) ->
EnvMaxCompactionWT;
Expand All @@ -451,15 +474,17 @@

%% [Number of batch processes]
-define(env_compaction_num_of_batch_procs_max(),
case application:get_env(leo_object_storage, batch_procs_max) of
case application:get_env(?APP_NAME,
batch_procs_max) of
{ok, EnvMaxCompactionBP} when is_integer(EnvMaxCompactionBP) ->
EnvMaxCompactionBP;
_ ->
?DEF_MAX_COMPACTION_BP
end).

-define(env_compaction_num_of_batch_procs_reg(),
case application:get_env(leo_object_storage, batch_procs_regular) of
case application:get_env(?APP_NAME,
batch_procs_regular) of
{ok, EnvRegCompactionBP} when is_integer(EnvRegCompactionBP) ->
EnvRegCompactionBP;
_ ->
Expand Down Expand Up @@ -487,6 +512,7 @@
end,_TargetContainers))
end).


%% custom-metadata's items for MDC-replication:
-define(PROP_CMETA_CLUSTER_ID, 'cluster_id').
-define(PROP_CMETA_NUM_OF_REPLICAS, 'num_of_replicas').
Expand Down
24 changes: 22 additions & 2 deletions src/leo_compact_fsm_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -952,8 +952,11 @@ after_execute_1({ok, #compaction_worker_state{
ok ->
ok = leo_object_storage_server:switch_container(
ObjStorageId, FilePath, NumActiveObjs, SizeActiveObjs),
ok = leo_object_storage_server:switch_container(
ObjStorageId_R, FilePath, NumActiveObjs, SizeActiveObjs),
%% @TODO
NumOfObjStorageReadProcs = ?env_num_of_obj_storage_read_procs(),
ok = after_execute_2(NumOfObjStorageReadProcs, ObjStorageId_R,
FilePath, NumActiveObjs, SizeActiveObjs),

ok = leo_backend_db_api:finish_compaction(MetaDBId, true),
ok;
{error,_Cause} ->
Expand All @@ -972,6 +975,23 @@ after_execute_1({_Error, #compaction_worker_state{
ok.


%% @doc Switch an avs container for obj-storage-read
%% @private
after_execute_2(0,_,_,_,_) ->
ok;
after_execute_2(ChildIndex, ObjStorageId_R,
FilePath, NumActiveObjs, SizeActiveObjs) ->
ObjStorageId_R_Child = list_to_atom(
lists:append([atom_to_list(ObjStorageId_R),
"_",
integer_to_list(ChildIndex)
])),
ok = leo_object_storage_server:switch_container(
ObjStorageId_R_Child, FilePath, NumActiveObjs, SizeActiveObjs),
after_execute_2(ChildIndex - 1, ObjStorageId_R,
FilePath, NumActiveObjs, SizeActiveObjs).


%% @doc Output the diagnosis log
%% @private
output_diagnosis_log(undefined,_Metadata) ->
Expand Down
9 changes: 6 additions & 3 deletions src/leo_object_storage_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,8 @@ start_app() ->
do_request(get, [{AddrId, Key}, StartPos, EndPos, IsForcedCheck]) ->
KeyBin = term_to_binary({AddrId, Key}),
case get_object_storage_pid(KeyBin) of
[{_,Pid}|_] ->
[{_,PidL}|_] ->
Pid = ?get_obj_storage_read_proc(PidL, AddrId),
?SERVER_MODULE:get(Pid, {AddrId, Key}, StartPos, EndPos, IsForcedCheck);
_ ->
{error, ?ERROR_PROCESS_NOT_FOUND}
Expand Down Expand Up @@ -570,15 +571,17 @@ do_request(delete, [Key, Object]) ->
do_request(head, [{AddrId, Key}]) ->
KeyBin = term_to_binary({AddrId, Key}),
case get_object_storage_pid(KeyBin) of
[{_,Pid}|_] ->
[{_,PidL}|_] ->
Pid = ?get_obj_storage_read_proc(PidL, AddrId),
?SERVER_MODULE:head(Pid, {AddrId, Key});
_ ->
{error, ?ERROR_PROCESS_NOT_FOUND}
end;
do_request(head_with_calc_md5, [{AddrId, Key}, MD5Context]) ->
KeyBin = term_to_binary({AddrId, Key}),
case get_object_storage_pid(KeyBin) of
[{_,Pid}|_] ->
[{_,PidL}|_] ->
Pid = ?get_obj_storage_read_proc(PidL, AddrId),
?SERVER_MODULE:head_with_calc_md5(
Pid, {AddrId, Key}, MD5Context);
_ ->
Expand Down
87 changes: 58 additions & 29 deletions src/leo_object_storage_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ add_container_1(leo_compact_fsm_worker = Mod,
{error, Cause}
end.

%% @TODO
add_container_1(leo_object_storage_server = Mod, BaseId,
ObjStorageId, MetaDBId, CompactWorkerId, LoggerId, Props) ->
Path = leo_misc:get_value('path', Props),
Expand All @@ -380,33 +379,10 @@ add_container_1(leo_object_storage_server = Mod, BaseId,
case supervisor:start_child(?MODULE, ChildSpec_1) of
{ok,_} ->
%% For GET and HEAD
ObjStorageId_R = gen_id(obj_storage_read, BaseId),
ChildSpec_2 = {ObjStorageId_R,
{Mod, start_link,
[ObjServerState#obj_server_state{id = ObjStorageId_R,
privilege = ?OBJ_PRV_READ_ONLY}]},
permanent, 2000, worker, [Mod]},
case supervisor:start_child(?MODULE, ChildSpec_2) of
{ok,_} ->
true = ets:insert(?ETS_CONTAINERS_TABLE,
{BaseId, [{obj_storage, ObjStorageId},
{obj_storage_read, ObjStorageId_R},
{metadata, MetaDBId},
{compact_worker, CompactWorkerId}]}),
ok = leo_misc:set_env(?APP_NAME,
{?ENV_COMPACTION_STATUS, ObjStorageId},
?STATE_ACTIVE),
ok;
{error,{already_started,_Pid}} ->
ok;
{error, Cause} ->
error_logger:error_msg("~p,~p,~p,~p~n",
[{module, ?MODULE_STRING},
{function, "add_container_1/6"},
{line, ?LINE},
{body, Cause}]),
{error, Cause}
end;
NumOfObjStorageReadProcs = ?env_num_of_obj_storage_read_procs(),
add_container_2(NumOfObjStorageReadProcs, Mod, BaseId,
ObjStorageId, MetaDBId,
CompactWorkerId, ObjServerState, []);
{error,{already_started,_Pid}} ->
ok;
{error, Cause} ->
Expand All @@ -419,6 +395,50 @@ add_container_1(leo_object_storage_server = Mod, BaseId,
end.


%% @doc Make obj_storage_read's processes for GET and HEAD operation
%% @private
add_container_2(0,_Mod, BaseId,
ObjStorageId, MetaDBId, CompactWorkerId,_ObjServerState, Acc) ->
true = ets:insert(?ETS_CONTAINERS_TABLE,
{BaseId, [{obj_storage, ObjStorageId},
{obj_storage_read, Acc},
{metadata, MetaDBId},
{compact_worker, CompactWorkerId}]}),
ok = leo_misc:set_env(?APP_NAME,
{?ENV_COMPACTION_STATUS, ObjStorageId},
?STATE_ACTIVE),
ok;
add_container_2(ChildIndex, Mod, BaseId,
ObjStorageId, MetaDBId, CompactWorkerId, ObjServerState, Acc) ->
ObjStorageId_R = gen_id(obj_storage_read, BaseId, ChildIndex),
ChildSpec = {ObjStorageId_R,
{Mod, start_link,
[ObjServerState#obj_server_state{id = ObjStorageId_R,
privilege = ?OBJ_PRV_READ_ONLY}]},
permanent, 2000, worker, [Mod]},
Ret = case supervisor:start_child(?MODULE, ChildSpec) of
{ok,_} ->
ok;
{error,{already_started,_Pid}} ->
ok;
{error, Cause} ->
error_logger:error_msg("~p,~p,~p,~p~n",
[{module, ?MODULE_STRING},
{function, "add_container_2/8"},
{line, ?LINE},
{body, Cause}]),
{error, Cause}
end,
case Ret of
ok ->
add_container_2(ChildIndex - 1, Mod, BaseId,
ObjStorageId, MetaDBId, CompactWorkerId,
ObjServerState, [ObjStorageId_R|Acc]);
Other ->
Other
end.


%% @doc Generate Id for obj-storage or metadata
%%
-spec(gen_id(obj_storage | metadata | diagnosis_logger | compact_worker, integer()) ->
Expand All @@ -430,7 +450,8 @@ gen_id(obj_storage, Id) ->
gen_id(obj_storage_read, Id) ->
list_to_atom(lists:append([atom_to_list(?APP_NAME),
"_read_",
integer_to_list(Id)]));
integer_to_list(Id)
]));
gen_id(metadata, Id) ->
list_to_atom(lists:append(["leo_metadata_",
integer_to_list(Id)]));
Expand All @@ -440,3 +461,11 @@ gen_id(diagnosis_logger, Id) ->
gen_id(compact_worker, Id) ->
list_to_atom(lists:append(["leo_compact_worker_",
integer_to_list(Id)])).

gen_id(obj_storage_read, Id, ChildIndex) ->
list_to_atom(lists:append([atom_to_list(?APP_NAME),
"_read_",
integer_to_list(Id),
"_",
integer_to_list(ChildIndex)
])).

1 comment on commit e51b374

@yosukehara
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memo:

Please sign in to comment.