diff --git a/components/raftstore/src/engine_store_ffi/mod.rs b/components/raftstore/src/engine_store_ffi/mod.rs index 48c2e7bfcab..bac75bccda1 100644 --- a/components/raftstore/src/engine_store_ffi/mod.rs +++ b/components/raftstore/src/engine_store_ffi/mod.rs @@ -56,7 +56,7 @@ impl UnwrapExternCFunc for std::option::Option { pub struct RaftStoreProxy { status: AtomicU8, key_manager: Option>, - read_index_client: Box, + read_index_client: Option>, kv_engine: std::sync::RwLock>, } @@ -72,7 +72,7 @@ impl RaftStoreProxy { pub fn new( status: AtomicU8, key_manager: Option>, - read_index_client: Box, + read_index_client: Option>, kv_engine: std::sync::RwLock>, ) -> Self { RaftStoreProxy { @@ -206,6 +206,14 @@ pub extern "C" fn ffi_batch_read_index( fn_insert_batch_read_index_resp: Option, ) { assert!(!proxy_ptr.is_null()); + unsafe { + match proxy_ptr.as_ref().read_index_client { + Option::None => { + return; + } + _ => {} + } + } debug_assert!(fn_insert_batch_read_index_resp.is_some()); if view.len != 0 { assert_ne!(view.view, std::ptr::null()); @@ -223,6 +231,8 @@ pub extern "C" fn ffi_batch_read_index( let resp = proxy_ptr .as_ref() .read_index_client + .as_ref() + .unwrap() .batch_read_index(req_vec, time::Duration::from_millis(timeout_ms)); assert_ne!(res, std::ptr::null_mut()); for (r, region_id) in &resp { @@ -295,12 +305,22 @@ pub extern "C" fn ffi_make_read_index_task( req_view: BaseBuffView, ) -> RawRustPtr { assert!(!proxy_ptr.is_null()); + unsafe { + match proxy_ptr.as_ref().read_index_client { + Option::None => { + return RawRustPtr::default(); + } + _ => {} + } + } let mut req = kvrpcpb::ReadIndexRequest::default(); req.merge_from_bytes(req_view.to_slice()).unwrap(); let task = unsafe { proxy_ptr .as_ref() .read_index_client + .as_ref() + .unwrap() .make_read_index_task(req) }; return match task { @@ -346,6 +366,14 @@ pub extern "C" fn ffi_poll_read_index_task( waker: RawVoidPtr, ) -> u8 { assert!(!proxy_ptr.is_null()); + unsafe { + match proxy_ptr.as_ref().read_index_client { + Option::None => { + return 0; + } + _ => {} + } + } let task = unsafe { &mut *(task_ptr as *mut crate::engine_store_ffi::read_index_helper::ReadIndexTask) }; @@ -358,6 +386,8 @@ pub extern "C" fn ffi_poll_read_index_task( proxy_ptr .as_ref() .read_index_client + .as_ref() + .unwrap() .poll_read_index_task(task, waker) } { get_engine_store_server_helper().set_read_index_resp(resp_data, &res); diff --git a/components/server/src/proxy.rs b/components/server/src/proxy.rs index b3fa2ce2133..6c167b45d9f 100644 --- a/components/server/src/proxy.rs +++ b/components/server/src/proxy.rs @@ -197,6 +197,11 @@ pub unsafe fn run_proxy( .required(true) .takes_value(true), ) + .arg( + Arg::with_name("only-decryption") + .long("only-decryption") + .help("Only do decryption in Proxy"), + ) .get_matches_from(args); if matches.is_present("print-sample-config") { @@ -241,7 +246,11 @@ pub unsafe fn run_proxy( } config.raft_store.engine_store_server_helper = engine_store_server_helper as *const _ as isize; - crate::server::run_tikv(config, engine_store_server_helper); + if matches.is_present("only-decryption") { + crate::server::run_tikv_only_decryption(config, engine_store_server_helper); + } else { + crate::server::run_tikv(config, engine_store_server_helper); + } } fn check_engine_label(matches: &clap::ArgMatches<'_>) { diff --git a/components/server/src/server.rs b/components/server/src/server.rs index c9d1a5c6004..d32d26a3782 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -146,10 +146,10 @@ pub unsafe fn run_tikv(config: TiKvConfig, engine_store_server_helper: &EngineSt let mut proxy = RaftStoreProxy::new( AtomicU8::new(RaftProxyStatus::Idle as u8), tikv.encryption_key_manager.clone(), - Box::new(ReadIndexClient::new( + Some(Box::new(ReadIndexClient::new( tikv.router.clone(), SysQuota::cpu_cores_quota() as usize * 2, - )), + ))), std::sync::RwLock::new(None), ); @@ -239,6 +239,119 @@ pub unsafe fn run_tikv(config: TiKvConfig, engine_store_server_helper: &EngineSt } } +/// Run a TiKV server only for decryption. Returns when the server is shutdown by the user, in which +/// case the server will be properly stopped. +pub unsafe fn run_tikv_only_decryption( + config: TiKvConfig, + engine_store_server_helper: &EngineStoreServerHelper, +) { + // Sets the global logger ASAP. + // It is okay to use the config w/o `validate()`, + // because `initial_logger()` handles various conditions. + initial_logger(&config); + + // Print version information. + crate::log_proxy_info(); + + // Print resource quota. + SysQuota::log_quota(); + CPU_CORES_QUOTA_GAUGE.set(SysQuota::cpu_cores_quota()); + + // Do some prepare works before start. + pre_start(); + + let _m = Monitor::default(); + + macro_rules! run_impl { + ($ER: ty) => {{ + let encryption_key_manager = + data_key_manager_from_config(&config.security.encryption, &config.storage.data_dir) + .map_err(|e| { + panic!( + "Encryption failed to initialize: {}. code: {}", + e, + e.error_code() + ) + }) + .unwrap() + .map(Arc::new); + + let mut proxy = RaftStoreProxy::new( + AtomicU8::new(RaftProxyStatus::Idle as u8), + encryption_key_manager.clone(), + Option::None, + std::sync::RwLock::new(None), + ); + + let proxy_helper = { + let mut proxy_helper = RaftStoreProxyFFIHelper::new(&proxy); + proxy_helper.fn_server_info = Some(ffi_server_info); + proxy_helper + }; + + info!("set raft-store proxy helper"); + + engine_store_server_helper.handle_set_proxy(&proxy_helper); + + info!("wait for engine-store server to start"); + while engine_store_server_helper.handle_get_engine_store_server_status() + == EngineStoreServerStatus::Idle + { + thread::sleep(Duration::from_millis(200)); + } + + if engine_store_server_helper.handle_get_engine_store_server_status() + != EngineStoreServerStatus::Running + { + info!("engine-store server is not running, make proxy exit"); + return; + } + + info!("engine-store server is started"); + + proxy.set_status(RaftProxyStatus::Running); + + { + debug_assert!( + engine_store_server_helper.handle_get_engine_store_server_status() + == EngineStoreServerStatus::Running + ); + loop { + if engine_store_server_helper.handle_get_engine_store_server_status() + != EngineStoreServerStatus::Running + { + break; + } + thread::sleep(Duration::from_millis(200)); + } + } + + info!( + "found engine-store server status is {:?}, start to stop all services", + engine_store_server_helper.handle_get_engine_store_server_status() + ); + + proxy.set_status(RaftProxyStatus::Stopped); + + info!("all services in raft-store proxy are stopped"); + + info!("wait for engine-store server to stop"); + while engine_store_server_helper.handle_get_engine_store_server_status() + != EngineStoreServerStatus::Terminated + { + thread::sleep(Duration::from_millis(200)); + } + info!("engine-store server is stopped"); + }}; + } + + if !config.raft_engine.enable { + run_impl!(RocksEngine) + } else { + run_impl!(RaftLogEngine) + } +} + const RESERVED_OPEN_FDS: u64 = 1000; const DEFAULT_METRICS_FLUSH_INTERVAL: Duration = Duration::from_millis(10_000); diff --git a/components/test_raftstore/src/cluster.rs b/components/test_raftstore/src/cluster.rs index 833920be724..95d69980e23 100644 --- a/components/test_raftstore/src/cluster.rs +++ b/components/test_raftstore/src/cluster.rs @@ -297,10 +297,10 @@ impl Cluster { let proxy = Box::new(raftstore::engine_store_ffi::RaftStoreProxy::new( AtomicU8::new(raftstore::engine_store_ffi::RaftProxyStatus::Idle as u8), key_mgr.clone(), - Box::new(raftstore::engine_store_ffi::ReadIndexClient::new( + Some(Box::new(raftstore::engine_store_ffi::ReadIndexClient::new( router.clone(), SysQuota::cpu_cores_quota() as usize * 2, - )), + ))), std::sync::RwLock::new(Some(engines.kv.clone())), ));