diff --git a/components/backup/Cargo.toml b/components/backup/Cargo.toml index effe13c4e08..2a7155db29f 100644 --- a/components/backup/Cargo.toml +++ b/components/backup/Cargo.toml @@ -73,6 +73,7 @@ tokio = { version = "1.5", features = ["rt-multi-thread"] } tokio-stream = "0.1" txn_types = { path = "../txn_types", default-features = false } yatp = { git = "https://github.com/tikv/yatp.git", branch = "master" } +aws = { path = "../cloud/aws" } [dev-dependencies] rand = "0.8" diff --git a/components/backup/src/endpoint.rs b/components/backup/src/endpoint.rs index 2a68cbb6bd8..cc295cc0f78 100644 --- a/components/backup/src/endpoint.rs +++ b/components/backup/src/endpoint.rs @@ -51,6 +51,8 @@ use crate::{ Error, *, }; +use aws::S3Storage; + const BACKUP_BATCH_LIMIT: usize = 1024; #[derive(Clone)] @@ -293,6 +295,7 @@ impl BackupRange { backup_ts: TimeStamp, begin_ts: TimeStamp, saver: async_channel::Sender, + storage_name: &str, ) -> Result { assert!(!self.codec.is_raw_kv); @@ -362,7 +365,7 @@ impl BackupRange { .start_key .clone() .map_or_else(Vec::new, |k| k.into_raw().unwrap()); - let mut writer = writer_builder.build(next_file_start_key.clone())?; + let mut writer = writer_builder.build(next_file_start_key.clone(), storage_name)?; loop { if let Err(e) = scanner.scan_entries(&mut batch) { error!(?e; "backup scan entries failed"); @@ -396,7 +399,7 @@ impl BackupRange { send_to_worker_with_metrics(&saver, msg).await?; next_file_start_key = this_end_key; writer = writer_builder - .build(next_file_start_key.clone()) + .build(next_file_start_key.clone(), storage_name) .map_err(|e| { error_unknown!(?e; "backup writer failed"); e @@ -887,7 +890,7 @@ impl Endpoint { let input = brange.codec.decode_backup_key(Some(k)).unwrap_or_default(); file_system::sha256(&input).ok().map(hex::encode) }); - let name = backup_file_name(store_id, &brange.region, key); + let name = backup_file_name(store_id, &brange.region, key, _backend.name()); let ct = to_sst_compression_type(request.compression_type); let stat = if is_raw_kv { @@ -923,6 +926,7 @@ impl Endpoint { backup_ts, start_ts, saver_tx.clone(), + _backend.name(), ) .await }; @@ -1065,26 +1069,50 @@ fn get_max_start_key(start_key: Option<&Key>, region: &Region) -> Option { /// A name consists with five parts: store id, region_id, a epoch version, the hash of range start key and timestamp. /// range start key is used to keep the unique file name for file, to handle different tables exists on the same region. /// local unix timestamp is used to keep the unique file name for file, to handle receive the same request after connection reset. -pub fn backup_file_name(store_id: u64, region: &Region, key: Option) -> String { +pub fn backup_file_name(store_id: u64, region: &Region, key: Option, storage_name: &str) -> String { let start = SystemTime::now(); let since_the_epoch = start .duration_since(UNIX_EPOCH) .expect("Time went backwards"); match key { - Some(k) => format!( - "{}_{}_{}_{}_{}", - store_id, - region.get_id(), - region.get_region_epoch().get_version(), - k, - since_the_epoch.as_millis() - ), - None => format!( - "{}_{}_{}", - store_id, - region.get_id(), - region.get_region_epoch().get_version() - ), + Some(k) => { + if storage_name == S3Storage::name() { + format!( + "{}/{}_{}_{}_{}", + store_id, + region.get_id(), + region.get_region_epoch().get_version(), + k, + since_the_epoch.as_millis() + ) + } else { + format!( + "{}_{}_{}_{}_{}", + store_id, + region.get_id(), + region.get_region_epoch().get_version(), + k, + since_the_epoch.as_millis() + ) + } + }, + None => { + if storage_name == S3Storage::name() { + format!( + "{}/{}_{}", + store_id, + region.get_id(), + region.get_region_epoch().get_version() + ) + } else { + format!( + "{}_{}_{}", + store_id, + region.get_id(), + region.get_region_epoch().get_version() + ) + } + }, } } diff --git a/components/backup/src/writer.rs b/components/backup/src/writer.rs index 8408fb7c002..4c4c6dc5ec7 100644 --- a/components/backup/src/writer.rs +++ b/components/backup/src/writer.rs @@ -198,10 +198,10 @@ impl BackupWriterBuilder { } } - pub fn build(&self, start_key: Vec) -> Result { + pub fn build(&self, start_key: Vec, storage_name: &str) -> Result { let key = file_system::sha256(&start_key).ok().map(hex::encode); let store_id = self.store_id; - let name = backup_file_name(store_id, &self.region, key); + let name = backup_file_name(store_id, &self.region, key, storage_name); BackupWriter::new( self.db.clone(), &name, diff --git a/components/cloud/aws/src/s3.rs b/components/cloud/aws/src/s3.rs index b5cacb2266e..f42ddca0066 100644 --- a/components/cloud/aws/src/s3.rs +++ b/components/cloud/aws/src/s3.rs @@ -158,6 +158,10 @@ pub struct S3Storage { } impl S3Storage { + pub fn name() -> &'static str { + STORAGE_NAME + } + pub fn from_input(input: InputConfig) -> io::Result { Self::new(Config::from_input(input)?) }