Skip to content
This repository was archived by the owner on Feb 21, 2024. It is now read-only.

Commit 1ff36db

Browse files
authored
Merge pull request paritytech#152 from subspace/archive-reconstructor
Add archive reconstructor for archived segments
2 parents 5cab601 + e5d8fde commit 1ff36db

File tree

7 files changed

+633
-9
lines changed

7 files changed

+633
-9
lines changed

crates/subspace-archiving/src/archiver.rs

+2-8
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
// limitations under the License.
1515

1616
use crate::merkle_tree::{MerkleTree, Witness};
17+
use crate::utils;
1718
use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
1819
use reed_solomon_erasure::galois_16::ReedSolomon;
1920
use serde::{Deserialize, Serialize};
@@ -598,16 +599,9 @@ impl Archiver {
598599
segment
599600
};
600601

601-
fn slice_to_arrays(slice: &[u8]) -> Vec<[u8; 2]> {
602-
slice
603-
.chunks_exact(2)
604-
.map(|s| s.try_into().unwrap())
605-
.collect()
606-
}
607-
608602
let data_shards: Vec<Vec<[u8; 2]>> = segment
609603
.chunks_exact(self.record_size)
610-
.map(slice_to_arrays)
604+
.map(utils::slice_to_arrays)
611605
.collect();
612606

613607
drop(segment);

crates/subspace-archiving/src/lib.rs

