From 231f0bdea478c541f9738e06b10e23d2819d156b Mon Sep 17 00:00:00 2001 From: Jan Doms Date: Tue, 22 Sep 2015 16:31:44 +0200 Subject: [PATCH 01/12] verify namespace (WIP) --- ocaml/src/albamgr_plugin.ml | 18 ++++++---- ocaml/src/albamgr_protocol.ml | 67 +++++++++++++++++++++++++++++++---- ocaml/src/maintenance.ml | 19 +++++++--- ocaml/src/maintenance_test.ml | 7 ++-- 4 files changed, 90 insertions(+), 21 deletions(-) diff --git a/ocaml/src/albamgr_plugin.ml b/ocaml/src/albamgr_plugin.ml index e19a7b61..e3c3f3f1 100644 --- a/ocaml/src/albamgr_plugin.ml +++ b/ocaml/src/albamgr_plugin.ml @@ -1333,17 +1333,23 @@ let albamgr_user_hook : HookRegistry.h = fun (ic, oc, _cid) db backend -> Llio.raw_string_to Llio.int_to) (name, i) in + let progress = + let open Progress in + match action with + | Rewrite -> + (Rewrite { count = 0L; next = Some start; }) + | Verify _ -> + (Verify ({ count = 0L; next = Some start; }, + { fragments_detected_missing = 0L; + fragments_osd_unavailable = 0L; + fragments_checksum_mismatch = 0L; })) in Work.IterNamespaceLeaf (action, namespace_id, name, range), - match action with - | Rewrite -> - Update.Set (Keys.progress name, - serialize - Progress.to_buffer - (Progress.Rewrite (0L, Some start))) + Update.Set (Keys.progress name, + serialize Progress.to_buffer progress) ) (Int.range 0 cnt) in diff --git a/ocaml/src/albamgr_protocol.ml b/ocaml/src/albamgr_protocol.ml index 9456283b..ba522137 100644 --- a/ocaml/src/albamgr_protocol.ml +++ b/ocaml/src/albamgr_protocol.ml @@ -554,14 +554,27 @@ module Protocol = struct module Work = struct type id = Int32.t + type verify_params = { + checksum : bool; + repair_osd_unavailable : bool; + } [@@deriving show] type action = | Rewrite + | Verify of verify_params [@@deriving show] let action_to_buffer buf = function | Rewrite -> Llio.int8_to buf 1 + | Verify { checksum; repair_osd_unavailable; } -> + Llio.int8_to buf 2; + Llio.bool_to buf checksum; + Llio.bool_to buf repair_osd_unavailable let action_from_buffer buf = match Llio.int8_from buf with | 1 -> Rewrite + | 2 -> + let checksum = Llio.bool_from buf in + let repair_osd_unavailable = Llio.bool_from buf in + Verify { checksum; repair_osd_unavailable; } | k -> raise_bad_tag "Work.action" k type range = string * string option [@@deriving show] @@ -707,22 +720,62 @@ module Protocol = struct end module Progress = struct + type base = { + count : int64; + next : string option; + } [@@deriving show] + let base_to_buffer buf { count; next; } = + Llio.int64_to buf count; + Llio.string_option_to buf next + let base_from_buffer buf = + let count = Llio.int64_from buf in + let next = Llio.string_option_from buf in + { count; next } + + type verify_params = { + fragments_detected_missing : int64; + fragments_osd_unavailable : int64; + fragments_checksum_mismatch : int64; + } [@@deriving show] + let verify_params_to_buffer + buf + { fragments_detected_missing; + fragments_osd_unavailable; + fragments_checksum_mismatch; } = + Llio.int64_to buf fragments_detected_missing; + Llio.int64_to buf fragments_osd_unavailable; + Llio.int64_to buf fragments_checksum_mismatch + let verify_params_from_buffer buf = + let fragments_detected_missing = Llio.int64_from buf in + let fragments_osd_unavailable = Llio.int64_from buf in + let fragments_checksum_mismatch = Llio.int64_from buf in + { fragments_detected_missing; + fragments_osd_unavailable; + fragments_checksum_mismatch } + type t = - | Rewrite of int64 * string option + | Rewrite of base + | Verify of base * verify_params [@@deriving show] let to_buffer buf = function - | Rewrite (count, next) -> + | Rewrite b -> Llio.int8_to buf 1; - Llio.int64_to buf count; - Llio.string_option_to buf next + base_to_buffer buf b + | Verify (b, v) -> + Llio.int8_to buf 2; + base_to_buffer buf b; + verify_params_to_buffer buf v let from_buffer buf = match Llio.int8_from buf with | 1 -> - let count = Llio.int64_from buf in - let next = Llio.string_option_from buf in - Rewrite (count, next) + let b = base_from_buffer buf in + Rewrite b + | 2 -> + let b = base_from_buffer buf in + let v = verify_params_from_buffer buf in + Verify (b, v) | k -> raise_bad_tag "Progress" k module Update = struct diff --git a/ocaml/src/maintenance.ml b/ocaml/src/maintenance.ml index 94518164..7f454f10 100644 --- a/ocaml/src/maintenance.ml +++ b/ocaml/src/maintenance.ml @@ -1215,7 +1215,7 @@ class client ?(retry_timeout = 60.) | Some p -> let open Albamgr_protocol.Protocol in match p, action with - | Progress.Rewrite (count, next), Work.Rewrite -> + | Progress.Rewrite { Progress.count; next; }, Work.Rewrite -> let fetch ~first ~last = alba_client # with_nsm_client' ~namespace_id @@ -1260,9 +1260,9 @@ class client ?(retry_timeout = 60.) (List.last objs) else last in - let po = (Some (Progress.Rewrite - (Int64.(add count (of_int cnt)), - next))) + let po = Some (Progress.Rewrite + { Progress.count = Int64.(add count (of_int cnt)); + next; }) in alba_client # mgr_access # update_progress @@ -1273,6 +1273,15 @@ class client ?(retry_timeout = 60.) else if has_more then inner po else Lwt.return () + | Progress.Verify ({ Progress.count; next; }, + { Progress.fragments_detected_missing; + fragments_osd_unavailable; + fragments_checksum_mismatch }), + Work.Verify _ -> + Lwt.fail_with "TODO" + | _, Work.Rewrite + | _, Work.Verify _ -> + Lwt.fail_with "badly set up IterNamespace task!" in alba_client # mgr_access # get_progress name >>= fun po -> inner po @@ -1324,7 +1333,7 @@ class client ?(retry_timeout = 60.) Lwt.return () | `Retry -> Lwt_unix.sleep backoff >>= fun () -> - inner (backoff *. 1.5) + inner (min (backoff *. 1.5) 120.) in Lwt.finalize (fun () -> inner 1.0) diff --git a/ocaml/src/maintenance_test.ml b/ocaml/src/maintenance_test.ml index db492c0a..27e18b14 100644 --- a/ocaml/src/maintenance_test.ml +++ b/ocaml/src/maintenance_test.ml @@ -443,9 +443,10 @@ let test_rewrite_namespace () = (fun acc (i, p) -> let end_key = get_start_key (i+1) cnt in match p with - | Progress.Rewrite (c, end_key') -> - assert (end_key = end_key'); - acc + (Int64.to_int c)) + | Progress.Rewrite { Progress.count; next; } -> + assert (end_key = next); + acc + (Int64.to_int count) + | _ -> assert false) 0 progresses in From 5172a733ca50fca4b8c39456c5b1cc56f50a2895 Mon Sep 17 00:00:00 2001 From: Jan Doms Date: Wed, 23 Sep 2015 11:24:02 +0200 Subject: [PATCH 02/12] add failing test_verify_namespace + some refactoring in maintenance.ml IterNamespaceLeaf --- ocaml/src/maintenance.ml | 104 ++++++++++++++++++++++------------ ocaml/src/maintenance_test.ml | 70 +++++++++++++++++++++++ 2 files changed, 137 insertions(+), 37 deletions(-) diff --git a/ocaml/src/maintenance.ml b/ocaml/src/maintenance.ml index 7f454f10..99f9d719 100644 --- a/ocaml/src/maintenance.ml +++ b/ocaml/src/maintenance.ml @@ -1214,18 +1214,16 @@ class client ?(retry_timeout = 60.) | None -> Lwt.return () | Some p -> let open Albamgr_protocol.Protocol in - match p, action with - | Progress.Rewrite { Progress.count; next; }, Work.Rewrite -> - let fetch ~first ~last = - alba_client # with_nsm_client' - ~namespace_id - (fun client -> - client # list_objects_by_id - ~first ~finc:true - ~last - ~max:100 ~reverse:false) - in - + let fetch ~first ~last = + alba_client # with_nsm_client' + ~namespace_id + (fun client -> + client # list_objects_by_id + ~first ~finc:true + ~last + ~max:100 ~reverse:false) + in + let get_next_batch { Progress.count; next; } = (match next, last with | None, _ -> Lwt.return ((0,[]), false) | Some next, None -> fetch ~first:next ~last:None @@ -1233,8 +1231,34 @@ class client ?(retry_timeout = 60.) if next >= last then Lwt.return ((0,[]), false) else fetch ~first:next ~last:(Some (last, false))) + in + let get_update_progress_base { Progress.count; _; } ((cnt, objs), has_more) = + let next = + if has_more + then Option.map + (fun mf -> mf.Nsm_model.Manifest.object_id ^ "\000") + (List.last objs) + else last + in + { Progress.count = Int64.(add count (of_int cnt)); + next; } + in + let update_progress_and_maybe_continue new_p has_more = + let po = Some new_p in + alba_client # mgr_access # update_progress + name p po >>= fun () -> - >>= fun ((cnt, objs), has_more) -> + if not (filter work_id) + then Lwt.fail NotMyTask + else if has_more + then inner po + else Lwt.return () + in + + match p, action with + | Progress.Rewrite pb, Work.Rewrite -> + + get_next_batch pb >>= fun (((cnt, objs), has_more) as batch) -> Lwt_list.iter_s (fun manifest -> @@ -1253,32 +1277,38 @@ class client ?(retry_timeout = 60.) | exn -> Lwt.fail exn)) objs >>= fun () -> - let next = - if has_more - then Option.map - (fun mf -> mf.Nsm_model.Manifest.object_id ^ "\000") - (List.last objs) - else last - in - let po = Some (Progress.Rewrite - { Progress.count = Int64.(add count (of_int cnt)); - next; }) - in + let new_p = Progress.Rewrite (get_update_progress_base pb batch) in - alba_client # mgr_access # update_progress - name p po >>= fun () -> - - if not (filter work_id) - then Lwt.fail NotMyTask - else if has_more - then inner po - else Lwt.return () - | Progress.Verify ({ Progress.count; next; }, - { Progress.fragments_detected_missing; - fragments_osd_unavailable; - fragments_checksum_mismatch }), + update_progress_and_maybe_continue + new_p + has_more + | Progress.Verify (pb, + ({ Progress.fragments_detected_missing; + fragments_osd_unavailable; + fragments_checksum_mismatch } as progress_verify)), Work.Verify _ -> - Lwt.fail_with "TODO" + get_next_batch pb >>= fun (((cnt, objs), has_more) as batch) -> + + Lwt_list.iter_s + (fun manifest -> + (* TODO + * - get all fragments, check result... + * maybe change return type van download fragment... + * - do inline repair where needed + *) + Lwt.return ()) + objs >>= fun () -> + + let new_p = Progress.Verify + (get_update_progress_base pb batch, + (* TODO update this based the result + * of iterating (mapping) over all + * manifests *) + progress_verify) in + + update_progress_and_maybe_continue + new_p + has_more | _, Work.Rewrite | _, Work.Verify _ -> Lwt.fail_with "badly set up IterNamespace task!" diff --git a/ocaml/src/maintenance_test.ml b/ocaml/src/maintenance_test.ml index 27e18b14..a4762631 100644 --- a/ocaml/src/maintenance_test.ml +++ b/ocaml/src/maintenance_test.ml @@ -461,6 +461,75 @@ let test_rewrite_namespace () = Lwt.return ()) +let test_verify_namespace () = + let test_name = "test_verify_namespace" in + let namespace = test_name in + test_with_alba_client + (fun alba_client -> + alba_client # create_namespace + ~preset_name:None + ~namespace () >>= fun namespace_id -> + + let open Nsm_model in + + let object_name = "abc" in + alba_client # get_base_client # upload_object_from_string + ~namespace + ~object_name + ~object_data:"efg" + ~checksum_o:None + ~allow_overwrite:NoPrevious >>= fun (mf, _) -> + + let object_id = mf.Manifest.object_id in + + (* remove a fragment *) + let locations = List.hd_exn (mf.Manifest.fragment_locations) in + let victim_osd_o, version0 = List.hd_exn locations in + let victim_osd = Option.get_some victim_osd_o in + Alba_test.delete_fragment + alba_client namespace_id object_id + (victim_osd, 0) + 0 0 + >>= fun () -> + + let open Albamgr_protocol.Protocol in + let name = "name" in + let cnt = 10 in + alba_client # mgr_access # add_work_items + [ Work.(IterNamespace + (Verify + { checksum = true; + repair_osd_unavailable = true; }, + namespace_id, + name, + cnt)) ] >>= fun () -> + + Alba_test.wait_for_work alba_client >>= fun () -> + Alba_test.wait_for_work alba_client >>= fun () -> + + alba_client # with_nsm_client' + ~namespace_id + (fun client -> + client # get_object_manifest_by_name object_name) + >>= fun mfo -> + let mf = Option.get_some mfo in + assert (mf.Manifest.version_id = 1); + assert (mf.Manifest.fragment_locations + |> List.hd_exn + |> List.hd_exn + |> snd + = 1); + + (* TODO test + - corrupted fragment + - fragment that can't be downloaded (because asd got corrupted) + - fragment on asd that is offline + + assert all those fragments are now ok + *) + + Lwt.return ()) + open OUnit @@ -471,4 +540,5 @@ let suite = "maintenance_test" >:::[ "test_repair_orange" >:: test_repair_orange; "test_repair_orange2" >:: test_repair_orange2; "test_rewrite_namespace" >:: test_rewrite_namespace; + "test_verify_namespace" >:: test_verify_namespace; ] From 96613239d06ff7393f9102e34cfe4a3d76937253 Mon Sep 17 00:00:00 2001 From: Jan Doms Date: Wed, 23 Sep 2015 16:13:22 +0200 Subject: [PATCH 03/12] extract download_fragment from alba_base_client --- ocaml/src/alba_base_client.ml | 141 +++-------------------------- ocaml/src/alba_client_download.ml | 145 ++++++++++++++++++++++++++++++ ocaml/src/fragment_cache.ml | 11 +-- ocaml/src/fragment_cache_test.ml | 6 +- 4 files changed, 166 insertions(+), 137 deletions(-) diff --git a/ocaml/src/alba_base_client.ml b/ocaml/src/alba_base_client.ml index 3b5f5f8e..731701fb 100644 --- a/ocaml/src/alba_base_client.ml +++ b/ocaml/src/alba_base_client.ml @@ -17,7 +17,6 @@ limitations under the License. open Prelude open Lwt open Checksum -open Slice open Lwt_bytes2 open Alba_statistics open Fragment_cache @@ -55,7 +54,6 @@ class client new osd_access mgr_access ~osd_connection_pool_size ~osd_timeout in let with_osd_from_pool ~osd_id f = osd_access # with_osd ~osd_id f in - let get_osd_info = osd_access # get_osd_info in let get_namespace_osds_info_cache ~namespace_id = nsm_host_access # get_namespace_info ~namespace_id >>= fun (_, osds, _) -> @@ -226,6 +224,7 @@ class client ~checksum_o ~allow_overwrite + (* consumers of this method are responsible for freeing * the returned fragment bigstring *) @@ -239,132 +238,20 @@ class client ~fragment_checksum decompress ~encryption = + Alba_client_download.download_fragment + osd_access + ~osd_id_o + ~namespace_id + ~object_id ~object_name + ~chunk_id ~fragment_id + ~replication + ~version_id + ~fragment_checksum + decompress + ~encryption + fragment_cache + (bad_fragment_callback self) - (match osd_id_o with - | None -> Lwt.fail_with "can't download fragment from None osd" - | Some osd_id -> Lwt.return osd_id) - >>= fun osd_id -> - - let t0_fragment = Unix.gettimeofday () in - - let key_string = - Osd_keys.AlbaInstance.fragment - ~namespace_id - ~object_id ~version_id - ~chunk_id ~fragment_id - in - let key = Slice.wrap_string key_string in - - let retrieve key = - fragment_cache # lookup - namespace_id key_string - read_it - >>= function - | None -> - begin - Lwt_log.debug_f "fragment not in cache, trying osd:%li" osd_id - >>= fun () -> - - Lwt.catch - (fun () -> - with_osd_from_pool - ~osd_id - (fun device_client -> - device_client # get_option key)) - (let open Asd_protocol.Protocol in - function - | (Error.Exn err) as exn -> begin match err with - | Error.Unknown_error _ - | Error.ProtocolVersionMismatch _ -> - bad_fragment_callback - self - ~namespace_id ~object_id ~object_name - ~chunk_id ~fragment_id ~version_id - | Error.Full (* a bit silly as this is not an update *) - | Error.Assert_failed _ - | Error.Unknown_operation -> - () - end; - Lwt.fail exn - | exn -> Lwt.fail exn) - >>= function - | None -> - let msg = - Printf.sprintf - "Detected missing fragment namespace_id=%li object_id=%S osd_id=%li (chunk,fragment,version)=(%i,%i,%i)" - namespace_id object_id osd_id - chunk_id fragment_id version_id - in - Lwt_log.warning msg >>= fun () -> - bad_fragment_callback - self - ~namespace_id ~object_id ~object_name - ~chunk_id ~fragment_id ~version_id; - (* TODO loopke die queue harvest en nr albamgr duwt *) - (* TODO testje *) - Lwt.fail_with msg - | Some (data:Slice.t) -> - get_osd_info ~osd_id >>= fun (_, state) -> - state.read <- Unix.gettimeofday () :: state.read; - Lwt.ignore_result - (fragment_cache # add - namespace_id key_string - (Slice.get_string_unsafe data) - ); - let hit_or_mis = false in - Lwt.return (hit_or_mis, data) - end - | Some data -> - let hit_or_mis = true in - Lwt.return (hit_or_mis, Slice.wrap_string data) - in - Statistics.with_timing_lwt (fun () -> retrieve key) - - >>= fun (t_retrieve, (hit_or_miss, fragment_data)) -> - - let fragment_data' = Slice.to_bigstring fragment_data in - - Statistics.with_timing_lwt - (fun () -> - Fragment_helper.verify fragment_data' fragment_checksum) - >>= fun (t_verify, checksum_valid) -> - - (if checksum_valid - then Lwt.return () - else - begin - Lwt_bytes.unsafe_destroy fragment_data'; - bad_fragment_callback - self - ~namespace_id ~object_id ~object_name - ~chunk_id ~fragment_id ~version_id; - Lwt.fail_with "Checksum mismatch" - end) >>= fun () -> - - Statistics.with_timing_lwt - (fun () -> - Fragment_helper.maybe_decrypt - encryption - ~object_id ~chunk_id ~fragment_id - ~ignore_fragment_id:replication - fragment_data') - >>= fun (t_decrypt, maybe_decrypted) -> - - Statistics.with_timing_lwt - (fun () -> decompress ~release_input:true maybe_decrypted) - >>= fun (t_decompress, (maybe_decompressed : Lwt_bytes.t)) -> - - let t_fragment = Statistics.({ - osd_id; - retrieve = t_retrieve; - hit_or_miss; - verify = t_verify; - decrypt = t_decrypt; - decompress = t_decompress; - total = Unix.gettimeofday () -. t0_fragment; - }) in - - Lwt.return (t_fragment, maybe_decompressed) (* consumers of this method are responsible for freeing * the returned fragment bigstrings diff --git a/ocaml/src/alba_client_download.ml b/ocaml/src/alba_client_download.ml index a715d958..3afc1423 100644 --- a/ocaml/src/alba_client_download.ml +++ b/ocaml/src/alba_client_download.ml @@ -14,6 +14,10 @@ See the License for the specific language governing permissions and limitations under the License. *) +open Lwt_bytes2 +open Slice +open Alba_statistics +open Osd_access open Lwt.Infix let get_object_manifest' @@ -34,3 +38,144 @@ let get_object_manifest' namespace_id object_name lookup_on_nsm_host ~consistent_read ~should_cache + + +(* consumers of this method are responsible for freeing + * the returned fragment bigstring + *) +let download_fragment + (osd_access : osd_access) + (* TODO Nsm_model.location instead? *) + ~osd_id_o ~version_id + ~namespace_id + ~object_id ~object_name + ~chunk_id ~fragment_id + ~replication + ~fragment_checksum + decompress + ~encryption + fragment_cache + bad_fragment_callback + = + (match osd_id_o with + | None -> Lwt.fail_with "can't download fragment from None osd" + | Some osd_id -> Lwt.return osd_id) + >>= fun osd_id -> + + let t0_fragment = Unix.gettimeofday () in + + let key_string = + Osd_keys.AlbaInstance.fragment + ~namespace_id + ~object_id ~version_id + ~chunk_id ~fragment_id + in + let key = Slice.wrap_string key_string in + + let retrieve key = + fragment_cache # lookup + namespace_id key_string + >>= function + | None -> + begin + Lwt_log.debug_f "fragment not in cache, trying osd:%li" osd_id + >>= fun () -> + + Lwt.catch + (fun () -> + osd_access # with_osd + ~osd_id + (fun device_client -> + device_client # get_option key)) + (let open Asd_protocol.Protocol in + function + | (Error.Exn err) as exn -> + begin match err with + | Error.Unknown_error _ + | Error.ProtocolVersionMismatch _ -> + bad_fragment_callback + ~namespace_id ~object_id ~object_name + ~chunk_id ~fragment_id ~version_id + | Error.Full (* a bit silly as this is not an update *) + | Error.Assert_failed _ + | Error.Unknown_operation -> + () + end; + Lwt.fail exn + | exn -> Lwt.fail exn) + >>= function + | None -> + let msg = + Printf.sprintf + "Detected missing fragment namespace_id=%li object_id=%S osd_id=%li (chunk,fragment,version)=(%i,%i,%i)" + namespace_id object_id osd_id + chunk_id fragment_id version_id + in + Lwt_log.warning msg >>= fun () -> + bad_fragment_callback + ~namespace_id ~object_id ~object_name + ~chunk_id ~fragment_id ~version_id; + (* TODO loopke die queue harvest en nr albamgr duwt *) + (* TODO testje *) + Lwt.fail_with msg + | Some (data:Slice.t) -> + osd_access # get_osd_info ~osd_id >>= fun (_, state) -> + state.read <- Unix.gettimeofday () :: state.read; + Lwt.ignore_result + (fragment_cache # add + namespace_id key_string + (Slice.get_string_unsafe data) + ); + let hit_or_mis = false in + Lwt.return (hit_or_mis, data) + end + | Some data -> + let hit_or_mis = true in + Lwt.return (hit_or_mis, Slice.wrap_string data) + in + Statistics.with_timing_lwt (fun () -> retrieve key) + + >>= fun (t_retrieve, (hit_or_miss, fragment_data)) -> + + let fragment_data' = Slice.to_bigstring fragment_data in + + Statistics.with_timing_lwt + (fun () -> + Fragment_helper.verify fragment_data' fragment_checksum) + >>= fun (t_verify, checksum_valid) -> + + (if checksum_valid + then Lwt.return () + else + begin + Lwt_bytes.unsafe_destroy fragment_data'; + bad_fragment_callback + ~namespace_id ~object_id ~object_name + ~chunk_id ~fragment_id ~version_id; + Lwt.fail_with "Checksum mismatch" + end) >>= fun () -> + + Statistics.with_timing_lwt + (fun () -> + Fragment_helper.maybe_decrypt + encryption + ~object_id ~chunk_id ~fragment_id + ~ignore_fragment_id:replication + fragment_data') + >>= fun (t_decrypt, maybe_decrypted) -> + + Statistics.with_timing_lwt + (fun () -> decompress ~release_input:true maybe_decrypted) + >>= fun (t_decompress, (maybe_decompressed : Lwt_bytes.t)) -> + + let t_fragment = Statistics.({ + osd_id; + retrieve = t_retrieve; + hit_or_miss; + verify = t_verify; + decrypt = t_decrypt; + decompress = t_decompress; + total = Unix.gettimeofday () -. t0_fragment; + }) in + + Lwt.return (t_fragment, maybe_decompressed) diff --git a/ocaml/src/fragment_cache.ml b/ocaml/src/fragment_cache.ml index c2b09270..bc35af77 100644 --- a/ocaml/src/fragment_cache.ml +++ b/ocaml/src/fragment_cache.ml @@ -23,10 +23,7 @@ module KV = Rocks_key_value_store class type cache = object method clear_all : unit -> unit Lwt.t method add : int32 -> string -> Bytes.t -> unit Lwt.t - method lookup : - int32 -> string - -> (Lwt_unix.file_descr -> len:int -> 'a Lwt.t) - -> 'a option Lwt.t + method lookup : int32 -> string -> bytes option Lwt.t method drop : int32 -> unit Lwt.t method get_count : unit -> int64 @@ -37,7 +34,7 @@ end class no_cache = object(self :# cache) method clear_all () = Lwt.return () method add bid oid blob = Lwt.return () - method lookup bid oid f = Lwt.return None + method lookup bid oid = Lwt.return None method drop bid = Lwt.return () method get_count () = 0L method get_total_size () = 0L @@ -319,7 +316,7 @@ class blob_cache root ~(max_size:int64) ~rocksdb_max_open_files = Lwt.return ()) ) - method lookup bid oid f = + method lookup bid oid = Hashtbl.remove _dropping bid; @@ -383,7 +380,7 @@ class blob_cache root ~(max_size:int64) ~rocksdb_max_open_files = (fun () -> Lwt_mutex.with_lock _mutex - (fun () -> _lookup bid oid f ) + (fun () -> _lookup bid oid read_it) ) (fun exn -> Lwt_log.warning ~exn "the cache exploded. returning None" >>= fun () -> diff --git a/ocaml/src/fragment_cache_test.ml b/ocaml/src/fragment_cache_test.ml index 0685bf8f..275036ea 100644 --- a/ocaml/src/fragment_cache_test.ml +++ b/ocaml/src/fragment_cache_test.ml @@ -64,7 +64,7 @@ let test_1 () = Bytes.set blob 0 'z'; (* _replace_grow *) cache # add bid oid blob >>= fun () -> (* lookup *) - cache # lookup bid oid read_it >>= + cache # lookup bid oid >>= begin function | None -> @@ -116,7 +116,7 @@ let test_3 () = | [] -> Lwt.return () | (k, is_some)::rest -> begin - cache # lookup 0l k read_it >>= fun r -> + cache # lookup 0l k >>= fun r -> match r, is_some with | None , false -> Lwt.return () | Some _, true -> Lwt.return () @@ -250,7 +250,7 @@ let test_long () = begin let bid = Random.int32 10l in let oid = Random.int 100 |> Printf.sprintf "%04x" in - cache # lookup bid oid read_it + cache # lookup bid oid >>= function | None -> loop (found ) (missed + 1) (i-1) | Some _ -> loop (found + 1) missed (i-1) From 0fcf7506aeedda49a4eae29d38bcc2a48425b2ab Mon Sep 17 00:00:00 2001 From: Jan Doms Date: Wed, 23 Sep 2015 17:04:39 +0200 Subject: [PATCH 04/12] move with_timing(_lwt) to prelude --- ocaml/src/alba_base_client.ml | 4 ++-- ocaml/src/alba_client.ml | 7 ++++--- ocaml/src/alba_client_download.ml | 9 +++++---- ocaml/src/alba_client_upload.ml | 8 ++++---- ocaml/src/alba_statistics.ml | 13 ------------- ocaml/src/albamgr_plugin.ml | 4 ++-- ocaml/src/asd_client.ml | 4 ++-- ocaml/src/asd_server.ml | 7 +++---- ocaml/src/asd_statistics.ml | 2 -- ocaml/src/fragment_cache.ml | 4 ++-- ocaml/src/fragment_helper.ml | 1 - ocaml/src/nsm_host_plugin.ml | 2 +- ocaml/src/osd_access.ml | 3 +-- ocaml/src/proxy_server.ml | 3 +-- ocaml/src/tools/prelude.ml | 13 +++++++++++++ ocaml/src/tools/statistics_collection.ml | 2 -- 16 files changed, 40 insertions(+), 46 deletions(-) diff --git a/ocaml/src/alba_base_client.ml b/ocaml/src/alba_base_client.ml index 731701fb..ba71df1d 100644 --- a/ocaml/src/alba_base_client.ml +++ b/ocaml/src/alba_base_client.ml @@ -804,11 +804,11 @@ class client then fragment_size else (object_size - offset) in - Statistics.with_timing_lwt + with_timing_lwt (fun () -> hash2 # update_lwt_bytes_detached fragment 0 fragment_size') >>= fun (t_verify', ()) -> - Statistics.with_timing_lwt + with_timing_lwt (fun () -> write_object_data fragment 0 fragment_size') >>= fun (t_write_data', ()) -> Lwt.return (offset + fragment_size', diff --git a/ocaml/src/alba_client.ml b/ocaml/src/alba_client.ml index b25b8e33..22451070 100644 --- a/ocaml/src/alba_client.ml +++ b/ocaml/src/alba_client.ml @@ -14,12 +14,13 @@ See the License for the specific language governing permissions and limitations under the License. *) -open Lwt.Infix +open Prelude open Fragment_cache open Albamgr_access open Alba_base_client open Alba_statistics open Alba_client_errors +open Lwt.Infix class alba_client (base_client : Alba_base_client.client) = @@ -276,7 +277,7 @@ class alba_client (base_client : Alba_base_client.client) = let t0_object = Unix.gettimeofday () in - Statistics.with_timing_lwt + with_timing_lwt (fun () -> self # get_object_manifest' ~namespace_id ~object_name ~consistent_read ~should_cache) @@ -308,7 +309,7 @@ class alba_client (base_client : Alba_base_client.client) | Error.Exn Error.NotEnoughFragments as exn -> (* Download probably failed because of stale manifest *) Lwt_log.info_f ~exn "retrying " >>= fun () -> - Statistics.with_timing_lwt + with_timing_lwt (fun () -> Manifest_cache.ManifestCache.remove (base_client # get_manifest_cache) diff --git a/ocaml/src/alba_client_download.ml b/ocaml/src/alba_client_download.ml index 3afc1423..8149ac75 100644 --- a/ocaml/src/alba_client_download.ml +++ b/ocaml/src/alba_client_download.ml @@ -14,6 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. *) +open Prelude open Lwt_bytes2 open Slice open Alba_statistics @@ -133,13 +134,13 @@ let download_fragment let hit_or_mis = true in Lwt.return (hit_or_mis, Slice.wrap_string data) in - Statistics.with_timing_lwt (fun () -> retrieve key) + with_timing_lwt (fun () -> retrieve key) >>= fun (t_retrieve, (hit_or_miss, fragment_data)) -> let fragment_data' = Slice.to_bigstring fragment_data in - Statistics.with_timing_lwt + with_timing_lwt (fun () -> Fragment_helper.verify fragment_data' fragment_checksum) >>= fun (t_verify, checksum_valid) -> @@ -155,7 +156,7 @@ let download_fragment Lwt.fail_with "Checksum mismatch" end) >>= fun () -> - Statistics.with_timing_lwt + with_timing_lwt (fun () -> Fragment_helper.maybe_decrypt encryption @@ -164,7 +165,7 @@ let download_fragment fragment_data') >>= fun (t_decrypt, maybe_decrypted) -> - Statistics.with_timing_lwt + with_timing_lwt (fun () -> decompress ~release_input:true maybe_decrypted) >>= fun (t_decompress, (maybe_decompressed : Lwt_bytes.t)) -> diff --git a/ocaml/src/alba_client_upload.ml b/ocaml/src/alba_client_upload.ml index 544adfc6..9c9f585e 100644 --- a/ocaml/src/alba_client_upload.ml +++ b/ocaml/src/alba_client_upload.ml @@ -149,7 +149,7 @@ let upload_chunk t_hash, checksum)), osd_id_o) -> - Statistics.with_timing_lwt + with_timing_lwt (fun () -> match osd_id_o with | None -> Lwt.return () @@ -315,14 +315,14 @@ let upload_object'' let t0_chunk = Unix.gettimeofday () in let chunk_size' = min desired_chunk_size (object_length - total_size) in Lwt_log.debug_f "chunk_size' = %i" chunk_size' >>= fun () -> - Statistics.with_timing_lwt + with_timing_lwt (fun () -> object_reader # read chunk_size' chunk) >>= fun (read_data_time, ()) -> let total_size' = total_size + chunk_size' in let has_more = total_size' < object_length in - Statistics.with_timing_lwt + with_timing_lwt (fun () -> object_hash # update_lwt_bytes_detached chunk 0 chunk_size') >>= fun (hash_time', ()) -> @@ -463,7 +463,7 @@ let upload_object'' ~manifest ~gc_epoch in - Statistics.with_timing_lwt + with_timing_lwt (fun () -> Lwt.catch store_manifest diff --git a/ocaml/src/alba_statistics.ml b/ocaml/src/alba_statistics.ml index c7e56d16..bca3d13b 100644 --- a/ocaml/src/alba_statistics.ml +++ b/ocaml/src/alba_statistics.ml @@ -75,19 +75,6 @@ module Statistics = struct total : duration; } [@@deriving show] - let with_timing f = - let t0 = Unix.gettimeofday () in - let res = f () in - let t1 = Unix.gettimeofday () in - t1 -. t0, res - - let with_timing_lwt f = - let t0 = Unix.gettimeofday () in - let open Lwt.Infix in - f () >>= fun res -> - let t1 = Unix.gettimeofday () in - Lwt.return (t1 -. t0, res) - let summed_fragment_hit_misses (object_download:object_download) = List.fold_left (fun (h,m) (chunk_download:chunk_download) -> diff --git a/ocaml/src/albamgr_plugin.ml b/ocaml/src/albamgr_plugin.ml index e3c3f3f1..a1ed4c48 100644 --- a/ocaml/src/albamgr_plugin.ml +++ b/ocaml/src/albamgr_plugin.ml @@ -1757,7 +1757,7 @@ let albamgr_user_hook : HookRegistry.h = fun (ic, oc, _cid) db backend -> Lwt.catch (fun () -> let (delta,(res,res_serializer)) = - Statistics.with_timing + with_timing (fun () -> let req = read_query_i r req_buf in let res = handle_query r req in @@ -1789,7 +1789,7 @@ let albamgr_user_hook : HookRegistry.h = fun (ic, oc, _cid) db backend -> | cnt -> Lwt.catch (fun () -> - Statistics.with_timing_lwt + with_timing_lwt (fun () -> handle_update r req >>= fun (res, upds) -> backend # push_update diff --git a/ocaml/src/asd_client.ml b/ocaml/src/asd_client.ml index 6a0a9a4e..36730bab 100644 --- a/ocaml/src/asd_client.ml +++ b/ocaml/src/asd_client.ml @@ -40,7 +40,7 @@ class client fd ic id = Lwt_log.debug_f "asd_client %s: %s" id descr >>= fun () -> - Alba_statistics.Statistics.with_timing_lwt + with_timing_lwt (fun () -> let s = serialize_with_length @@ -61,7 +61,7 @@ class client fd ic id = Lwt_log.debug_f "asd_client %s: %s" id descr >>= fun () -> - Alba_statistics.Statistics.with_timing_lwt + with_timing_lwt (fun () -> let s = serialize_with_length diff --git a/ocaml/src/asd_server.ml b/ocaml/src/asd_server.ml index b3351194..33431fe9 100644 --- a/ocaml/src/asd_server.ml +++ b/ocaml/src/asd_server.ml @@ -21,7 +21,6 @@ open Asd_protocol open Slice open Checksum open Asd_statistics -open Alba_statistics open Lwt_buffer let blob_threshold = 16 * 1024 @@ -193,7 +192,7 @@ module DirectoryInfo = struct let write_blob t fnr blob = Lwt_log.debug_f "writing blob %Li" fnr >>= fun () -> - Statistics.with_timing_lwt + with_timing_lwt (fun () -> let dir, _, file_path = get_file_dir_name_path t fnr in ensure_dir_exists t dir >>= fun () -> @@ -859,7 +858,7 @@ let asd_protocol let buf = Llio.make_buffer req_s 0 in let code = Llio.int32_from buf in let command = Protocol.code_to_command code in - Statistics.with_timing_lwt + with_timing_lwt (fun () -> handle_request buf command) >>= fun (delta, ()) -> Statistics_collection.Generic.new_delta stats code delta; @@ -1022,7 +1021,7 @@ let run_server then begin let waiters_len = List.length waiters in Lwt_log.debug_f "Starting syncfs for %i waiters" waiters_len >>= fun () -> - Statistics.with_timing_lwt + with_timing_lwt (fun () -> Syncfs.lwt_syncfs fs_fd) >>= fun (t_syncfs, rc) -> assert (rc = 0); let logger = diff --git a/ocaml/src/asd_statistics.ml b/ocaml/src/asd_statistics.ml index 9796d713..a3f7b481 100644 --- a/ocaml/src/asd_statistics.ml +++ b/ocaml/src/asd_statistics.ml @@ -20,8 +20,6 @@ module AsdStatistics = struct module G = Statistics_collection.Generic type t = G.t let section = Alba_statistics.Statistics.section - let with_timing = G.with_timing - let with_timing_lwt = G.with_timing_lwt let new_delta t code delta = G.new_delta t code delta diff --git a/ocaml/src/fragment_cache.ml b/ocaml/src/fragment_cache.ml index bc35af77..bde1290e 100644 --- a/ocaml/src/fragment_cache.ml +++ b/ocaml/src/fragment_cache.ml @@ -225,7 +225,7 @@ class blob_cache root ~(max_size:int64) ~rocksdb_max_open_files = Lwt_extra2.write_all fd_out fragment 0 fragment_l ) in - Alba_statistics.Statistics.with_timing_lwt _inner >>= fun (took,()) -> + with_timing_lwt _inner >>= fun (took,()) -> Lwt_log.debug_f "_write_blob path:%s took:%f" path took in @@ -992,7 +992,7 @@ class blob_cache root ~(max_size:int64) ~rocksdb_max_open_files = ) ) in - Alba_statistics.Statistics.with_timing_lwt + with_timing_lwt (fun () -> Lwt_log.debug_f ~section "add %lx %S" bid oid >>= fun () -> Lwt_mutex.with_lock _mutex _add diff --git a/ocaml/src/fragment_helper.ml b/ocaml/src/fragment_helper.ml index 3bc87db2..0a523b6e 100644 --- a/ocaml/src/fragment_helper.ml +++ b/ocaml/src/fragment_helper.ml @@ -145,7 +145,6 @@ let pack_fragment encryption checksum_algo = - let with_timing_lwt = Alba_statistics.Statistics.with_timing_lwt in with_timing_lwt (fun () -> maybe_compress compression fragment diff --git a/ocaml/src/nsm_host_plugin.ml b/ocaml/src/nsm_host_plugin.ml index a6e7937e..4963616b 100644 --- a/ocaml/src/nsm_host_plugin.ml +++ b/ocaml/src/nsm_host_plugin.ml @@ -407,7 +407,7 @@ let nsm_host_user_hook : HookRegistry.h = fun (ic, oc, _cid) db backend -> Llio.input_string ic >>= fun req_s -> let req_buf = Llio.make_buffer req_s 0 in let tag = Llio.int32_from req_buf in - Protocol.NSMHStatistics.with_timing_lwt + with_timing_lwt (fun () -> do_one tag req_buf) >>= fun (delta,r) -> Protocol.NSMHStatistics.new_delta statistics tag delta; diff --git a/ocaml/src/osd_access.ml b/ocaml/src/osd_access.ml index d93b69c5..39ed37e1 100644 --- a/ocaml/src/osd_access.ml +++ b/ocaml/src/osd_access.ml @@ -16,7 +16,6 @@ limitations under the License. open Prelude open Remotes -open Alba_statistics open Checksum open Slice open Lwt.Infix @@ -135,7 +134,7 @@ class osd_access | Some osd_info -> with_osd_from_pool ~osd_id (fun osd -> - Statistics.with_timing_lwt + with_timing_lwt (fun () -> osd # apply_sequence [] diff --git a/ocaml/src/proxy_server.ml b/ocaml/src/proxy_server.ml index 3f3eb81d..5086bec0 100644 --- a/ocaml/src/proxy_server.ml +++ b/ocaml/src/proxy_server.ml @@ -17,7 +17,6 @@ limitations under the License. open Lwt open Prelude open Proxy_protocol -open Alba_statistics let ini_hash_to_string tbl = let buf = Buffer.create 20 in @@ -357,7 +356,7 @@ let proxy_protocol (alba_client : Alba_client.alba_client) Llio.input_string ic >>= fun req_s -> let buf = Llio.make_buffer req_s 0 in let code = Llio.int_from buf in - Statistics.with_timing_lwt + with_timing_lwt (fun () -> handle_request buf code) >>= fun (time_inner, ()) -> (if time_inner > 0.5 diff --git a/ocaml/src/tools/prelude.ml b/ocaml/src/tools/prelude.ml index b5368c8b..1b06bfc4 100644 --- a/ocaml/src/tools/prelude.ml +++ b/ocaml/src/tools/prelude.ml @@ -650,3 +650,16 @@ module Lwt_list = struct | Not_found -> Lwt.return_none | exn -> Lwt.fail exn) end + +let with_timing f = + let t0 = Unix.gettimeofday () in + let res = f () in + let t1 = Unix.gettimeofday () in + t1 -. t0, res + +let with_timing_lwt f = + let t0 = Unix.gettimeofday () in + let open Lwt.Infix in + f () >>= fun res -> + let t1 = Unix.gettimeofday () in + Lwt.return (t1 -. t0, res) diff --git a/ocaml/src/tools/statistics_collection.ml b/ocaml/src/tools/statistics_collection.ml index 4d8512c3..b0833c54 100644 --- a/ocaml/src/tools/statistics_collection.ml +++ b/ocaml/src/tools/statistics_collection.ml @@ -1,6 +1,4 @@ module Generic = struct - let with_timing_lwt = Alba_statistics.Statistics.with_timing_lwt - let with_timing = Alba_statistics.Statistics.with_timing open Stat include Stat From c0bcf896fc8d1d9e88701acabb5a3d3de1408f0f Mon Sep 17 00:00:00 2001 From: Jan Doms Date: Wed, 23 Sep 2015 17:22:38 +0200 Subject: [PATCH 05/12] refactoring of download_fragment --- ocaml/src/alba_base_client.ml | 14 +++++- ocaml/src/alba_client_download.ml | 80 +++++++++++++------------------ ocaml/src/tools/prelude.ml | 25 ++++++++++ 3 files changed, 71 insertions(+), 48 deletions(-) diff --git a/ocaml/src/alba_base_client.ml b/ocaml/src/alba_base_client.ml index ba71df1d..915b34a2 100644 --- a/ocaml/src/alba_base_client.ml +++ b/ocaml/src/alba_base_client.ml @@ -250,7 +250,19 @@ class client decompress ~encryption fragment_cache - (bad_fragment_callback self) + >>= function + | Prelude.Error.Ok a -> Lwt.return a + | Prelude.Error.Error x -> + bad_fragment_callback + self + ~namespace_id ~object_name ~object_id + ~chunk_id ~fragment_id ~version_id; + match x with + | `AsdError err -> Lwt.fail (Asd_protocol.Protocol.Error.Exn err) + | `AsdExn exn -> Lwt.fail exn + | `NoneOsd -> Lwt.fail_with "can't download fragment from None osd" + | `FragmentMissing -> Lwt.fail_with "missing fragment" + | `ChecksumMismatch -> Lwt.fail_with "checksum mismatch" (* consumers of this method are responsible for freeing diff --git a/ocaml/src/alba_client_download.ml b/ocaml/src/alba_client_download.ml index 8149ac75..fdc33e24 100644 --- a/ocaml/src/alba_client_download.ml +++ b/ocaml/src/alba_client_download.ml @@ -14,7 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. *) -open Prelude open Lwt_bytes2 open Slice open Alba_statistics @@ -56,12 +55,14 @@ let download_fragment decompress ~encryption fragment_cache - bad_fragment_callback = + let module E = Prelude.Error.Lwt in + let (>>==) = E.bind in + (match osd_id_o with - | None -> Lwt.fail_with "can't download fragment from None osd" - | Some osd_id -> Lwt.return osd_id) - >>= fun osd_id -> + | None -> E.fail `NoneOsd + | Some osd_id -> E.return osd_id) + >>== fun osd_id -> let t0_fragment = Unix.gettimeofday () in @@ -87,24 +88,13 @@ let download_fragment osd_access # with_osd ~osd_id (fun device_client -> - device_client # get_option key)) + device_client # get_option key + >>= E.return)) (let open Asd_protocol.Protocol in function - | (Error.Exn err) as exn -> - begin match err with - | Error.Unknown_error _ - | Error.ProtocolVersionMismatch _ -> - bad_fragment_callback - ~namespace_id ~object_id ~object_name - ~chunk_id ~fragment_id ~version_id - | Error.Full (* a bit silly as this is not an update *) - | Error.Assert_failed _ - | Error.Unknown_operation -> - () - end; - Lwt.fail exn - | exn -> Lwt.fail exn) - >>= function + | Error.Exn err -> E.fail (`AsdError err) + | exn -> E.fail (`AsdExn exn)) + >>== function | None -> let msg = Printf.sprintf @@ -113,12 +103,7 @@ let download_fragment chunk_id fragment_id version_id in Lwt_log.warning msg >>= fun () -> - bad_fragment_callback - ~namespace_id ~object_id ~object_name - ~chunk_id ~fragment_id ~version_id; - (* TODO loopke die queue harvest en nr albamgr duwt *) - (* TODO testje *) - Lwt.fail_with msg + E.fail `FragmentMissing | Some (data:Slice.t) -> osd_access # get_osd_info ~osd_id >>= fun (_, state) -> state.read <- Unix.gettimeofday () :: state.read; @@ -128,46 +113,47 @@ let download_fragment (Slice.get_string_unsafe data) ); let hit_or_mis = false in - Lwt.return (hit_or_mis, data) + E.return (hit_or_mis, data) end | Some data -> let hit_or_mis = true in - Lwt.return (hit_or_mis, Slice.wrap_string data) + E.return (hit_or_mis, Slice.wrap_string data) in - with_timing_lwt (fun () -> retrieve key) - >>= fun (t_retrieve, (hit_or_miss, fragment_data)) -> + E.with_timing (fun () -> retrieve key) + >>== fun (t_retrieve, (hit_or_miss, fragment_data)) -> let fragment_data' = Slice.to_bigstring fragment_data in - with_timing_lwt + E.with_timing (fun () -> - Fragment_helper.verify fragment_data' fragment_checksum) - >>= fun (t_verify, checksum_valid) -> + Fragment_helper.verify fragment_data' fragment_checksum + >>= E.return) + >>== fun (t_verify, checksum_valid) -> (if checksum_valid - then Lwt.return () + then E.return () else begin Lwt_bytes.unsafe_destroy fragment_data'; - bad_fragment_callback - ~namespace_id ~object_id ~object_name - ~chunk_id ~fragment_id ~version_id; - Lwt.fail_with "Checksum mismatch" - end) >>= fun () -> + E.fail `ChecksumMismatch + end) >>== fun () -> - with_timing_lwt + E.with_timing (fun () -> Fragment_helper.maybe_decrypt encryption ~object_id ~chunk_id ~fragment_id ~ignore_fragment_id:replication - fragment_data') - >>= fun (t_decrypt, maybe_decrypted) -> + fragment_data' + >>= E.return) + >>== fun (t_decrypt, maybe_decrypted) -> - with_timing_lwt - (fun () -> decompress ~release_input:true maybe_decrypted) - >>= fun (t_decompress, (maybe_decompressed : Lwt_bytes.t)) -> + E.with_timing + (fun () -> + decompress ~release_input:true maybe_decrypted + >>= E.return) + >>== fun (t_decompress, (maybe_decompressed : Lwt_bytes.t)) -> let t_fragment = Statistics.({ osd_id; @@ -179,4 +165,4 @@ let download_fragment total = Unix.gettimeofday () -. t0_fragment; }) in - Lwt.return (t_fragment, maybe_decompressed) + E.return (t_fragment, maybe_decompressed) diff --git a/ocaml/src/tools/prelude.ml b/ocaml/src/tools/prelude.ml index 1b06bfc4..a26a0f3d 100644 --- a/ocaml/src/tools/prelude.ml +++ b/ocaml/src/tools/prelude.ml @@ -663,3 +663,28 @@ let with_timing_lwt f = f () >>= fun res -> let t1 = Unix.gettimeofday () in Lwt.return (t1 -. t0, res) + +module Error = struct + type ('a, 'b) t = + | Ok of 'a + | Error of 'b + + let map f = function + | Ok a -> Ok (f a) + | Error _ as err -> err + + module Lwt = struct + + let return a = Lwt.return (Ok a) + let fail b = Lwt.return (Error b) + + let bind t f = + t >>= function + | Ok a -> f a + | (Error b as res) -> Lwt.return res + + let with_timing f = + with_timing_lwt f >>= fun (delta, res) -> + Lwt.return (map (fun a -> (delta, a)) res) + end +end From 0679eab71ad1f85547f615ebda74f3ba850412d5 Mon Sep 17 00:00:00 2001 From: Jan Doms Date: Thu, 24 Sep 2015 10:37:32 +0200 Subject: [PATCH 06/12] small refactoring + outline of verify function --- ocaml/src/alba_base_client.ml | 21 +++-- ocaml/src/alba_client.ml | 2 +- ocaml/src/alba_client_download.ml | 5 +- ocaml/src/alba_test.ml | 4 +- ocaml/src/maintenance.ml | 18 ++--- ocaml/src/proxy_server.ml | 4 +- ocaml/src/verify.ml | 123 ++++++++++++++++++++++++++++++ 7 files changed, 148 insertions(+), 29 deletions(-) create mode 100644 ocaml/src/verify.ml diff --git a/ocaml/src/alba_base_client.ml b/ocaml/src/alba_base_client.ml index 915b34a2..12a7a50b 100644 --- a/ocaml/src/alba_base_client.ml +++ b/ocaml/src/alba_base_client.ml @@ -66,14 +66,14 @@ class client let bad_fragment_callback self ~namespace_id ~object_id ~object_name - ~chunk_id ~fragment_id ~version_id = + ~chunk_id ~fragment_id ~location = Manifest_cache.ManifestCache.remove manifest_cache namespace_id object_name; bad_fragment_callback self ~namespace_id ~object_id ~object_name - ~chunk_id ~fragment_id ~version_id + ~chunk_id ~fragment_id ~location in object(self) @@ -229,23 +229,21 @@ class client * the returned fragment bigstring *) method download_fragment - ~osd_id_o + ~location ~namespace_id ~object_id ~object_name ~chunk_id ~fragment_id ~replication - ~version_id ~fragment_checksum decompress ~encryption = Alba_client_download.download_fragment osd_access - ~osd_id_o + ~location ~namespace_id ~object_id ~object_name ~chunk_id ~fragment_id ~replication - ~version_id ~fragment_checksum decompress ~encryption @@ -256,7 +254,7 @@ class client bad_fragment_callback self ~namespace_id ~object_name ~object_id - ~chunk_id ~fragment_id ~version_id; + ~chunk_id ~fragment_id ~location; match x with | `AsdError err -> Lwt.fail (Asd_protocol.Protocol.Error.Exn err) | `AsdExn exn -> Lwt.fail exn @@ -288,19 +286,18 @@ class client let threads : unit Lwt.t list = List.mapi - (fun fragment_id ((osd_id_o, version_id), fragment_checksum) -> + (fun fragment_id (location, fragment_checksum) -> let t = Lwt.catch (fun () -> self # download_fragment ~namespace_id - ~osd_id_o + ~location ~object_id ~object_name ~chunk_id ~fragment_id ~replication:(k=1) - ~version_id ~fragment_checksum decompress ~encryption @@ -573,13 +570,13 @@ class client let download_fragments () = Lwt_list.map_p (fun (fragment_id, _, _) -> - let (osd_id_o, version_id), fragment_checksum = + let location, fragment_checksum = List.nth_exn chunk_locations fragment_id in self # download_fragment ~namespace_id ~object_id ~object_name - ~osd_id_o ~version_id + ~location ~chunk_id ~fragment_id ~replication:(k=1) ~fragment_checksum diff --git a/ocaml/src/alba_client.ml b/ocaml/src/alba_client.ml index 22451070..d16d8f21 100644 --- a/ocaml/src/alba_client.ml +++ b/ocaml/src/alba_client.ml @@ -394,7 +394,7 @@ let with_client albamgr_client_cfg ?(bad_fragment_callback = fun alba_client ~namespace_id ~object_id ~object_name - ~chunk_id ~fragment_id ~version_id -> ()) + ~chunk_id ~fragment_id ~location -> ()) ?(albamgr_connection_pool_size = 10) ?(nsm_host_connection_pool_size = 10) ?(osd_connection_pool_size = 10) diff --git a/ocaml/src/alba_client_download.ml b/ocaml/src/alba_client_download.ml index fdc33e24..faa64d3a 100644 --- a/ocaml/src/alba_client_download.ml +++ b/ocaml/src/alba_client_download.ml @@ -45,8 +45,7 @@ let get_object_manifest' *) let download_fragment (osd_access : osd_access) - (* TODO Nsm_model.location instead? *) - ~osd_id_o ~version_id + ~location ~namespace_id ~object_id ~object_name ~chunk_id ~fragment_id @@ -59,6 +58,8 @@ let download_fragment let module E = Prelude.Error.Lwt in let (>>==) = E.bind in + let osd_id_o, version_id = location in + (match osd_id_o with | None -> E.fail `NoneOsd | Some osd_id -> E.return osd_id) diff --git a/ocaml/src/alba_test.ml b/ocaml/src/alba_test.ml index 4d4f8cc0..f6c2a4ff 100644 --- a/ocaml/src/alba_test.ml +++ b/ocaml/src/alba_test.ml @@ -953,7 +953,7 @@ let test_missing_corrupted_fragment () = (alba_client (* : Alba_client.alba_client *)) ~namespace_id ~object_id ~object_name - ~chunk_id ~fragment_id ~version_id = + ~chunk_id ~fragment_id ~location = Lwt.ignore_result (Lwt_extra2.ignore_errors (fun () -> @@ -961,7 +961,7 @@ let test_missing_corrupted_fragment () = ~namespace_id ~object_id ~object_name ~chunk_id - ~fragment_id ~version_id)) + ~fragment_id ~version_id:(snd location))) in test_with_alba_client ~bad_fragment_callback diff --git a/ocaml/src/maintenance.ml b/ocaml/src/maintenance.ml index 99f9d719..1bf1bc8b 100644 --- a/ocaml/src/maintenance.ml +++ b/ocaml/src/maintenance.ml @@ -1286,18 +1286,16 @@ class client ?(retry_timeout = 60.) ({ Progress.fragments_detected_missing; fragments_osd_unavailable; fragments_checksum_mismatch } as progress_verify)), - Work.Verify _ -> + Work.Verify { checksum; repair_osd_unavailable; } -> get_next_batch pb >>= fun (((cnt, objs), has_more) as batch) -> - Lwt_list.iter_s - (fun manifest -> - (* TODO - * - get all fragments, check result... - * maybe change return type van download fragment... - * - do inline repair where needed - *) - Lwt.return ()) - objs >>= fun () -> + Lwt_list.map_s + (Verify.verify_and_maybe_repair_object + alba_client + ~namespace_id + ~verify_checksum:checksum + ~repair_osd_unavailable) + objs >>= fun _ -> let new_p = Progress.Verify (get_update_progress_base pb batch, diff --git a/ocaml/src/proxy_server.ml b/ocaml/src/proxy_server.ml index 5086bec0..b79e0bba 100644 --- a/ocaml/src/proxy_server.ml +++ b/ocaml/src/proxy_server.ml @@ -454,7 +454,7 @@ let run_server hosts port (alba_client : Alba_base_client.client) ~namespace_id ~object_id ~object_name - ~chunk_id ~fragment_id ~version_id = + ~chunk_id ~fragment_id ~location = Lwt.ignore_result (Lwt_extra2.ignore_errors (fun () -> @@ -463,7 +463,7 @@ let run_server hosts port ~object_name ~chunk_id ~fragment_id - ~version_id)) in + ~version_id:(snd location))) in Alba_client.with_client albamgr_client_cfg ~cache_dir diff --git a/ocaml/src/verify.ml b/ocaml/src/verify.ml new file mode 100644 index 00000000..44b81433 --- /dev/null +++ b/ocaml/src/verify.ml @@ -0,0 +1,123 @@ +(* +Copyright 2015 Open vStorage NV + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*) + +open Prelude +open Slice +open Lwt.Infix + +let verify_and_maybe_repair_object + (alba_client : Alba_base_client.client) + ~namespace_id + ~verify_checksum + ~repair_osd_unavailable + manifest = + let open Nsm_model in + let open Manifest in + let object_id = manifest.object_id in + let object_name = manifest.name in + + let osd_access = alba_client # osd_access in + + if not verify_checksum + then + begin + Lwt_list.mapi_p + (fun chunk_id chunk_location -> + Lwt_list.mapi_p + (fun fragment_id location -> + match location with + | (None, _) -> Lwt.return `NoneOsd + | (Some osd_id, version_id) -> + + let key_string = + Osd_keys.AlbaInstance.fragment + ~namespace_id + ~object_id ~version_id + ~chunk_id ~fragment_id + in + let key = Slice.wrap_string key_string in + + Lwt.catch + (fun () -> + osd_access # with_osd + ~osd_id + (fun osd -> + osd # multi_exists [ key ]) >>= function + | [ true; ] -> Lwt.return `Ok + | [ false; ] -> Lwt.return `Missing + | _ -> assert false) + (fun exn -> + Lwt.return `Unavailable)) + chunk_location) + manifest.Manifest.fragment_locations >>= fun results -> + + (* TODO per chunk: + * - needs_repair = are there missing (or unavailable) fragments + * - repair can be done by restoring fragments or needs rewrite? + * => be lazy ... try repair, and if it fails do a rewrite + * just like is done in decommission + * + * repairing `NoneOsd while at it would be nice but is not needed + *) + Lwt.return [] + end + else + begin + let es, compression = match manifest.Manifest.storage_scheme with + | Storage_scheme.EncodeCompressEncrypt (es, c) -> es, c in + let enc = manifest.Manifest.encrypt_info in + let decompress = Fragment_helper.maybe_decompress compression in + let k, m, w = match es with + | Encoding_scheme.RSVM (k, m, w) -> k, m, w in + let replication = k = 1 in + + let open Albamgr_protocol.Protocol in + alba_client # nsm_host_access # get_namespace_info ~namespace_id >>= fun (ns_info, _, _) -> + alba_client # get_preset_info ~preset_name:ns_info.Namespace.preset_name >>= fun preset -> + let encryption = Preset.get_encryption preset enc in + + Lwt_list.mapi_p + (fun chunk_id chunk_location -> + Lwt_list.mapi_p + (fun fragment_id location -> + + let fragment_checksum = + Layout.index manifest.fragment_checksums + chunk_id fragment_id + in + + Alba_client_download.download_fragment + (alba_client # osd_access) + ~location + ~namespace_id + ~object_id ~object_name + ~chunk_id ~fragment_id + ~replication + ~fragment_checksum + decompress + ~encryption + (alba_client # get_fragment_cache) + >>= fun _ -> + (* TODO inspect result *) + Lwt.return ()) + chunk_location) + manifest.Manifest.fragment_locations + (* TODO + * - get all fragments, check result... + * maybe change return type van download fragment... + * - do inline repair where needed + *) + end From d7fa0c093eb01059d494f93a97fd2d10de0c702a Mon Sep 17 00:00:00 2001 From: Jan Doms Date: Thu, 24 Sep 2015 14:52:09 +0200 Subject: [PATCH 07/12] extract repair from maintenance.ml --- ocaml/src/maintenance.ml | 178 +++------------------------------------ ocaml/src/repair.ml | 177 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 189 insertions(+), 166 deletions(-) create mode 100644 ocaml/src/repair.ml diff --git a/ocaml/src/maintenance.ml b/ocaml/src/maintenance.ml index 1bf1bc8b..9a0c26d5 100644 --- a/ocaml/src/maintenance.ml +++ b/ocaml/src/maintenance.ml @@ -16,7 +16,6 @@ limitations under the License. open Prelude open Slice -open Lwt_bytes2 open Recovery_info open Lwt.Infix @@ -226,161 +225,6 @@ class client ?(retry_timeout = 60.) Lwt.return object_location_movements - method repair_object_generic - ~namespace_id - ~manifest - ~problem_osds - ~problem_fragments - = - let open Nsm_model in - Lwt_log.debug_f "_repair_object_generic ~namespace_id:%li ~manifest:%s ~problem_osds:%s ~problem_fragments:%s" - namespace_id (Manifest.show manifest) - ([%show : int32 list] (Int32Set.elements problem_osds)) - ([%show : (int * int) list] problem_fragments) - >>= fun () -> - - let object_name = manifest.Manifest.name in - let object_id = manifest.Manifest.object_id in - - let locations = manifest.Manifest.fragment_locations in - let fragment_checksums = manifest.Manifest.fragment_checksums in - let fragment_info = - Layout.combine - locations - fragment_checksums - in - - alba_client # get_ns_preset_info ~namespace_id >>= fun preset -> - alba_client # nsm_host_access # get_gc_epoch ~namespace_id >>= fun gc_epoch -> - - let fragment_checksum_algo = - preset.Albamgr_protocol.Protocol.Preset.fragment_checksum_algo in - - let es, compression = match manifest.Manifest.storage_scheme with - | Storage_scheme.EncodeCompressEncrypt (es, c) -> es, c in - let enc = manifest.Manifest.encrypt_info in - let encryption = Albamgr_protocol.Protocol.Preset.get_encryption preset enc in - let decompress = Fragment_helper.maybe_decompress compression in - let k, m, w = match es with - | Encoding_scheme.RSVM (k, m, w) -> k, m, w in - let w' = Encoding_scheme.w_as_int w in - - let version_id = manifest.Manifest.version_id + 1 in - - Lwt_list.map_s - (fun (chunk_id, chunk_location) -> - - alba_client # get_namespace_osds_info_cache ~namespace_id >>= fun osds_info_cache' -> - - let _, ok_fragments, fragments_to_be_repaired = - List.fold_left - (fun (fragment_id, ok_fragments, to_be_repaireds) - ((fragment_osd_id_o, fragment_version_id), fragment_checksum) -> - let ok_fragments', to_be_repaireds' = - if List.mem (chunk_id, fragment_id) problem_fragments || - (match fragment_osd_id_o with - | None -> false - | Some osd_id -> Int32Set.mem osd_id problem_osds) - then - ok_fragments, - (fragment_id, fragment_checksum) :: to_be_repaireds - else - fragment_osd_id_o :: ok_fragments, - to_be_repaireds in - fragment_id + 1, ok_fragments', to_be_repaireds') - (0, [], []) - chunk_location in - if fragments_to_be_repaired = [] - then Lwt.return (chunk_id, []) - else begin - alba_client # download_chunk - ~namespace_id - ~object_id - ~object_name - chunk_location - ~chunk_id - ~encryption - decompress - k m w' - >>= fun (data_fragments, coding_fragments, t_chunk) -> - - let all_fragments = List.append data_fragments coding_fragments in - Lwt.finalize - (fun () -> - Maintenance_helper.upload_missing_fragments - alba_client - osds_info_cache' - ok_fragments - all_fragments - fragments_to_be_repaired - ~namespace_id - manifest - ~chunk_id ~version_id ~gc_epoch - locations - compression encryption - fragment_checksum_algo - ~is_replication:(k=1)) - (fun () -> - List.iter - Lwt_bytes.unsafe_destroy - all_fragments; - Lwt.return ()) - >>= fun updated_locations -> - - Lwt.return (chunk_id, updated_locations) - end) - (List.mapi (fun i lc -> i, lc) fragment_info) - >>= fun updated_locations -> - - let updated_object_locations = - List.fold_left - (fun acc (chunk_id, updated_locations) -> - let updated_chunk_locations = - List.map - (fun (fragment_id, device_id) -> - (chunk_id, fragment_id, device_id)) - updated_locations in - List.rev_append updated_chunk_locations acc) - [] - updated_locations - in - - Lwt.return (updated_object_locations, gc_epoch, version_id) - - method private _repair_object_generic - ~namespace_id - ~manifest - ~problem_osds - ~problem_fragments = - self # repair_object_generic - ~namespace_id - ~manifest - ~problem_osds - ~problem_fragments - >>= fun (updated_object_locations, gc_epoch, version_id) -> - - let open Nsm_model in - let object_name = manifest.Manifest.name in - let object_id = manifest.Manifest.object_id in - - alba_client # with_nsm_client' - ~namespace_id - (fun client -> - client # update_manifest - ~object_name - ~object_id - (List.map - (fun (c,f,o) -> c,f,Some o) - updated_object_locations) - ~gc_epoch ~version_id) - >>= fun () -> - Lwt_log.debug_f - "updated_manifest ~namespace_id:%li ~object_id:%S ~updated_object_locations:%s" - namespace_id object_id - ([%show : (int *int * int32) list] updated_object_locations) - >>= fun () -> - Lwt.return () - method decommission_device ?(deterministic=false) ~namespace_id @@ -406,11 +250,12 @@ class client ?(retry_timeout = 60.) (fun manifest -> Lwt.catch (fun () -> - self # _repair_object_generic - ~namespace_id - ~manifest - ~problem_fragments:[] - ~problem_osds:(Int32Set.of_list [ osd_id ])) + Repair.repair_object_generic_and_update_manifest + alba_client + ~namespace_id + ~manifest + ~problem_fragments:[] + ~problem_osds:(Int32Set.of_list [ osd_id ])) (fun exn -> let open Nsm_model.Manifest in Lwt_log.info_f @@ -785,11 +630,12 @@ class client ?(retry_timeout = 60.) begin alba_client # nsm_host_access # get_gc_epoch ~namespace_id >>= fun gc_epoch -> - self # _repair_object_generic - ~namespace_id - ~manifest - ~problem_fragments - ~problem_osds + Repair.repair_object_generic_and_update_manifest + alba_client + ~namespace_id + ~manifest + ~problem_fragments + ~problem_osds end else self # repair_object_rewrite diff --git a/ocaml/src/repair.ml b/ocaml/src/repair.ml new file mode 100644 index 00000000..4eb8794d --- /dev/null +++ b/ocaml/src/repair.ml @@ -0,0 +1,177 @@ +(* +Copyright 2015 Open vStorage NV + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*) + +open Prelude +open Lwt_bytes2 +open Lwt.Infix + +let repair_object_generic + (alba_client : Alba_base_client.client) + ~namespace_id + ~manifest + ~problem_osds + ~problem_fragments + = + let open Nsm_model in + Lwt_log.debug_f "_repair_object_generic ~namespace_id:%li ~manifest:%s ~problem_osds:%s ~problem_fragments:%s" + namespace_id (Manifest.show manifest) + ([%show : int32 list] (Int32Set.elements problem_osds)) + ([%show : (int * int) list] problem_fragments) + >>= fun () -> + + let object_name = manifest.Manifest.name in + let object_id = manifest.Manifest.object_id in + + let locations = manifest.Manifest.fragment_locations in + let fragment_checksums = manifest.Manifest.fragment_checksums in + let fragment_info = + Layout.combine + locations + fragment_checksums + in + + alba_client # get_ns_preset_info ~namespace_id >>= fun preset -> + alba_client # nsm_host_access # get_gc_epoch ~namespace_id >>= fun gc_epoch -> + + let fragment_checksum_algo = + preset.Albamgr_protocol.Protocol.Preset.fragment_checksum_algo in + + let es, compression = match manifest.Manifest.storage_scheme with + | Storage_scheme.EncodeCompressEncrypt (es, c) -> es, c in + let enc = manifest.Manifest.encrypt_info in + let encryption = Albamgr_protocol.Protocol.Preset.get_encryption preset enc in + let decompress = Fragment_helper.maybe_decompress compression in + let k, m, w = match es with + | Encoding_scheme.RSVM (k, m, w) -> k, m, w in + let w' = Encoding_scheme.w_as_int w in + + let version_id = manifest.Manifest.version_id + 1 in + + Lwt_list.map_s + (fun (chunk_id, chunk_location) -> + + alba_client # get_namespace_osds_info_cache ~namespace_id >>= fun osds_info_cache' -> + + let _, ok_fragments, fragments_to_be_repaired = + List.fold_left + (fun (fragment_id, ok_fragments, to_be_repaireds) + ((fragment_osd_id_o, fragment_version_id), fragment_checksum) -> + let ok_fragments', to_be_repaireds' = + if List.mem (chunk_id, fragment_id) problem_fragments || + (match fragment_osd_id_o with + | None -> false + | Some osd_id -> Int32Set.mem osd_id problem_osds) + then + ok_fragments, + (fragment_id, fragment_checksum) :: to_be_repaireds + else + fragment_osd_id_o :: ok_fragments, + to_be_repaireds in + fragment_id + 1, ok_fragments', to_be_repaireds') + (0, [], []) + chunk_location in + if fragments_to_be_repaired = [] + then Lwt.return (chunk_id, []) + else begin + alba_client # download_chunk + ~namespace_id + ~object_id + ~object_name + chunk_location + ~chunk_id + ~encryption + decompress + k m w' + >>= fun (data_fragments, coding_fragments, t_chunk) -> + + let all_fragments = List.append data_fragments coding_fragments in + Lwt.finalize + (fun () -> + Maintenance_helper.upload_missing_fragments + alba_client + osds_info_cache' + ok_fragments + all_fragments + fragments_to_be_repaired + ~namespace_id + manifest + ~chunk_id ~version_id ~gc_epoch + locations + compression encryption + fragment_checksum_algo + ~is_replication:(k=1)) + (fun () -> + List.iter + Lwt_bytes.unsafe_destroy + all_fragments; + Lwt.return ()) + >>= fun updated_locations -> + + Lwt.return (chunk_id, updated_locations) + end) + (List.mapi (fun i lc -> i, lc) fragment_info) + >>= fun updated_locations -> + + let updated_object_locations = + List.fold_left + (fun acc (chunk_id, updated_locations) -> + let updated_chunk_locations = + List.map + (fun (fragment_id, device_id) -> + (chunk_id, fragment_id, device_id)) + updated_locations in + List.rev_append updated_chunk_locations acc) + [] + updated_locations + in + + Lwt.return (updated_object_locations, gc_epoch, version_id) + +let repair_object_generic_and_update_manifest + alba_client + ~namespace_id + ~manifest + ~problem_osds + ~problem_fragments = + repair_object_generic + alba_client + ~namespace_id + ~manifest + ~problem_osds + ~problem_fragments + >>= fun (updated_object_locations, gc_epoch, version_id) -> + + let open Nsm_model in + let object_name = manifest.Manifest.name in + let object_id = manifest.Manifest.object_id in + + alba_client # with_nsm_client' + ~namespace_id + (fun client -> + client # update_manifest + ~object_name + ~object_id + (List.map + (fun (c,f,o) -> c,f,Some o) + updated_object_locations) + ~gc_epoch ~version_id) + >>= fun () -> + Lwt_log.debug_f + "updated_manifest ~namespace_id:%li ~object_id:%S ~updated_object_locations:%s" + namespace_id object_id + ([%show : (int *int * int32) list] updated_object_locations) + >>= fun () -> + Lwt.return () From fbf854003a8526270758bbf456e6e0b801797faa Mon Sep 17 00:00:00 2001 From: Jan Doms Date: Thu, 24 Sep 2015 15:26:41 +0200 Subject: [PATCH 08/12] test passes again ... but more to do --- ocaml/src/alba_test.ml | 7 +++-- ocaml/src/cli_maintenance.ml | 4 +-- ocaml/src/maintenance.ml | 59 ++++++++---------------------------- ocaml/src/repair.ml | 39 ++++++++++++++++++++++++ ocaml/src/verify.ml | 55 +++++++++++++++++++++++++++------ 5 files changed, 104 insertions(+), 60 deletions(-) diff --git a/ocaml/src/alba_test.ml b/ocaml/src/alba_test.ml index f6c2a4ff..d6fb2fdc 100644 --- a/ocaml/src/alba_test.ml +++ b/ocaml/src/alba_test.ml @@ -1746,9 +1746,10 @@ let test_stale_manifest_download () = ~should_cache:false >>= fun (_, manifest_o) -> let manifest = Option.get_some manifest_o in - maintenance_client # repair_object_rewrite - ~namespace_id - ~manifest >>= fun () -> + Repair.rewrite_object + (alba_client2 # get_base_client) + ~namespace_id + ~manifest >>= fun () -> maintenance_client # clean_obsolete_keys_namespace ~once:true ~namespace_id >>= fun () -> Lwt.return ()) diff --git a/ocaml/src/cli_maintenance.ml b/ocaml/src/cli_maintenance.ml index 32debc6d..77648620 100644 --- a/ocaml/src/cli_maintenance.ml +++ b/ocaml/src/cli_maintenance.ml @@ -274,7 +274,6 @@ let alba_rewrite_object with_alba_client cfg_file (fun client -> - let maintenance_client = new Maintenance.client (client # get_base_client) in client # nsm_host_access # with_namespace_id ~namespace (fun namespace_id -> client # nsm_host_access # get_nsm_by_id ~namespace_id @@ -285,7 +284,8 @@ let alba_rewrite_object Lwt.fail_with "No object with the specified name could be found" | Some manifest -> - maintenance_client # repair_object_rewrite + Repair.rewrite_object + (client # get_base_client) ~namespace_id ~manifest)) in diff --git a/ocaml/src/maintenance.ml b/ocaml/src/maintenance.ml index 9a0c26d5..25edb877 100644 --- a/ocaml/src/maintenance.ml +++ b/ocaml/src/maintenance.ml @@ -265,9 +265,10 @@ class client ?(retry_timeout = 60.) Lwt_extra2.ignore_errors ~logging:true (fun () -> - self # repair_object_rewrite - ~namespace_id - ~manifest)) + Repair.rewrite_object + alba_client + ~namespace_id + ~manifest)) ) manifests >>= fun () -> @@ -638,36 +639,11 @@ class client ?(retry_timeout = 60.) ~problem_osds end else - self # repair_object_rewrite - ~namespace_id - ~manifest - - method repair_object_rewrite - ~namespace_id ~manifest = - let open Nsm_model in - - Lwt_log.debug_f - "Repairing %s in namespace %li" - (Manifest.show manifest) namespace_id - >>= fun () -> - - let object_reader = - new Object_reader2.object_reader - alba_client - namespace_id manifest in - - let object_name = manifest.Manifest.name in - let checksum_o = Some manifest.Manifest.checksum in - let allow_overwrite = PreviousObjectId manifest.Manifest.object_id in - - alba_client # upload_object' - ~namespace_id - ~object_name - ~object_reader - ~checksum_o - ~allow_overwrite >>= fun _ -> + Repair.rewrite_object + alba_client + ~namespace_id + ~manifest - Lwt.return () method rebalance_namespace ?delay @@ -886,7 +862,7 @@ class client ?(retry_timeout = 60.) if cnt > 0 then Lwt_list.iter_s - (fun manifest -> self # repair_object_rewrite ~namespace_id ~manifest) + (fun manifest -> Repair.rewrite_object alba_client ~namespace_id ~manifest) objs >>= fun () -> Lwt.return true else @@ -1108,19 +1084,10 @@ class client ?(retry_timeout = 60.) Lwt_list.iter_s (fun manifest -> - Lwt.catch - (fun () -> - self # repair_object_rewrite - ~namespace_id - ~manifest) - (let open Nsm_model.Err in - function - | Nsm_exn (Overwrite_not_allowed, _) -> - (* ignore this one ... the object was overwritten - * already in the meantime, which is just fine for us - *) - Lwt.return () - | exn -> Lwt.fail exn)) + Repair.rewrite_object + alba_client + ~namespace_id + ~manifest) objs >>= fun () -> let new_p = Progress.Rewrite (get_update_progress_base pb batch) in diff --git a/ocaml/src/repair.ml b/ocaml/src/repair.ml index 4eb8794d..258b99f4 100644 --- a/ocaml/src/repair.ml +++ b/ocaml/src/repair.ml @@ -175,3 +175,42 @@ let repair_object_generic_and_update_manifest ([%show : (int *int * int32) list] updated_object_locations) >>= fun () -> Lwt.return () + +let rewrite_object + alba_client + ~namespace_id + ~manifest = + let open Nsm_model in + + Lwt_log.debug_f + "Repairing %s in namespace %li" + (Manifest.show manifest) namespace_id + >>= fun () -> + + let object_reader = + new Object_reader2.object_reader + alba_client + namespace_id manifest in + + let object_name = manifest.Manifest.name in + let checksum_o = Some manifest.Manifest.checksum in + let allow_overwrite = PreviousObjectId manifest.Manifest.object_id in + + Lwt.catch + (fun () -> + alba_client # upload_object' + ~namespace_id + ~object_name + ~object_reader + ~checksum_o + ~allow_overwrite >>= fun _ -> + Lwt.return ()) + (let open Nsm_model.Err in + function + | Nsm_exn (Overwrite_not_allowed, _) -> + (* ignore this one ... the object was overwritten + * already in the meantime, which is just fine when we're + * trying to rewrite it + *) + Lwt.return () + | exn -> Lwt.fail exn) diff --git a/ocaml/src/verify.ml b/ocaml/src/verify.ml index 44b81433..980f233e 100644 --- a/ocaml/src/verify.ml +++ b/ocaml/src/verify.ml @@ -31,7 +31,7 @@ let verify_and_maybe_repair_object let osd_access = alba_client # osd_access in - if not verify_checksum + if true (* not verify_checksum *) then begin Lwt_list.mapi_p @@ -64,14 +64,51 @@ let verify_and_maybe_repair_object chunk_location) manifest.Manifest.fragment_locations >>= fun results -> - (* TODO per chunk: - * - needs_repair = are there missing (or unavailable) fragments - * - repair can be done by restoring fragments or needs rewrite? - * => be lazy ... try repair, and if it fails do a rewrite - * just like is done in decommission - * - * repairing `NoneOsd while at it would be nice but is not needed - *) + let _, problem_fragments = + List.fold_left + (fun (chunk_id, acc) ls -> + let _, acc' = + List.fold_left + (fun (fragment_id, acc) -> + function + | `NoneOsd + | `Ok -> (fragment_id + 1, acc) + | `Missing + | `Unavailable -> (fragment_id + 1, (chunk_id, fragment_id) :: acc)) + (0, acc) + ls + in + (chunk_id + 1, acc')) + (0, []) + results + in + + (if problem_fragments <> [] + then + begin + Lwt_log.debug_f + "verify results in repairing fragments: %s" + ([%show : (int * int) list] problem_fragments) >>= fun () -> + + Lwt.catch + (fun () -> + (* TODO use maintenance_client # repair_object instead? *) + Repair.repair_object_generic_and_update_manifest + alba_client + ~namespace_id + ~manifest + ~problem_osds:Int32Set.empty + ~problem_fragments) + (fun exn -> + Repair.rewrite_object + alba_client + ~namespace_id + ~manifest) + end + else + Lwt.return ()) >>= fun () -> + + (* TODO report back on what happened... *) Lwt.return [] end else From dff59ad8e1baaec67111fd684300d7cceb27769a Mon Sep 17 00:00:00 2001 From: Jan Doms Date: Thu, 24 Sep 2015 16:43:24 +0200 Subject: [PATCH 09/12] also test for checksum mismatch and make it pass --- ocaml/src/maintenance_test.ml | 37 ++++-- ocaml/src/verify.ml | 216 +++++++++++++++------------------- 2 files changed, 124 insertions(+), 129 deletions(-) diff --git a/ocaml/src/maintenance_test.ml b/ocaml/src/maintenance_test.ml index a4762631..ebf7655f 100644 --- a/ocaml/src/maintenance_test.ml +++ b/ocaml/src/maintenance_test.ml @@ -492,6 +492,28 @@ let test_verify_namespace () = 0 0 >>= fun () -> + (* overwrite a fragment with garbage (to create a checksum mismatch) *) + begin + let osd_id_o, version_id = List.nth_exn locations 1 in + let osd_id = Option.get_some osd_id_o in + alba_client # with_osd + ~osd_id + (fun osd -> + osd # apply_sequence + [] + [ Osd.Update.set_string + (Osd_keys.AlbaInstance.fragment + ~namespace_id + ~object_id ~version_id + ~chunk_id:0 + ~fragment_id:1) + (get_random_string 39) + Checksum.NoChecksum + false; ] >>= function + | Osd.Ok -> Lwt.return () + | _ -> assert false) + end >>= fun () -> + let open Albamgr_protocol.Protocol in let name = "name" in let cnt = 10 in @@ -514,19 +536,20 @@ let test_verify_namespace () = >>= fun mfo -> let mf = Option.get_some mfo in assert (mf.Manifest.version_id = 1); + + (* missing fragment *) assert (mf.Manifest.fragment_locations |> List.hd_exn |> List.hd_exn |> snd = 1); - (* TODO test - - corrupted fragment - - fragment that can't be downloaded (because asd got corrupted) - - fragment on asd that is offline - - assert all those fragments are now ok - *) + (* checksum mismatch fragment *) + assert (mf.Manifest.fragment_locations + |> List.hd_exn + |> fun l -> List.nth_exn l 1 + |> snd + = 1); Lwt.return ()) diff --git a/ocaml/src/verify.ml b/ocaml/src/verify.ml index 980f233e..b46939de 100644 --- a/ocaml/src/verify.ml +++ b/ocaml/src/verify.ml @@ -16,6 +16,7 @@ limitations under the License. open Prelude open Slice +open Lwt_bytes2 open Lwt.Infix let verify_and_maybe_repair_object @@ -27,134 +28,105 @@ let verify_and_maybe_repair_object let open Nsm_model in let open Manifest in let object_id = manifest.object_id in - let object_name = manifest.name in let osd_access = alba_client # osd_access in - if true (* not verify_checksum *) - then - begin - Lwt_list.mapi_p - (fun chunk_id chunk_location -> - Lwt_list.mapi_p - (fun fragment_id location -> - match location with - | (None, _) -> Lwt.return `NoneOsd - | (Some osd_id, version_id) -> - - let key_string = - Osd_keys.AlbaInstance.fragment - ~namespace_id - ~object_id ~version_id - ~chunk_id ~fragment_id - in - let key = Slice.wrap_string key_string in - - Lwt.catch - (fun () -> - osd_access # with_osd - ~osd_id - (fun osd -> - osd # multi_exists [ key ]) >>= function - | [ true; ] -> Lwt.return `Ok - | [ false; ] -> Lwt.return `Missing - | _ -> assert false) - (fun exn -> - Lwt.return `Unavailable)) - chunk_location) - manifest.Manifest.fragment_locations >>= fun results -> - - let _, problem_fragments = - List.fold_left - (fun (chunk_id, acc) ls -> - let _, acc' = - List.fold_left - (fun (fragment_id, acc) -> - function - | `NoneOsd - | `Ok -> (fragment_id + 1, acc) - | `Missing - | `Unavailable -> (fragment_id + 1, (chunk_id, fragment_id) :: acc)) - (0, acc) - ls + Lwt_list.mapi_s + (fun chunk_id chunk_location -> + Lwt_list.mapi_p + (fun fragment_id location -> + match location with + | (None, _) -> Lwt.return `NoneOsd + | (Some osd_id, version_id) -> + + let key_string = + Osd_keys.AlbaInstance.fragment + ~namespace_id + ~object_id ~version_id + ~chunk_id ~fragment_id in - (chunk_id + 1, acc')) - (0, []) - results - in - - (if problem_fragments <> [] - then - begin - Lwt_log.debug_f - "verify results in repairing fragments: %s" - ([%show : (int * int) list] problem_fragments) >>= fun () -> + let key = Slice.wrap_string key_string in Lwt.catch (fun () -> - (* TODO use maintenance_client # repair_object instead? *) - Repair.repair_object_generic_and_update_manifest - alba_client - ~namespace_id - ~manifest - ~problem_osds:Int32Set.empty - ~problem_fragments) + osd_access # with_osd + ~osd_id + (fun osd -> + if verify_checksum + then + osd # multi_get [ key ] >>= function + | [ None ] -> Lwt.return `Missing + | [ Some f ] -> + let checksum = Layout.index manifest.fragment_checksums chunk_id fragment_id in + let fragment_data' = Slice.to_bigstring f in + Fragment_helper.verify fragment_data' checksum + >>= fun checksum_valid -> + Lwt_bytes.unsafe_destroy fragment_data'; + Lwt.return + (if checksum_valid + then `Ok + else `ChecksumMismatch) + | _ -> assert false + else + osd # multi_exists [ key ] >>= function + | [ true; ] -> Lwt.return `Ok + | [ false; ] -> Lwt.return `Missing + | _ -> assert false)) (fun exn -> - Repair.rewrite_object - alba_client - ~namespace_id - ~manifest) - end - else - Lwt.return ()) >>= fun () -> - - (* TODO report back on what happened... *) - Lwt.return [] - end - else - begin - let es, compression = match manifest.Manifest.storage_scheme with - | Storage_scheme.EncodeCompressEncrypt (es, c) -> es, c in - let enc = manifest.Manifest.encrypt_info in - let decompress = Fragment_helper.maybe_decompress compression in - let k, m, w = match es with - | Encoding_scheme.RSVM (k, m, w) -> k, m, w in - let replication = k = 1 in - - let open Albamgr_protocol.Protocol in - alba_client # nsm_host_access # get_namespace_info ~namespace_id >>= fun (ns_info, _, _) -> - alba_client # get_preset_info ~preset_name:ns_info.Namespace.preset_name >>= fun preset -> - let encryption = Preset.get_encryption preset enc in - - Lwt_list.mapi_p - (fun chunk_id chunk_location -> - Lwt_list.mapi_p - (fun fragment_id location -> - - let fragment_checksum = - Layout.index manifest.fragment_checksums - chunk_id fragment_id + Lwt.return `Unavailable)) + chunk_location) + manifest.Manifest.fragment_locations >>= fun results -> + + let _, problem_fragments = + List.fold_left + (fun (chunk_id, acc) ls -> + let _, acc' = + List.fold_left + (fun (fragment_id, acc) status -> + let should_repair = + match status with + | `NoneOsd + | `Ok -> false + | `ChecksumMismatch + | `Missing -> true + | `Unavailable -> repair_osd_unavailable in - - Alba_client_download.download_fragment - (alba_client # osd_access) - ~location - ~namespace_id - ~object_id ~object_name - ~chunk_id ~fragment_id - ~replication - ~fragment_checksum - decompress - ~encryption - (alba_client # get_fragment_cache) - >>= fun _ -> - (* TODO inspect result *) - Lwt.return ()) - chunk_location) - manifest.Manifest.fragment_locations - (* TODO - * - get all fragments, check result... - * maybe change return type van download fragment... - * - do inline repair where needed - *) - end + (fragment_id + 1, + if should_repair + then (chunk_id, fragment_id) :: acc + else acc)) + (0, acc) + ls + in + (chunk_id + 1, acc')) + (0, []) + results + in + + (if problem_fragments <> [] + then + begin + Lwt_log.debug_f + "verify results in repairing fragments: %s" + ([%show : (int * int) list] problem_fragments) >>= fun () -> + + Lwt.catch + (fun () -> + (* TODO use maintenance_client # repair_object instead? *) + Repair.repair_object_generic_and_update_manifest + alba_client + ~namespace_id + ~manifest + ~problem_osds:Int32Set.empty + ~problem_fragments) + (fun exn -> + Repair.rewrite_object + alba_client + ~namespace_id + ~manifest) + end + else + Lwt.return ()) >>= fun () -> + + (* TODO report back on what happened... *) + Lwt.return [] From 7ab119174eb8384f393bbe7d5e835bb460d274fb Mon Sep 17 00:00:00 2001 From: Jan Doms Date: Fri, 25 Sep 2015 11:07:08 +0200 Subject: [PATCH 10/12] track progress of verify job --- ocaml/src/maintenance.ml | 30 +++++++++++++----- ocaml/src/maintenance_test.ml | 45 +++++++++++++++++++++++++-- ocaml/src/verify.ml | 58 +++++++++++++++++++++++++++-------- 3 files changed, 110 insertions(+), 23 deletions(-) diff --git a/ocaml/src/maintenance.ml b/ocaml/src/maintenance.ml index 25edb877..f3bbed41 100644 --- a/ocaml/src/maintenance.ml +++ b/ocaml/src/maintenance.ml @@ -1096,9 +1096,9 @@ class client ?(retry_timeout = 60.) new_p has_more | Progress.Verify (pb, - ({ Progress.fragments_detected_missing; - fragments_osd_unavailable; - fragments_checksum_mismatch } as progress_verify)), + { Progress.fragments_detected_missing; + fragments_osd_unavailable; + fragments_checksum_mismatch }), Work.Verify { checksum; repair_osd_unavailable; } -> get_next_batch pb >>= fun (((cnt, objs), has_more) as batch) -> @@ -1108,14 +1108,28 @@ class client ?(retry_timeout = 60.) ~namespace_id ~verify_checksum:checksum ~repair_osd_unavailable) - objs >>= fun _ -> + objs >>= fun res -> + + let fragments_detected_missing, + fragments_osd_unavailable, + fragments_checksum_mismatch = + List.fold_left + (fun (a,b,c) (a',b',c') -> + let open Int64 in + add a (of_int a'), + add b (of_int b'), + add c (of_int c')) + (fragments_detected_missing, + fragments_osd_unavailable, + fragments_checksum_mismatch) + res + in let new_p = Progress.Verify (get_update_progress_base pb batch, - (* TODO update this based the result - * of iterating (mapping) over all - * manifests *) - progress_verify) in + { Progress.fragments_detected_missing; + fragments_osd_unavailable; + fragments_checksum_mismatch; }) in update_progress_and_maybe_continue new_p diff --git a/ocaml/src/maintenance_test.ml b/ocaml/src/maintenance_test.ml index ebf7655f..a4893b31 100644 --- a/ocaml/src/maintenance_test.ml +++ b/ocaml/src/maintenance_test.ml @@ -33,7 +33,6 @@ let with_nice_error_log f = let test_rebalance_one () = let test_name = "test_rebalance_one" in - (*let preset_name = test_name in*) let namespace = test_name in let object_name = namespace in let open Nsm_model in @@ -463,11 +462,18 @@ let test_rewrite_namespace () = let test_verify_namespace () = let test_name = "test_verify_namespace" in - let namespace = test_name in test_with_alba_client (fun alba_client -> + + let preset_name = test_name in + let preset' = + let open Albamgr_protocol.Protocol in + Preset.({ _DEFAULT with policies = [ (5,4,8,3); ]; }) in + alba_client # mgr_access # create_preset preset_name preset' >>= fun () -> + + let namespace = test_name in alba_client # create_namespace - ~preset_name:None + ~preset_name:(Some preset_name) ~namespace () >>= fun namespace_id -> let open Nsm_model in @@ -551,6 +557,39 @@ let test_verify_namespace () = |> snd = 1); + alba_client # mgr_access # get_progress_for_prefix name >>= fun (cnt', progresses) -> + assert (cnt = cnt'); + let objects_verified, + fragments_detected_missing, + fragments_osd_unavailable, + fragments_checksum_mismatch + = + List.fold_left + (fun (objects_verified, + fragments_detected_missing', + fragments_osd_unavailable', + fragments_checksum_mismatch') + (i, p) -> + let end_key = get_start_key (i+1) cnt in + match p with + | Progress.Verify ({ Progress.count; next; }, + { Progress.fragments_detected_missing; + fragments_osd_unavailable; + fragments_checksum_mismatch })-> + assert (end_key = next); + objects_verified + Int64.to_int count, + fragments_detected_missing' + Int64.to_int fragments_detected_missing, + fragments_osd_unavailable' + Int64.to_int fragments_osd_unavailable, + fragments_checksum_mismatch' + Int64.to_int fragments_checksum_mismatch + | _ -> assert false) + (0, 0, 0, 0) + progresses + in + assert (objects_verified = 1); + assert (fragments_detected_missing = 1); + assert (fragments_osd_unavailable = 0); + assert (fragments_checksum_mismatch = 1); + Lwt.return ()) diff --git a/ocaml/src/verify.ml b/ocaml/src/verify.ml index b46939de..a973e4f2 100644 --- a/ocaml/src/verify.ml +++ b/ocaml/src/verify.ml @@ -77,12 +77,36 @@ let verify_and_maybe_repair_object chunk_location) manifest.Manifest.fragment_locations >>= fun results -> - let _, problem_fragments = + let _, + (fragments_detected_missing, + fragments_osd_unavailable, + fragments_checksum_mismatch, + problem_fragments) = List.fold_left - (fun (chunk_id, acc) ls -> - let _, acc' = + (fun (chunk_id, (fragments_detected_missing, + fragments_osd_unavailable, + fragments_checksum_mismatch, + problem_fragments)) + ls -> + let _, acc = List.fold_left - (fun (fragment_id, acc) status -> + (fun (fragment_id, + (fragments_detected_missing, + fragments_osd_unavailable, + fragments_checksum_mismatch, + problem_fragments)) status -> + let fragments_detected_missing = match status with + | `Missing -> fragments_detected_missing + 1 + | _ -> fragments_detected_missing + in + let fragments_osd_unavailable = match status with + | `Unavailable -> fragments_osd_unavailable + 1 + | _ -> fragments_osd_unavailable + in + let fragments_checksum_mismatch = match status with + | `ChecksumMismatch -> fragments_checksum_mismatch + 1 + | _ -> fragments_checksum_mismatch + in let should_repair = match status with | `NoneOsd @@ -91,15 +115,23 @@ let verify_and_maybe_repair_object | `Missing -> true | `Unavailable -> repair_osd_unavailable in - (fragment_id + 1, + fragment_id + 1, + (fragments_detected_missing, + fragments_osd_unavailable, + fragments_checksum_mismatch, if should_repair - then (chunk_id, fragment_id) :: acc - else acc)) - (0, acc) + then (chunk_id, fragment_id) :: problem_fragments + else problem_fragments)) + (0, + (fragments_detected_missing, + fragments_osd_unavailable, + fragments_checksum_mismatch, + problem_fragments)) ls in - (chunk_id + 1, acc')) - (0, []) + chunk_id + 1, + acc) + (0, (0,0,0,[])) results in @@ -128,5 +160,7 @@ let verify_and_maybe_repair_object else Lwt.return ()) >>= fun () -> - (* TODO report back on what happened... *) - Lwt.return [] + Lwt.return + (fragments_detected_missing, + fragments_osd_unavailable, + fragments_checksum_mismatch) From 6eaad752f9854ea8e06eb13f4e2022a54e927bc5 Mon Sep 17 00:00:00 2001 From: Jan Doms Date: Fri, 25 Sep 2015 11:51:48 +0200 Subject: [PATCH 11/12] alba verify namespace cli support --- ocaml/src/alba.ml | 56 -------------------------------------------- ocaml/src/cli_mgr.ml | 40 ++++++++++++++++++++++++++++--- 2 files changed, 37 insertions(+), 59 deletions(-) diff --git a/ocaml/src/alba.ml b/ocaml/src/alba.ml index 96855b0f..2141066d 100644 --- a/ocaml/src/alba.ml +++ b/ocaml/src/alba.ml @@ -634,60 +634,6 @@ let namespace_recovery_agent_cmd = ), Term.info "namespace-recovery-agent" ~doc:"recover the contents of a namespace from the osds" -let verify_namespace cfg_file namespace = - let t () = - with_alba_client - cfg_file - (fun client -> - client # nsm_host_access # with_nsm_client - ~namespace - (fun client -> - client # list_all_objects ()) >>= fun (cnt, objs) -> - Lwt_list.map_s - (fun object_name -> - Lwt.catch - (fun () -> - client # download_object_generic - ~namespace - ~object_name - ~consistent_read:false - ~should_cache:false - ~write_object_data:(fun size -> - Lwt.return - begin - fun src offset length -> - (* ignore all data - * alba_client will verify checksum. *) - Lwt.return () - end) - >>= fun _ -> - Lwt.return (`Ok object_name)) - (fun exn -> - Lwt.return (`Failed (object_name, exn)))) - objs >>= fun results -> - let ok, faileds = - List.fold_left - (fun (ok, faileds) -> - function - | `Ok object_name -> (ok + 1, faileds) - | `Failed (object_name, exn) -> (ok, (object_name, exn) :: faileds)) - (0, []) - results - in - Lwt_log.info_f "Verified %i objects. %i downloads succeeded." cnt ok >>= fun () -> - Lwt_list.iter_s - (fun (object_name, exn) -> - Lwt_log.info_f ~exn "Downloading object %s failed" object_name) - faileds) - in - lwt_cmd_line false t - -let verify_namespace_cmd = - Term.(pure verify_namespace - $ alba_cfg_file - $ namespace 0), - Term.info "verify-namespace" ~doc:"verifies all objects of a namespace can be downloaded" - let unit_tests produce_xml alba_cfg_file only_test = Albamgr_test.ccfg_ref := Some (Albamgr_protocol.Protocol.Arakoon_config.from_config_file alba_cfg_file); @@ -823,8 +769,6 @@ let () = namespace_recovery_agent_cmd; - verify_namespace_cmd; - unit_tests_cmd; ] in diff --git a/ocaml/src/cli_mgr.ml b/ocaml/src/cli_mgr.ml index f34ca094..276db219 100644 --- a/ocaml/src/cli_mgr.ml +++ b/ocaml/src/cli_mgr.ml @@ -522,7 +522,7 @@ let alba_add_osd_cmd = " Note: this is for development purposes only." ) -let alba_rewrite_namespace cfg_file namespace name factor = +let alba_add_iter_namespace_item cfg_file namespace name factor action = let t () = with_albamgr_client cfg_file ~attempts:1 @@ -534,17 +534,22 @@ let alba_rewrite_namespace cfg_file namespace name factor = let namespace_id = namespace_info.Namespace.id in client # add_work_items [ Work.(IterNamespace - (Rewrite, + (action, namespace_id, name, factor)) ]) in lwt_cmd_line false t +let alba_rewrite_namespace cfg_file namespace name factor = + alba_add_iter_namespace_item + cfg_file namespace name factor + Albamgr_protocol.Protocol.Work.Rewrite + let job_name p = Arg.(required & pos p (some string) None - & info [] ~docv:"the name of the job") + & info [] ~docv:"JOB_NAME") let alba_rewrite_namespace_cmd = Term.(pure alba_rewrite_namespace @@ -558,6 +563,34 @@ let alba_rewrite_namespace_cmd = "rewrite-namespace" ~doc:"rewrite all objects in the specified namespace" + +let alba_verify_namespace + cfg_file namespace name factor + no_verify_checksum no_repair_osd_unavailable = + alba_add_iter_namespace_item + cfg_file namespace name factor + (let open Albamgr_protocol.Protocol.Work in + Verify { checksum = not no_verify_checksum; + repair_osd_unavailable = not no_repair_osd_unavailable; }) + +let alba_verify_namespace_cmd = + Term.(pure alba_verify_namespace + $ alba_cfg_file + $ namespace 0 + $ job_name 1 + $ Arg.(value + & opt int 1 + & info ["factor"] ~docv:"specifies into how many pieces the job should be divided") + $ Arg.(value + & flag + & info ["no-verify-checksum"] ~docv:"flag to specify checksums should not be verified") + $ Arg.(value + & flag + & info ["no-repair-osd-unavailable"] ~docv:"flag to specify that fragments on unavailable osds should not be repaired")), + Term.info + "verify-namespace" + ~doc:"verify all objects in the specified namespace" + let alba_show_job_progress cfg_file name = let t () = with_albamgr_client @@ -630,6 +663,7 @@ let cmds = [ alba_list_work_cmd; alba_rewrite_namespace_cmd; + alba_verify_namespace_cmd; alba_show_job_progress_cmd; alba_clear_job_progress_cmd; ] From 13383e06a5ca94cf74bce497433372affa392ebc Mon Sep 17 00:00:00 2001 From: Jan Doms Date: Tue, 29 Sep 2015 10:03:15 +0200 Subject: [PATCH 12/12] comment --- ocaml/src/verify.ml | 79 ++++++++++++++++++++++++--------------------- 1 file changed, 43 insertions(+), 36 deletions(-) diff --git a/ocaml/src/verify.ml b/ocaml/src/verify.ml index a973e4f2..cf14bda8 100644 --- a/ocaml/src/verify.ml +++ b/ocaml/src/verify.ml @@ -31,6 +31,48 @@ let verify_and_maybe_repair_object let osd_access = alba_client # osd_access in + let verify = + if verify_checksum + then + fun osd key ~chunk_id ~fragment_id -> + osd # multi_get [ key ] >>= function + | [ None ] -> Lwt.return `Missing + | [ Some f ] -> + let checksum = Layout.index manifest.fragment_checksums chunk_id fragment_id in + let fragment_data' = Slice.to_bigstring f in + Fragment_helper.verify fragment_data' checksum + >>= fun checksum_valid -> + Lwt_bytes.unsafe_destroy fragment_data'; + Lwt.return + (if checksum_valid + then `Ok + else `ChecksumMismatch) + | _ -> assert false + else + fun osd key ~chunk_id ~fragment_id -> + osd # multi_exists [ key ] >>= function + | [ true; ] -> Lwt.return `Ok + | [ false; ] -> Lwt.return `Missing + | _ -> assert false + in + + let verify ~osd_id ~chunk_id ~fragment_id ~version_id = + let key_string = + Osd_keys.AlbaInstance.fragment + ~namespace_id + ~object_id ~version_id + ~chunk_id ~fragment_id + in + let key = Slice.wrap_string key_string in + Lwt.catch + (fun () -> + osd_access # with_osd + ~osd_id + (fun osd -> verify osd key ~chunk_id ~fragment_id)) + (fun exn -> + Lwt.return `Unavailable) + in + Lwt_list.mapi_s (fun chunk_id chunk_location -> Lwt_list.mapi_p @@ -38,42 +80,7 @@ let verify_and_maybe_repair_object match location with | (None, _) -> Lwt.return `NoneOsd | (Some osd_id, version_id) -> - - let key_string = - Osd_keys.AlbaInstance.fragment - ~namespace_id - ~object_id ~version_id - ~chunk_id ~fragment_id - in - let key = Slice.wrap_string key_string in - - Lwt.catch - (fun () -> - osd_access # with_osd - ~osd_id - (fun osd -> - if verify_checksum - then - osd # multi_get [ key ] >>= function - | [ None ] -> Lwt.return `Missing - | [ Some f ] -> - let checksum = Layout.index manifest.fragment_checksums chunk_id fragment_id in - let fragment_data' = Slice.to_bigstring f in - Fragment_helper.verify fragment_data' checksum - >>= fun checksum_valid -> - Lwt_bytes.unsafe_destroy fragment_data'; - Lwt.return - (if checksum_valid - then `Ok - else `ChecksumMismatch) - | _ -> assert false - else - osd # multi_exists [ key ] >>= function - | [ true; ] -> Lwt.return `Ok - | [ false; ] -> Lwt.return `Missing - | _ -> assert false)) - (fun exn -> - Lwt.return `Unavailable)) + verify ~osd_id ~chunk_id ~fragment_id ~version_id) chunk_location) manifest.Manifest.fragment_locations >>= fun results ->