diff --git a/apps/arweave/src/ar_chunk_storage.erl b/apps/arweave/src/ar_chunk_storage.erl index 45a26e3fc..f0e34a4d9 100644 --- a/apps/arweave/src/ar_chunk_storage.erl +++ b/apps/arweave/src/ar_chunk_storage.erl @@ -1,9 +1,9 @@ -%%% The blob storage optimized for fast reads. +%% The blob storage optimized for fast reads. -module(ar_chunk_storage). -behaviour(gen_server). --export([start_link/2, put/2, put/3, +-export([start_link/2, name/1, is_storage_supported/3, put/2, put/3, open_files/1, get/1, get/2, get/5, read_chunk2/5, get_range/2, get_range/3, close_file/2, close_files/1, cut/2, delete/1, delete/2, list_files/2, run_defragmentation/0]). @@ -36,6 +36,34 @@ start_link(Name, StoreID) -> gen_server:start_link({local, Name}, ?MODULE, StoreID, []). +%% @doc Return the name of the server serving the given StoreID. +name(StoreID) -> + list_to_atom("ar_chunk_storage_" ++ ar_storage_module:label_by_id(StoreID)). + +%% @doc Return true if we can accept the chunk for storage. +%% 256 KiB chunks are stored in the blob storage optimized for read speed. +%% Unpacked chunks smaller than 256 KiB cannot be stored here currently, +%% because the module does not keep track of the chunk sizes - all chunks +%% are assumed to be 256 KiB. +-spec is_storage_supported( + Offset :: non_neg_integer(), + ChunkSize :: non_neg_integer(), + Packing :: term() +) -> true | false. + +is_storage_supported(Offset, ChunkSize, Packing) -> + case Offset > ?STRICT_DATA_SPLIT_THRESHOLD of + true -> + %% All chunks above ?STRICT_DATA_SPLIT_THRESHOLD are placed in 256 KiB buckets + %% so technically can be stored in ar_chunk_storage. However, to avoid + %% managing padding in ar_chunk_storage for unpacked chunks smaller than 256 KiB + %% (we do not need fast random access to unpacked chunks after + %% ?STRICT_DATA_SPLIT_THRESHOLD anyways), we put them to RocksDB. + Packing /= unpacked orelse ChunkSize == (?DATA_CHUNK_SIZE); + false -> + ChunkSize == (?DATA_CHUNK_SIZE) + end. + %% @doc Store the chunk under the given end offset, %% bytes Offset - ?DATA_CHUNK_SIZE, Offset - ?DATA_CHUNK_SIZE + 1, .., Offset - 1. put(PaddedOffset, Chunk) -> @@ -287,8 +315,8 @@ handle_cast({repack, Start, End, NextCursor, RightBound, Packing}, spawn(fun() -> repack(Start, End, NextCursor, RightBound, Packing, StoreID) end), {noreply, State}; -handle_cast({register_packing_ref, Ref, Offset}, #state{ packing_map = Map } = State) -> - {noreply, State#state{ packing_map = maps:put(Ref, Offset, Map) }}; +handle_cast({register_packing_ref, Ref, Args}, #state{ packing_map = Map } = State) -> + {noreply, State#state{ packing_map = maps:put(Ref, Args, Map) }}; handle_cast({expire_repack_request, Ref}, #state{ packing_map = Map } = State) -> {noreply, State#state{ packing_map = maps:remove(Ref, Map) }}; @@ -343,33 +371,69 @@ handle_info({chunk, {packed, Ref, ChunkArgs}}, case maps:get(Ref, Map, not_found) of not_found -> {noreply, State}; - Offset -> + Args -> State2 = State#state{ packing_map = maps:remove(Ref, Map) }, - {Packing, Chunk, _, _, _} = ChunkArgs, - case ar_sync_record:delete(Offset, Offset - ?DATA_CHUNK_SIZE, - ar_data_sync, StoreID) of + {Packing, Chunk, Offset, _, ChunkSize} = ChunkArgs, + StartOffset = Offset - ?DATA_CHUNK_SIZE, + RemoveFromSyncRecordResult = ar_sync_record:delete(Offset, + StartOffset, ar_data_sync, StoreID), + IsStorageSupported = + case RemoveFromSyncRecordResult of + ok -> + is_storage_supported(Offset, ChunkSize, Packing); + Error -> + Error + end, + RemoveFromChunkStorageSyncRecordResult = + case IsStorageSupported of + true -> + store; + false -> + %% Based on the new packing we do not want to + %% store the chunk in the chunk storage anymore so + %% we also remove the record from the + %% chunk-storage specific sync record and + %% send the chunk to the corresponding ar_data_sync + %% module to store it in RocksDB. + ar_sync_record:delete(Offset, StartOffset, + ?MODULE, StoreID); + Error2 -> + Error2 + end, + case RemoveFromChunkStorageSyncRecordResult of ok -> + DataSyncServer = ar_data_sync:name(StoreID), + gen_server:cast(DataSyncServer, + {store_chunk, ChunkArgs, Args}), + {noreply, State2#state{ repack_cursor = Offset, + prev_repack_cursor = PrevCursor }}; + store -> case handle_store_chunk(Offset, Chunk, FileIndex, StoreID) of {ok, FileIndex2} -> ar_sync_record:add_async(repacked_chunk, - Offset, Offset - ?DATA_CHUNK_SIZE, + Offset, StartOffset, Packing, ar_data_sync, StoreID), {noreply, State2#state{ file_index = FileIndex2, - repack_cursor = Offset, prev_repack_cursor = PrevCursor }}; - Error2 -> + repack_cursor = Offset, + prev_repack_cursor = PrevCursor }}; + Error3 -> + PackingStr = ar_serialize:encode_packing(Packing, true), ?LOG_ERROR([{event, failed_to_store_repacked_chunk}, + {type, repack_in_place}, {storage_module, StoreID}, {offset, Offset}, - {packing, ar_serialize:encode_packing(Packing, true)}, - {error, io_lib:format("~p", [Error2])}]), + {packing, PackingStr}, + {error, io_lib:format("~p", [Error3])}]), {noreply, State2} end; - Error3 -> - ?LOG_ERROR([{event, failed_to_remove_repacked_chunk_from_sync_record}, + Error4 -> + PackingStr = ar_serialize:encode_packing(Packing, true), + ?LOG_ERROR([{event, failed_to_store_repacked_chunk}, + {type, repack_in_place}, {storage_module, StoreID}, {offset, Offset}, - {packing, ar_serialize:encode_packing(Packing, true)}, - {error, io_lib:format("~p", [Error3])}]), + {packing, PackingStr}, + {error, io_lib:format("~p", [Error4])}]), {noreply, State2} end end; @@ -397,7 +461,7 @@ get_chunk_group_size() -> Config#config.chunk_storage_file_size. read_repack_cursor(StoreID, TargetPacking) -> - Filepath = get_filepath("repack_in_place_cursor", StoreID), + Filepath = get_filepath("repack_in_place_cursor2", StoreID), case file:read_file(Filepath) of {ok, Bin} -> case catch binary_to_term(Bin) of @@ -411,7 +475,7 @@ read_repack_cursor(StoreID, TargetPacking) -> end. remove_repack_cursor(StoreID) -> - Filepath = get_filepath("repack_in_place_cursor", StoreID), + Filepath = get_filepath("repack_in_place_cursor2", StoreID), case file:delete(Filepath) of ok -> ok; @@ -424,7 +488,7 @@ remove_repack_cursor(StoreID) -> store_repack_cursor(0, _StoreID, _TargetPacking) -> ok; store_repack_cursor(Cursor, StoreID, TargetPacking) -> - Filepath = get_filepath("repack_in_place_cursor", StoreID), + Filepath = get_filepath("repack_in_place_cursor2", StoreID), file:write_file(Filepath, term_to_binary({Cursor, TargetPacking})). get_filepath(Name, StoreID) -> @@ -758,23 +822,26 @@ chunk_offset_list_to_map(ChunkOffsets) -> chunk_offset_list_to_map(ChunkOffsets, infinity, 0, #{}). repack(Cursor, RightBound, Packing, StoreID) -> - case ar_sync_record:get_next_synced_interval(Cursor, RightBound, ?MODULE, StoreID) of + case ar_sync_record:get_next_synced_interval(Cursor, RightBound, + ar_data_sync, StoreID) of not_found -> ar:console("~n~nRepacking of ~s is complete! " "We suggest you stop the node, rename " - "the storage module folder to reflect the new packing, and start the " + "the storage module folder to reflect " + "the new packing, and start the " "node with the new storage module.~n", [StoreID]), ?LOG_INFO([{event, repacking_complete}, {storage_module, StoreID}, - {target_packing, ar_serialize:encode_packing(Packing, true)}]), + {target_packing, + ar_serialize:encode_packing(Packing, true)}]), Server = list_to_atom("ar_chunk_storage_" ++ ar_storage_module:label_by_id(StoreID)), gen_server:cast(Server, repacking_complete), ok; {End, Start} -> Start2 = max(Cursor, Start), - case ar_sync_record:get_next_synced_interval(Start2, End, Packing, ar_data_sync, - StoreID) of + case ar_sync_record:get_next_synced_interval(Start2, End, + Packing, ar_data_sync, StoreID) of not_found -> repack(Start2, End, End, RightBound, Packing, StoreID); {End3, Start3} when Start3 > Start2 -> @@ -789,12 +856,16 @@ repack(Start, End, NextCursor, RightBound, Packing, StoreID) when Start >= End - repack(Start, End, NextCursor, RightBound, RequiredPacking, StoreID) -> {ok, Config} = application:get_env(arweave, config), RepackIntervalSize = ?DATA_CHUNK_SIZE * Config#config.repack_batch_size, - Server = list_to_atom("ar_chunk_storage_" ++ ar_storage_module:label_by_id(StoreID)), + Server = name(StoreID), + Start2 = Start + RepackIntervalSize, + RepackFurtherArgs = {repack, Start2, End, NextCursor, RightBound, + RequiredPacking}, CheckPackingBuffer = case ar_packing_server:is_buffer_full() of true -> ar_util:cast_after(200, Server, - {repack, Start, End, NextCursor, RightBound, RequiredPacking}), + {repack, Start, End, + NextCursor, RightBound, RequiredPacking}), continue; false -> ok @@ -804,104 +875,158 @@ repack(Start, End, NextCursor, RightBound, RequiredPacking, StoreID) -> continue -> continue; ok -> - case catch get_range(Start, RepackIntervalSize, StoreID) of - [] -> - Start2 = Start + RepackIntervalSize, - gen_server:cast(Server, {repack, Start2, End, NextCursor, RightBound, - RequiredPacking}), - continue; - {'EXIT', _Exc} -> - ?LOG_ERROR([{event, failed_to_read_chunk_range}, - {storage_module, StoreID}, - {start, Start}, - {size, RepackIntervalSize}, - {store_id, StoreID}]), - Start2 = Start + RepackIntervalSize, - gen_server:cast(Server, {repack, Start2, End, NextCursor, RightBound, - RequiredPacking}), - continue; - Range -> - {ok, Range} - end + repack_read_chunk_range(Start, RepackIntervalSize, + StoreID, RepackFurtherArgs) end, ReadMetadataRange = case ReadRange of continue -> continue; {ok, Range2} -> - {Min, Max, Map} = chunk_offset_list_to_map(Range2), - case ar_data_sync:get_chunk_metadata_range(Min, min(Max, End), StoreID) of - {ok, MetadataMap} -> - {ok, Map, MetadataMap}; - {error, Error} -> - ?LOG_ERROR([{event, failed_to_read_chunk_metadata_range}, - {storage_module, StoreID}, - {error, io_lib:format("~p", [Error])}, - {left, Min}, - {right, Max}]), - Start3 = Start + RepackIntervalSize, - gen_server:cast(Server, {repack, Start3, End, NextCursor, RightBound, - RequiredPacking}), - continue - end + repack_read_chunk_metadata_range(Start, RepackIntervalSize, End, + Range2, StoreID, RepackFurtherArgs) end, case ReadMetadataRange of continue -> ok; {ok, Map2, MetadataMap2} -> - Start4 = Start + RepackIntervalSize, - gen_server:cast(Server, {repack, Start4, End, NextCursor, RightBound, - RequiredPacking}), - maps:fold( - fun (AbsoluteOffset, {_, _TXRoot, _, _, _, ChunkSize}, ok) - when ChunkSize /= ?DATA_CHUNK_SIZE, - AbsoluteOffset =< ?STRICT_DATA_SPLIT_THRESHOLD -> - ok; - (AbsoluteOffset, {_, TXRoot, _, _, _, ChunkSize}, ok) -> - PaddedOffset = ar_data_sync:get_chunk_padded_offset(AbsoluteOffset), - case ar_sync_record:is_recorded(PaddedOffset, ar_data_sync, StoreID) of - {true, RequiredPacking} -> - ?LOG_WARNING([{event, - repacking_process_chunk_already_repacked}, - {storage_module, StoreID}, - {packing, - ar_serialize:encode_packing(RequiredPacking,true)}, - {offset, AbsoluteOffset}]), - ok; - {true, Packing} -> - case maps:get(PaddedOffset, Map2, not_found) of - not_found -> - ?LOG_WARNING([{event, - chunk_not_found_in_chunk_storage}, - {storage_module, StoreID}, - {offset, PaddedOffset}]), - ok; - Chunk -> - Ref = make_ref(), - gen_server:cast(Server, - {register_packing_ref, Ref, PaddedOffset}), - ar_util:cast_after(300000, Server, - {expire_repack_request, Ref}), - ar_packing_server:request_repack(Ref, whereis(Server), - {RequiredPacking, Packing, Chunk, - AbsoluteOffset, TXRoot, ChunkSize}), - ok - end; - true -> - ?LOG_WARNING([{event, no_packing_information_for_the_chunk}, - {storage_module, StoreID}, - {offset, PaddedOffset}]), - ok; + gen_server:cast(Server, {repack, Start2, End, NextCursor, + RightBound, RequiredPacking}), + Args = {StoreID, RequiredPacking, Map2}, + repack_send_chunks_for_repacking(MetadataMap2, Args) + end. + +repack_read_chunk_range(Start, Size, StoreID, RepackFurtherArgs) -> + Server = name(StoreID), + case catch get_range(Start, Size, StoreID) of + [] -> + gen_server:cast(Server, RepackFurtherArgs), + continue; + {'EXIT', _Exc} -> + ?LOG_ERROR([{event, failed_to_read_chunk_range}, + {storage_module, StoreID}, + {start, Start}, + {size, Size}]), + gen_server:cast(Server, RepackFurtherArgs), + continue; + Range -> + {ok, Range} + end. + +repack_read_chunk_metadata_range(Start, Size, End, + Range, StoreID, RepackFurtherArgs) -> + Server = name(StoreID), + End2 = min(Start + Size, End), + {_, _, Map} = chunk_offset_list_to_map(Range), + case ar_data_sync:get_chunk_metadata_range(Start, End2, StoreID) of + {ok, MetadataMap} -> + {ok, Map, MetadataMap}; + {error, Error} -> + ?LOG_ERROR([{event, failed_to_read_chunk_metadata_range}, + {storage_module, StoreID}, + {error, io_lib:format("~p", [Error])}]), + gen_server:cast(Server, RepackFurtherArgs), + continue + end. + +repack_send_chunks_for_repacking(MetadataMap, Args) -> + maps:fold(repack_send_chunks_for_repacking(Args), ok, MetadataMap). + +repack_send_chunks_for_repacking(Args) -> + fun (AbsoluteOffset, {_, _TXRoot, _, _, _, ChunkSize}, ok) + when ChunkSize /= ?DATA_CHUNK_SIZE, + AbsoluteOffset =< ?STRICT_DATA_SPLIT_THRESHOLD -> + ok; + (AbsoluteOffset, ChunkMeta, ok) -> + repack_send_chunk_for_repacking(AbsoluteOffset, ChunkMeta, Args) + end. + +repack_send_chunk_for_repacking(AbsoluteOffset, ChunkMeta, Args) -> + {StoreID, RequiredPacking, ChunkMap} = Args, + Server = name(StoreID), + PaddedOffset = ar_data_sync:get_chunk_padded_offset(AbsoluteOffset), + {ChunkDataKey, TXRoot, DataRoot, TXPath, + RelativeOffset, ChunkSize} = ChunkMeta, + case ar_sync_record:is_recorded(PaddedOffset, ar_data_sync, StoreID) of + {true, RequiredPacking} -> + ?LOG_WARNING([{event, repacking_process_chunk_already_repacked}, + {storage_module, StoreID}, + {packing, + ar_serialize:encode_packing(RequiredPacking, true)}, + {offset, AbsoluteOffset}]), + ok; + {true, Packing} -> + ChunkMaybeDataPath = + case maps:get(PaddedOffset, ChunkMap, not_found) of + not_found -> + repack_read_chunk_and_data_path(StoreID, + ChunkDataKey, AbsoluteOffset, no_chunk); + Chunk3 -> + case is_storage_supported(AbsoluteOffset, + ChunkSize, Packing) of false -> - ?LOG_WARNING([{event, chunk_not_found_in_sync_record}, - {storage_module, StoreID}, - {offset, PaddedOffset}]), - ok + %% We are going to move this chunk to + %% RocksDB after repacking so we read + %% its DataPath here to pass it later on + %% to store_chunk. + repack_read_chunk_and_data_path(StoreID, + ChunkDataKey, AbsoluteOffset, Chunk3); + true -> + %% We are going to repack the chunk and keep it + %% in the chunk storage - no need to make an + %% extra disk access to read the data path. + {Chunk3, none} end end, - ok, - MetadataMap2 - ) + case ChunkMaybeDataPath of + not_found -> + ok; + {Chunk, MaybeDataPath} -> + Ref = make_ref(), + RepackArgs = {Packing, MaybeDataPath, RelativeOffset, + DataRoot, TXPath, none, none}, + gen_server:cast(Server, + {register_packing_ref, Ref, RepackArgs}), + ar_util:cast_after(300000, Server, + {expire_repack_request, Ref}), + ar_packing_server:request_repack(Ref, whereis(Server), + {RequiredPacking, Packing, Chunk, + AbsoluteOffset, TXRoot, ChunkSize}) + end; + true -> + ?LOG_WARNING([{event, no_packing_information_for_the_chunk}, + {storage_module, StoreID}, + {offset, PaddedOffset}]), + ok; + false -> + ?LOG_WARNING([{event, chunk_not_found_in_sync_record}, + {storage_module, StoreID}, + {offset, PaddedOffset}]), + ok + end. + +repack_read_chunk_and_data_path(StoreID, ChunkDataKey, AbsoluteOffset, + MaybeChunk) -> + case ar_kv:get({chunk_data_db, StoreID}, ChunkDataKey) of + not_found -> + ?LOG_WARNING([{event, chunk_not_found}, + {type, repack_in_place}, + {storage_module, StoreID}, + {offset, AbsoluteOffset}]), + not_found; + {ok, V} -> + case binary_to_term(V) of + {Chunk, DataPath} -> + {Chunk, DataPath}; + DataPath when MaybeChunk /= no_chunk -> + {MaybeChunk, DataPath}; + _ -> + ?LOG_WARNING([{event, chunk_not_found2}, + {type, repack_in_place}, + {storage_module, StoreID}, + {offset, AbsoluteOffset}]), + not_found + end end. chunk_offset_list_to_map([], Min, Max, Map) -> diff --git a/apps/arweave/src/ar_data_sync.erl b/apps/arweave/src/ar_data_sync.erl index 9d3a0e5c4..c6d1c6983 100644 --- a/apps/arweave/src/ar_data_sync.erl +++ b/apps/arweave/src/ar_data_sync.erl @@ -12,7 +12,7 @@ get_chunk_by_byte/2, get_chunk_seek_offset/1, read_chunk/4, read_data_path/2, increment_chunk_cache_size/0, decrement_chunk_cache_size/0, get_chunk_padded_offset/1, get_chunk_metadata_range/3, - get_merkle_rebase_threshold/0, should_store_in_chunk_storage/3]). + get_merkle_rebase_threshold/0]). -export([debug_get_disk_pool_chunks/0]). @@ -867,6 +867,16 @@ handle_cast({pack_and_store_chunk, Args} = Cast, {noreply, State} end; +handle_cast({store_chunk, ChunkArgs, Args} = Cast, + #sync_data_state{ store_id = StoreID } = State) -> + case is_disk_space_sufficient(StoreID) of + true -> + {noreply, store_chunk(ChunkArgs, Args, State)}; + _ -> + ar_util:cast_after(30000, self(), Cast), + {noreply, State} + end; + %% Schedule syncing of the unsynced intervals. Choose a peer for each of the intervals. %% There are two message payloads: %% 1. collect_peer_intervals @@ -2736,7 +2746,7 @@ write_chunk(Offset, ChunkDataKey, Chunk, ChunkSize, DataPath, Packing, State) -> write_not_blacklisted_chunk(Offset, ChunkDataKey, Chunk, ChunkSize, DataPath, Packing, State) -> #sync_data_state{ chunk_data_db = ChunkDataDB, store_id = StoreID } = State, - ShouldStoreInChunkStorage = should_store_in_chunk_storage(Offset, ChunkSize, Packing), + ShouldStoreInChunkStorage = ar_chunk_storage:is_storage_supported(Offset, ChunkSize, Packing), Result = case ShouldStoreInChunkStorage of true -> @@ -2757,21 +2767,6 @@ write_not_blacklisted_chunk(Offset, ChunkDataKey, Chunk, ChunkSize, DataPath, Pa Result end. -%% @doc 256 KiB chunks are stored in the blob storage optimized for read speed. -%% Return true if we want to place the chunk there. -should_store_in_chunk_storage(Offset, ChunkSize, Packing) -> - case Offset > ?STRICT_DATA_SPLIT_THRESHOLD of - true -> - %% All chunks above ?STRICT_DATA_SPLIT_THRESHOLD are placed in 256 KiB buckets - %% so technically can be stored in ar_chunk_storage. However, to avoid - %% managing padding in ar_chunk_storage for unpacked chunks smaller than 256 KiB - %% (we do not need fast random access to unpacked chunks after - %% ?STRICT_DATA_SPLIT_THRESHOLD anyways), we put them to RocksDB. - Packing /= unpacked orelse ChunkSize == (?DATA_CHUNK_SIZE); - false -> - ChunkSize == (?DATA_CHUNK_SIZE) - end. - update_chunks_index(Args, State) -> AbsoluteChunkOffset = element(1, Args), case ar_tx_blacklist:is_byte_blacklisted(AbsoluteChunkOffset) of diff --git a/apps/arweave/src/ar_verify_chunks.erl b/apps/arweave/src/ar_verify_chunks.erl index f9ca015a4..5b4449057 100644 --- a/apps/arweave/src/ar_verify_chunks.erl +++ b/apps/arweave/src/ar_verify_chunks.erl @@ -150,7 +150,7 @@ verify_chunk_storage(PaddedOffset, _ChunkSize, {End, Start}, State) State; verify_chunk_storage(PaddedOffset, ChunkSize, _Interval, State) -> #state{ packing = Packing } = State, - case ar_data_sync:should_store_in_chunk_storage(PaddedOffset, ChunkSize, Packing) of + case ar_chunk_storage:is_storage_supported(PaddedOffset, ChunkSize, Packing) of true -> invalidate_chunk(chunk_storage_gap, PaddedOffset, ChunkSize, State); false ->