From 3fa1fe2373f75f49b36b4f9aded8cc019b8e7e04 Mon Sep 17 00:00:00 2001 From: Abhinandan Purkait Date: Wed, 11 Dec 2024 10:46:33 +0000 Subject: [PATCH] feat(csi-driver): add pre-publish hook to cleanup stale entries Signed-off-by: Abhinandan Purkait --- .../csi-driver/proto/node-service.proto | 10 ++ .../src/bin/controller/controller.rs | 23 ++- .../csi-driver/src/bin/node/findmnt.rs | 145 +++++++++++++++++- .../csi-driver/src/bin/node/main_.rs | 16 +- .../csi-driver/src/bin/node/mount.rs | 50 +++++- control-plane/csi-driver/src/bin/node/node.rs | 2 +- .../src/bin/node/nodeplugin_grpc.rs | 93 ++++++++++- .../csi-driver/src/bin/node/shutdown_event.rs | 11 +- 8 files changed, 336 insertions(+), 14 deletions(-) diff --git a/control-plane/csi-driver/proto/node-service.proto b/control-plane/csi-driver/proto/node-service.proto index 7cb2b6390..0271b3a65 100644 --- a/control-plane/csi-driver/proto/node-service.proto +++ b/control-plane/csi-driver/proto/node-service.proto @@ -20,6 +20,8 @@ service NodePlugin { rpc UnfreezeFS (UnfreezeFSRequest) returns (UnfreezeFSReply) {} // Find the volume identified by the volume ID, and return volume information. rpc FindVolume (FindVolumeRequest) returns (FindVolumeReply) {} + // Issue Cleanup for the volume, unmount and disconnect. + rpc Cleanup (CleanupRequest) returns (CleanupReply) {} } enum VolumeType { @@ -54,3 +56,11 @@ message FindVolumeReply { optional VolumeType volume_type = 1; string device_path = 2; // the device path for the volume } + +// The request message containing ID of the volume path to be cleaned up +message CleanupRequest { + string volume_id = 1; +} +// Message for response on a volume cleanup +message CleanupReply { +} diff --git a/control-plane/csi-driver/src/bin/controller/controller.rs b/control-plane/csi-driver/src/bin/controller/controller.rs index 6ac68a8ec..616470c9d 100644 --- a/control-plane/csi-driver/src/bin/controller/controller.rs +++ b/control-plane/csi-driver/src/bin/controller/controller.rs @@ -4,7 +4,9 @@ use crate::{ }; use csi_driver::{ context::{CreateParams, CreateSnapshotParams, PublishParams, QuiesceFsCandidate}, - node::internal::{node_plugin_client::NodePluginClient, FreezeFsRequest, UnfreezeFsRequest}, + node::internal::{ + node_plugin_client::NodePluginClient, CleanupRequest, FreezeFsRequest, UnfreezeFsRequest, + }, }; use rpc::csi::{volume_content_source::Type, Topology as CsiTopology, *}; use stor_port::types::v0::openapi::{ @@ -133,6 +135,20 @@ async fn issue_fs_unfreeze(endpoint: String, volume_id: String) -> Result<(), St } } +#[tracing::instrument] +async fn issue_cleanup(endpoint: String, volume_id: String) -> Result<(), Status> { + tracing::info!("Issuing cleanup of stale entries before publish for volume: {volume_id}"); + let mut client = NodePluginClient::connect(format!("http://{endpoint}")) + .await + .map_err(|error| Status::failed_precondition(error.to_string()))?; + client + .cleanup(Request::new(CleanupRequest { + volume_id: volume_id.clone(), + })) + .await + .map(|_| ()) +} + /// Get share URI for existing volume object and the node where the volume is published. fn get_volume_share_location(volume: &Volume) -> Option<(String, String)> { volume @@ -545,6 +561,11 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { Ok(_) => Ok(Some(node_id.as_str())), }?; + // Issue a cleanup rpc to csi node to ensure the subsystem doesn't have any path present before publishing + let app_node = RestApiClient::get_client().get_app_node(&args.node_id).await?; + tracing::info!("Issuing clean up to node {}, to endpoint {}", app_node.id, app_node.spec.endpoint); + issue_cleanup(app_node.spec.endpoint, volume_id.to_string()).await?; + // Volume is not published. let v = RestApiClient::get_client() .publish_volume(&volume_id, target_node, protocol, args.node_id.clone(), &publish_context) diff --git a/control-plane/csi-driver/src/bin/node/findmnt.rs b/control-plane/csi-driver/src/bin/node/findmnt.rs index 70837ac1e..aed1b4d5a 100644 --- a/control-plane/csi-driver/src/bin/node/findmnt.rs +++ b/control-plane/csi-driver/src/bin/node/findmnt.rs @@ -13,11 +13,20 @@ const FSTYPE_KEY: &str = "fstype"; #[derive(Debug)] pub(crate) struct DeviceMount { - #[allow(dead_code)] mount_path: String, fstype: FileSystem, } +impl std::fmt::Display for DeviceMount { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "[ Mount Path: {}, File System Type: {} ]", + self.mount_path, self.fstype + ) + } +} + impl DeviceMount { /// create new DeviceMount pub(crate) fn new(mount_path: String, fstype: FileSystem) -> DeviceMount { @@ -27,6 +36,10 @@ impl DeviceMount { pub(crate) fn fstype(&self) -> FileSystem { self.fstype.clone() } + /// Mount path + pub(crate) fn mount_path(&self) -> String { + self.mount_path.clone() + } } #[derive(Debug)] @@ -213,3 +226,133 @@ pub(crate) fn get_mountpaths(device_path: &str) -> Result, Devi Err(e) => Err(e), } } + +const DRIVER_NAME: &str = "driverName"; +const VOLUME_HANDLE: &str = "volumeHandle"; +const MAYASTOR_DRIVER: &str = "io.openebs.csi-mayastor"; +const METADATA_FILE: &str = "vol_data.json"; +const DEVICE_PATTERN: &str = "nvme"; + +/// This filter is to be used specifically search for mountpoints in context of k8s +/// +/// # Fields +/// * `driver` - Name of the csi driver which provisioned the volume. +/// * `volume_id` - Name of the volume which is being searched for. +/// * `file_name` - Name of the file where kubelet stores the metadata. +/// * `device_pattern` - Pattern to search for a specific type of device, ex nvme. +#[derive(Debug)] +struct FilterCsiMounts<'a> { + driver: &'a str, + volume_id: &'a str, + file_name: &'a str, + device_pattern: &'a str, +} + +/// Finds CSI mount points using `findmnt` and filters based on the given criteria. +fn find_csi_mount(filter: FilterCsiMounts) -> Result>, DeviceError> { + let output = Command::new(FIND_MNT).args(FIND_MNT_ARGS).output()?; + + if !output.status.success() { + return Err(DeviceError::new(String::from_utf8(output.stderr)?.as_str())); + } + + let json_str = String::from_utf8(output.stdout)?; + let json: Value = serde_json::from_str(&json_str)?; + + let mut results: Vec> = Vec::new(); + filter_csi_findmnt(&json, &filter, &mut results); + + Ok(results) +} + +/// Filters JSON output from `findmnt` for matching CSI mount points as per the filter. +fn filter_csi_findmnt( + json_val: &Value, + filter: &FilterCsiMounts, + results: &mut Vec>, +) { + match json_val { + Value::Array(json_array) => { + for jsonvalue in json_array { + filter_csi_findmnt(jsonvalue, filter, results); + } + } + Value::Object(json_map) => { + if let Some(source_value) = json_map.get(SOURCE_KEY) { + let source_str = key_adjusted_value(SOURCE_KEY, source_value); + if source_str.contains(filter.device_pattern) { + if let Some(target_value) = json_map.get(TARGET_KEY) { + let target_str = target_value.as_str().unwrap_or_default(); + + if let Some(parent_path) = std::path::Path::new(target_str).parent() { + let vol_data_path = parent_path.join(filter.file_name); + let vol_data_path_str = vol_data_path.to_string_lossy(); + + if let Ok(vol_data) = read_vol_data_json(&vol_data_path_str) { + if vol_data.get(DRIVER_NAME) + == Some(&Value::String(filter.driver.to_string())) + && vol_data.get(VOLUME_HANDLE) + == Some(&Value::String(filter.volume_id.to_string())) + { + results.push(jsonmap_to_hashmap(json_map)); + } + } + } + } + } + } + + for (_, jsonvalue) in json_map { + if jsonvalue.is_array() || jsonvalue.is_object() { + filter_csi_findmnt(jsonvalue, filter, results); + } + } + } + _ => (), + }; +} + +/// Reads and parses a file into a JSON object. +fn read_vol_data_json(path: &str) -> Result, DeviceError> { + let file = std::fs::File::open(path)?; + let reader = std::io::BufReader::new(file); + let json: serde_json::Map = serde_json::from_reader(reader)?; + Ok(json) +} + +/// Retrieves mount paths for a given CSI volume ID by parsing the metadata file. +pub(crate) async fn get_csi_mountpaths(volume_id: &str) -> Result, DeviceError> { + let filter = FilterCsiMounts { + driver: MAYASTOR_DRIVER, + volume_id, + file_name: METADATA_FILE, + device_pattern: DEVICE_PATTERN, + }; + match find_csi_mount(filter) { + Ok(results) => { + let mut mountpaths: Vec = Vec::new(); + for entry in results { + if let Some(mountpath) = entry.get(TARGET_KEY) { + if let Some(fstype) = entry.get(FSTYPE_KEY) { + mountpaths.push(DeviceMount::new( + mountpath.to_string(), + Fs::from_str(fstype) + .unwrap_or(Fs::Unsupported(fstype.to_string())) + .into(), + )) + } else { + error!("Missing fstype for {}", mountpath); + mountpaths.push(DeviceMount::new( + mountpath.to_string(), + Fs::Unsupported("".to_string()).into(), + )) + } + } else { + warn!("missing target field {:?}", entry); + } + } + Ok(mountpaths) + } + Err(e) => Err(e), + } +} diff --git a/control-plane/csi-driver/src/bin/node/main_.rs b/control-plane/csi-driver/src/bin/node/main_.rs index f4528573f..71071642d 100644 --- a/control-plane/csi-driver/src/bin/node/main_.rs +++ b/control-plane/csi-driver/src/bin/node/main_.rs @@ -29,7 +29,7 @@ use std::{ use tokio::net::UnixListener; use tokio_stream::wrappers::UnixListenerStream; use tonic::transport::Server; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; const GRPC_PORT: u16 = 50051; @@ -144,6 +144,13 @@ pub(super) async fn main() -> anyhow::Result<()> { .value_parser(clap::value_parser!(bool)) .help("Enable ansi color for logs") ) + .arg( + Arg::new("stale-entry-cleanup") + .long("stale-entry-cleanup") + .action(clap::ArgAction::SetTrue) + .value_name("BOOLEAN") + .help("Enable cleanup of the stale entries before controller publish") + ) .subcommand( clap::Command::new("fs-freeze") .arg( @@ -270,6 +277,11 @@ pub(super) async fn main() -> anyhow::Result<()> { let registration_enabled = matches.get_flag("enable-registration"); + let stale_entry_cleanup = matches.get_flag("stale-entry-cleanup"); + if !stale_entry_cleanup { + warn!("Stale entry cleanup is disabled!") + } + // Parse instance and grpc endpoints from the command line arguments and validate. let grpc_sock_addr = validate_endpoints( matches.get_one::("grpc-endpoint").unwrap(), @@ -281,7 +293,7 @@ pub(super) async fn main() -> anyhow::Result<()> { *crate::config::config().nvme_as_mut() = TryFrom::try_from(&matches)?; let (csi, grpc, registration) = tokio::join!( CsiServer::run(csi_socket, &matches)?, - NodePluginGrpcServer::run(grpc_sock_addr), + NodePluginGrpcServer::run(grpc_sock_addr, stale_entry_cleanup), run_registration_loop( node_name.clone(), grpc_sock_addr.to_string(), diff --git a/control-plane/csi-driver/src/bin/node/mount.rs b/control-plane/csi-driver/src/bin/node/mount.rs index d33d58ee8..8585aa88c 100644 --- a/control-plane/csi-driver/src/bin/node/mount.rs +++ b/control-plane/csi-driver/src/bin/node/mount.rs @@ -1,10 +1,11 @@ //! Utility functions for mounting and unmounting filesystems. -use crate::{filesystem_ops::FileSystem, runtime}; +use crate::{filesystem_ops::FileSystem, findmnt::DeviceMount, runtime}; use csi_driver::filesystem::FileSystem as Fs; use devinfo::mountinfo::{MountInfo, SafeMountIter}; -use std::{collections::HashSet, io::Error}; +use std::{collections::HashSet, io::Error, path::PathBuf}; use sys_mount::{unmount, FilesystemType, Mount, MountFlags, UnmountFlags}; +use tonic::Status; use tracing::{debug, info}; use uuid::Uuid; @@ -425,3 +426,48 @@ pub(crate) async fn unmount_on_fs_id_diff( ) }) } + +pub(crate) async fn lazy_unmount_mountpaths(mountpaths: &Vec) -> Result<(), Status> { + for mountpath in mountpaths { + debug!( + "Unmounting path: {}, with DETACH flag", + mountpath.mount_path() + ); + let target_path = mountpath.mount_path().to_string(); + + runtime::spawn_blocking({ + let target_path = target_path.clone(); + move || { + let mut unmount_flags = UnmountFlags::empty(); + unmount_flags.insert(UnmountFlags::DETACH); + sys_mount::unmount(target_path, unmount_flags) + .map_err(|error| Status::aborted(error.to_string())) + } + }) + .await + .map_err(|error| Status::aborted(error.to_string()))??; + + if target_path.ends_with("globalmount") { + if let Some(parent) = PathBuf::from(&target_path).parent() { + if let Err(error) = tokio::fs::remove_dir_all(parent).await { + tracing::error!( + "Failed to remove directory at path {}: {}", + target_path, + error + ); + return Err(Status::internal(format!( + "Failed to remove directory {}: {}", + target_path, error + ))); + } + + debug!( + "Successfully removed parent directory {} of : {} ", + parent.to_string_lossy(), + target_path + ); + } + } + } + Ok(()) +} diff --git a/control-plane/csi-driver/src/bin/node/node.rs b/control-plane/csi-driver/src/bin/node/node.rs index d184862f1..d9f2156b8 100644 --- a/control-plane/csi-driver/src/bin/node/node.rs +++ b/control-plane/csi-driver/src/bin/node/node.rs @@ -134,7 +134,7 @@ fn get_access_type(volume_capability: &Option) -> Result<&Acce /// Detach the nexus device from the system, either at volume unstage, /// or after failed filesystem mount at volume stage. -async fn detach(uuid: &Uuid, errheader: String) -> Result<(), Status> { +pub(crate) async fn detach(uuid: &Uuid, errheader: String) -> Result<(), Status> { if let Some(device) = Device::lookup(uuid).await.map_err(|error| { failure!( Code::Internal, diff --git a/control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs b/control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs index eeb894409..16cdf27dd 100644 --- a/control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs +++ b/control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs @@ -3,23 +3,34 @@ //! node as a IoEngine CSI node plugin, but it is not possible to do so within //! the CSI framework. This service must be deployed on all nodes the //! IoEngine CSI node plugin is deployed. + use crate::{ + dev::Device, + findmnt, fsfreeze::{fsfreeze, FsFreezeOpt}, + mount::lazy_unmount_mountpaths, nodeplugin_svc, nodeplugin_svc::{find_mount, lookup_device}, + runtime, shutdown_event::Shutdown, }; use csi_driver::node::internal::{ node_plugin_server::{NodePlugin, NodePluginServer}, - FindVolumeReply, FindVolumeRequest, FreezeFsReply, FreezeFsRequest, UnfreezeFsReply, - UnfreezeFsRequest, VolumeType, + CleanupReply, CleanupRequest, FindVolumeReply, FindVolumeRequest, FreezeFsReply, + FreezeFsRequest, UnfreezeFsReply, UnfreezeFsRequest, VolumeType, }; use nodeplugin_svc::TypeOfMount; +use nvmeadm::{error::NvmeError, nvmf_subsystem::Subsystem}; +use utils::nvme_target_nqn_prefix; + use tonic::{transport::Server, Request, Response, Status}; use tracing::{debug, error, info}; +use uuid::Uuid; #[derive(Debug, Default)] -pub(crate) struct NodePluginSvc {} +pub(crate) struct NodePluginSvc { + stale_entry_cleanup: bool, +} #[tonic::async_trait] impl NodePlugin for NodePluginSvc { @@ -54,6 +65,73 @@ impl NodePlugin for NodePluginSvc { device_path: device.devname(), })) } + + async fn cleanup( + &self, + request: Request, + ) -> Result, Status> { + if !self.stale_entry_cleanup { + return Ok(Response::new(CleanupReply {})); + } + let volume_id = request.into_inner().volume_id; + debug!("Starting cleanup for volume: {volume_id}"); + + let nqn = format!("{}:{volume_id}", nvme_target_nqn_prefix()); + match Subsystem::try_from_nqn(&nqn) { + Ok(subsystem_paths) => { + for subsystem_path in subsystem_paths { + debug!( + "Processing subsystem path: addr: {:?}, transport: {}, state: {}", + subsystem_path.address, subsystem_path.transport, subsystem_path.state + ); + if !subsystem_path.state.contains("deleting") { + runtime::spawn_blocking(move || { + subsystem_path + .disconnect() + .map_err(|error| Status::aborted(error.to_string())) + }) + .await + .map_err(|error| Status::aborted(error.to_string()))??; + } + } + Err(Status::internal(format!( + "Cleanup initiated for all stale entries of: {volume_id}. Returning error for validation on retry." + ))) + } + Err(NvmeError::NqnNotFound { .. }) => { + let uuid = Uuid::parse_str(&volume_id).map_err(|error| { + Status::invalid_argument(format!("Invalid volume UUID: {volume_id}, {error}")) + })?; + + if let Ok(Some(device)) = Device::lookup(&uuid).await { + let mountpaths = findmnt::get_mountpaths(&device.devname())?; + debug!( + "Device: {} found, with mount paths: {}, issuing unmount", + device.devname(), + mountpaths + .iter() + .map(|devmount| devmount.to_string()) + .collect::>() + .join(", ") + ); + lazy_unmount_mountpaths(&mountpaths).await?; + } else { + let mountpaths = findmnt::get_csi_mountpaths(&volume_id).await?; + debug!( + "Device was not found, detected mount paths: {}, issuing unmount", + mountpaths + .iter() + .map(|devmount| devmount.to_string()) + .collect::>() + .join(", ") + ); + lazy_unmount_mountpaths(&mountpaths).await?; + } + Ok(Response::new(CleanupReply {})) + } + Err(error) => Err(Status::aborted(error.to_string())), + } + } } impl From for VolumeType { @@ -70,13 +148,18 @@ pub(crate) struct NodePluginGrpcServer {} impl NodePluginGrpcServer { /// Run `Self` as a tonic server. - pub(crate) async fn run(endpoint: std::net::SocketAddr) -> anyhow::Result<()> { + pub(crate) async fn run( + endpoint: std::net::SocketAddr, + stale_entry_cleanup: bool, + ) -> anyhow::Result<()> { info!( "node plugin gRPC server configured at address {:?}", endpoint ); Server::builder() - .add_service(NodePluginServer::new(NodePluginSvc {})) + .add_service(NodePluginServer::new(NodePluginSvc { + stale_entry_cleanup, + })) .serve_with_shutdown(endpoint, Shutdown::wait()) .await .map_err(|error| { diff --git a/control-plane/csi-driver/src/bin/node/shutdown_event.rs b/control-plane/csi-driver/src/bin/node/shutdown_event.rs index 5ca9a9e64..766c699d4 100644 --- a/control-plane/csi-driver/src/bin/node/shutdown_event.rs +++ b/control-plane/csi-driver/src/bin/node/shutdown_event.rs @@ -13,8 +13,8 @@ mod tests { use csi_driver::node::internal::{ node_plugin_client::NodePluginClient, node_plugin_server::{NodePlugin, NodePluginServer}, - FindVolumeReply, FindVolumeRequest, FreezeFsReply, FreezeFsRequest, UnfreezeFsReply, - UnfreezeFsRequest, VolumeType, + CleanupReply, CleanupRequest, FindVolumeReply, FindVolumeRequest, FreezeFsReply, + FreezeFsRequest, UnfreezeFsReply, UnfreezeFsRequest, VolumeType, }; use std::{ str::FromStr, @@ -73,6 +73,13 @@ mod tests { device_path: "".to_string(), })) } + + async fn cleanup( + &self, + _request: Request, + ) -> Result, Status> { + unimplemented!() + } } /// Tests the shutdown of a tonic service.