Skip to content

Commit

Permalink
Merge pull request #730 from openvstorage/maybe_delete_broken
Browse files Browse the repository at this point in the history
consider removal of broken objects
  • Loading branch information
Romain Slootmaekers authored Jun 1, 2017
2 parents f9efd86 + b5cbd6b commit fec90d9
Show file tree
Hide file tree
Showing 10 changed files with 349 additions and 136 deletions.
29 changes: 29 additions & 0 deletions ocaml/src/alba_base_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -502,4 +502,33 @@ class client
(self # deliver_nsm_host_messages)
(self # drop_cache_by_id ~global:true)
~namespace

method delete_object ~namespace_id ~object_name ~may_not_exist =
self # nsm_host_access # get_nsm_by_id ~namespace_id
>>= fun nsm_client ->
Lwt.finalize
(fun () ->
let open Nsm_model in
nsm_client # delete_object
~object_name
~allow_overwrite:(if may_not_exist
then Unconditionally
else AnyPrevious)
>>= function
| None ->
Lwt_log.debug_f
"no object with name %s could be found\n"
object_name
| Some old_manifest ->
(* TODO add en-passant deletion of fragments *)
Lwt_log.debug_f
"object with name %s was deleted\n"
object_name)
(fun () ->
let () =
Manifest_cache.ManifestCache.remove
(self # get_manifest_cache) namespace_id object_name
in
Lwt.return_unit
)
end
29 changes: 2 additions & 27 deletions ocaml/src/alba_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -322,33 +322,8 @@ class alba_client (base_client : Alba_base_client.client)
self # nsm_host_access # with_namespace_id
~namespace
(fun namespace_id ->
self # nsm_host_access # get_nsm_by_id ~namespace_id
>>= fun nsm_client ->
Lwt.finalize
(fun () ->
let open Nsm_model in
nsm_client # delete_object
~object_name
~allow_overwrite:(if may_not_exist
then Unconditionally
else AnyPrevious)
>>= function
| None ->
Lwt_log.debug_f
"no object with name %s could be found\n"
object_name
| Some old_manifest ->
(* TODO add en-passant deletion of fragments *)
Lwt_log.debug_f
"object with name %s was deleted\n"
object_name)
(fun () ->
let () =
Manifest_cache.ManifestCache.remove
(base_client # get_manifest_cache) namespace_id object_name
in
Lwt.return_unit
))
base_client # delete_object ~namespace_id ~object_name ~may_not_exist
)

method download_object_generic'
~namespace_id
Expand Down
13 changes: 10 additions & 3 deletions ocaml/src/alba_test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,8 @@ let test_repair_by_policy () =
~nsm_host_id
~namespace ()
>>= fun namespace_id ->

alba_client # mgr_access # get_namespace_by_id ~namespace_id
>>= fun (_, namespace_name, namespace_info) ->
let get_k_m_x manifest =
let open Nsm_model in
let es = match manifest.Manifest.storage_scheme with
Expand Down Expand Up @@ -1179,7 +1180,10 @@ let test_repair_by_policy () =
(fun maintenance_client ->
maintenance_client # repair_by_policy_namespace'
~skip_recent:false
~namespace_id ())
~namespace_id
~namespace
~namespace_info
())
>>= fun () ->

alba_client # get_object_manifest
Expand Down Expand Up @@ -1975,6 +1979,9 @@ let test_update_policies () =
alba_client # mgr_access # create_preset preset_name preset >>= fun () ->
alba_client # create_namespace ~namespace ~preset_name:(Some preset_name) ()
>>= fun namespace_id ->
alba_client # mgr_access # get_namespace_by_id ~namespace_id
>>= fun (_, namespace, namespace_info) ->

_wait_for_osds alba_client namespace_id >>= fun () ->

let assert_k_m mf k m =
Expand Down Expand Up @@ -2019,7 +2026,7 @@ let test_update_policies () =
(fun maintenance_client ->
maintenance_client # repair_by_policy_namespace'
~skip_recent:false
~namespace_id ())
~namespace_id ~namespace ~namespace_info ())
>>= fun () ->

alba_client # get_object_manifest
Expand Down
158 changes: 67 additions & 91 deletions ocaml/src/maintenance.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1134,12 +1134,21 @@ class client ?(retry_timeout = 60.)
end
end

method repair_by_policy_namespace ~namespace_id =
method repair_by_policy_namespace
~namespace_id
~(namespace:string)
~namespace_info
=
if filter namespace_id
then self # repair_by_policy_namespace' ~namespace_id ()
else Lwt.return ()
then self # repair_by_policy_namespace'
~namespace_id ~namespace ~namespace_info ()
else Lwt.return_unit

method repair_by_policy_namespace' ?(skip_recent=true) ~namespace_id () =
method repair_by_policy_namespace' ?(skip_recent=true)
~namespace_id
~namespace
~namespace_info
() =

alba_client # get_ns_preset_info ~namespace_id >>= fun preset ->
let policies = preset.Preset.policies in
Expand Down Expand Up @@ -1167,88 +1176,36 @@ class client ?(retry_timeout = 60.)
nsm_host_access # get_nsm_by_id ~namespace_id >>= fun nsm ->

nsm # get_stats >>= fun stats ->
let (_, bucket_count) = stats.Nsm_model.NamespaceStats.bucket_count in
let open Nsm_model in
let (_, bucket_count) = stats.NamespaceStats.bucket_count in

Lwt_log.debug_f
"Found buckets %s for namespace_id:%Li"
([%show : (Policy.policy * int64) list] bucket_count)
namespace_id >>= fun () ->

let is_cache_namespace =
let open Maintenance_config in
Hashtbl.fold
(fun prefix preset acc ->
acc ||
(let open Albamgr_protocol.Protocol in
preset = namespace_info.Namespace.preset_name
&& String.starts_with namespace prefix
)
)
maintenance_config.cache_eviction_prefix_preset_pairs
false
in
let buckets =
bucket_count
|> List.filter (fun (_, cnt) -> cnt > 0L)
|> List.map fst
|> List.map_filter_rev
(fun ((k, m, fragment_count, max_disks_per_node) as bucket) ->
if (k, m) = (best_k, best_m)
then
begin
let open Compare in
match Int.compare' fragment_count best_actual_fragment_count with
| LT -> Some (bucket, `Regenerate)
| EQ
| GT ->
if max_disks_per_node > best_actual_max_disks_per_node
then Some (bucket, `Rebalance)
else None
end
else
begin
let is_more_preferred_bucket =
let rec inner = function
| [] -> false
| ((k', m', fragment_count', max_disks_per_node') as policy) :: policies ->
if policy = best_policy
then false
else
begin
if
(* does the bucket match this policy? *)
k = k'
&& m = m'
&& fragment_count >= fragment_count'
&& max_disks_per_node <= max_disks_per_node'
then true
else inner policies
end
in
inner policies
in
if is_more_preferred_bucket
then
(* this will be handled by another part of the maintenance process
* (when osds are considered dead/unavailable for writes for
* a long enough time the objects in these buckets will be
* repaired/rewritten)
*)
None
else
Some (bucket, `Rewrite)
end)
|> List.sort
(fun (bucket1, j1) (bucket2, j2) ->
let compare_bucket_safety (k1, _, fragment_count1, _) (k2, _, fragment_count2, _) =
(fragment_count1 - k1) - (fragment_count2 - k2)
in
let get_max_disks_per_node (_, _, _, x) = x in
match j1, j2 with
| `Rebalance, `Rebalance ->
compare
(get_max_disks_per_node bucket1)
(get_max_disks_per_node bucket2)
| `Regenerate, `Rebalance
| `Rewrite, `Rebalance ->
1
| `Rebalance, `Regenerate
| `Rebalance, `Rewrite ->
-1
| `Rewrite, `Rewrite
| `Regenerate, `Rewrite
| `Rewrite, `Regenerate
| `Regenerate, `Regenerate ->
compare_bucket_safety bucket1 bucket2)
Maintenance_helper.categorize_policies
best_policy
best_actual_fragment_count
best_actual_max_disks_per_node
policies
is_cache_namespace
bucket_count
in

let repaired_some = ref false in

let handle_bucket ((k, m, fragment_count, max_disks_per_node), job) =
Expand Down Expand Up @@ -1397,13 +1354,32 @@ class client ?(retry_timeout = 60.)
repaired_some := true;
Lwt.return_unit))
in
let f = match job with
| `Rewrite ->
let maybe_remove mf =
Lwt_extra2.ignore_errors
(fun () ->
let open Nsm_model in
let should_delete = not (Manifest.has_data_fragments k mf) in
if should_delete
then
alba_client # delete_object
~namespace_id
~object_name:mf.Manifest.name
~may_not_exist:true
else
Lwt.return_unit
)
in
let f =
let open Maintenance_helper in
match job with
| Rewrite ->
rewrite
| `Regenerate ->
| Regenerate ->
wrap_rewrite "Regenerate" regenerate
| `Rebalance ->
| Rebalance ->
wrap_rewrite "Rebalance" rebalance
| ConsiderRemoval -> maybe_remove

in

(* filter out recent object uploads to avoid repairing objects
Expand All @@ -1430,7 +1406,8 @@ class client ?(retry_timeout = 60.)
loop_buckets buckets >>= fun () ->

if !repaired_some && filter namespace_id
then self # repair_by_policy_namespace ~namespace_id
then self # repair_by_policy_namespace
~namespace_id ~namespace ~namespace_info
else Lwt.return ()


Expand Down Expand Up @@ -1902,8 +1879,8 @@ class client ?(retry_timeout = 60.)
retry_timeout

method maintenance_for_namespace
~namespace
~namespace_id
~namespace
~namespace_info
=
Lwt_extra2.ignore_errors
Expand Down Expand Up @@ -1974,7 +1951,11 @@ class client ?(retry_timeout = 60.)
~grace_period:gc_grace_period
~namespace_id);
"repair by policy",
(fun () -> self # repair_by_policy_namespace ~namespace_id);
(fun () -> self # repair_by_policy_namespace
~namespace_id
~namespace
~namespace_info
);
"rebalance",
(fun () -> self # rebalance_namespace
~make_first_last_reverse
Expand Down Expand Up @@ -2025,12 +2006,7 @@ class client ?(retry_timeout = 60.)
osd_access

method cache_eviction () : unit Lwt.t =
let get_prefixes () =
Hashtbl.fold
(fun prefix _ acc -> prefix :: acc)
maintenance_config.Maintenance_config.cache_eviction_prefix_preset_pairs
[]
in
let get_prefixes () = Maintenance_config.get_prefixes maintenance_config in

let percentage_from_fill_ratio fill_ratio =
if fill_ratio > 0.93
Expand Down
6 changes: 6 additions & 0 deletions ocaml/src/maintenance_config.ml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ type t = {

let show t = to_yojson t |> Yojson.Safe.pretty_to_string

let get_prefixes t =
Hashtbl.fold
(fun prefix _ acc -> prefix :: acc)
t.cache_eviction_prefix_preset_pairs
[]

let from_buffer buf =
let ser_version = Llio.int8_from buf in
assert (ser_version = 1);
Expand Down
Loading

0 comments on commit fec90d9

Please sign in to comment.