Skip to content

Commit

Permalink
feat(csi-driver): add pre-publish hook to cleanup stale entries
Browse files Browse the repository at this point in the history
Signed-off-by: Abhinandan Purkait <purkaitabhinandan@gmail.com>
  • Loading branch information
Abhinandan-Purkait committed Dec 12, 2024
1 parent e5e8342 commit 827986c
Show file tree
Hide file tree
Showing 8 changed files with 346 additions and 29 deletions.
10 changes: 10 additions & 0 deletions control-plane/csi-driver/proto/node-service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ service NodePlugin {
rpc UnfreezeFS (UnfreezeFSRequest) returns (UnfreezeFSReply) {}
// Find the volume identified by the volume ID, and return volume information.
rpc FindVolume (FindVolumeRequest) returns (FindVolumeReply) {}
// Force unstage the volume.
rpc ForceUnstageVolume (ForceUnstageVolumeRequest) returns (ForceUnstageVolumeReply) {}
}

enum VolumeType {
Expand Down Expand Up @@ -54,3 +56,11 @@ message FindVolumeReply {
optional VolumeType volume_type = 1;
string device_path = 2; // the device path for the volume
}

// The request message containing ID of the volume to be force unstaged.
message ForceUnstageVolumeRequest {
string volume_id = 1;
}
// Message for response on volume force unstage.
message ForceUnstageVolumeReply {
}
82 changes: 68 additions & 14 deletions control-plane/csi-driver/src/bin/controller/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use crate::{
};
use csi_driver::{
context::{CreateParams, CreateSnapshotParams, PublishParams, QuiesceFsCandidate},
node::internal::{node_plugin_client::NodePluginClient, FreezeFsRequest, UnfreezeFsRequest},
node::internal::{
node_plugin_client::NodePluginClient, ForceUnstageVolumeRequest, FreezeFsRequest,
UnfreezeFsRequest,
},
};
use rpc::csi::{volume_content_source::Type, Topology as CsiTopology, *};
use stor_port::types::v0::openapi::{
Expand All @@ -14,11 +17,12 @@ use stor_port::types::v0::openapi::{
SpecStatus, Volume, VolumeShareProtocol,
},
};
use utils::{dsp_created_by_key, DSP_OPERATOR};
use utils::{dsp_created_by_key, DEFAULT_REQ_TIMEOUT, DSP_OPERATOR};

use regex::Regex;
use std::{collections::HashMap, str::FromStr};
use tonic::{Code, Request, Response, Status};
use std::{collections::HashMap, str::FromStr, time::Duration};
use stor_port::types::v0::openapi::models::AppNode;
use tonic::{transport::Uri, Code, Request, Response, Status};
use tracing::{debug, error, instrument, trace, warn};
use uuid::Uuid;
use volume_capability::AccessType;
Expand Down Expand Up @@ -91,12 +95,30 @@ fn volume_app_node(volume: &Volume) -> Option<String> {
}
}

#[tracing::instrument]
/// Create a new endpoint that connects to the provided Uri.
/// This endpoint has default connect and request timeouts.
fn tonic_endpoint(endpoint: String) -> Result<tonic::transport::Endpoint, Status> {
let uri =
Uri::try_from(endpoint).map_err(|error| Status::invalid_argument(error.to_string()))?;

let timeout = humantime::parse_duration(DEFAULT_REQ_TIMEOUT).unwrap();
Ok(tonic::transport::Endpoint::from(uri)
.connect_timeout(timeout)
.timeout(std::time::Duration::from_secs(30))
.http2_keep_alive_interval(Duration::from_secs(5))
.keep_alive_timeout(Duration::from_secs(10))
.concurrency_limit(utils::DEFAULT_GRPC_CLIENT_CONCURRENCY))
}

