Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store is_provisioned in cluster marker file and use for logging #2658

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 132 additions & 37 deletions crates/node/src/cluster_marker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use restate_types::config::node_filepath;
use semver::Version;
use std::cmp::Ordering;
use std::fs::OpenOptions;
use std::fs::{File, OpenOptions};
use std::path::Path;
use tracing::debug;

Expand Down Expand Up @@ -59,6 +59,8 @@ pub enum ClusterValidationError {
ParsingVersion(#[from] semver::Error),
#[error("failed creating cluster marker file: {0}")]
CreateFile(std::io::Error),
#[error("failed syncing the cluster marker file: {0}")]
SyncFile(std::io::Error),
#[error("failed writing new cluster marker file: {0}")]
RenameFile(std::io::Error),
#[error("failed decoding cluster marker: {0}")]
Expand Down Expand Up @@ -94,27 +96,32 @@ pub struct ClusterMarker {
/// Minimum required version to read data. Optional since it was introduced after 0.9.
/// This field should only be updated when updating the `max_version` field.
min_forward_compatible_version: Option<Version>,
/// True if the cluster is known to be provisioned. Versions < 1.2 don't have this field stored.
is_provisioned: Option<bool>,
}

impl ClusterMarker {
fn new(
cluster_name: String,
current_version: Version,
min_forward_compatible_version: Version,
is_provisioned: bool,
) -> Self {
Self {
cluster_name,
max_version: current_version.clone(),
current_version,
min_forward_compatible_version: Some(min_forward_compatible_version),
is_provisioned: Some(is_provisioned),
}
}
}

/// Validates and updates the cluster marker wrt to the currently used Restate version.
/// Validates and updates the cluster marker wrt to the currently used Restate version. Returns
/// whether the cluster was provisioned before.
pub fn validate_and_update_cluster_marker(
cluster_name: &str,
) -> Result<(), ClusterValidationError> {
) -> Result<bool, ClusterValidationError> {
let this_version = Version::parse(env!("CARGO_PKG_VERSION"))?;
let cluster_marker_filepath = node_filepath(CLUSTER_MARKER_FILE_NAME);

Expand All @@ -131,15 +138,9 @@ fn validate_and_update_cluster_marker_inner(
this_version: Version,
cluster_marker_filepath: &Path,
compatibility_information: &CompatibilityInformation,
) -> Result<(), ClusterValidationError> {
let tmp_cluster_marker_filepath = cluster_marker_filepath
.parent()
.expect("filepath should have parent directory")
.join(TMP_CLUSTER_MARKER_FILE_NAME);
) -> Result<bool, ClusterValidationError> {
let mut cluster_marker = if cluster_marker_filepath.exists() {
let cluster_marker_file = std::fs::File::open(cluster_marker_filepath)
.map_err(ClusterValidationError::CreateFile)?;
serde_json::from_reader(&cluster_marker_file).map_err(ClusterValidationError::Decode)?
read_cluster_marker(cluster_marker_filepath)?
} else {
debug!(
"Did not find existing cluster marker. Creating a new one under '{}'.",
Expand All @@ -151,6 +152,7 @@ fn validate_and_update_cluster_marker_inner(
compatibility_information
.min_forward_compatible_version
.clone(),
false,
)
};

Expand Down Expand Up @@ -202,7 +204,21 @@ fn validate_and_update_cluster_marker_inner(
);
}

// update cluster marker by writing to new file and then rename
write_new_cluster_marker(cluster_marker_filepath, &cluster_marker)?;

Ok(cluster_marker.is_provisioned.unwrap_or_default())
}

fn write_new_cluster_marker(
cluster_marker_filepath: &Path,
new_cluster_marker: &ClusterMarker,
) -> Result<(), ClusterValidationError> {
let tmp_cluster_marker_filepath = cluster_marker_filepath
.parent()
.expect("filepath should have parent directory")
.join(TMP_CLUSTER_MARKER_FILE_NAME);

// update cluster marker by writing to a new file and then rename it
{
// create parent directories if not present
if let Some(parent) = tmp_cluster_marker_filepath.parent() {
Expand All @@ -217,24 +233,62 @@ fn validate_and_update_cluster_marker_inner(
.open(tmp_cluster_marker_filepath.as_path())
.map_err(ClusterValidationError::CreateFile)?;
// using JSON encoding to be human-readable
serde_json::to_writer(&new_cluster_marker_file, &cluster_marker)
serde_json::to_writer(&new_cluster_marker_file, &new_cluster_marker)
.map_err(ClusterValidationError::Encode)?;

// make sure the new cluster marker file is persisted
new_cluster_marker_file
.sync_all()
.map_err(ClusterValidationError::SyncFile)?;
}

// atomically rename the new cluster marker file into the old cluster marker file
std::fs::rename(
tmp_cluster_marker_filepath.as_path(),
cluster_marker_filepath,
)
.map_err(ClusterValidationError::RenameFile)?;

// make sure the rename operation is persisted to disk by flushing the parent directory
let parent = cluster_marker_filepath
.parent()
.expect("cluster marker file to be not the root");
let parent_dir = File::open(parent).expect("to open parent directory");
parent_dir
.sync_all()
.map_err(ClusterValidationError::SyncFile)?;
Ok(())
}

fn read_cluster_marker(
cluster_marker_filepath: &Path,
) -> Result<ClusterMarker, ClusterValidationError> {
let cluster_marker_file =
File::open(cluster_marker_filepath).map_err(ClusterValidationError::CreateFile)?;
serde_json::from_reader::<_, ClusterMarker>(&cluster_marker_file)
.map_err(ClusterValidationError::Decode)
}

/// Marks the cluster as provisioned in the cluster marker
pub fn mark_cluster_as_provisioned() -> Result<(), ClusterValidationError> {
let cluster_marker_filepath = node_filepath(CLUSTER_MARKER_FILE_NAME);
mark_cluster_as_provisioned_inner(cluster_marker_filepath.as_path())
}

fn mark_cluster_as_provisioned_inner(
cluster_marker_filepath: &Path,
) -> Result<(), ClusterValidationError> {
let mut cluster_marker = read_cluster_marker(cluster_marker_filepath)?;
cluster_marker.is_provisioned = Some(true);
write_new_cluster_marker(cluster_marker_filepath, &cluster_marker)
}

#[cfg(test)]
mod tests {
use crate::cluster_marker::{
validate_and_update_cluster_marker_inner, ClusterMarker, ClusterValidationError,
CompatibilityInformation, CLUSTER_MARKER_FILE_NAME, COMPATIBILITY_INFORMATION,
mark_cluster_as_provisioned_inner, validate_and_update_cluster_marker_inner, ClusterMarker,
ClusterValidationError, CompatibilityInformation, CLUSTER_MARKER_FILE_NAME,
COMPATIBILITY_INFORMATION,
};
use semver::Version;
use std::fs;
Expand Down Expand Up @@ -291,7 +345,8 @@ mod tests {
TESTING_COMPATIBILITY_INFORMATION
.min_forward_compatible_version
.clone()
)
),
is_provisioned: Some(false),
}
)
}
Expand All @@ -310,20 +365,19 @@ mod tests {
TESTING_COMPATIBILITY_INFORMATION
.min_forward_compatible_version
.clone(),
true,
),
&file,
)
.unwrap();
)?;

