Skip to content

Commit

Permalink
feat(csi-driver): add pre-publish hook to cleanup stale entries
Browse files Browse the repository at this point in the history
Signed-off-by: Abhinandan Purkait <purkaitabhinandan@gmail.com>
  • Loading branch information
Abhinandan-Purkait committed Dec 11, 2024
1 parent d54a84d commit 3fa1fe2
Show file tree
Hide file tree
Showing 8 changed files with 336 additions and 14 deletions.
10 changes: 10 additions & 0 deletions control-plane/csi-driver/proto/node-service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ service NodePlugin {
rpc UnfreezeFS (UnfreezeFSRequest) returns (UnfreezeFSReply) {}
// Find the volume identified by the volume ID, and return volume information.
rpc FindVolume (FindVolumeRequest) returns (FindVolumeReply) {}
// Issue Cleanup for the volume, unmount and disconnect.
rpc Cleanup (CleanupRequest) returns (CleanupReply) {}
}

enum VolumeType {
Expand Down Expand Up @@ -54,3 +56,11 @@ message FindVolumeReply {
optional VolumeType volume_type = 1;
string device_path = 2; // the device path for the volume
}

// The request message containing ID of the volume path to be cleaned up
message CleanupRequest {
string volume_id = 1;
}
// Message for response on a volume cleanup
message CleanupReply {
}
23 changes: 22 additions & 1 deletion control-plane/csi-driver/src/bin/controller/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use crate::{
};
use csi_driver::{
context::{CreateParams, CreateSnapshotParams, PublishParams, QuiesceFsCandidate},
node::internal::{node_plugin_client::NodePluginClient, FreezeFsRequest, UnfreezeFsRequest},
node::internal::{
node_plugin_client::NodePluginClient, CleanupRequest, FreezeFsRequest, UnfreezeFsRequest,
},
};
use rpc::csi::{volume_content_source::Type, Topology as CsiTopology, *};
use stor_port::types::v0::openapi::{
Expand Down Expand Up @@ -133,6 +135,20 @@ async fn issue_fs_unfreeze(endpoint: String, volume_id: String) -> Result<(), St
}
}

#[tracing::instrument]
async fn issue_cleanup(endpoint: String, volume_id: String) -> Result<(), Status> {
tracing::info!("Issuing cleanup of stale entries before publish for volume: {volume_id}");
let mut client = NodePluginClient::connect(format!("http://{endpoint}"))
.await
.map_err(|error| Status::failed_precondition(error.to_string()))?;
client
.cleanup(Request::new(CleanupRequest {
volume_id: volume_id.clone(),
}))
.await
.map(|_| ())
}

/// Get share URI for existing volume object and the node where the volume is published.
fn get_volume_share_location(volume: &Volume) -> Option<(String, String)> {
volume
Expand Down Expand Up @@ -545,6 +561,11 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
Ok(_) => Ok(Some(node_id.as_str())),
}?;

// Issue a cleanup rpc to csi node to ensure the subsystem doesn't have any path present before publishing
let app_node = RestApiClient::get_client().get_app_node(&args.node_id).await?;
tracing::info!("Issuing clean up to node {}, to endpoint {}", app_node.id, app_node.spec.endpoint);
issue_cleanup(app_node.spec.endpoint, volume_id.to_string()).await?;

// Volume is not published.
let v = RestApiClient::get_client()
.publish_volume(&volume_id, target_node, protocol, args.node_id.clone(), &publish_context)
Expand Down
145 changes: 144 additions & 1 deletion control-plane/csi-driver/src/bin/node/findmnt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,20 @@ const FSTYPE_KEY: &str = "fstype";

#[derive(Debug)]
pub(crate) struct DeviceMount {
#[allow(dead_code)]
mount_path: String,
fstype: FileSystem,
}

impl std::fmt::Display for DeviceMount {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"[ Mount Path: {}, File System Type: {} ]",
self.mount_path, self.fstype
)
}
}

