Skip to content


fix: finish implementing verification report
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesPiechota committed Nov 8, 2024
1 parent 5a826be commit a2144ce
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 76 deletions.
14 changes: 14 additions & 0 deletions apps/arweave/include/ar_verify_chunks.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-define(AR_VERIFY_CHUNKS_HRL, true).

-record(verify_report, {
start_time :: non_neg_integer(),
total_error_bytes = 0 :: non_neg_integer(),
total_error_chunks = 0 :: non_neg_integer(),
error_bytes = #{} :: #{atom() => non_neg_integer()},
error_chunks = #{} :: #{atom() => non_neg_integer()},
bytes_processed = 0 :: non_neg_integer(),
progress = 0 :: non_neg_integer()

124 changes: 51 additions & 73 deletions apps/arweave/src/ar_verify_chunks.erl
Original file line number Diff line number Diff line change
@@ -1,35 +1,25 @@
%%% The blob storage optimized for fast reads.


-export([start_link/2, init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]).
-export([start_link/2, name/1]).
-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]).


-record(report, {
total_error_bytes = 0 :: non_neg_integer(),
total_error_chunks = 0 :: non_neg_integer(),
error_bytes = #{} :: #{atom() => non_neg_integer()},
error_chunks = #{} :: #{atom() => non_neg_integer()}

-record(state, {
store_id :: binary(),
packing :: binary(),
start_time :: non_neg_integer(),
start_offset :: non_neg_integer(),
end_offset :: non_neg_integer(),
cursor :: non_neg_integer(),
ready = false :: boolean(),
report = #report{} :: #report{}
verify_report = #verify_report{} :: #verify_report{}


%%% Public interface.
Expand All @@ -38,75 +28,49 @@
start_link(Name, StorageModule) ->
gen_server:start_link({local, Name}, ?MODULE, StorageModule, []).

-spec name(binary()) -> atom().
name(StoreID) ->
list_to_atom("ar_verify_chunks_" ++ ar_storage_module:label_by_id(StoreID)).

%%% Generic server callbacks.

init(StoreID) ->
?LOG_ERROR([{event, verify_chunk_storage_started}, {store_id, StoreID}]),
?LOG_INFO([{event, verify_chunk_storage_started}, {store_id, StoreID}]),
{StartOffset, EndOffset} = ar_storage_module:get_range(StoreID),
gen_server:cast(self(), verify),
ar_util:cast_after(?REPORT_PROGRESS_INTERVAL, self(), report_progress),
{ok, #state{
store_id = StoreID,
packing = ar_storage_module:get_packing(StoreID),
start_time = erlang:system_time(millisecond),
start_offset = StartOffset,
end_offset = EndOffset,
cursor = StartOffset,
ready = is_ready(EndOffset)
ready = is_ready(EndOffset),
verify_report = #verify_report{
start_time = erlang:system_time(millisecond)

handle_cast(verify, #state{ready = false, end_offset = EndOffset} = State) ->
?LOG_ERROR([{event, not_ready}]),
ar_util:cast_after(1000, self(), verify),
{noreply, State#state{ready = is_ready(EndOffset)}};
#state{cursor = Cursor, end_offset = EndOffset} = State) when Cursor >= EndOffset ->
{noreply, State};
handle_cast(verify, #state{store_id = StoreID} = State) ->
{noreply, verify(State)};

handle_cast(report_progress, #state{ready = false} = State) ->
ar_util:cast_after(?REPORT_PROGRESS_INTERVAL, self(), report_progress),
{noreply, State};
handle_cast(report_progress, #state{cursor = Cursor, end_offset = EndOffset} = State)
when Cursor >= EndOffset ->
{noreply, State};
handle_cast(report_progress, State) ->
start_time = StartTime,
cursor = Cursor,
start_offset = StartOffset,
end_offset = EndOffset,
report = Report,
store_id = StoreID
} = State,
Bytes = Cursor - StartOffset,
MB = Bytes div 1000000,
Progress = Bytes * 100 div (EndOffset - StartOffset),
ElapsedTime = (erlang:system_time(millisecond) - StartTime) div 1000,
Rate = MB div ElapsedTime,
Intersection = ar_sync_record:get_intersection_size(EndOffset, StartOffset, invalid_chunks, StoreID),
ar:console("============== ~s ==============~n", [StoreID]),
ar:console("Verified ~B GB. ~B% done. ~B elapsed. ~B MB/s.~n", [MB div 1000, Progress, ElapsedTime, Rate]),
ar:console("Missing chunks: ~B~n", [Report#report.total_error_chunks]),
ar:console("Missing bytes: ~B MB~n", [Report#report.total_error_bytes div 1000000]),
ar:console("Missing bytes (padded): ~B MB~n", [Report#report.total_error_chunks * ?DATA_CHUNK_SIZE div 1000000]),
ar:console("Report: ~p~n", [Report]),
ar:console("Intervals: ~p~n", [Intersection]),
ar_util:cast_after(?REPORT_PROGRESS_INTERVAL, self(), report_progress),
{noreply, State};
handle_cast(verify, State) ->
State2 = verify(State),
State3 = report_progress(State2),
{noreply, State3};

handle_cast(Cast, State) ->
?LOG_WARNING([{event, unhandled_cast}, {module, ?MODULE}, {cast, Cast}]),
{noreply, State}.

handle_call(Call, From, State) ->
?LOG_WARNING([{event, unhandled_call}, {module, ?MODULE}, {call, Call}, {from, From}]),
{noreply, State}.
{reply, ok, State}.

handle_info(Info, State) ->
?LOG_WARNING([{event, unhandled_info}, {module, ?MODULE}, {info, Info}]),
Expand Down Expand Up @@ -201,23 +165,23 @@ invalidate_chunk(Type, Offset, ChunkSize, Logs, State) ->
log_error(Type, Offset, ChunkSize, Logs, State).

log_error(Type, Offset, ChunkSize, Logs, State) ->
#state{ report = Report, store_id = StoreID, cursor = Cursor, packing = Packing } = State,
#state{ verify_report = Report, store_id = StoreID, cursor = Cursor, packing = Packing } = State,

LogMessage = [{event, verify_chunk_storage_error},
{type, Type}, {store_id, StoreID},
{packing, ar_serialize:encode_packing(Packing, true)},
{offset, Offset}, {cursor, Cursor}, {chunk_size, ChunkSize}] ++ Logs,
NewBytes = maps:get(Type, Report#report.error_bytes, 0) + ChunkSize,
NewChunks = maps:get(Type, Report#report.error_chunks, 0) + 1,

Report2 = Report#report{
total_error_bytes = Report#report.total_error_bytes + ChunkSize,
total_error_chunks = Report#report.total_error_chunks + 1,
error_bytes = maps:put(Type, NewBytes, Report#report.error_bytes),
error_chunks = maps:put(Type, NewChunks, Report#report.error_chunks)
NewBytes = maps:get(Type, Report#verify_report.error_bytes, 0) + ChunkSize,
NewChunks = maps:get(Type, Report#verify_report.error_chunks, 0) + 1,

Report2 = Report#verify_report{
total_error_bytes = Report#verify_report.total_error_bytes + ChunkSize,
total_error_chunks = Report#verify_report.total_error_chunks + 1,
error_bytes = maps:put(Type, NewBytes, Report#verify_report.error_bytes),
error_chunks = maps:put(Type, NewChunks, Report#verify_report.error_chunks)
State#state{ report = Report2 }.
State#state{ verify_report = Report2 }.

query_intervals(State) ->
#state{cursor = Cursor, store_id = StoreID} = State,
Expand Down Expand Up @@ -264,6 +228,20 @@ check_interval({End, Start}) when Start > End ->
check_interval(Interval) ->

report_progress(State) ->
store_id = StoreID, verify_report = Report, cursor = Cursor,
start_offset = StartOffset, end_offset = EndOffset
} = State,
BytesProcessed = Cursor - StartOffset,
Progress = BytesProcessed * 100 div (EndOffset - StartOffset),
Report2 = Report#verify_report{
bytes_processed = BytesProcessed,
progress = Progress
ar_verify_chunks_reporter:update(StoreID, Report2),
State#state{ verify_report = Report2 }.

%% ar_chunk_storage does not store small chunks before strict_split_data_threshold
%% (before 30607159107830 = partitions 0-7 and a half of 8
Expand Down Expand Up @@ -313,8 +291,8 @@ verify_chunk_test_() ->
fun test_verify_chunk/0

test_align_intervals() ->
{not_found, not_found},
Expand Down Expand Up @@ -402,7 +380,7 @@ test_verify_chunk_storage_should_store() ->
Addr = crypto:strong_rand_bytes(32),
ExpectedState = #state{
packing = unpacked,
report = #report{
verify_report = #verify_report{
total_error_bytes = ?DATA_CHUNK_SIZE,
total_error_chunks = 1,
error_bytes = #{chunk_storage_gap => ?DATA_CHUNK_SIZE},
Expand All @@ -426,7 +404,7 @@ test_verify_chunk_storage_should_store() ->
packing = {composite, Addr, 1},
report = #report{
verify_report = #verify_report{
total_error_bytes = ?DATA_CHUNK_SIZE div 2,
total_error_chunks = 1,
error_bytes = #{chunk_storage_gap => ?DATA_CHUNK_SIZE div 2},
Expand Down Expand Up @@ -463,7 +441,7 @@ test_verify_chunk_storage_should_not_store() ->
test_verify_proof_no_datapath() ->
ExpectedState1 = #state{
packing = unpacked,
report = #report{
verify_report = #verify_report{
total_error_bytes = ?DATA_CHUNK_SIZE,
total_error_chunks = 1,
error_bytes = #{read_data_path_error => ?DATA_CHUNK_SIZE},
Expand All @@ -472,7 +450,7 @@ test_verify_proof_no_datapath() ->
ExpectedState2 = #state{
packing = unpacked,
report = #report{
verify_report = #verify_report{
total_error_bytes = ?DATA_CHUNK_SIZE div 2,
total_error_chunks = 1,
error_bytes = #{read_data_path_error => ?DATA_CHUNK_SIZE div 2},
Expand Down Expand Up @@ -502,7 +480,7 @@ test_verify_proof_valid_paths() ->
test_verify_proof_invalid_paths() ->
ExpectedState1 = #state{
packing = unpacked,
report = #report{
verify_report = #verify_report{
total_error_bytes = ?DATA_CHUNK_SIZE,
total_error_chunks = 1,
error_bytes = #{validate_paths_error => ?DATA_CHUNK_SIZE},
Expand All @@ -511,7 +489,7 @@ test_verify_proof_invalid_paths() ->
ExpectedState2 = #state{
packing = unpacked,
report = #report{
verify_report = #verify_report{
total_error_bytes = ?DATA_CHUNK_SIZE div 2,
total_error_chunks = 1,
error_bytes = #{validate_paths_error => ?DATA_CHUNK_SIZE div 2},
Expand Down Expand Up @@ -550,7 +528,7 @@ test_verify_chunk() ->
ExpectedState = #state{
packing = unpacked,
report = #report{
verify_report = #verify_report{
total_error_bytes = ?DATA_CHUNK_SIZE,
total_error_chunks = 1,
error_bytes = #{get_chunk_error => ?DATA_CHUNK_SIZE},
Expand Down
103 changes: 103 additions & 0 deletions apps/arweave/src/ar_verify_chunks_reporter.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
%%% The blob storage optimized for fast reads.


-export([start_link/0, update/2]).
-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]).


-record(state, {
reports = #{} :: #{binary() => #verify_report{}}


%%% Public interface.

%% @doc Start the server.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

-spec update(binary(), #verify_report{}) -> ok.
update(StoreID, Report) ->
gen_server:cast(?MODULE, {update, StoreID, Report}).

%%% Generic server callbacks.

init([]) ->
ar_util:cast_after(?REPORT_PROGRESS_INTERVAL, self(), report_progress),
{ok, #state{}}.

handle_cast({update, StoreID, Report}, State) ->
{noreply, State#state{ reports = maps:put(StoreID, Report, State#state.reports) }};

handle_cast(report_progress, State) ->
reports = Reports
} = State,

ar_util:cast_after(?REPORT_PROGRESS_INTERVAL, self(), report_progress),
{noreply, State};

handle_cast(Cast, State) ->
?LOG_WARNING([{event, unhandled_cast}, {module, ?MODULE}, {cast, Cast}]),
{noreply, State}.

handle_call(Call, From, State) ->
?LOG_WARNING([{event, unhandled_call}, {module, ?MODULE}, {call, Call}, {from, From}]),
{reply, ok, State}.

handle_info(Info, State) ->
?LOG_WARNING([{event, unhandled_info}, {module, ?MODULE}, {info, Info}]),
{noreply, State}.

terminate(_Reason, _State) ->

print_reports(Reports) when map_size(Reports) == 0 ->
print_reports(Reports) ->
fun(StoreID, Report) ->
print_report(StoreID, Report)

print_header() ->
ar:console("Verification Report~n", []),
ar:console("+-------------------------------------------------------------------+-----------+------+----------+-------------+~n", []),
ar:console("| Storage Module | Processed | % | Errors | Verify Rate |~n", []),
ar:console("+-------------------------------------------------------------------+-----------+------+----------+-------------+~n", []).

print_footer() ->
ar:console("+-------------------------------------------------------------------+-----------+------+----------+-------------+~n~n", []).

print_report(StoreID, Report) ->
total_error_bytes = TotalErrorBytes,
bytes_processed = BytesProcessed,
progress = Progress,
start_time = StartTime
} = Report,
Duration = erlang:system_time(millisecond) - StartTime,
Rate = 1000 * BytesProcessed / Duration,
ar:console("| ~65s | ~4B GB | ~3B% | ~5.1f GB | ~6.1f MB/s |~n",
StoreID, BytesProcessed div 1000000000, Progress,
TotalErrorBytes / 1000000000, Rate / 1000000
6 changes: 3 additions & 3 deletions apps/arweave/src/ar_verify_chunks_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ init([]) ->
Workers = lists:map(
fun(StorageModule) ->
StoreID = ar_storage_module:id(StorageModule),
Label = ar_storage_module:label(StorageModule),
Name = list_to_atom("ar_verify_chunks_" ++ Label),
Name = ar_verify_chunks:name(StoreID),
?CHILD_WITH_ARGS(ar_verify_chunks, worker, Name, [Name, StoreID])
{ok, {{one_for_one, 5, 10}, Workers}}
Reporter = ?CHILD(ar_verify_chunks_reporter, worker),
{ok, {{one_for_one, 5, 10}, [Reporter | Workers]}}

0 comments on commit a2144ce

Please sign in to comment.