validate_and_update_cluster_marker_inner(
CLUSTER_NAME,
current_version.clone(),
&file,
&TESTING_COMPATIBILITY_INFORMATION,
)
.unwrap();
)?;

let cluster_marker = read_cluster_marker(file).unwrap();
let cluster_marker = read_cluster_marker(file)?;

assert_eq!(
cluster_marker,
Expand All @@ -335,7 +389,8 @@ mod tests {
TESTING_COMPATIBILITY_INFORMATION
.min_forward_compatible_version
.clone()
)
),
is_provisioned: Some(true),
}
);
Ok(())
Expand All @@ -355,20 +410,19 @@ mod tests {
TESTING_COMPATIBILITY_INFORMATION
.min_forward_compatible_version
.clone(),
false,
),
&file,
)
.unwrap();
)?;

validate_and_update_cluster_marker_inner(
CLUSTER_NAME,
current_version.clone(),
&file,
&TESTING_COMPATIBILITY_INFORMATION,
)
.unwrap();
)?;

let cluster_marker = read_cluster_marker(file).unwrap();
let cluster_marker = read_cluster_marker(file)?;

assert_eq!(
cluster_marker,
Expand All @@ -381,6 +435,7 @@ mod tests {
.min_forward_compatible_version
.clone()
),
is_provisioned: Some(false),
}
);
Ok(())
Expand All @@ -400,10 +455,10 @@ mod tests {
TESTING_COMPATIBILITY_INFORMATION
.min_forward_compatible_version
.clone(),
true,
),
&file,
)
.unwrap();
)?;

