Skip to content

Commit

Permalink
Adjust the backup organization structure (pingcap/tidb#30087)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaoming <zhanggaoming028@gmail.com>
  • Loading branch information
MoCuishle28 committed Jul 5, 2022
1 parent d356be1 commit 562d17f
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 20 deletions.
1 change: 1 addition & 0 deletions components/backup/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ tokio = { version = "1.5", features = ["rt-multi-thread"] }
tokio-stream = "0.1"
txn_types = { path = "../txn_types", default-features = false }
yatp = { git = "https://github.com/tikv/yatp.git", branch = "master" }
aws = { path = "../cloud/aws" }

[dev-dependencies]
rand = "0.8"
Expand Down
64 changes: 46 additions & 18 deletions components/backup/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ use crate::{
Error, *,
};

use aws::S3Storage;

const BACKUP_BATCH_LIMIT: usize = 1024;

#[derive(Clone)]
Expand Down Expand Up @@ -293,6 +295,7 @@ impl BackupRange {
backup_ts: TimeStamp,
begin_ts: TimeStamp,
saver: async_channel::Sender<InMemBackupFiles>,
storage_name: &str,
) -> Result<Statistics> {
assert!(!self.codec.is_raw_kv);

Expand Down Expand Up @@ -362,7 +365,7 @@ impl BackupRange {
.start_key
.clone()
.map_or_else(Vec::new, |k| k.into_raw().unwrap());
let mut writer = writer_builder.build(next_file_start_key.clone())?;
let mut writer = writer_builder.build(next_file_start_key.clone(), storage_name)?;
loop {
if let Err(e) = scanner.scan_entries(&mut batch) {
error!(?e; "backup scan entries failed");
Expand Down Expand Up @@ -396,7 +399,7 @@ impl BackupRange {
send_to_worker_with_metrics(&saver, msg).await?;
next_file_start_key = this_end_key;
writer = writer_builder
.build(next_file_start_key.clone())
.build(next_file_start_key.clone(), storage_name)
.map_err(|e| {
error_unknown!(?e; "backup writer failed");
e
Expand Down Expand Up @@ -887,7 +890,7 @@ impl<E: Engine, R: RegionInfoProvider + Clone + 'static> Endpoint<E, R> {
let input = brange.codec.decode_backup_key(Some(k)).unwrap_or_default();
file_system::sha256(&input).ok().map(hex::encode)
});
let name = backup_file_name(store_id, &brange.region, key);
let name = backup_file_name(store_id, &brange.region, key, _backend.name());
let ct = to_sst_compression_type(request.compression_type);

let stat = if is_raw_kv {
Expand Down Expand Up @@ -923,6 +926,7 @@ impl<E: Engine, R: RegionInfoProvider + Clone + 'static> Endpoint<E, R> {
backup_ts,
start_ts,
saver_tx.clone(),
_backend.name(),
)
.await
};
Expand Down Expand Up @@ -1065,26 +1069,50 @@ fn get_max_start_key(start_key: Option<&Key>, region: &Region) -> Option<Key> {
/// A name consists with five parts: store id, region_id, a epoch version, the hash of range start key and timestamp.
/// range start key is used to keep the unique file name for file, to handle different tables exists on the same region.
/// local unix timestamp is used to keep the unique file name for file, to handle receive the same request after connection reset.
pub fn backup_file_name(store_id: u64, region: &Region, key: Option<String>) -> String {
pub fn backup_file_name(store_id: u64, region: &Region, key: Option<String>, storage_name: &str) -> String {
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
match key {
Some(k) => format!(
"{}_{}_{}_{}_{}",
store_id,
region.get_id(),
region.get_region_epoch().get_version(),
k,
since_the_epoch.as_millis()
),
None => format!(
"{}_{}_{}",
store_id,
region.get_id(),
region.get_region_epoch().get_version()
),
Some(k) => {
if storage_name == S3Storage::name() {
format!(
"{}/{}_{}_{}_{}",
store_id,
region.get_id(),
region.get_region_epoch().get_version(),
k,
since_the_epoch.as_millis()
)
} else {
format!(
"{}_{}_{}_{}_{}",
store_id,
region.get_id(),
region.get_region_epoch().get_version(),
k,
since_the_epoch.as_millis()
)
}
},
None => {
if storage_name == S3Storage::name() {
format!(
"{}/{}_{}",
store_id,
region.get_id(),
region.get_region_epoch().get_version()
)
} else {
format!(
"{}_{}_{}",
store_id,
region.get_id(),
region.get_region_epoch().get_version()
)
}
},
}
}

Expand Down
4 changes: 2 additions & 2 deletions components/backup/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,10 @@ impl BackupWriterBuilder {
}
}

pub fn build(&self, start_key: Vec<u8>) -> Result<BackupWriter> {
pub fn build(&self, start_key: Vec<u8>, storage_name: &str) -> Result<BackupWriter> {
let key = file_system::sha256(&start_key).ok().map(hex::encode);
let store_id = self.store_id;
let name = backup_file_name(store_id, &self.region, key);
let name = backup_file_name(store_id, &self.region, key, storage_name);
BackupWriter::new(
self.db.clone(),
&name,
Expand Down
4 changes: 4 additions & 0 deletions components/cloud/aws/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ pub struct S3Storage {
}

impl S3Storage {
pub fn name() -> &'static str {
STORAGE_NAME
}

pub fn from_input(input: InputConfig) -> io::Result<Self> {
Self::new(Config::from_input(input)?)
}
Expand Down

0 comments on commit 562d17f

Please sign in to comment.