Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consider removal of broken objects #730

Merged
merged 3 commits into from
Jun 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions ocaml/src/alba_base_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -502,4 +502,33 @@ class client
(self # deliver_nsm_host_messages)
(self # drop_cache_by_id ~global:true)
~namespace

method delete_object ~namespace_id ~object_name ~may_not_exist =
self # nsm_host_access # get_nsm_by_id ~namespace_id
>>= fun nsm_client ->
Lwt.finalize
(fun () ->
let open Nsm_model in
nsm_client # delete_object
~object_name
~allow_overwrite:(if may_not_exist
then Unconditionally
else AnyPrevious)
>>= function
| None ->
Lwt_log.debug_f
"no object with name %s could be found\n"
object_name
| Some old_manifest ->
(* TODO add en-passant deletion of fragments *)
Lwt_log.debug_f
"object with name %s was deleted\n"
object_name)
(fun () ->
let () =
Manifest_cache.ManifestCache.remove
(self # get_manifest_cache) namespace_id object_name
in
Lwt.return_unit
)
end
29 changes: 2 additions & 27 deletions ocaml/src/alba_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -322,33 +322,8 @@ class alba_client (base_client : Alba_base_client.client)
self # nsm_host_access # with_namespace_id
~namespace
(fun namespace_id ->
self # nsm_host_access # get_nsm_by_id ~namespace_id
>>= fun nsm_client ->
Lwt.finalize
(fun () ->
let open Nsm_model in
nsm_client # delete_object
~object_name
~allow_overwrite:(if may_not_exist
then Unconditionally
else AnyPrevious)
>>= function
| None ->
Lwt_log.debug_f
"no object with name %s could be found\n"
object_name
| Some old_manifest ->
(* TODO add en-passant deletion of fragments *)
Lwt_log.debug_f
"object with name %s was deleted\n"
object_name)
(fun () ->
let () =
Manifest_cache.ManifestCache.remove
(base_client # get_manifest_cache) namespace_id object_name
in
Lwt.return_unit
))
base_client # delete_object ~namespace_id ~object_name ~may_not_exist
)

method download_object_generic'
~namespace_id
Expand Down
13 changes: 10 additions & 3 deletions ocaml/src/alba_test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,8 @@ let test_repair_by_policy () =
~nsm_host_id
~namespace ()
>>= fun namespace_id ->

alba_client # mgr_access # get_namespace_by_id ~namespace_id
>>= fun (_, namespace_name, namespace_info) ->
let get_k_m_x manifest =
let open Nsm_model in
let es = match manifest.Manifest.storage_scheme with
Expand Down Expand Up @@ -1179,7 +1180,10 @@ let test_repair_by_policy () =
(fun maintenance_client ->
maintenance_client # repair_by_policy_namespace'
~skip_recent:false
~namespace_id ())
~namespace_id
~namespace
~namespace_info
())
>>= fun () ->

alba_client # get_object_manifest
Expand Down Expand Up @@ -1975,6 +1979,9 @@ let test_update_policies () =
alba_client # mgr_access # create_preset preset_name preset >>= fun () ->
alba_client # create_namespace ~namespace ~preset_name:(Some preset_name) ()
>>= fun namespace_id ->
alba_client # mgr_access # get_namespace_by_id ~namespace_id
>>= fun (_, namespace, namespace_info) ->

_wait_for_osds alba_client namespace_id >>= fun () ->

let assert_k_m mf k m =
Expand Down Expand Up @@ -2019,7 +2026,7 @@ let test_update_policies () =
(fun maintenance_client ->
maintenance_client # repair_by_policy_namespace'
~skip_recent:false
~namespace_id ())
~namespace_id ~namespace ~namespace_info ())
>>= fun () ->

alba_client # get_object_manifest
Expand Down
158 changes: 67 additions & 91 deletions ocaml/src/maintenance.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1134,12 +1134,21 @@ class client ?(retry_timeout = 60.)
end
end

method repair_by_policy_namespace ~namespace_id =
method repair_by_policy_namespace
~namespace_id
~(namespace:string)
~namespace_info
=
if filter namespace_id
then self # repair_by_policy_namespace' ~namespace_id ()
else Lwt.return ()
then self # repair_by_policy_namespace'
~namespace_id ~namespace ~namespace_info ()
else Lwt.return_unit

method repair_by_policy_namespace' ?(skip_recent=true) ~namespace_id () =
method repair_by_policy_namespace' ?(skip_recent=true)
~namespace_id
~namespace
~namespace_info
() =

alba_client # get_ns_preset_info ~namespace_id >>= fun preset ->
let policies = preset.Preset.policies in
Expand Down Expand Up @@ -1167,88 +1176,36 @@ class client ?(retry_timeout = 60.)
nsm_host_access # get_nsm_by_id ~namespace_id >>= fun nsm ->

nsm # get_stats >>= fun stats ->
let (_, bucket_count) = stats.Nsm_model.NamespaceStats.bucket_count in
let open Nsm_model in
let (_, bucket_count) = stats.NamespaceStats.bucket_count in

Lwt_log.debug_f
"Found buckets %s for namespace_id:%Li"
([%show : (Policy.policy * int64) list] bucket_count)
namespace_id >>= fun () ->

let is_cache_namespace =
let open Maintenance_config in
Hashtbl.fold
(fun prefix preset acc ->
acc ||
(let open Albamgr_protocol.Protocol in
preset = namespace_info.Namespace.preset_name
&& String.starts_with namespace prefix
)
)
maintenance_config.cache_eviction_prefix_preset_pairs
false
in
let buckets =
bucket_count
|> List.filter (fun (_, cnt) -> cnt > 0L)
|> List.map fst
|> List.map_filter_rev
(fun ((k, m, fragment_count, max_disks_per_node) as bucket) ->
if (k, m) = (best_k, best_m)
then
begin
let open Compare in
match Int.compare' fragment_count best_actual_fragment_count with
| LT -> Some (bucket, `Regenerate)
| EQ
| GT ->
if max_disks_per_node > best_actual_max_disks_per_node
then Some (bucket, `Rebalance)
else None
end
else
begin
let is_more_preferred_bucket =
let rec inner = function
| [] -> false
| ((k', m', fragment_count', max_disks_per_node') as policy) :: policies ->
if policy = best_policy
then false
else
begin
if
(* does the bucket match this policy? *)
k = k'
&& m = m'
&& fragment_count >= fragment_count'
&& max_disks_per_node <= max_disks_per_node'
then true
else inner policies
end
in
inner policies
in
if is_more_preferred_bucket
then
(* this will be handled by another part of the maintenance process
* (when osds are considered dead/unavailable for writes for
* a long enough time the objects in these buckets will be
* repaired/rewritten)
*)
None
else
Some (bucket, `Rewrite)
end)
|> List.sort
(fun (bucket1, j1) (bucket2, j2) ->
let compare_bucket_safety (k1, _, fragment_count1, _) (k2, _, fragment_count2, _) =
(fragment_count1 - k1) - (fragment_count2 - k2)
in
let get_max_disks_per_node (_, _, _, x) = x in
match j1, j2 with
| `Rebalance, `Rebalance ->
compare
(get_max_disks_per_node bucket1)
(get_max_disks_per_node bucket2)
| `Regenerate, `Rebalance
| `Rewrite, `Rebalance ->
1
| `Rebalance, `Regenerate
| `Rebalance, `Rewrite ->
-1
| `Rewrite, `Rewrite
| `Regenerate, `Rewrite
| `Rewrite, `Regenerate
| `Regenerate, `Regenerate ->
compare_bucket_safety bucket1 bucket2)
Maintenance_helper.categorize_policies
best_policy
best_actual_fragment_count
best_actual_max_disks_per_node
policies
is_cache_namespace
bucket_count
in

let repaired_some = ref false in

let handle_bucket ((k, m, fragment_count, max_disks_per_node), job) =
Expand Down Expand Up @@ -1397,13 +1354,32 @@ class client ?(retry_timeout = 60.)
repaired_some := true;
Lwt.return_unit))
in
let f = match job with
| `Rewrite ->
let maybe_remove mf =
Lwt_extra2.ignore_errors
(fun () ->
let open Nsm_model in
let should_delete = not (Manifest.has_data_fragments k mf) in
if should_delete
then
alba_client # delete_object
~namespace_id
~object_name:mf.Manifest.name
~may_not_exist:true
else
Lwt.return_unit
)
in
let f =
let open Maintenance_helper in
match job with
| Rewrite ->
rewrite
| `Regenerate ->
| Regenerate ->
wrap_rewrite "Regenerate" regenerate
| `Rebalance ->
| Rebalance ->
wrap_rewrite "Rebalance" rebalance
| ConsiderRemoval -> maybe_remove

in

(* filter out recent object uploads to avoid repairing objects
Expand All @@ -1430,7 +1406,8 @@ class client ?(retry_timeout = 60.)
loop_buckets buckets >>= fun () ->

if !repaired_some && filter namespace_id
then self # repair_by_policy_namespace ~namespace_id
then self # repair_by_policy_namespace
~namespace_id ~namespace ~namespace_info
else Lwt.return ()


Expand Down Expand Up @@ -1902,8 +1879,8 @@ class client ?(retry_timeout = 60.)
retry_timeout

method maintenance_for_namespace
~namespace
~namespace_id
~namespace
~namespace_info
=
Lwt_extra2.ignore_errors
Expand Down Expand Up @@ -1974,7 +1951,11 @@ class client ?(retry_timeout = 60.)
~grace_period:gc_grace_period
~namespace_id);
"repair by policy",
(fun () -> self # repair_by_policy_namespace ~namespace_id);
(fun () -> self # repair_by_policy_namespace
~namespace_id
~namespace
~namespace_info
);
"rebalance",
(fun () -> self # rebalance_namespace
~make_first_last_reverse
Expand Down Expand Up @@ -2025,12 +2006,7 @@ class client ?(retry_timeout = 60.)
osd_access

method cache_eviction () : unit Lwt.t =
let get_prefixes () =
Hashtbl.fold
(fun prefix _ acc -> prefix :: acc)
maintenance_config.Maintenance_config.cache_eviction_prefix_preset_pairs
[]
in
let get_prefixes () = Maintenance_config.get_prefixes maintenance_config in

let percentage_from_fill_ratio fill_ratio =
if fill_ratio > 0.93
Expand Down
6 changes: 6 additions & 0 deletions ocaml/src/maintenance_config.ml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ type t = {

let show t = to_yojson t |> Yojson.Safe.pretty_to_string

let get_prefixes t =
Hashtbl.fold
(fun prefix _ acc -> prefix :: acc)
t.cache_eviction_prefix_preset_pairs
[]

let from_buffer buf =
let ser_version = Llio.int8_from buf in
assert (ser_version = 1);
Expand Down
Loading