+4
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,7 @@
2222
pub mod archiver;
2323
#[cfg(feature = "std")]
2424
pub mod merkle_tree;
25+
#[cfg(feature = "std")]
26+
pub mod reconstructor;
27+
#[cfg(feature = "std")]
28+
mod utils;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
use crate::archiver::{Segment, SegmentItem};
2+
use crate::utils;
3+
use parity_scale_codec::Decode;
4+
use reed_solomon_erasure::galois_16::ReedSolomon;
5+
use std::mem;
6+
use subspace_core_primitives::{
7+
ArchivedBlockProgress, LastArchivedBlock, Piece, RootBlock, PIECE_SIZE, SHA256_HASH_SIZE,
8+
};
9+
use thiserror::Error;
10+
11+
/// Reconstructor-related instantiation error.
12+
#[derive(Debug, Error, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
13+
pub enum ReconstructorInstantiationError {
14+
/// Segment size is not bigger than record size
15+
#[error("Segment size is not bigger than record size")]
16+
SegmentSizeTooSmall,
17+
/// Segment size is not a multiple of record size
18+
#[error("Segment size is not a multiple of record size")]
19+
SegmentSizesNotMultipleOfRecordSize,
20+
/// Wrong record and segment size, it will not be possible to produce pieces
21+
#[error("Wrong record and segment size, it will not be possible to produce pieces")]
22+
WrongRecordAndSegmentCombination,
23+
}
24+
25+
/// Reconstructor-related instantiation error
26+
#[derive(Debug, Error, Clone, PartialEq)]
27+
pub enum ReconstructorError {
28+
/// Segment size is not bigger than record size
29+
#[error("Error during data shards reconstruction: {0}")]
30+
DataShardsReconstruction(reed_solomon_erasure::Error),
31+
/// Segment size is not bigger than record size
32+
#[error("Error during segment decoding: {0}")]
33+
SegmentDecoding(parity_scale_codec::Error),
34+
/// Incorrect segment order, each next segment must have monotonically increasing segment index
35+
#[error("Incorrect segment order, expected index {expected_segment_index}, actual {actual_segment_index}")]
36+
IncorrectSegmentOrder {
37+
expected_segment_index: u64,
38+
actual_segment_index: u64,
39+
},
40+
}
41+
42+
/// Data structure that contains information reconstructed from given segment (potentially using
43+
/// information from segments that were added previously)
44+
#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
45+
pub struct ReconstructedContents {
46+
/// Root block stored in a segment
47+
pub root_block: Option<RootBlock>,
48+
/// Reconstructed encoded blocks with their block numbers
49+
pub blocks: Vec<(u32, Vec<u8>)>,
50+
}
51+
52+
/// Reconstructor helps to retrieve blocks from archived pieces.
53+
#[derive(Debug)]
54+
pub struct Reconstructor {
55+
/// Configuration parameter defining the size of one record (data in one piece excluding witness
56+
/// size)
57+
record_size: usize,
58+
/// Configuration parameter defining the size of one recorded history segment
59+
segment_size: usize,
60+
/// Erasure coding data structure
61+
reed_solomon: ReedSolomon,
62+
/// Index of last segment added to reconstructor
63+
last_segment_index: Option<u64>,
64+
/// Partially reconstructed block waiting for more data
65+
partial_block: Option<Vec<u8>>,
66+
}
67+
68+
impl Reconstructor {
69+
pub fn new(
70+
record_size: usize,
71+
segment_size: usize,
72+
) -> Result<Self, ReconstructorInstantiationError> {
73+
if segment_size <= record_size {
74+
return Err(ReconstructorInstantiationError::SegmentSizeTooSmall);
75+
}
76+
if segment_size % record_size != 0 {
77+
return Err(ReconstructorInstantiationError::SegmentSizesNotMultipleOfRecordSize);
78+
}
79+
80+
// We take N data records and will creates the same number of parity records, hence `*2`
81+
let merkle_num_leaves = segment_size / record_size * 2;
82+
let witness_size = SHA256_HASH_SIZE * merkle_num_leaves.log2() as usize;
83+
if record_size + witness_size != PIECE_SIZE {
84+
return Err(ReconstructorInstantiationError::WrongRecordAndSegmentCombination);
85+
}
86+
87+
let shards = segment_size / record_size;
88+
let reed_solomon = ReedSolomon::new(shards, shards)
89+
.expect("ReedSolomon should always be correctly instantiated");
90+
91+
Ok(Self {
92+
record_size,
93+
segment_size,
94+
reed_solomon,
95+
last_segment_index: None,
96+
partial_block: None,
97+
})
98+
}
99+
100+
/// Given a set of pieces of a segment of the archived history (any half of all pieces are
101+
/// required to be present, the rest will be recovered automatically due to use of erasure
102+
/// coding if needed), reconstructs and returns root block and a list of encoded blocks with
103+
/// corresponding block numbers.
104+
///
105+
/// It is possible to start with any segment, but when next segment is pushed, it needs to
106+
/// follow the previous one or else error will be returned.
107+
pub fn add_segment(
108+
&mut self,
109+
segment_pieces: &[Option<Piece>],
110+
) -> Result<ReconstructedContents, ReconstructorError> {
111+
let mut segment_data = Vec::with_capacity(self.segment_size);
112+
if !segment_pieces
113+
.iter()
114+
.take(self.reed_solomon.data_shard_count())
115+
.all(|maybe_piece| {
116+
if let Some(piece) = maybe_piece {
117+
segment_data.extend_from_slice(&piece[..self.record_size]);
118+
true
119+
} else {
120+
false
121+
}
122+
})
123+
{
124+
// If not all data pieces are available, need to reconstruct data shards using erasure
125+
// coding.
126+
let mut shards = segment_pieces
127+
.iter()
128+
.map(|maybe_piece| maybe_piece.as_ref().map(utils::slice_to_arrays))
129+
.collect::<Vec<_>>();
130+
131+
self.reed_solomon
132+
.reconstruct_data(&mut shards)
133+
.map_err(ReconstructorError::DataShardsReconstruction)?;
134+
135+
segment_data.clear();
136+
shards
137+
.into_iter()
138+
.take(self.reed_solomon.data_shard_count())
139+
.for_each(|maybe_piece| {
140+
let piece = maybe_piece.expect(
141+
"All data shards are available after successful reconstruction; qed",
142+
);
143+
144+
for chunk in piece.iter().take(self.record_size / 2) {
145+
segment_data.extend_from_slice(chunk.as_ref());
146+
}
147+
});
148+
}
149+
150+
let Segment::V0 { items } = Segment::decode(&mut segment_data.as_ref())
151+
.map_err(ReconstructorError::SegmentDecoding)?;
152+
153+
let mut reconstructed_contents = ReconstructedContents::default();
154+
let mut next_block_number = 0;
155+
let mut partial_block = self.partial_block.take().unwrap_or_default();
156+
157+
for segment_item in items {
158+
match segment_item {
159+
SegmentItem::Block { bytes, .. } => {
160+
if !partial_block.is_empty() {
161+
reconstructed_contents
162+
.blocks
163+
.push((next_block_number, mem::take(&mut partial_block)));
164+
165+
next_block_number += 1;
166+
}
167+
168+
reconstructed_contents
169+
.blocks
170+
.push((next_block_number, bytes));
171+
172+
next_block_number += 1;
173+
}
174+
SegmentItem::BlockStart { bytes, .. } => {
175+
if !partial_block.is_empty() {
176+
reconstructed_contents
177+
.blocks
178+
.push((next_block_number, mem::take(&mut partial_block)));
179+
180+
next_block_number += 1;
181+
}
182+
183+
partial_block = bytes;
184+
}
185+
SegmentItem::BlockContinuation { bytes, .. } => {
186+
if partial_block.is_empty() {
187+
// This is continuation from previous segment, we don't have the beginning
188+
// of the block to continue.
189+
continue;
190+
}
191+
192+
partial_block.extend_from_slice(&bytes);
193+
}
194+
SegmentItem::RootBlock(root_block) => {
195+
let segment_index = root_block.segment_index();
196+
197+
if let Some(last_segment_index) = self.last_segment_index {
198+
if last_segment_index != segment_index {
199+
return Err(ReconstructorError::IncorrectSegmentOrder {
200+
expected_segment_index: last_segment_index + 1,
201+
actual_segment_index: segment_index + 1,
202+
});
203+
}
204+
}
205+
206+
self.last_segment_index.replace(segment_index + 1);
207+
208+
let LastArchivedBlock {
209+
number,
210+
archived_progress,
211+
} = root_block.last_archived_block();
212+
213+
reconstructed_contents.root_block.replace(root_block);
214+
215+
match archived_progress {
216+
ArchivedBlockProgress::Complete => {
217+
reconstructed_contents
218+
.blocks
219+
.push((next_block_number, mem::take(&mut partial_block)));
220+
221+
next_block_number = number + 1;
222+
}
223+
ArchivedBlockProgress::Partial(_bytes) => {
224+
next_block_number = number;
225+
226+
if partial_block.is_empty() {
227+
// Will not be able to recover full block, bump right away.
228+
next_block_number += 1;
229+
}
230+
}
231+
}
232+
}
233+
}
234+
}
235+
236+
if !partial_block.is_empty() {
237+
self.partial_block.replace(partial_block);
238+
}
239+
240+
if self.last_segment_index.is_none() {
241+
self.last_segment_index.replace(0);
242+
}
243+
244+
Ok(reconstructed_contents)
245+
}
246+
}
+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
use reed_solomon_erasure::galois_16::Field as Galois16Field;
2+
use reed_solomon_erasure::Field;
3+
use std::mem;
4+
5+
type Elem = <Galois16Field as Field>::Elem;
6+
const ELEM_BYTES: usize = mem::size_of::<Elem>();
7+
8+
/// Convert slice to a vector of arrays for `reed_solomon_erasure` library.
9+
pub(crate) fn slice_to_arrays<S: AsRef<[u8]> + ?Sized>(slice: &S) -> Vec<Elem> {
10+
slice
11+
.as_ref()
12+
.chunks_exact(ELEM_BYTES)
13+
.map(|s| s.try_into().unwrap())
14+
.collect()
15+
}

crates/subspace-archiving/tests/integration/archiver.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ fn archiver() {
282282
}
283283

284284
#[test]
285-
fn archiver_invalid_usage() {
285+
fn invalid_usage() {
286286
assert_matches!(
287287
Archiver::new(5, SEGMENT_SIZE),
288288
Err(ArchiverInstantiationError::RecordSizeTooSmall),

crates/subspace-archiving/tests/integration/main.rs

+1
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@
33

44
mod archiver;
55
mod merkle_tree;
6+
mod reconstructor;

0 commit comments

Comments
 (0)