let result = validate_and_update_cluster_marker_inner(
CLUSTER_NAME,
Expand All @@ -420,7 +475,7 @@ mod tests {

#[test]
fn forward_incompatible_version() -> anyhow::Result<()> {
let dir = tempdir().unwrap();
let dir = tempdir()?;
let file = dir.path().join(CLUSTER_MARKER_FILE_NAME);
let max_version = Version::new(2, 2, 6);
let this_version = Version::new(1, 0, 3);
Expand All @@ -430,10 +485,10 @@ mod tests {
CLUSTER_NAME.to_owned(),
max_version.clone(),
Version::new(2, 0, 0),
true,
),
&file,
)
.unwrap();
)?;

let result = validate_and_update_cluster_marker_inner(
CLUSTER_NAME,
Expand All @@ -459,10 +514,10 @@ mod tests {
CLUSTER_NAME.to_owned(),
max_version.clone(),
Version::new(0, 9, 0),
true,
),
&file,
)
.unwrap();
)?;

let result = validate_and_update_cluster_marker_inner(
CLUSTER_NAME,
Expand All @@ -480,7 +535,7 @@ mod tests {

#[test]
fn compatible_version() -> anyhow::Result<()> {
let dir = tempdir().unwrap();
let dir = tempdir()?;
let file = dir.path().join(CLUSTER_MARKER_FILE_NAME);
let max_version = Version::new(2, 0, 2);
let this_version = Version::new(2, 3, 1);
Expand All @@ -491,10 +546,10 @@ mod tests {
TESTING_COMPATIBILITY_INFORMATION
.min_forward_compatible_version
.clone(),
false,
),
&file,
)
.unwrap();
)?;

let result = validate_and_update_cluster_marker_inner(
CLUSTER_NAME,
Expand All @@ -506,4 +561,44 @@ mod tests {

Ok(())
}

#[test]
fn mark_cluster_marker_as_provisioned() -> anyhow::Result<()> {
let dir = tempdir()?;
let file = dir.path().join(CLUSTER_MARKER_FILE_NAME);
let version = Version::new(2, 2, 6);

write_cluster_marker(
&ClusterMarker::new(
CLUSTER_NAME.to_owned(),
version.clone(),
TESTING_COMPATIBILITY_INFORMATION
.min_forward_compatible_version
.clone(),
false,
),
&file,
)?;

mark_cluster_as_provisioned_inner(&file)?;

let cluster_marker = read_cluster_marker(file)?;

assert_eq!(
cluster_marker,
ClusterMarker {
current_version: version.clone(),
max_version: version,
cluster_name: CLUSTER_NAME.to_owned(),
min_forward_compatible_version: Some(
TESTING_COMPATIBILITY_INFORMATION
.min_forward_compatible_version
.clone()
),
is_provisioned: Some(true),
}
);

Ok(())
}
}
Loading
Loading