Skip to content

Commit

Permalink
feat(csi-driver): add pre-publish hook to cleanup stale entries
Browse files Browse the repository at this point in the history
Signed-off-by: Abhinandan Purkait <purkaitabhinandan@gmail.com>
  • Loading branch information
Abhinandan-Purkait committed Dec 12, 2024
1 parent e5e8342 commit ac33da7
Show file tree
Hide file tree
Showing 10 changed files with 370 additions and 30 deletions.
10 changes: 10 additions & 0 deletions control-plane/csi-driver/proto/node-service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ service NodePlugin {
rpc UnfreezeFS (UnfreezeFSRequest) returns (UnfreezeFSReply) {}
// Find the volume identified by the volume ID, and return volume information.
rpc FindVolume (FindVolumeRequest) returns (FindVolumeReply) {}
// Force unstage the volume.
rpc ForceUnstageVolume (ForceUnstageVolumeRequest) returns (ForceUnstageVolumeReply) {}
}

enum VolumeType {
Expand Down Expand Up @@ -54,3 +56,11 @@ message FindVolumeReply {
optional VolumeType volume_type = 1;
string device_path = 2; // the device path for the volume
}

// The request message containing ID of the volume to be force unstaged.
message ForceUnstageVolumeRequest {
string volume_id = 1;
}
// Message for response on volume force unstage.
message ForceUnstageVolumeReply {
}
14 changes: 14 additions & 0 deletions control-plane/csi-driver/src/bin/controller/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub(crate) struct CsiControllerConfig {
node_selector: HashMap<String, String>,
/// Max Outstanding Create Volume Requests.
create_volume_limit: usize,
/// Force unstage volume.
force_unstage_volume: bool,
}

impl CsiControllerConfig {
Expand Down Expand Up @@ -43,11 +45,19 @@ impl CsiControllerConfig {
.map(|s| s.map(|s| s.as_str())),
)?;

let force_unstage_volume = args.get_flag("force-unstage-volume");
if !force_unstage_volume {
tracing::warn!(
"Force unstage volume is disabled, can trigger potential data corruption!"
);
}

CONFIG.get_or_init(|| Self {
rest_endpoint: rest_endpoint.into(),
io_timeout: io_timeout.into(),
node_selector,
create_volume_limit,
force_unstage_volume,
});
Ok(())
}
Expand Down Expand Up @@ -78,4 +88,8 @@ impl CsiControllerConfig {
pub(crate) fn node_selector_segment(&self) -> HashMap<String, String> {
self.node_selector.clone()
}
/// Force unstage volume.
pub(crate) fn force_unstage_volume(&self) -> bool {
self.force_unstage_volume
}
}
82 changes: 68 additions & 14 deletions control-plane/csi-driver/src/bin/controller/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use crate::{
};
use csi_driver::{
context::{CreateParams, CreateSnapshotParams, PublishParams, QuiesceFsCandidate},
node::internal::{node_plugin_client::NodePluginClient, FreezeFsRequest, UnfreezeFsRequest},
node::internal::{
node_plugin_client::NodePluginClient, ForceUnstageVolumeRequest, FreezeFsRequest,
UnfreezeFsRequest,
},
};
use rpc::csi::{volume_content_source::Type, Topology as CsiTopology, *};
use stor_port::types::v0::openapi::{
Expand All @@ -14,11 +17,12 @@ use stor_port::types::v0::openapi::{
SpecStatus, Volume, VolumeShareProtocol,
},
};
use utils::{dsp_created_by_key, DSP_OPERATOR};
use utils::{dsp_created_by_key, DEFAULT_REQ_TIMEOUT, DSP_OPERATOR};

use regex::Regex;
use std::{collections::HashMap, str::FromStr};
use tonic::{Code, Request, Response, Status};
use std::{collections::HashMap, str::FromStr, time::Duration};
use stor_port::types::v0::openapi::models::AppNode;
use tonic::{transport::Uri, Code, Request, Response, Status};
use tracing::{debug, error, instrument, trace, warn};
use uuid::Uuid;
use volume_capability::AccessType;
Expand All @@ -31,13 +35,15 @@ const SNAPSHOT_NAME_PATTERN: &str =
#[derive(Debug)]
pub(crate) struct CsiControllerSvc {
create_volume_limiter: std::sync::Arc<tokio::sync::Semaphore>,
force_unstage_volume: bool,
}
impl CsiControllerSvc {
pub(crate) fn new(cfg: &CsiControllerConfig) -> Self {
Self {
create_volume_limiter: std::sync::Arc::new(tokio::sync::Semaphore::new(
cfg.create_volume_limit(),
)),
force_unstage_volume: cfg.force_unstage_volume(),
}
}
async fn create_volume_permit(&self) -> Result<tokio::sync::SemaphorePermit, tonic::Status> {
Expand Down Expand Up @@ -91,12 +97,30 @@ fn volume_app_node(volume: &Volume) -> Option<String> {
}
}

#[tracing::instrument]
/// Create a new endpoint that connects to the provided Uri.
/// This endpoint has default connect and request timeouts.
fn tonic_endpoint(endpoint: String) -> Result<tonic::transport::Endpoint, Status> {
let uri =
Uri::try_from(endpoint).map_err(|error| Status::invalid_argument(error.to_string()))?;

let timeout = humantime::parse_duration(DEFAULT_REQ_TIMEOUT).unwrap();
Ok(tonic::transport::Endpoint::from(uri)
.connect_timeout(timeout)
.timeout(std::time::Duration::from_secs(30))
.http2_keep_alive_interval(Duration::from_secs(5))
.keep_alive_timeout(Duration::from_secs(10))
.concurrency_limit(utils::DEFAULT_GRPC_CLIENT_CONCURRENCY))
}

#[tracing::instrument(err, skip_all)]
async fn issue_fs_freeze(endpoint: String, volume_id: String) -> Result<(), Status> {
trace!("Issuing fs freeze");
let mut client = NodePluginClient::connect(format!("http://{endpoint}"))
let channel = tonic_endpoint(format!("http://{endpoint}"))?
.connect()
.await
.map_err(|error| Status::failed_precondition(error.to_string()))?;
.map_err(|error| Status::unavailable(error.to_string()))?;
let mut client = NodePluginClient::new(channel);

match client
.freeze_fs(Request::new(FreezeFsRequest {
volume_id: volume_id.clone(),
Expand All @@ -112,12 +136,15 @@ async fn issue_fs_freeze(endpoint: String, volume_id: String) -> Result<(), Stat
}
}

#[tracing::instrument]
#[tracing::instrument(err, skip_all)]
async fn issue_fs_unfreeze(endpoint: String, volume_id: String) -> Result<(), Status> {
trace!("Issuing fs unfreeze");
let mut client = NodePluginClient::connect(format!("http://{endpoint}"))
let channel = tonic_endpoint(format!("http://{endpoint}"))?
.connect()
.await
.map_err(|error| Status::failed_precondition(error.to_string()))?;
.map_err(|error| Status::unavailable(error.to_string()))?;
let mut client = NodePluginClient::new(channel);

match client
.unfreeze_fs(Request::new(UnfreezeFsRequest {
volume_id: volume_id.clone(),
Expand All @@ -133,6 +160,28 @@ async fn issue_fs_unfreeze(endpoint: String, volume_id: String) -> Result<(), St
}
}

#[tracing::instrument(err, skip_all)]
async fn force_unstage(app_node: AppNode, volume_id: String) -> Result<(), Status> {
tracing::info!(
"Issuing cleanup for volume: {} to node {}, to endpoint {}",
volume_id,
app_node.id,
app_node.spec.endpoint
);
let channel = tonic_endpoint(format!("http://{}", app_node.spec.endpoint))?
.connect()
.await
.map_err(|error| Status::unavailable(error.to_string()))?;
let mut client = NodePluginClient::new(channel);

client
.force_unstage_volume(Request::new(ForceUnstageVolumeRequest {
volume_id: volume_id.clone(),
}))
.await
.map(|_| ())
}

/// Get share URI for existing volume object and the node where the volume is published.
fn get_volume_share_location(volume: &Volume) -> Option<(String, String)> {
volume
Expand Down Expand Up @@ -518,13 +567,12 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
error!("{}", m);
return Err(Status::internal(m));
}
},
}
_ => {

// Check for node being cordoned.
fn cordon_check(spec: Option<&NodeSpec>) -> bool {
if let Some(spec) = spec {
return spec.cordondrainstate.is_some()
return spec.cordondrainstate.is_some();
}
false
}
Expand All @@ -538,13 +586,19 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
// then let the control-plane decide where to place the target. Node should not be cordoned.
Ok(node) if node.state.as_ref().map(|n| n.status).unwrap_or(NodeStatus::Unknown) != NodeStatus::Online || cordon_check(node.spec.as_ref()) => {
Ok(None)
},
}
// For 1-replica volumes, don't pre-select the target node. This will allow the
// control-plane to pin the target to the replica node.
Ok(_) if volume.spec.num_replicas == 1 => Ok(None),
Ok(_) => Ok(Some(node_id.as_str())),
}?;

// Issue a cleanup rpc to csi node to ensure the subsystem doesn't have any path present before publishing
if self.force_unstage_volume {
let app_node = RestApiClient::get_client().get_app_node(&args.node_id).await?;
force_unstage(app_node, volume_id.to_string()).await?;
}

// Volume is not published.
let v = RestApiClient::get_client()
.publish_volume(&volume_id, target_node, protocol, args.node_id.clone(), &publish_context)
Expand Down
19 changes: 13 additions & 6 deletions control-plane/csi-driver/src/bin/controller/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,19 @@ async fn main() -> anyhow::Result<()> {
.help("Formatting style to be used while logging")
)
.arg(
Arg::new("ansi-colors")
.long("ansi-colors")
.default_value("true")
.value_parser(clap::value_parser!(bool))
.help("Enable ansi color for logs")
)
Arg::new("ansi-colors")
.long("ansi-colors")
.default_value("true")
.value_parser(clap::value_parser!(bool))
.help("Enable ansi color for logs")
)
.arg(
Arg::new("force-unstage-volume")
.long("force-unstage-volume")
.default_value("true")
.value_parser(clap::value_parser!(bool))
.help("Enable force unstage volume feature")
)
.get_matches();

utils::print_package_info!();
Expand Down
Loading

0 comments on commit ac33da7

Please sign in to comment.