Skip to content

Commit

Permalink
Merge pull request #36 from openvstorage/verify-namespace
Browse files Browse the repository at this point in the history
Verify namespace
  • Loading branch information
domsj committed Sep 29, 2015
2 parents d29b6a3 + 13383e0 commit 3449a8c
Show file tree
Hide file tree
Showing 26 changed files with 985 additions and 517 deletions.
56 changes: 0 additions & 56 deletions ocaml/src/alba.ml
Original file line number Diff line number Diff line change
Expand Up @@ -634,60 +634,6 @@ let namespace_recovery_agent_cmd =
),
Term.info "namespace-recovery-agent" ~doc:"recover the contents of a namespace from the osds"

let verify_namespace cfg_file namespace =
let t () =
with_alba_client
cfg_file
(fun client ->
client # nsm_host_access # with_nsm_client
~namespace
(fun client ->
client # list_all_objects ()) >>= fun (cnt, objs) ->
Lwt_list.map_s
(fun object_name ->
Lwt.catch
(fun () ->
client # download_object_generic
~namespace
~object_name
~consistent_read:false
~should_cache:false
~write_object_data:(fun size ->
Lwt.return
begin
fun src offset length ->
(* ignore all data
* alba_client will verify checksum. *)
Lwt.return ()
end)
>>= fun _ ->
Lwt.return (`Ok object_name))
(fun exn ->
Lwt.return (`Failed (object_name, exn))))
objs >>= fun results ->
let ok, faileds =
List.fold_left
(fun (ok, faileds) ->
function
| `Ok object_name -> (ok + 1, faileds)
| `Failed (object_name, exn) -> (ok, (object_name, exn) :: faileds))
(0, [])
results
in
Lwt_log.info_f "Verified %i objects. %i downloads succeeded." cnt ok >>= fun () ->
Lwt_list.iter_s
(fun (object_name, exn) ->
Lwt_log.info_f ~exn "Downloading object %s failed" object_name)
faileds)
in
lwt_cmd_line false t

let verify_namespace_cmd =
Term.(pure verify_namespace
$ alba_cfg_file
$ namespace 0),
Term.info "verify-namespace" ~doc:"verifies all objects of a namespace can be downloaded"

let unit_tests produce_xml alba_cfg_file only_test =
Albamgr_test.ccfg_ref :=
Some (Albamgr_protocol.Protocol.Arakoon_config.from_config_file alba_cfg_file);
Expand Down Expand Up @@ -823,8 +769,6 @@ let () =

namespace_recovery_agent_cmd;

verify_namespace_cmd;

unit_tests_cmd;

] in
Expand Down
172 changes: 34 additions & 138 deletions ocaml/src/alba_base_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
open Prelude
open Lwt
open Checksum
open Slice
open Lwt_bytes2
open Alba_statistics
open Fragment_cache
Expand Down Expand Up @@ -55,7 +54,6 @@ class client
new osd_access mgr_access ~osd_connection_pool_size ~osd_timeout
in
let with_osd_from_pool ~osd_id f = osd_access # with_osd ~osd_id f in
let get_osd_info = osd_access # get_osd_info in

let get_namespace_osds_info_cache ~namespace_id =
nsm_host_access # get_namespace_info ~namespace_id >>= fun (_, osds, _) ->
Expand All @@ -68,14 +66,14 @@ class client
let bad_fragment_callback
self
~namespace_id ~object_id ~object_name
~chunk_id ~fragment_id ~version_id =
~chunk_id ~fragment_id ~location =
Manifest_cache.ManifestCache.remove
manifest_cache
namespace_id object_name;
bad_fragment_callback
self
~namespace_id ~object_id ~object_name
~chunk_id ~fragment_id ~version_id
~chunk_id ~fragment_id ~location
in
object(self)

Expand Down Expand Up @@ -226,145 +224,44 @@ class client
~checksum_o
~allow_overwrite


