Skip to content

Commit

Permalink
max_version support (#190)
Browse files Browse the repository at this point in the history
* fix: max_version support

Signed-off-by: wangnengjie <751614701@qq.com>
  • Loading branch information
wangnengjie authored Sep 1, 2022
1 parent d8fbdef commit 070ff80
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 22 deletions.
11 changes: 8 additions & 3 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ pub use opt::AgateOptions;
use skiplist::Skiplist;
use yatp::task::callback::Handle;

use crate::value::ValuePointer;
use crate::{
closer::Closer,
entry::Entry,
Expand All @@ -29,7 +28,7 @@ use crate::{
ops::oracle::Oracle,
opt::build_table_options,
util::{has_any_prefixes, make_comparator},
value::{self, Request, Value},
value::{self, Request, Value, ValuePointer},
value_log::ValueLog,
wal::Wal,
Error, Result, Table, TableBuilder, TableOptions,
Expand Down Expand Up @@ -190,11 +189,17 @@ impl Core {
orc,
};

core.orc.init_next_ts(core.max_version());

// TODO: Initialize other structures.
core.orc.increment_next_ts();
Ok(core)
}

pub fn max_version(&self) -> u64 {
let v = self.mts.read().unwrap().max_version();
v.max(self.lvctl.max_version())
}

fn memtable_file_path(opts: &AgateOptions, file_id: usize) -> PathBuf {
opts.dir
.join(format!("{:05}{}", file_id, MEMTABLE_FILE_EXT))
Expand Down
17 changes: 17 additions & 0 deletions src/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,23 @@ impl LevelsController {
level.read().unwrap().append_iterators(iters, opts);
}
}

pub fn max_version(&self) -> u64 {
let mut max = 0;
for level in &self.inner.levels {
max = max.max(
level
.read()
.unwrap()
.tables
.iter()
.map(|tbl| tbl.max_version())
.max()
.unwrap_or(0),
);
}
max
}
}

fn build_change_set(compact_def: &CompactDef, new_tables: &[Table]) -> ManifestChangeSet {
Expand Down
80 changes: 70 additions & 10 deletions src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ impl MemTable {
self.skl.put(key, value);

// update max version
inner.max_version = ts;
// Although we guarantee that the order of all updates pushed to write_ch is
// the same as getting commit_ts(version), user may set it non-monotonically
// in managed mode. So we should compare and update.
inner.max_version = inner.max_version.max(ts);

Ok(())
}
Expand Down Expand Up @@ -140,6 +143,10 @@ impl MemTable {
pub fn id(&self) -> usize {
self.id
}

pub fn max_version(&self) -> u64 {
self.inner.read().unwrap().max_version
}
}

impl Drop for MemTable {
Expand Down Expand Up @@ -204,6 +211,18 @@ impl MemTables {
pub fn pop_imm(&mut self) {
self.immutable.pop_front().unwrap();
}

pub fn max_version(&self) -> u64 {
let mut v = self.mutable.max_version();
v = v.max(
self.immutable
.iter()
.map(|imm| imm.max_version())
.max()
.unwrap_or(0),
);
v
}
}

impl Drop for MemTables {
Expand All @@ -217,10 +236,8 @@ impl Drop for MemTables {

#[cfg(test)]
mod tests {
use bytes::BytesMut;

use super::*;
use crate::{format::append_ts, util::make_comparator};
use crate::{key_with_ts, util::make_comparator};

fn get_memtable(data: Vec<(Bytes, Value)>) -> Arc<MemTable> {
let skl = Skiplist::with_capacity(make_comparator(), 4 * 1024 * 1024, true);
Expand All @@ -237,9 +254,7 @@ mod tests {
fn test_memtable_put() {
let mut data = vec![];
for i in 0..1000 {
let mut v = BytesMut::from(i.to_string().as_bytes());
append_ts(&mut v, i);
let v = v.freeze();
let v = key_with_ts(i.to_string().as_str(), i);
data.push((v.clone(), Value::new(v)));
}
let (d1, dx) = data.split_at(250);
Expand All @@ -256,12 +271,15 @@ mod tests {
.collect::<Vec<Arc<MemTable>>>(),
),
};
assert_eq!(mem_tables.mutable.max_version(), 249);
assert_eq!(mem_tables.immutable[0].max_version(), 499);
assert_eq!(mem_tables.immutable[1].max_version(), 749);
assert_eq!(mem_tables.immutable[2].max_version(), 999);
assert_eq!(mem_tables.max_version(), 999);
let view = mem_tables.view();
for k in 0..4 {
for i in k * 250..(k + 1) * 250 {
let mut v = BytesMut::from(i.to_string().as_str());
append_ts(&mut v, i);
let v = v.freeze();
let v = key_with_ts(i.to_string().as_str(), i);

// get value from skiplist
let value = view.tables()[k as usize].get(&v).unwrap();
Expand All @@ -270,4 +288,46 @@ mod tests {
}
}
}

#[test]
fn max_version() {
let skl = Skiplist::with_capacity(make_comparator(), 4 * 1024 * 1024, true);
let mem = MemTable::new(0, skl, None, AgateOptions::default());
for i in 200..300 {
let v = key_with_ts(i.to_string().as_str(), i);
mem.put(v.clone(), Value::new(v)).unwrap();
}
assert_eq!(mem.max_version(), 299);
for i in 300..310 {
let v = key_with_ts(i.to_string().as_str(), i);
mem.put(v.clone(), Value::new(v)).unwrap();
}
assert_eq!(mem.max_version(), 309);
for i in 295..305 {
let v = key_with_ts((i * 100).to_string().as_str(), i);
mem.put(v.clone(), Value::new(v)).unwrap();
}
assert_eq!(mem.max_version(), 309);

let mut data = vec![];
for i in 100..200 {
let v = key_with_ts(i.to_string().as_str(), i);
data.push((v.clone(), Value::new(v)));
}
let (d1, d2) = data.split_at(50);

let mem_tables = MemTables {
mutable: Arc::new(mem),
immutable: VecDeque::from(
[d1, d2]
.iter()
.map(|x| get_memtable(x.to_vec()))
.collect::<Vec<Arc<MemTable>>>(),
),
};
assert_eq!(mem_tables.mutable.max_version(), 309);
assert_eq!(mem_tables.immutable[0].max_version(), 149);
assert_eq!(mem_tables.immutable[1].max_version(), 199);
assert_eq!(mem_tables.max_version(), 309);
}
}
8 changes: 8 additions & 0 deletions src/ops/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,14 @@ impl Oracle {
commit_info.next_txn_ts
}

/// for init Oracle only
pub(crate) fn init_next_ts(&self, max_ts: u64) {
self.commit_info.lock().unwrap().next_txn_ts = max_ts;
self.txn_mark.done(max_ts);
self.read_mark.done(max_ts);
self.increment_next_ts();
}

pub(crate) fn increment_next_ts(&self) {
let mut commit_info = self.commit_info.lock().unwrap();
commit_info.next_txn_ts += 1
Expand Down
3 changes: 1 addition & 2 deletions src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,7 @@ impl TableInner {
}

fn max_version(&self) -> u64 {
unimplemented!()
// self.fetch_index()?.max_version()
self.fetch_index().max_version
}

fn drop_no_fail(&mut self) -> Result<()> {
Expand Down
8 changes: 3 additions & 5 deletions src/table/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
use prost::Message;
use proto::meta::{checksum::Algorithm as ChecksumAlg, BlockOffset, Checksum, TableIndex};

use crate::{bloom::Bloom, checksum, format::user_key, opt::Options, util, value::Value};
use crate::{bloom::Bloom, checksum, format::user_key, get_ts, opt::Options, util, value::Value};

/// Entry header stores the difference between current key and block base key.
/// `overlap` is the common prefix of key and base key, and diff is the length
Expand Down Expand Up @@ -37,7 +37,6 @@ pub struct Builder {
table_index: TableIndex,
key_hashes: Vec<u32>,
options: Options,
max_version: u64,
}

impl Builder {
Expand All @@ -52,7 +51,6 @@ impl Builder {
base_offset: 0,
entry_offsets: vec![],
options,
max_version: 0,
}
}

Expand Down Expand Up @@ -91,6 +89,7 @@ impl Builder {

let sst_size = v.encoded_size() + diff_key.len() + 4;
self.table_index.estimated_size += sst_size as u32 + vlog_len;
self.table_index.max_version = self.table_index.max_version.max(get_ts(key));
}

fn finish_block(&mut self) {
Expand Down Expand Up @@ -254,8 +253,7 @@ mod tests {
assert_eq!(block_first_key, &idx.offsets[i].key);
}

// TODO: support max_version
// assert_eq!(TEST_KEYS_COUNT, table.max_version());
assert_eq!(TEST_KEYS_COUNT as u64, table.max_version());
}

fn test_with_bloom_filter(with_blooms: bool) {
Expand Down
3 changes: 1 addition & 2 deletions src/table/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,7 @@ fn test_table_big_values() {

assert!(!it.valid());
assert_eq!(n, count);
// TODO: support max_version in table
// assert_eq!(n, table.max_version());
assert_eq!(n as u64, table.max_version());
}

#[test]
Expand Down

0 comments on commit 070ff80

Please sign in to comment.