Skip to content

Commit

Permalink
Add:
Browse files Browse the repository at this point in the history
     * "recreate" argument for build command
     *  upgrade via build command if state exists
     * fixed panic when upgrading
  • Loading branch information
Alexandr Sorokin committed Apr 18, 2024
1 parent ef58810 commit e6706df
Show file tree
Hide file tree
Showing 11 changed files with 515 additions and 170 deletions.
35 changes: 29 additions & 6 deletions src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,35 @@ pub fn run_v2() -> Result<(), Box<dyn Error>> {
.write(args)?;
}
Some(("build", args)) => {
Cluster::try_from(args)?
.use_failure_domain_as_zone_for_instances(args)
.print(args)
.write_build_state(args)?
.to_inventory()?
.write(args)?;
if args.get_flag("recreate") {
State::recreate(args)?;
}

match State::from_latest(args) {
Ok(state) => {
let mut old: Cluster = state.into();
old.hosts.clear_view();

let mut new =
Cluster::try_from(args)?.use_failure_domain_as_zone_for_instances(args);

let hosts_diff = old.merge(&mut new, args.get_flag("idiomatic-merge"))?;

old.use_failure_domain_as_zone_for_instances(args)
.print(args)
.write_upgrade_state(args, hosts_diff)?
.to_inventory()?
.write(args)?;
}
_ => {
Cluster::try_from(args)?
.use_failure_domain_as_zone_for_instances(args)
.print(args)
.write_build_state(args)?
.to_inventory()?
.write(args)?;
}
}
}
Some(("inspect", args)) => {
println!("{}", Cluster::try_from(args)?);
Expand Down
33 changes: 19 additions & 14 deletions src/task/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,20 @@ pub(super) fn read() -> ArgMatches {
.short('q')
.action(ArgAction::SetTrue)
.help("do not print table and cluster yaml"),
Arg::new("export-state")
.long("export-state")
Arg::new("state-dir")
.long("state-dir")
.env("GENIN_STATE_DIR")
.action(ArgAction::Set)
.help("export the build state"),
.help("override .geninstate directory location"),
Arg::new("recreate")
.long("recreate")
.action(ArgAction::SetTrue)
.help("Delete the existing state before building"),
Arg::new("idiomatic-merge")
.long("idiomatic-merge")
.short('I')
.action(ArgAction::SetTrue)
.help("merge replicasets with similar names like router-1 and router-1-1"),
]),
Command::new("init")
.about("Init genin and create cluster.genin.yml configuration")
Expand Down Expand Up @@ -201,17 +211,14 @@ pub(super) fn read() -> ArgMatches {
]),
Command::new("upgrade")
.about(
"Using the genin configuration and the inventory to be \
"[Deprecated] Using the genin configuration and the inventory to be \
modified creates a new inventory",
)
.args(&[
Arg::new("old")
.long("old")
.action(ArgAction::Set)
.help(
"Absolute or relative path of the file with \
Arg::new("old").long("old").action(ArgAction::Set).help(
"Absolute or relative path of the file with \
the description of the cluster to be generated",
),
),
Arg::new("new")
.long("new")
.action(ArgAction::Set)
Expand Down Expand Up @@ -287,12 +294,10 @@ pub(super) fn read() -> ArgMatches {
.short('I')
.action(ArgAction::SetTrue)
.help("merge replicasets with similar names like router-1 and router-1-1"),
fd_as_zone_arg()
fd_as_zone_arg(),
]),
Command::new("list-state")
.about(
"Print last 10 genin states",
)
.about("Print last 10 genin states")
.args(&[
Arg::new("export-state")
.long("export-state")
Expand Down
152 changes: 79 additions & 73 deletions src/task/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::fmt::{Debug, Display};
use std::fs::File;
use std::io::{self, Write};
use std::net::IpAddr;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use thiserror::Error;

use crate::task::cluster::hst::v1::Host;
Expand Down Expand Up @@ -149,6 +149,19 @@ impl Default for Cluster {
/// ip: 10.99.3.100
/// ```
fn default() -> Self {
let failover = Failover {
mode: Mode::Stateful,
state_provider: StateProvider::Stateboard,
failover_variants: super::flv::FailoverVariants::StateboardVariant(StateboardParams {
uri: super::flv::Uri {
address: hst::v2::Address::Ip(IpAddr::from([192, 168, 16, 11])),
port: 4401,
},
password: String::from("password"),
}),
..Default::default()
};

Self {
topology: Topology::default(),
hosts: HostV2::from("cluster")
Expand All @@ -165,21 +178,8 @@ impl Default for Cluster {
])
.with_config(HostV2Config::from((8081, 3031)))])
.with_config(HostV2Config::from((8081, 3031))),
failover: Failover {
mode: Mode::Stateful,
state_provider: StateProvider::Stateboard,
failover_variants: super::flv::FailoverVariants::StateboardVariant(
StateboardParams {
uri: super::flv::Uri {
address: hst::v2::Address::Ip(IpAddr::from([192, 168, 16, 11])),
port: 4401,
},
password: String::from("password"),
},
),
..Default::default()
},
vars: Default::default(),
failover: failover.clone(),
vars: Vars::default().with_failover(failover),
metadata: ClusterMetadata {
paths: vec![PathBuf::from("cluster.genin.yml")],
},
Expand All @@ -204,32 +204,20 @@ impl<'a> TryFrom<&'a ArgMatches> for Cluster {

fn try_from(args: &'a ArgMatches) -> Result<Self, Self::Error> {
match args.try_get_one::<String>("source") {
Ok(Some(path)) if path.ends_with(".gz") => {
debug!("Restoring the cluster distribution from the state file {path}");
Ok(Cluster {
metadata: ClusterMetadata {
paths: vec![path.into()],
},
..Cluster::from(State::try_from(&PathBuf::from(path))?)
})
}
Ok(Some(path)) => {
Ok(path) => {
let source = String::from("cluster.genin.yml");
let path = path.unwrap_or(&source);

let file = File::open(path)?;
Ok(Cluster {
let mut cluster = Cluster {
metadata: ClusterMetadata {
paths: vec![path.into()],
},
..serde_yaml::from_reader(file)?
})
}
Ok(None) => {
let file = File::open("cluster.genin.yml")?;
Ok(Cluster {
metadata: ClusterMetadata {
paths: vec!["cluster.genin.yml".into()],
},
..serde_yaml::from_reader(file)?
})
};

cluster.vars = cluster.vars.with_failover(cluster.failover.clone());
Ok(cluster)
}
Err(_) => {
debug!(
Expand Down Expand Up @@ -404,6 +392,7 @@ impl From<State> for Cluster {
hosts,
vars: state.vars.with_failover(state.failover.clone()),
failover: state.failover,
topology: state.topology,
metadata: ClusterMetadata {
paths: vec![PathBuf::from(state.path)],
},
Expand Down Expand Up @@ -560,23 +549,25 @@ pub fn check_placeholders(slice: &[u8]) -> Result<String, serde_yaml::Error> {
impl Cluster {
pub fn spread(self) -> Self {
let instances = Instances::from(&self.topology);
let mut hosts = self
.hosts
let mut hosts = self.hosts.with_instances(instances);
hosts.with_stateboard(&self.failover);
let mut hosts = hosts
.clone()
.with_add_queue(
instances
hosts
.instances
.iter()
.map(|instance| (instance.name.clone(), instance.clone()))
.collect(),
)
.with_delete_queue(
instances
hosts
.instances
.iter()
.map(|instance| (instance.name.clone(), instance.clone()))
.collect(),
)
.with_instances(instances);
hosts.spread();
hosts.with_stateboard(&self.failover);
);

hosts.spread();
Self { hosts, ..self }
}
Expand Down Expand Up @@ -792,17 +783,26 @@ impl Cluster {
}

pub fn write_build_state(self, args: &ArgMatches) -> Result<Self, ClusterError> {
if let Ok(Some(path)) = args.try_get_one::<String>("export-state") {
State::builder()
.uid(self.metadata.paths.clone())?
.make_build_state()
.path(path)
.hosts(&self.hosts)
.vars(&self.vars)
.failover(&self.failover)
.build()?
.dump_by_path(path)?;
}
let state_dir = args
.get_one::<String>("state-dir")
.cloned()
.unwrap_or(".geninstate".into());
let latest_path = Path::new(&state_dir).join("latest.gz");
let latest_path = latest_path.to_str().unwrap();

let mut state = State::builder()
.uid(self.metadata.paths.clone())?
.make_build_state()
.path(latest_path)
.hosts(&self.hosts)
.vars(&self.vars)
.failover(&self.failover)
.topology(&self.topology)
.build()?;

state.dump_by_uid(&state_dir)?;
state.dump_by_path(latest_path)?;

Ok(self)
}

Expand All @@ -811,6 +811,27 @@ impl Cluster {
args: &ArgMatches,
hosts_diff: Vec<Change>,
) -> Result<Self, ClusterError> {
let instances_diff: Vec<Change> = self
.hosts
.add_queue
.iter()
.map(|(name, _)| Change::Added(name.to_string()))
.chain(
self.hosts
.delete_queue
.iter()
.map(|(name, _)| Change::Removed(name.to_string())),
)
.collect();

if hosts_diff.is_empty() && instances_diff.is_empty() {
return Ok(self);
}

// if args != export-state -> try open latest
// if .geninstate not exists -> create dir
// if latest not exists -> create latest
// if write state
let state_dir = args
.get_one::<String>("state-dir")
.cloned()
Expand All @@ -822,31 +843,16 @@ impl Cluster {
format!("{state_dir}/latest.gz")
};

// if args != export-state -> try open latest
// if .geninstate not exists -> create dir
// if latest not exists -> create latest
// if write state
let mut state = State::builder()
.uid(self.metadata.paths.clone())?
.make_upgrade_state()
.path(&path)
.instances_changes(
self.hosts
.add_queue
.iter()
.map(|(name, _)| Change::Added(name.to_string()))
.chain(
self.hosts
.delete_queue
.iter()
.map(|(name, _)| Change::Removed(name.to_string())),
)
.collect(),
)
.instances_changes(instances_diff)
.hosts_changes(hosts_diff)
.hosts(&self.hosts)
.vars(&self.vars)
.failover(&self.failover)
.topology(&self.topology)
.build()?;

state.dump_by_uid(&state_dir)?;
Expand Down
12 changes: 12 additions & 0 deletions src/task/cluster/hst/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,18 @@ impl HostV2 {
}
}

/// used only when restoring from state
pub fn finalize_failure_domains(&mut self) {
for instance in self.instances.iter_mut() {
if instance.failure_domains.in_progress() {
instance.failure_domains = FailureDomains::Finished(self.name.to_string())
}
}
for sub_host in self.hosts.iter_mut() {
sub_host.finalize_failure_domains()
}
}

pub fn clear_instances(&mut self) {
self.instances.clear();
if !self.hosts.is_empty() {
Expand Down
8 changes: 6 additions & 2 deletions src/task/cluster/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ vars:

hosts_v2_model.spread();

let cluster_v1: Cluster = serde_yaml::from_str(&cluster_v1_str).unwrap();
let mut cluster_v1: Cluster = serde_yaml::from_str(&cluster_v1_str).unwrap();
cluster_v1.hosts.add_queue = IndexMap::default();
cluster_v1.hosts.delete_queue = IndexMap::default();

println!(
"stateboard 1: {:?}",
Expand Down Expand Up @@ -213,7 +215,9 @@ vars:
"#
.into();

let cluster_v2: Cluster = serde_yaml::from_str(&cluster_v2_str).unwrap();
let mut cluster_v2: Cluster = serde_yaml::from_str(&cluster_v2_str).unwrap();
cluster_v2.hosts.add_queue = IndexMap::default();
cluster_v2.hosts.delete_queue = IndexMap::default();

println!(
"stateboard 3: {:?}",
Expand Down
4 changes: 2 additions & 2 deletions src/task/cluster/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use super::{
TopologyMemberV1,
};

#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct Topology(Vec<TopologySet>);

impl TryFrom<Instances> for Topology {
Expand Down Expand Up @@ -307,7 +307,7 @@ impl Default for Topology {
}
}

#[derive(Serialize, Debug, PartialEq, Eq)]
#[derive(Serialize, Debug, Clone, PartialEq, Eq)]
struct TopologySet {
name: Name,
#[serde(skip_serializing_if = "Option::is_none")]
Expand Down
Loading

0 comments on commit e6706df

Please sign in to comment.