#[tracing::instrument(err, skip_all)]
async fn issue_fs_freeze(endpoint: String, volume_id: String) -> Result<(), Status> {
trace!("Issuing fs freeze");
let mut client = NodePluginClient::connect(format!("http://{endpoint}"))
let channel = tonic_endpoint(format!("http://{endpoint}"))?
.connect()
.await
.map_err(|error| Status::failed_precondition(error.to_string()))?;
.map_err(|error| Status::unavailable(error.to_string()))?;
let mut client = NodePluginClient::new(channel);

match client
.freeze_fs(Request::new(FreezeFsRequest {
volume_id: volume_id.clone(),
Expand All @@ -112,12 +134,15 @@ async fn issue_fs_freeze(endpoint: String, volume_id: String) -> Result<(), Stat
}
}

#[tracing::instrument]
#[tracing::instrument(err, skip_all)]
async fn issue_fs_unfreeze(endpoint: String, volume_id: String) -> Result<(), Status> {
trace!("Issuing fs unfreeze");
let mut client = NodePluginClient::connect(format!("http://{endpoint}"))
let channel = tonic_endpoint(format!("http://{endpoint}"))?
.connect()
.await
.map_err(|error| Status::failed_precondition(error.to_string()))?;
.map_err(|error| Status::unavailable(error.to_string()))?;
let mut client = NodePluginClient::new(channel);

match client
.unfreeze_fs(Request::new(UnfreezeFsRequest {
volume_id: volume_id.clone(),
Expand All @@ -133,6 +158,28 @@ async fn issue_fs_unfreeze(endpoint: String, volume_id: String) -> Result<(), St
}
}

#[tracing::instrument(err, skip_all)]
async fn force_unstage(app_node: AppNode, volume_id: String) -> Result<(), Status> {
tracing::info!(
"Issuing cleanup for volume: {} to node {}, to endpoint {}",
volume_id,
app_node.id,
app_node.spec.endpoint
);
let channel = tonic_endpoint(format!("http://{}", app_node.spec.endpoint))?
.connect()
.await
.map_err(|error| Status::unavailable(error.to_string()))?;
let mut client = NodePluginClient::new(channel);

client
.force_unstage_volume(Request::new(ForceUnstageVolumeRequest {
volume_id: volume_id.clone(),
}))
.await
.map(|_| ())
}

/// Get share URI for existing volume object and the node where the volume is published.
fn get_volume_share_location(volume: &Volume) -> Option<(String, String)> {
volume
Expand Down Expand Up @@ -518,13 +565,12 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
error!("{}", m);
return Err(Status::internal(m));
}
},
}
_ => {

// Check for node being cordoned.
fn cordon_check(spec: Option<&NodeSpec>) -> bool {
if let Some(spec) = spec {
return spec.cordondrainstate.is_some()
return spec.cordondrainstate.is_some();
}
false
}
Expand All @@ -538,13 +584,21 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
// then let the control-plane decide where to place the target. Node should not be cordoned.
Ok(node) if node.state.as_ref().map(|n| n.status).unwrap_or(NodeStatus::Unknown) != NodeStatus::Online || cordon_check(node.spec.as_ref()) => {
Ok(None)
},
}
// For 1-replica volumes, don't pre-select the target node. This will allow the
// control-plane to pin the target to the replica node.
Ok(_) if volume.spec.num_replicas == 1 => Ok(None),
Ok(_) => Ok(Some(node_id.as_str())),
}?;

// Issue a cleanup rpc to csi node to ensure the subsystem doesn't have any path present before publishing
match RestApiClient::get_client().get_app_node(&args.node_id).await {
Ok(app_node) => force_unstage(app_node, volume_id.to_string()).await?,
Err(ApiClientError::ResourceNotExists(..)) => warn!("App node: {}, not found, skipping force unstage volume", args.node_id),
Err(error) => return Err(error.into())
}


// Volume is not published.
let v = RestApiClient::get_client()
.publish_volume(&volume_id, target_node, protocol, args.node_id.clone(), &publish_context)
Expand Down
12 changes: 6 additions & 6 deletions control-plane/csi-driver/src/bin/controller/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ async fn main() -> anyhow::Result<()> {
.help("Formatting style to be used while logging")
)
.arg(
Arg::new("ansi-colors")
.long("ansi-colors")
.default_value("true")
.value_parser(clap::value_parser!(bool))
.help("Enable ansi color for logs")
)
Arg::new("ansi-colors")
.long("ansi-colors")
.default_value("true")
.value_parser(clap::value_parser!(bool))
.help("Enable ansi color for logs")
)
.get_matches();

utils::print_package_info!();
Expand Down
145 changes: 144 additions & 1 deletion control-plane/csi-driver/src/bin/node/findmnt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use csi_driver::filesystem::FileSystem as Fs;
use serde_json::Value;
use std::{collections::HashMap, process::Command, str::FromStr, string::String, vec::Vec};
use tracing::{error, warn};
use utils::csi_plugin_name;

// Keys of interest we expect to find in the JSON output generated
// by findmnt.
Expand All @@ -13,11 +14,20 @@ const FSTYPE_KEY: &str = "fstype";

#[derive(Debug)]
pub(crate) struct DeviceMount {
#[allow(dead_code)]
mount_path: String,
fstype: FileSystem,
}

impl std::fmt::Display for DeviceMount {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"MountPath: {}, FileSystem: {}",
self.mount_path, self.fstype
)
}
}

