Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Ingest External SST #188

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions proto/src/proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ message ManifestChange {
uint64 key_id = 4;
EncryptionAlgo encryption_algo = 5;
uint32 compression = 6; // Only used for CREATE Op.
uint64 global_version = 7; // Only used for file ingest, 0 means no global_version
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to add global_version just for the ingest external file scenario?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. For ingested files, version of every key is invalid. Or if you have any better idea to assign version for keys in ingested files? Iter on every key and issue random writes is not a good idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. For ingested files, version of every key is invalid. Or if you have any better idea to assign version for keys in ingested files? Iter on every key and issue random writes is not a good idea.

How about rewriting files to be ingested? In this way, we can also deal with the large value cannot be stored in value log.

Copy link
Contributor Author

@wangnengjie wangnengjie Sep 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. For ingested files, version of every key is invalid. Or if you have any better idea to assign version for keys in ingested files? Iter on every key and issue random writes is not a good idea.

How about rewriting files to be ingested? In this way, we can also deal with the large value cannot be stored in value log.

I think it really costs to rewrite. The bandwidth on cloud device is precious. Large value can store to value log when compactioning ingested files (do we impl this?). And, when rewriting, we are not sure these files can finally success to ingest and so split large values to value log needs more think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we only write value log when writing entry to LSM tree.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we only write value log when writing entry to LSM tree.

Split during compaction is nice when the threshold changes dynamically or reopen db with different config.

Well, we can discuss this point tomorrow or offline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. For ingested files, version of every key is invalid. Or if you have any better idea to assign version for keys in ingested files? Iter on every key and issue random writes is not a good idea.

How about rewriting files to be ingested? In this way, we can also deal with the large value cannot be stored in value log.

I think this will block read process. We need to get commit_ts (version) before rewrite and due to SSI protection, every read_ts after that should wait for this txn to finish (watermark).

}

message BlockOffset {
Expand Down
10 changes: 5 additions & 5 deletions src/batch.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::Agate;
use crate::{entry::Entry, ops::transaction::Transaction};
use crate::{Error, Result};
use std::sync::Arc;

use bytes::Bytes;
use std::sync::Arc;

use crate::{entry::Entry, ops::transaction::Transaction, Agate, Error, Result};

/// WriteBatch helps write multiple entries to database
pub struct WriteBatch {
Expand Down Expand Up @@ -127,11 +126,12 @@ impl WriteBatch {

#[cfg(test)]
mod tests {
use bytes::Bytes;

use crate::{
db::tests::{generate_test_agate_options, run_agate_test},
AgateOptions,
};
use bytes::Bytes;

fn test_with_options(opts: AgateOptions) {
let key = |i| Bytes::from(format!("{:10}", i));
Expand Down
18 changes: 18 additions & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
closer::Closer,
entry::Entry,
get_ts,
ingest::{IngestExternalFileOptions, IngestExternalFileTask},
levels::LevelsController,
manifest::ManifestFile,
memtable::{MemTable, MemTables},
Expand Down Expand Up @@ -142,6 +143,14 @@ impl Agate {
pub fn write_requests(&self, request: Vec<Request>) -> Result<()> {
self.core.write_requests(request)
}

pub fn ingest_external_files(
&self,
files: &[&str],
opts: &IngestExternalFileOptions,
) -> Result<()> {
self.core.ingest_external_files(files, opts)
}
}

impl Drop for Agate {
Expand Down Expand Up @@ -655,6 +664,15 @@ impl Core {

unreachable!()
}

pub fn ingest_external_files(
self: &Arc<Self>,
files: &[&str],
opts: &IngestExternalFileOptions,
) -> Result<()> {
let mut task = IngestExternalFileTask::new(self.clone(), files, *opts);
task.run()
}
}

#[cfg(test)]
Expand Down
14 changes: 2 additions & 12 deletions src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,8 @@ pub fn key_with_ts_last(key: impl Into<BytesMut>) -> Bytes {
key_with_ts(key, 0)
}

pub fn append_ts(key: &mut BytesMut, ts: u64) {
key.reserve(8);
let res = (u64::MAX - ts).to_be();
let buf = key.chunk_mut();
unsafe {
ptr::copy_nonoverlapping(
&res as *const u64 as *const u8,
buf.as_mut_ptr() as *mut _,
8,
);
key.advance_mut(8);
}
pub fn append_ts(mut key: impl BufMut, ts: u64) {
key.put_u64(u64::MAX - ts);
}

pub fn get_ts(key: &[u8]) -> u64 {
Expand Down
Loading