impl DeviceMount {
/// create new DeviceMount
pub(crate) fn new(mount_path: String, fstype: FileSystem) -> DeviceMount {
Expand All @@ -27,6 +36,10 @@ impl DeviceMount {
pub(crate) fn fstype(&self) -> FileSystem {
self.fstype.clone()
}
/// Mount path
pub(crate) fn mount_path(&self) -> String {
self.mount_path.clone()
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -213,3 +226,133 @@ pub(crate) fn get_mountpaths(device_path: &str) -> Result<Vec<DeviceMount>, Devi
Err(e) => Err(e),
}
}

const DRIVER_NAME: &str = "driverName";
const VOLUME_HANDLE: &str = "volumeHandle";
const MAYASTOR_DRIVER: &str = "io.openebs.csi-mayastor";
const METADATA_FILE: &str = "vol_data.json";
const DEVICE_PATTERN: &str = "nvme";

/// This filter is to be used specifically search for mountpoints in context of k8s
///
/// # Fields
/// * `driver` - Name of the csi driver which provisioned the volume.
/// * `volume_id` - Name of the volume which is being searched for.
/// * `file_name` - Name of the file where kubelet stores the metadata.
/// * `device_pattern` - Pattern to search for a specific type of device, ex nvme.
#[derive(Debug)]
struct FilterCsiMounts<'a> {
driver: &'a str,
volume_id: &'a str,
file_name: &'a str,
device_pattern: &'a str,
}

/// Finds CSI mount points using `findmnt` and filters based on the given criteria.
fn find_csi_mount(filter: FilterCsiMounts) -> Result<Vec<HashMap<String, String>>, DeviceError> {
let output = Command::new(FIND_MNT).args(FIND_MNT_ARGS).output()?;

if !output.status.success() {
return Err(DeviceError::new(String::from_utf8(output.stderr)?.as_str()));
}

let json_str = String::from_utf8(output.stdout)?;
let json: Value = serde_json::from_str(&json_str)?;

let mut results: Vec<HashMap<String, String>> = Vec::new();
filter_csi_findmnt(&json, &filter, &mut results);

Ok(results)
}

/// Filters JSON output from `findmnt` for matching CSI mount points as per the filter.
fn filter_csi_findmnt(
json_val: &Value,
filter: &FilterCsiMounts,
results: &mut Vec<HashMap<String, String>>,
) {
match json_val {
Value::Array(json_array) => {
for jsonvalue in json_array {
filter_csi_findmnt(jsonvalue, filter, results);
}
}
Value::Object(json_map) => {
if let Some(source_value) = json_map.get(SOURCE_KEY) {
let source_str = key_adjusted_value(SOURCE_KEY, source_value);
if source_str.contains(filter.device_pattern) {
if let Some(target_value) = json_map.get(TARGET_KEY) {
let target_str = target_value.as_str().unwrap_or_default();

if let Some(parent_path) = std::path::Path::new(target_str).parent() {
let vol_data_path = parent_path.join(filter.file_name);
let vol_data_path_str = vol_data_path.to_string_lossy();

if let Ok(vol_data) = read_vol_data_json(&vol_data_path_str) {
if vol_data.get(DRIVER_NAME)
== Some(&Value::String(filter.driver.to_string()))
&& vol_data.get(VOLUME_HANDLE)
== Some(&Value::String(filter.volume_id.to_string()))
{
results.push(jsonmap_to_hashmap(json_map));
}
}
}
}
}
}

for (_, jsonvalue) in json_map {
if jsonvalue.is_array() || jsonvalue.is_object() {
filter_csi_findmnt(jsonvalue, filter, results);
}
}
}
_ => (),
};
}

/// Reads and parses a file into a JSON object.
fn read_vol_data_json(path: &str) -> Result<serde_json::Map<String, Value>, DeviceError> {
let file = std::fs::File::open(path)?;
let reader = std::io::BufReader::new(file);
let json: serde_json::Map<String, Value> = serde_json::from_reader(reader)?;
Ok(json)
}

/// Retrieves mount paths for a given CSI volume ID by parsing the metadata file.
pub(crate) async fn get_csi_mountpaths(volume_id: &str) -> Result<Vec<DeviceMount>, DeviceError> {
let filter = FilterCsiMounts {
driver: MAYASTOR_DRIVER,
volume_id,
file_name: METADATA_FILE,
device_pattern: DEVICE_PATTERN,
};
match find_csi_mount(filter) {
Ok(results) => {
let mut mountpaths: Vec<DeviceMount> = Vec::new();
for entry in results {
if let Some(mountpath) = entry.get(TARGET_KEY) {
if let Some(fstype) = entry.get(FSTYPE_KEY) {
mountpaths.push(DeviceMount::new(
mountpath.to_string(),
Fs::from_str(fstype)
.unwrap_or(Fs::Unsupported(fstype.to_string()))
.into(),
))
} else {
error!("Missing fstype for {}", mountpath);
mountpaths.push(DeviceMount::new(
mountpath.to_string(),
Fs::Unsupported("".to_string()).into(),
))
}
} else {
warn!("missing target field {:?}", entry);
}
}
Ok(mountpaths)
}
Err(e) => Err(e),
}
}
16 changes: 14 additions & 2 deletions control-plane/csi-driver/src/bin/node/main_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::{
use tokio::net::UnixListener;
use tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::Server;
use tracing::{debug, error, info};
use tracing::{debug, error, info, warn};

const GRPC_PORT: u16 = 50051;

Expand Down Expand Up @@ -144,6 +144,13 @@ pub(super) async fn main() -> anyhow::Result<()> {
.value_parser(clap::value_parser!(bool))
.help("Enable ansi color for logs")
)
.arg(
Arg::new("stale-entry-cleanup")
.long("stale-entry-cleanup")
.action(clap::ArgAction::SetTrue)
.value_name("BOOLEAN")
.help("Enable cleanup of the stale entries before controller publish")
)
.subcommand(
clap::Command::new("fs-freeze")
.arg(
Expand Down Expand Up @@ -270,6 +277,11 @@ pub(super) async fn main() -> anyhow::Result<()> {

let registration_enabled = matches.get_flag("enable-registration");

let stale_entry_cleanup = matches.get_flag("stale-entry-cleanup");
if !stale_entry_cleanup {
warn!("Stale entry cleanup is disabled!")
}

// Parse instance and grpc endpoints from the command line arguments and validate.
let grpc_sock_addr = validate_endpoints(
matches.get_one::<String>("grpc-endpoint").unwrap(),
Expand All @@ -281,7 +293,7 @@ pub(super) async fn main() -> anyhow::Result<()> {
*crate::config::config().nvme_as_mut() = TryFrom::try_from(&matches)?;
let (csi, grpc, registration) = tokio::join!(
CsiServer::run(csi_socket, &matches)?,
NodePluginGrpcServer::run(grpc_sock_addr),
NodePluginGrpcServer::run(grpc_sock_addr, stale_entry_cleanup),
run_registration_loop(
node_name.clone(),
grpc_sock_addr.to_string(),
Expand Down
50 changes: 48 additions & 2 deletions control-plane/csi-driver/src/bin/node/mount.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
//! Utility functions for mounting and unmounting filesystems.
use crate::{filesystem_ops::FileSystem, runtime};
use crate::{filesystem_ops::FileSystem, findmnt::DeviceMount, runtime};
use csi_driver::filesystem::FileSystem as Fs;
use devinfo::mountinfo::{MountInfo, SafeMountIter};

use std::{collections::HashSet, io::Error};
use std::{collections::HashSet, io::Error, path::PathBuf};
use sys_mount::{unmount, FilesystemType, Mount, MountFlags, UnmountFlags};
use tonic::Status;
use tracing::{debug, info};
use uuid::Uuid;

Expand Down Expand Up @@ -425,3 +426,48 @@ pub(crate) async fn unmount_on_fs_id_diff(
)
})
}

pub(crate) async fn lazy_unmount_mountpaths(mountpaths: &Vec<DeviceMount>) -> Result<(), Status> {
for mountpath in mountpaths {
debug!(
"Unmounting path: {}, with DETACH flag",
mountpath.mount_path()
);
let target_path = mountpath.mount_path().to_string();

runtime::spawn_blocking({
let target_path = target_path.clone();
move || {
let mut unmount_flags = UnmountFlags::empty();
unmount_flags.insert(UnmountFlags::DETACH);
sys_mount::unmount(target_path, unmount_flags)
.map_err(|error| Status::aborted(error.to_string()))
}
})
.await
.map_err(|error| Status::aborted(error.to_string()))??;

if target_path.ends_with("globalmount") {
if let Some(parent) = PathBuf::from(&target_path).parent() {
if let Err(error) = tokio::fs::remove_dir_all(parent).await {
tracing::error!(
"Failed to remove directory at path {}: {}",
target_path,
error
);
return Err(Status::internal(format!(
"Failed to remove directory {}: {}",
target_path, error
)));
}

debug!(
"Successfully removed parent directory {} of : {} ",
parent.to_string_lossy(),
target_path
);
}
}
}
Ok(())
}
2 changes: 1 addition & 1 deletion control-plane/csi-driver/src/bin/node/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ fn get_access_type(volume_capability: &Option<VolumeCapability>) -> Result<&Acce

/// Detach the nexus device from the system, either at volume unstage,
/// or after failed filesystem mount at volume stage.
async fn detach(uuid: &Uuid, errheader: String) -> Result<(), Status> {
pub(crate) async fn detach(uuid: &Uuid, errheader: String) -> Result<(), Status> {
if let Some(device) = Device::lookup(uuid).await.map_err(|error| {
failure!(
Code::Internal,
Expand Down
Loading

0 comments on commit 3fa1fe2

Please sign in to comment.