Skip to content

Commit

Permalink
k8s-orchestrator: create a statefulset per process
Browse files Browse the repository at this point in the history
  • Loading branch information
teskje committed Feb 26, 2025
1 parent 586528d commit b9814bf
Showing 1 changed file with 76 additions and 51 deletions.
127 changes: 76 additions & 51 deletions src/orchestrator-kubernetes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{
LabelSelector, LabelSelectorRequirement, OwnerReference,
};
use kube::api::{Api, DeleteParams, ObjectMeta, Patch, PatchParams};
use kube::api::{Api, DeleteParams, ListParams, ObjectMeta, Patch, PatchParams};
use kube::client::Client;
use kube::error::Error as K8sError;
use kube::runtime::{watcher, WatchStreamExt};
Expand Down Expand Up @@ -247,6 +247,7 @@ enum WorkerCommand {
},
DropService {
name: String,
match_labels: BTreeMap<String, String>,
},
ListServices {
namespace: String,
Expand All @@ -263,9 +264,8 @@ enum WorkerCommand {
#[derive(Debug, Clone)]
struct ServiceDescription {
name: String,
scale: u16,
service: K8sService,
stateful_set: StatefulSet,
stateful_sets: Vec<StatefulSet>,
pod_template_hash: String,
}

Expand Down Expand Up @@ -397,6 +397,14 @@ impl NamespacedKubernetesOrchestrator {
)
}

/// The minimal set of labels that uniquely the pods in the service with the given id.
fn service_match_labels(&self, id: &str) -> BTreeMap<String, String> {
btreemap! {
"environmentd.materialize.cloud/namespace".into() => self.namespace.clone(),
"environmentd.materialize.cloud/service-id".into() => id.into(),
}
}

/// Return a `watcher::Config` instance that limits results to the namespace
/// assigned to this orchestrator.
fn watch_pod_params(&self) -> watcher::Config {
Expand Down Expand Up @@ -617,16 +625,10 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
user_requested_disk && !size_disables_disk
};

let name = self.service_name(id);
// The match labels should be the minimal set of labels that uniquely
// identify the pods in the stateful set. Changing these after the
// `StatefulSet` is created is not permitted by Kubernetes, and we're
// not yet smart enough to handle deleting and recreating the
// `StatefulSet`.
let match_labels = btreemap! {
"environmentd.materialize.cloud/namespace".into() => self.namespace.clone(),
"environmentd.materialize.cloud/service-id".into() => id.into(),
};
let service_name = self.service_name(id);
let process_names: Vec<_> = (0..scale).map(|i| format!("{service_name}-{i}")).collect();

let match_labels = self.service_match_labels(id);
let mut labels = match_labels.clone();
for (key, value) in labels_in {
labels.insert(self.make_label_key(&key), value);
Expand Down Expand Up @@ -658,7 +660,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
}
let service = K8sService {
metadata: ObjectMeta {
name: Some(name.clone()),
name: Some(service_name.clone()),
..Default::default()
},
spec: Some(ServiceSpec {
Expand All @@ -679,10 +681,11 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
status: None,
};

let hosts = (0..scale)
.map(|i| {
let hosts = process_names
.iter()
.map(|process_name| {
format!(
"{name}-{i}.{name}.{}.svc.cluster.local",
"{process_name}.{service_name}.{}.svc.cluster.local",
self.kubernetes_namespace
)
})
Expand Down Expand Up @@ -1193,32 +1196,32 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
pod_template_hash.clone(),
);

let stateful_set = StatefulSet {
metadata: ObjectMeta {
name: Some(name.clone()),
..Default::default()
},
spec: Some(StatefulSetSpec {
selector: LabelSelector {
match_labels: Some(match_labels),
let stateful_sets = process_names
.iter()
.map(|process_name| StatefulSet {
metadata: ObjectMeta {
name: Some(process_name.clone()),
..Default::default()
},
service_name: name.clone(),
replicas: Some(scale.into()),
template: pod_template_spec,
pod_management_policy: Some("Parallel".to_string()),
volume_claim_templates,
..Default::default()
}),
status: None,
};
spec: Some(StatefulSetSpec {
selector: LabelSelector {
match_labels: Some(match_labels.clone()),
..Default::default()
},
service_name: service_name.clone(),
template: pod_template_spec.clone(),
volume_claim_templates: volume_claim_templates.clone(),
..Default::default()
}),
status: None,
})
.collect();

self.send_command(WorkerCommand::EnsureService {
desc: ServiceDescription {
name,
scale,
name: service_name,
service,
stateful_set,
stateful_sets,
pod_template_hash,
},
});
Expand All @@ -1242,6 +1245,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {

self.send_command(WorkerCommand::DropService {
name: self.service_name(id),
match_labels: self.service_match_labels(id),
});

Ok(())
Expand Down Expand Up @@ -1399,7 +1403,9 @@ impl OrchestratorWorker {
EnsureService { desc } => {
retry(|| self.ensure_service(desc.clone()), "EnsureService").await
}
DropService { name } => retry(|| self.drop_service(&name), "DropService").await,
DropService { name, match_labels } => {
retry(|| self.drop_service(&name, &match_labels), "DropService").await
}
ListServices {
namespace,
result_tx,
Expand Down Expand Up @@ -1676,11 +1682,12 @@ impl OrchestratorWorker {
.owner_references
.get_or_insert(vec![])
.extend(self.owner_references.iter().cloned());
desc.stateful_set
.metadata
.owner_references
.get_or_insert(vec![])
.extend(self.owner_references.iter().cloned());
for ss in &mut desc.stateful_sets {
ss.metadata
.owner_references
.get_or_insert(vec![])
.extend(self.owner_references.iter().cloned());
}

self.service_api
.patch(
Expand All @@ -1693,16 +1700,17 @@ impl OrchestratorWorker {
.patch(
&desc.name,
&PatchParams::apply(FIELD_MANAGER).force(),
&Patch::Apply(desc.stateful_set),
&Patch::Apply(&desc.stateful_sets),
)
.await?;

// Explicitly delete any pods in the stateful set that don't match the
// Explicitly delete any pods in the stateful sets that don't match the
// template. In theory, Kubernetes would do this automatically, but
// in practice we have observed that it does not.
// See: https://github.com/kubernetes/kubernetes/issues/67250
for pod_id in 0..desc.scale {
let pod_name = format!("{}-{pod_id}", desc.name);
for ss in &desc.stateful_sets {
let process_name = ss.metadata.name.as_ref().unwrap();
let pod_name = format!("{process_name}-0");
let pod = match self.pod_api.get(&pod_name).await {
Ok(pod) => pod,
// Pod already doesn't exist.
Expand All @@ -1724,13 +1732,30 @@ impl OrchestratorWorker {
}
}

// FIXME: also handle dropping additional stateful sets after a scale decrease

Ok(())
}

async fn drop_service(&self, name: &str) -> Result<(), K8sError> {
async fn drop_service(
&self,
name: &str,
match_labels: &BTreeMap<String, String>,
) -> Result<(), K8sError> {
let label_selectors: Vec<_> = match_labels
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect();
let label_selector = label_selectors.join(",");
let res = self
.stateful_set_api
.delete(name, &DeleteParams::default())
.delete_collection(
&DeleteParams::default(),
&ListParams {
label_selector: Some(label_selector),
..Default::default()
},
)
.await;
match res {
Ok(_) => (),
Expand All @@ -1750,9 +1775,9 @@ impl OrchestratorWorker {
}

async fn list_services(&self, namespace: &str) -> Result<Vec<String>, K8sError> {
let stateful_sets = self.stateful_set_api.list(&Default::default()).await?;
let services = self.service_api.list(&Default::default()).await?;
let name_prefix = format!("{}{namespace}-", self.name_prefix);
Ok(stateful_sets
Ok(services
.into_iter()
.filter_map(|ss| {
ss.metadata
Expand Down

0 comments on commit b9814bf

Please sign in to comment.