Skip to content

Commit

Permalink
fix: large files broke prefetch
Browse files Browse the repository at this point in the history
Files larger than 4G leads to prefetch panic, because the max blob io
range is smaller than 4G. This pr changes blob io max size from u32 to
u64.

Signed-off-by: 泰友 <cuichengxu.ccx@antgroup.com>
  • Loading branch information
泰友 committed Jul 5, 2023
1 parent 7d5cb1a commit b5f54b2
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 11 deletions.
6 changes: 5 additions & 1 deletion rafs/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,11 @@ impl FileSystem for Rafs {
let r = self.device.read_to(w, desc)?;
result += r;
recorder.mark_success(r);
if r as u32 != desc.size() {

// There are two ways to large blob io: large `offset`, amplify.
// Both of them are rarely larger than 4G. Fix device.read_to later.
// FIXME: device.read_to() on blob io larger than 4G.
if r as u64 != desc.size() {
break;
}
}
Expand Down
4 changes: 2 additions & 2 deletions smoke/tests/texture/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func MakeChunkDictLayer(t *testing.T, workDir string) *tool.Layer {
layer.CreateFile(t, "chunk-dict-file-5", []byte("dir-1/file-2"))
layer.CreateFile(t, "chunk-dict-file-6", []byte("This is poetry"))
layer.CreateFile(t, "chunk-dict-file-7", []byte("My name is long"))
layer.CreateLargeFile(t, "chunk-dict-file-8", 13)
layer.CreateLargeFile(t, "chunk-dict-file-8", 5)
layer.CreateHoledFile(t, "chunk-dict-file-9", []byte("hello world"), 1024, 1024*1024)
layer.CreateFile(t, "chunk-dict-file-10", []byte(""))

Expand Down Expand Up @@ -67,7 +67,7 @@ func MakeLowerLayer(t *testing.T, workDir string) *tool.Layer {
layer.CreateSymlink(t, "dir-1/file-deleted-symlink", "dir-1/file-deleted")

// Create large file
layer.CreateLargeFile(t, "large-blob.bin", 13)
layer.CreateLargeFile(t, "large-blob.bin", 5)

// Create holed file
layer.CreateHoledFile(t, "file-hole-1", []byte("hello world"), 1024, 1024*1024)
Expand Down
6 changes: 3 additions & 3 deletions smoke/tests/tool/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ func (l *Layer) CreateFile(t *testing.T, name string, data []byte) {
require.NoError(t, err)
}

func (l *Layer) CreateLargeFile(t *testing.T, name string, sizeMB int) {
func (l *Layer) CreateLargeFile(t *testing.T, name string, sizeGB int) {
f, err := os.Create(filepath.Join(l.workDir, name))
require.NoError(t, err)
defer func() {
f.Close()
}()

for b := 1; b <= sizeMB; b++ {
_, err := f.Write(bytes.Repeat([]byte{byte(b)}, 1024*1024))
for b := 1; b <= sizeGB; b++ {
_, err := f.Write(bytes.Repeat([]byte{byte(b)}, 1024*1024*1024))
require.NoError(t, err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion storage/src/cache/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl AsyncWorkerMgr {
}

/// Consume network bandwidth budget for prefetching.
pub fn consume_prefetch_budget(&self, size: u32) {
pub fn consume_prefetch_budget(&self, size: u64) {
if self.prefetch_inflight.load(Ordering::Relaxed) > 0 {
self.prefetch_consumed
.fetch_add(size as usize, Ordering::AcqRel);
Expand Down
55 changes: 51 additions & 4 deletions storage/src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ pub struct BlobIoVec {
/// The blob associated with the IO operation.
bi_blob: Arc<BlobInfo>,
/// Total size of blob IOs to be performed.
bi_size: u32,
bi_size: u64,
/// Array of blob IOs, these IOs should executed sequentially.
pub(crate) bi_vec: Vec<BlobIoDesc>,
}
Expand All @@ -762,8 +762,8 @@ impl BlobIoVec {
pub fn push(&mut self, desc: BlobIoDesc) {
assert_eq!(self.bi_blob.blob_index(), desc.blob.blob_index());
assert_eq!(self.bi_blob.blob_id(), desc.blob.blob_id());
assert!(self.bi_size.checked_add(desc.size).is_some());
self.bi_size += desc.size;
assert!(self.bi_size.checked_add(desc.size as u64).is_some());
self.bi_size += desc.size as u64;
self.bi_vec.push(desc);
}

Expand Down Expand Up @@ -793,7 +793,7 @@ impl BlobIoVec {
}

/// Get size of pending IO data.
pub fn size(&self) -> u32 {
pub fn size(&self) -> u64 {
self.bi_size
}

Expand Down Expand Up @@ -1481,4 +1481,51 @@ mod tests {
assert!(desc2.is_continuous(&desc3, 0x800));
assert!(desc2.is_continuous(&desc3, 0x1000));
}

#[test]
fn test_extend_large_blob_io_vec() {
let size = 0x2_0000_0000; // 8G blob
let chunk_size = 0x10_0000; // 1M chunk
let chunk_count = (size / chunk_size as u64) as u32;
let large_blob = Arc::new(BlobInfo::new(
0,
"blob_id".to_owned(),
size,
size,
chunk_size,
chunk_count,
BlobFeatures::default(),
));

let mut iovec = BlobIoVec::new(large_blob.clone());
let mut iovec2 = BlobIoVec::new(large_blob.clone());

// extend half of blob
for chunk_idx in 0..chunk_count {
let chunk = Arc::new(MockChunkInfo {
block_id: Default::default(),
blob_index: large_blob.blob_index,
flags: BlobChunkFlags::empty(),
compress_size: chunk_size,
compress_offset: chunk_idx as u64 * chunk_size as u64,
uncompress_size: 2 * chunk_size,
uncompress_offset: 2 * chunk_idx as u64 * chunk_size as u64,
file_offset: 2 * chunk_idx as u64 * chunk_size as u64,
index: chunk_idx as u32,
reserved: 0,
}) as Arc<dyn BlobChunkInfo>;
let desc = BlobIoDesc::new(large_blob.clone(), BlobIoChunk(chunk), 0, chunk_size, true);
if chunk_idx < chunk_count / 2 {
iovec.push(desc);
} else {
iovec2.push(desc)
}
}

// extend other half of blob
iovec.append(iovec2);

assert_eq!(size, iovec.size());
assert_eq!(chunk_count, iovec.len() as u32);
}
}

0 comments on commit b5f54b2

Please sign in to comment.