impl DeviceMount {
/// create new DeviceMount
pub(crate) fn new(mount_path: String, fstype: FileSystem) -> DeviceMount {
Expand All @@ -27,6 +37,10 @@ impl DeviceMount {
pub(crate) fn fstype(&self) -> FileSystem {
self.fstype.clone()
}
/// Get the mount path.
pub(crate) fn mount_path(&self) -> String {
self.mount_path.clone()
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -213,3 +227,132 @@ pub(crate) fn get_mountpaths(device_path: &str) -> Result<Vec<DeviceMount>, Devi
Err(e) => Err(e),
}
}

const DRIVER_NAME: &str = "driverName";
const VOLUME_HANDLE: &str = "volumeHandle";
const METADATA_FILE: &str = "vol_data.json";
const DEVICE_PATTERN: &str = "nvme";

/// This filter is to be used specifically search for mountpoints in context of k8s
///
/// # Fields
/// * `driver` - Name of the csi driver which provisioned the volume.
/// * `volume_id` - Name of the volume which is being searched for.
/// * `file_name` - Name of the file where kubelet stores the metadata.
/// * `device_pattern` - Pattern to search for a specific type of device, ex nvme.
#[derive(Debug)]
struct FilterCsiMounts<'a> {
driver: &'a str,
volume_id: &'a str,
file_name: &'a str,
device_pattern: &'a str,
}

/// Finds CSI mount points using `findmnt` and filters based on the given criteria.
fn find_csi_mount(filter: FilterCsiMounts) -> Result<Vec<HashMap<String, String>>, DeviceError> {
let output = Command::new(FIND_MNT).args(FIND_MNT_ARGS).output()?;

if !output.status.success() {
return Err(DeviceError::new(String::from_utf8(output.stderr)?.as_str()));
}

let json_str = String::from_utf8(output.stdout)?;
let json: Value = serde_json::from_str(&json_str)?;

let mut results: Vec<HashMap<String, String>> = Vec::new();
filter_csi_findmnt(&json, &filter, &mut results);

Ok(results)
}

/// Filters JSON output from `findmnt` for matching CSI mount points as per the filter.
fn filter_csi_findmnt(
json_val: &Value,
filter: &FilterCsiMounts,
results: &mut Vec<HashMap<String, String>>,
) {
match json_val {
Value::Array(json_array) => {
for jsonvalue in json_array {
filter_csi_findmnt(jsonvalue, filter, results);
}
}
Value::Object(json_map) => {
if let Some(source_value) = json_map.get(SOURCE_KEY) {
let source_str = key_adjusted_value(SOURCE_KEY, source_value);
if source_str.contains(filter.device_pattern) {
if let Some(target_value) = json_map.get(TARGET_KEY) {
let target_str = target_value.as_str().unwrap_or_default();

if let Some(parent_path) = std::path::Path::new(target_str).parent() {
let vol_data_path = parent_path.join(filter.file_name);
let vol_data_path_str = vol_data_path.to_string_lossy();

if let Ok(vol_data) = read_vol_data_json(&vol_data_path_str) {
if vol_data.get(DRIVER_NAME)
== Some(&Value::String(filter.driver.to_string()))
&& vol_data.get(VOLUME_HANDLE)
== Some(&Value::String(filter.volume_id.to_string()))
{
results.push(jsonmap_to_hashmap(json_map));
}
}
}
}
}
}

for (_, jsonvalue) in json_map {
if jsonvalue.is_array() || jsonvalue.is_object() {
filter_csi_findmnt(jsonvalue, filter, results);
}
}
}
_ => (),
};
}

/// Reads and parses a file into a JSON object.
fn read_vol_data_json(path: &str) -> Result<serde_json::Map<String, Value>, DeviceError> {
let file = std::fs::File::open(path)?;
let reader = std::io::BufReader::new(file);
let json: serde_json::Map<String, Value> = serde_json::from_reader(reader)?;
Ok(json)
}

/// Retrieves mount paths for a given CSI volume ID by parsing the metadata file.
pub(crate) async fn get_csi_mountpaths(volume_id: &str) -> Result<Vec<DeviceMount>, DeviceError> {
let filter = FilterCsiMounts {
driver: &csi_plugin_name(),
volume_id,
file_name: METADATA_FILE,
device_pattern: DEVICE_PATTERN,
};
match find_csi_mount(filter) {
Ok(results) => {
let mut mountpaths: Vec<DeviceMount> = Vec::new();
for entry in results {
if let Some(mountpath) = entry.get(TARGET_KEY) {
if let Some(fstype) = entry.get(FSTYPE_KEY) {
mountpaths.push(DeviceMount::new(
mountpath.to_string(),
Fs::from_str(fstype)
.unwrap_or(Fs::Unsupported(fstype.to_string()))
.into(),
))
} else {
warn!(volume.id=%volume_id, "Missing fstype for {mountpath}");
mountpaths.push(DeviceMount::new(
mountpath.to_string(),
Fs::Unsupported("".to_string()).into(),
))
}
} else {
warn!(volume.id=%volume_id, ?entry, "Missing target field");
}
}
Ok(mountpaths)
}
Err(e) => Err(e),
}
}
Loading

0 comments on commit 827986c

Please sign in to comment.