Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Refactor LevelHandler for different TableAccessor #112

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ proto = { path = "proto" }
skiplist = { path = "skiplist" }
memmap = "0.7"
farmhash = "1.1"
prost = "0.7"
prost = "0.8"
enum_dispatch = "0.3"
crossbeam-channel = "0.5"
crc32fast = "1.2"
Expand Down
4 changes: 2 additions & 2 deletions proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2018"

[dependencies]
bytes = "1.0"
prost = "0.7"
prost = "0.8"

[build-dependencies]
prost-build = { version = "0.6" }
prost-build = { version = "0.8" }
5 changes: 0 additions & 5 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,14 +626,11 @@ impl Core {
vlog.write(&mut requests)?;
}

let mut cnt = 0;

// writing to LSM
for req in requests {
if req.entries.is_empty() {
continue;
}
cnt += req.entries.len();

while let Err(_) = self.ensure_room_for_write() {
std::thread::sleep(std::time::Duration::from_millis(10));
Expand All @@ -643,8 +640,6 @@ impl Core {
self.write_to_lsm(req)?;
}

// eprintln!("{} entries written", cnt);

Ok(())
};

Expand Down
145 changes: 47 additions & 98 deletions src/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ mod handler;
#[cfg(test)]
pub(crate) mod tests;

use crate::table::VecTableAccessor;
use compaction::{
get_key_range, get_key_range_single, CompactDef, CompactStatus, CompactionPriority, KeyRange,
LevelCompactStatus, Targets,
get_key_range, CompactDef, CompactStatus, CompactionPriority, LevelCompactStatus, Targets,
};
use handler::HandlerBaseLevel;
use handler::LevelHandler;

use proto::meta::ManifestChangeSet;
Expand All @@ -16,7 +17,7 @@ use crate::manifest::{new_create_change, new_delete_change, ManifestFile};
use crate::ops::oracle::Oracle;
use crate::opt::build_table_options;
use crate::table::{MergeIterator, TableIterators};
use crate::util::{has_any_prefixes, same_key, KeyComparator, COMPARATOR};
use crate::util::{has_any_prefixes, same_key, KeyComparator, KeyRange, COMPARATOR};
use crate::value::{Value, ValuePointer};
use crate::AgateIterator;
use crate::TableBuilder;
Expand All @@ -33,14 +34,15 @@ use std::sync::atomic::AtomicU64;
use std::sync::{Arc, RwLock};
use std::time::Duration;

use crate::levels::handler::HandlerLevel0;
use bytes::{BufMut, Bytes, BytesMut};
use crossbeam_channel::{select, tick, unbounded};
use yatp::task::callback::Handle;

pub(crate) struct Core {
next_file_id: AtomicU64,
// `levels[i].level == i` should be ensured
pub(crate) levels: Vec<Arc<RwLock<LevelHandler>>>,
pub(crate) levels: Vec<Arc<RwLock<Box<dyn LevelHandler>>>>,
opts: AgateOptions,
// TODO: agate oracle, manifest should be added here
cpt_status: RwLock<CompactStatus>,
Expand Down Expand Up @@ -85,8 +87,15 @@ impl Core {
eprintln!("{} tables opened", num_opened);

for (i, tables) in tables.into_iter().enumerate() {
let mut level = LevelHandler::new(opts.clone(), i);
level.init_tables(tables);
let level: Box<dyn LevelHandler> = if i == 0 {
Box::new(HandlerLevel0::new(tables, opts.clone(), i))
} else {
Box::new(HandlerBaseLevel::<VecTableAccessor>::new(
tables,
opts.clone(),
i,
))
};
levels.push(Arc::new(RwLock::new(level)));

cpt_status_levels.push(LevelCompactStatus::default());
Expand Down Expand Up @@ -122,7 +131,10 @@ impl Core {

let start = std::time::Instant::now();
let mut last_log = std::time::Instant::now();
while !self.levels[0].write()?.try_add_l0_table(table.clone()) {
while !self.levels[0]
.write()?
.replace_tables(&[], &[table.clone()])
{
let current = std::time::Instant::now();
let duration = current.duration_since(start);
if duration.as_millis() > 1000 {
Expand Down Expand Up @@ -184,30 +196,17 @@ impl Core {
));
}

if this_level.tables.is_empty() {
if this_level.num_tables() == 0 {
return Err(Error::CustomError("not table in this level".to_string()));
}

if compact_def.drop_prefixes.is_empty() {
let mut out = vec![];
let mut kr = KeyRange::default();
for table in this_level.tables.iter() {
let dkr = get_key_range_single(table);
if kr.overlaps_with(&dkr) {
out.push(table.clone());
kr.extend(&dkr);
} else {
break;
}
}
compact_def.top = out;
compact_def.top = this_level.overlapping_tables(&KeyRange::default());
}

compact_def.this_range = get_key_range(&compact_def.top);

let (left, right) = next_level.overlapping_tables(&compact_def.this_range);

compact_def.bot = next_level.tables[left..right].to_vec();
compact_def.bot = next_level.overlapping_tables(&compact_def.this_range);

if compact_def.bot.is_empty() {
compact_def.next_range = compact_def.this_range.clone();
Expand Down Expand Up @@ -240,20 +239,8 @@ impl Core {
// TODO: don't hold cpt_status through this function
let mut cpt_status = self.cpt_status.write().unwrap();

let mut out = vec![];
// let now = std::time::Instant::now();

for table in this_level.tables.iter() {
if table.size() > 2 * compact_def.targets.file_size[0] {
// file already big, don't include it
continue;
}
// TODO: created at logic
if cpt_status.tables.contains(&table.id()) {
continue;
}
out.push(table.clone());
}
let out =
this_level.pick_all_tables(2 * compact_def.targets.file_size[0], &cpt_status.tables);

if out.len() < 4 {
return Err(Error::CustomError("not enough table to merge".to_string()));
Expand All @@ -262,12 +249,16 @@ impl Core {
compact_def.this_range = KeyRange::inf();
compact_def.top = out;

cpt_status.levels[this_level.level]
cpt_status.levels[this_level.level()]
.ranges
.push(KeyRange::inf());

for table in compact_def.top.iter() {
assert!(cpt_status.tables.insert(table.id()), false);
assert!(
cpt_status.tables.insert(table.id()),
"insert to compaction table must success, but get {}",
false
);
}

compact_def.targets.file_size[0] = std::u32::MAX as u64;
Expand All @@ -283,62 +274,20 @@ impl Core {
}

fn fill_tables(&self, compact_def: &mut CompactDef) -> Result<()> {
let this_level = compact_def.this_level.read().unwrap();
let next_level = compact_def.next_level.read().unwrap();

let tables = &this_level.tables;
let this_level = compact_def.this_level.clone();
let next_level = compact_def.next_level.clone();
let this_guard = this_level.read().unwrap();
let next_guard = next_level.read().unwrap();

if tables.is_empty() {
if this_guard.num_tables() == 0 {
return Err(Error::CustomError("no tables to compact".to_string()));
}

// TODO: sort tables by heuristic

// TODO: don't hold cpt_status write lock for long time
let mut cpt_status = self.cpt_status.write().unwrap();

for table in tables {
compact_def.this_size = table.size();
compact_def.this_range = get_key_range_single(table);
// if we're already compacting this range, don't do anything
if cpt_status.overlaps_with(compact_def.this_level_id, &compact_def.this_range) {
continue;
}
compact_def.top = vec![table.clone()];
let (left, right) = next_level.overlapping_tables(&compact_def.this_range);

if right < left {
eprintln!("right {} is less than left {} in overlapping_tables for current level {}, next level {}, key_range {:?}",
right, left, compact_def.this_level_id,
compact_def.next_level_id, compact_def.this_range);
continue;
}

compact_def.bot = next_level.tables[left..right].to_vec();

if compact_def.bot.is_empty() {
compact_def.bot = vec![];
compact_def.next_range = compact_def.this_range.clone();
if let Err(_) = cpt_status.compare_and_add(compact_def) {
continue;
}
return Ok(());
}

compact_def.next_range = get_key_range(&compact_def.bot);

if cpt_status.overlaps_with(compact_def.next_level_id, &compact_def.next_range) {
continue;
}

if let Err(_) = cpt_status.compare_and_add(compact_def) {
continue;
}

return Ok(());
}

Err(Error::CustomError("no table to fill".to_string()))
this_guard.select_table_range(next_guard.as_ref(), compact_def, &mut *cpt_status)
}

fn run_compact_def(
Expand Down Expand Up @@ -377,11 +326,11 @@ impl Core {
let mut this_level = this_level.write().unwrap();
let mut next_level = next_level.write().unwrap();
this_level.delete_tables(&compact_def.top)?;
next_level.replace_tables(&compact_def.bot, &new_tables)?;
next_level.replace_tables(&compact_def.bot, &new_tables);
} else {
let mut this_level = this_level.write().unwrap();
this_level.delete_tables(&compact_def.top)?;
this_level.replace_tables(&compact_def.bot, &new_tables)?;
this_level.replace_tables(&compact_def.bot, &new_tables);
}

// TODO: logging
Expand Down Expand Up @@ -463,7 +412,7 @@ impl Core {
}

// TODO: sync dir
new_tables.sort_by(|x, y| COMPARATOR.compare_key(x.biggest(), y.biggest()));
new_tables.sort_by(|x, y| COMPARATOR.compare_key(x.largest(), y.largest()));

Ok(new_tables)
}
Expand Down Expand Up @@ -648,7 +597,7 @@ impl Core {
}

if i % N == N - 1 {
let biggest = table.biggest();
let biggest = table.largest();
let mut buf = BytesMut::with_capacity(biggest.len() + 8);
buf.put(user_key(&biggest));
let right = key_with_ts(buf, std::u64::MAX);
Expand Down Expand Up @@ -680,7 +629,7 @@ impl Core {
let next_level = self.levels[base_level].clone();
compact_def = CompactDef::new(
idx,
self.levels[level].clone(),
self.levels[0].clone(),
level,
next_level,
base_level,
Expand Down Expand Up @@ -733,7 +682,7 @@ impl Core {
file_size: vec![0; self.levels.len()],
};

let mut db_size = self.last_level().read().unwrap().total_size;
let mut db_size = self.last_level().read().unwrap().total_size();

for i in (1..self.levels.len()).rev() {
let ltarget = adjust(db_size);
Expand All @@ -760,7 +709,7 @@ impl Core {
targets
}

fn last_level(&self) -> &Arc<RwLock<LevelHandler>> {
fn last_level(&self) -> &Arc<RwLock<Box<dyn LevelHandler>>> {
self.levels.last().unwrap()
}

Expand Down Expand Up @@ -796,8 +745,8 @@ impl Core {
// We may safely ignore this situation.
// TODO: check if we could make it more stable
let size;
if del_size <= level.total_size {
size = level.total_size - del_size;
if del_size <= level.total_size() {
size = level.total_size() - del_size;
} else {
size = 0;
}
Expand All @@ -822,9 +771,9 @@ impl Core {
continue;
}
let lvl = lh.read().unwrap();
let (left, right) = lvl.overlapping_tables(&kr);
let ret = lvl.overlapping_tables(&kr);
drop(lvl);
if right - left > 0 {
if !ret.is_empty() {
return true;
}
}
Expand Down
Loading