(* consumers of this method are responsible for freeing
* the returned fragment bigstring
*)
method download_fragment
~osd_id_o
~location
~namespace_id
~object_id ~object_name
~chunk_id ~fragment_id
~replication
~version_id
~fragment_checksum
decompress
~encryption =
Alba_client_download.download_fragment
osd_access
~location
~namespace_id
~object_id ~object_name
~chunk_id ~fragment_id
~replication
~fragment_checksum
decompress
~encryption
fragment_cache
>>= function
| Prelude.Error.Ok a -> Lwt.return a
| Prelude.Error.Error x ->
bad_fragment_callback
self
~namespace_id ~object_name ~object_id
~chunk_id ~fragment_id ~location;
match x with
| `AsdError err -> Lwt.fail (Asd_protocol.Protocol.Error.Exn err)
| `AsdExn exn -> Lwt.fail exn
| `NoneOsd -> Lwt.fail_with "can't download fragment from None osd"
| `FragmentMissing -> Lwt.fail_with "missing fragment"
| `ChecksumMismatch -> Lwt.fail_with "checksum mismatch"

(match osd_id_o with
| None -> Lwt.fail_with "can't download fragment from None osd"
| Some osd_id -> Lwt.return osd_id)
>>= fun osd_id ->

let t0_fragment = Unix.gettimeofday () in

let key_string =
Osd_keys.AlbaInstance.fragment
~namespace_id
~object_id ~version_id
~chunk_id ~fragment_id
in
let key = Slice.wrap_string key_string in

let retrieve key =
fragment_cache # lookup
namespace_id key_string
read_it
>>= function
| None ->
begin
Lwt_log.debug_f "fragment not in cache, trying osd:%li" osd_id
>>= fun () ->

Lwt.catch
(fun () ->
with_osd_from_pool
~osd_id
(fun device_client ->
device_client # get_option key))
(let open Asd_protocol.Protocol in
function
| (Error.Exn err) as exn -> begin match err with
| Error.Unknown_error _
| Error.ProtocolVersionMismatch _ ->
bad_fragment_callback
self
~namespace_id ~object_id ~object_name
~chunk_id ~fragment_id ~version_id
| Error.Full (* a bit silly as this is not an update *)
| Error.Assert_failed _
| Error.Unknown_operation ->
()
end;
Lwt.fail exn
| exn -> Lwt.fail exn)
>>= function
| None ->
let msg =
Printf.sprintf
"Detected missing fragment namespace_id=%li object_id=%S osd_id=%li (chunk,fragment,version)=(%i,%i,%i)"
namespace_id object_id osd_id
chunk_id fragment_id version_id
in
Lwt_log.warning msg >>= fun () ->
bad_fragment_callback
self
~namespace_id ~object_id ~object_name
~chunk_id ~fragment_id ~version_id;
(* TODO loopke die queue harvest en nr albamgr duwt *)
(* TODO testje *)
Lwt.fail_with msg
| Some (data:Slice.t) ->
get_osd_info ~osd_id >>= fun (_, state) ->
state.read <- Unix.gettimeofday () :: state.read;
Lwt.ignore_result
(fragment_cache # add
namespace_id key_string
(Slice.get_string_unsafe data)
);
let hit_or_mis = false in
Lwt.return (hit_or_mis, data)
end
| Some data ->
let hit_or_mis = true in
Lwt.return (hit_or_mis, Slice.wrap_string data)
in
Statistics.with_timing_lwt (fun () -> retrieve key)

>>= fun (t_retrieve, (hit_or_miss, fragment_data)) ->

let fragment_data' = Slice.to_bigstring fragment_data in

Statistics.with_timing_lwt
(fun () ->
Fragment_helper.verify fragment_data' fragment_checksum)
>>= fun (t_verify, checksum_valid) ->

(if checksum_valid
then Lwt.return ()
else
begin
Lwt_bytes.unsafe_destroy fragment_data';
bad_fragment_callback
self
~namespace_id ~object_id ~object_name
~chunk_id ~fragment_id ~version_id;
Lwt.fail_with "Checksum mismatch"
end) >>= fun () ->

Statistics.with_timing_lwt
(fun () ->
Fragment_helper.maybe_decrypt
encryption
~object_id ~chunk_id ~fragment_id
~ignore_fragment_id:replication
fragment_data')
>>= fun (t_decrypt, maybe_decrypted) ->

Statistics.with_timing_lwt
(fun () -> decompress ~release_input:true maybe_decrypted)
>>= fun (t_decompress, (maybe_decompressed : Lwt_bytes.t)) ->

let t_fragment = Statistics.({
osd_id;
retrieve = t_retrieve;
hit_or_miss;
verify = t_verify;
decrypt = t_decrypt;
decompress = t_decompress;
total = Unix.gettimeofday () -. t0_fragment;
}) in

Lwt.return (t_fragment, maybe_decompressed)

(* consumers of this method are responsible for freeing
* the returned fragment bigstrings
Expand All @@ -389,19 +286,18 @@ class client

let threads : unit Lwt.t list =
List.mapi
(fun fragment_id ((osd_id_o, version_id), fragment_checksum) ->
(fun fragment_id (location, fragment_checksum) ->
let t =
Lwt.catch
(fun () ->
self # download_fragment
~namespace_id
~osd_id_o
~location
~object_id
~object_name
~chunk_id
~fragment_id
~replication:(k=1)
~version_id
~fragment_checksum
decompress
~encryption
Expand Down Expand Up @@ -674,13 +570,13 @@ class client
let download_fragments () =
Lwt_list.map_p
(fun (fragment_id, _, _) ->
let (osd_id_o, version_id), fragment_checksum =
let location, fragment_checksum =
List.nth_exn chunk_locations fragment_id
in
self # download_fragment
~namespace_id
~object_id ~object_name
~osd_id_o ~version_id
~location
~chunk_id ~fragment_id
~replication:(k=1)
~fragment_checksum
Expand Down Expand Up @@ -917,11 +813,11 @@ class client
then fragment_size
else (object_size - offset)
in
Statistics.with_timing_lwt
with_timing_lwt
(fun () ->
hash2 # update_lwt_bytes_detached fragment 0 fragment_size')
>>= fun (t_verify', ()) ->
Statistics.with_timing_lwt
with_timing_lwt
(fun () ->
write_object_data fragment 0 fragment_size') >>= fun (t_write_data', ()) ->
Lwt.return (offset + fragment_size',
Expand Down
9 changes: 5 additions & 4 deletions ocaml/src/alba_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ See the License for the specific language governing permissions and
limitations under the License.
*)

open Lwt.Infix
open Prelude
open Fragment_cache
open Albamgr_access
open Alba_base_client
open Alba_statistics
open Alba_client_errors
open Lwt.Infix

class alba_client (base_client : Alba_base_client.client)
=
Expand Down Expand Up @@ -276,7 +277,7 @@ class alba_client (base_client : Alba_base_client.client)

=
let t0_object = Unix.gettimeofday () in
Statistics.with_timing_lwt
with_timing_lwt
(fun () -> self # get_object_manifest'
~namespace_id ~object_name
~consistent_read ~should_cache)
Expand Down Expand Up @@ -308,7 +309,7 @@ class alba_client (base_client : Alba_base_client.client)
| Error.Exn Error.NotEnoughFragments as exn ->
(* Download probably failed because of stale manifest *)
Lwt_log.info_f ~exn "retrying " >>= fun () ->
Statistics.with_timing_lwt
with_timing_lwt
(fun () ->
Manifest_cache.ManifestCache.remove
(base_client # get_manifest_cache)
Expand Down Expand Up @@ -393,7 +394,7 @@ let with_client albamgr_client_cfg
?(bad_fragment_callback = fun
alba_client
~namespace_id ~object_id ~object_name
~chunk_id ~fragment_id ~version_id -> ())
~chunk_id ~fragment_id ~location -> ())
?(albamgr_connection_pool_size = 10)
?(nsm_host_connection_pool_size = 10)
?(osd_connection_pool_size = 10)
Expand Down
Loading

0 comments on commit 3449a8c

Please sign in to comment.