Skip to content

Commit

Permalink
Make Indexes a memory-mapped file
Browse files Browse the repository at this point in the history
  • Loading branch information
hubcio committed Nov 3, 2024
1 parent 5fb550d commit 05fb1a0
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 34 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ futures = "0.3.30"
iggy = { path = "../sdk" }
jsonwebtoken = "9.3.0"
log = "0.4.20"
memmap2 = "0.9.5"
moka = { version = "0.12.5", features = ["future"] }
openssl = { version = "0.10.66", features = ["vendored"] }
opentelemetry = { version = "0.26.0", features = ["trace", "logs"] }
Expand Down
6 changes: 3 additions & 3 deletions server/src/compat/storage_conversion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::state::State;
use crate::streaming::batching::message_batch::RetainedMessageBatch;
use crate::streaming::partitions::partition::{ConsumerOffset, Partition};
use crate::streaming::persistence::persister::Persister;
use crate::streaming::segments::index::{Index, IndexRange};
use crate::streaming::segments::index::{Index, IndexRange, Indexes};
use crate::streaming::segments::segment::Segment;
use crate::streaming::storage::{
PartitionStorage, SegmentStorage, StreamStorage, SystemInfoStorage, SystemStorage, TopicStorage,
Expand Down Expand Up @@ -246,8 +246,8 @@ impl SegmentStorage for NoopSegmentStorage {
Ok(())
}

async fn load_all_indexes(&self, _segment: &Segment) -> Result<Vec<Index>, IggyError> {
Ok(vec![])
async fn load_all_indexes(&self, _segment: &Segment) -> Result<Indexes, IggyError> {
Ok(Indexes::default())
}

async fn load_index_range(
Expand Down
100 changes: 84 additions & 16 deletions server/src/streaming/segments/index.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,75 @@
use crate::streaming::segments::segment::Segment;
use iggy::error::IggyError;
use iggy::error::IggyError::InvalidOffset;
use memmap2::Mmap;
use std::ops::Deref;
use std::ops::Index as IndexOp;

#[derive(Debug)]
pub enum Indexes {
InMemory(Vec<Index>),
MemoryMapped { mmap: Mmap },
}

impl From<Vec<Index>> for Indexes {
fn from(indexes: Vec<Index>) -> Self {
Indexes::InMemory(indexes)
}
}

impl Indexes {
pub fn push(&mut self, index: Index) {
match self {
Indexes::InMemory(vec) => {
vec.push(index);
}
Indexes::MemoryMapped { .. } => {
panic!("Cannot push to memory-mapped indexes");
}
}
}
}

impl Deref for Indexes {
type Target = [Index];

fn deref(&self) -> &Self::Target {
match self {
Indexes::InMemory(vec) => vec.as_slice(),
Indexes::MemoryMapped { mmap, .. } => {
let bytes = &mmap[..];
let len = bytes.len() / std::mem::size_of::<Index>();
let ptr = bytes.as_ptr() as *const Index;
unsafe { std::slice::from_raw_parts(ptr, len) }
}
}
}
}

impl IndexOp<usize> for Indexes {
type Output = Index;

fn index(&self, idx: usize) -> &Self::Output {
&self.deref()[idx]
}
}

impl Default for Indexes {
fn default() -> Self {
Indexes::InMemory(Vec::new())
}
}

impl<'a> IntoIterator for &'a Indexes {
type Item = &'a Index;
type IntoIter = std::slice::Iter<'a, Index>;

fn into_iter(self) -> Self::IntoIter {
self.deref().iter()
}
}

#[repr(C)]
#[derive(Debug, Eq, Clone, Copy, Default)]
pub struct Index {
pub offset: u32,
Expand All @@ -22,12 +90,19 @@ pub struct IndexRange {
}

impl Segment {
pub fn get_indexes_slice(&self) -> &[Index] {
match &self.indexes {
Some(indexes) => indexes,
None => &[],
}
}

pub fn load_highest_lower_bound_index(
&self,
indices: &[Index],
start_offset: u32,
end_offset: u32,
) -> Result<IndexRange, IggyError> {
let indices = self.get_indexes_slice();
let starting_offset_idx = binary_search_index(indices, start_offset);
let ending_offset_idx = binary_search_index(indices, end_offset);

Expand Down Expand Up @@ -143,16 +218,16 @@ mod tests {
timestamp: 5000,
},
];
segment.indexes.as_mut().unwrap().extend(indexes);
if let Some(Indexes::InMemory(vec)) = segment.indexes.as_mut() {
vec.extend(indexes);
}
}

#[test]
fn should_find_both_indices() {
let mut segment = create_segment();
create_test_indices(&mut segment);
let result = segment
.load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 15, 45)
.unwrap();
let result = segment.load_highest_lower_bound_index(15, 45).unwrap();

assert_eq!(result.start.offset, 20);
assert_eq!(result.end.offset, 50);
Expand All @@ -162,16 +237,12 @@ mod tests {
fn start_and_end_index_should_be_equal() {
let mut segment = create_segment();
create_test_indices(&mut segment);
let result_end_range = segment
.load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 65, 100)
.unwrap();
let result_end_range = segment.load_highest_lower_bound_index(65, 100).unwrap();

assert_eq!(result_end_range.start.offset, 65);
assert_eq!(result_end_range.end.offset, 65);

let result_start_range = segment
.load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 0, 5)
.unwrap();
let result_start_range = segment.load_highest_lower_bound_index(0, 5).unwrap();
assert_eq!(result_start_range.start.offset, 5);
assert_eq!(result_start_range.end.offset, 5);
}
Expand All @@ -180,9 +251,7 @@ mod tests {
fn should_clamp_last_index_when_out_of_range() {
let mut segment = create_segment();
create_test_indices(&mut segment);
let result = segment
.load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 5, 100)
.unwrap();
let result = segment.load_highest_lower_bound_index(5, 100).unwrap();

assert_eq!(result.start.offset, 5);
assert_eq!(result.end.offset, 65);
Expand All @@ -193,8 +262,7 @@ mod tests {
let mut segment = create_segment();
create_test_indices(&mut segment);

let result =
segment.load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 100, 200);
let result = segment.load_highest_lower_bound_index(100, 200);
assert!(result.is_err());
}
}
10 changes: 4 additions & 6 deletions server/src/streaming/segments/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,12 @@ impl Segment {
return Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect());
}

if let Some(indices) = &self.indexes {
if self.indexes.is_some() {
let relative_start_offset = (start_offset - self.start_offset) as u32;
let relative_end_offset = (end_offset - self.start_offset) as u32;
let index_range = match self.load_highest_lower_bound_index(
indices,
relative_start_offset,
relative_end_offset,
) {
let index_range = match self
.load_highest_lower_bound_index(relative_start_offset, relative_end_offset)
{
Ok(range) => range,
Err(_) => {
trace!(
Expand Down
6 changes: 3 additions & 3 deletions server/src/streaming/segments/segment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::index::Indexes;
use crate::configs::system::SystemConfig;
use crate::streaming::batching::batch_accumulator::BatchAccumulator;
use crate::streaming::segments::index::Index;
use crate::streaming::storage::SystemStorage;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::timestamp::IggyTimestamp;
Expand Down Expand Up @@ -34,7 +34,7 @@ pub struct Segment {
pub(crate) message_expiry: IggyExpiry,
pub(crate) unsaved_messages: Option<BatchAccumulator>,
pub(crate) config: Arc<SystemConfig>,
pub(crate) indexes: Option<Vec<Index>>,
pub(crate) indexes: Option<Indexes>,
pub(crate) storage: Arc<SystemStorage>,
}

Expand Down Expand Up @@ -74,7 +74,7 @@ impl Segment {
_ => message_expiry,
},
indexes: match config.segment.cache_indexes {
true => Some(Vec::new()),
true => Some(Indexes::default()),
false => None,
},
unsaved_messages: None,
Expand Down
13 changes: 10 additions & 3 deletions server/src/streaming/segments/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@ use bytes::{BufMut, BytesMut};
use iggy::error::IggyError;
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::checksum;
use memmap2::Mmap;
use std::io::SeekFrom;
use std::path::Path;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncSeekExt, BufReader};
use tracing::{error, info, trace, warn};

const EMPTY_INDEXES: Vec<Index> = vec![];
use super::index::Indexes;

const EMPTY_INDEXES: Indexes = Indexes::InMemory(Vec::new());
pub const INDEX_SIZE: u32 = 16; // offset: 4 bytes, position: 4 bytes, timestamp: 8 bytes
const BUF_READER_CAPACITY_BYTES: usize = 512 * 1000;

Expand Down Expand Up @@ -72,6 +76,9 @@ impl SegmentStorage for FileSegmentStorage {

if segment.is_full().await {
segment.is_closed = true;
let file = Arc::new(File::open(&segment.index_path).await?);
let mmap = unsafe { Mmap::map(&file) }?;
segment.indexes = Some(Indexes::MemoryMapped { mmap });
}

let messages_count = segment.get_messages_count();
Expand Down Expand Up @@ -267,7 +274,7 @@ impl SegmentStorage for FileSegmentStorage {
Ok(())
}

async fn load_all_indexes(&self, segment: &Segment) -> Result<Vec<Index>, IggyError> {
async fn load_all_indexes(&self, segment: &Segment) -> Result<Indexes, IggyError> {
trace!("Loading indexes from file...");
let file = file::open(&segment.index_path).await?;
let file_size = file.metadata().await?.len() as usize;
Expand Down Expand Up @@ -315,7 +322,7 @@ impl SegmentStorage for FileSegmentStorage {

trace!("Loaded {} indexes from file.", indexes_count);

Ok(indexes)
Ok(Indexes::InMemory(indexes))
}

async fn load_index_range(
Expand Down
7 changes: 4 additions & 3 deletions server/src/streaming/storage.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::batching::message_batch::RetainedMessageBatch;
use super::segments::index::Indexes;
use crate::configs::system::SystemConfig;
use crate::state::system::{PartitionState, StreamState, TopicState};
use crate::streaming::partitions::partition::{ConsumerOffset, Partition};
Expand Down Expand Up @@ -77,7 +78,7 @@ pub trait SegmentStorage: Send + Sync {
) -> Result<u32, IggyError>;
async fn load_message_ids(&self, segment: &Segment) -> Result<Vec<u128>, IggyError>;
async fn load_checksums(&self, segment: &Segment) -> Result<(), IggyError>;
async fn load_all_indexes(&self, segment: &Segment) -> Result<Vec<Index>, IggyError>;
async fn load_all_indexes(&self, segment: &Segment) -> Result<Indexes, IggyError>;
async fn load_index_range(
&self,
segment: &Segment,
Expand Down Expand Up @@ -307,8 +308,8 @@ pub(crate) mod tests {
Ok(())
}

async fn load_all_indexes(&self, _segment: &Segment) -> Result<Vec<Index>, IggyError> {
Ok(vec![])
async fn load_all_indexes(&self, _segment: &Segment) -> Result<Indexes, IggyError> {
Ok(Indexes::default())
}

async fn load_index_range(
Expand Down

0 comments on commit 05fb1a0

Please sign in to comment.