Skip to content

Commit

Permalink
k8s-orchestrator: create a statefulset per process
Browse files Browse the repository at this point in the history
This commit changes the k8s orchestrator to create a separate
statefulset for each process requested, instead of using a single
statefulset with multiple replicas.

This is to allow us to supply different configurations to different
cluster processes. In particular, we want to be able to provide each
process with its index through a command line argument.
  • Loading branch information
teskje committed Feb 26, 2025
1 parent 586528d commit 9457444
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 74 deletions.
2 changes: 1 addition & 1 deletion doc/developer/cloudtest.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ is instantiated once per `pytest` invocation
```
from materialize.cloudtest.util.wait import wait
wait(condition="condition=Ready", resource="pod/compute-cluster-u1-replica-u1-0")
wait(condition="condition=Ready", resource="pod/cluster-u1-replica-u1-gen-0-0")
```

`wait` uses `kubectl wait` behind the scenes. Here is what the `kubectl wait`
Expand Down
2 changes: 1 addition & 1 deletion misc/python/materialize/cloudtest/util/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


def cluster_pod_name(cluster_id: str, replica_id: str, process: int = 0) -> str:
return f"pod/cluster-{cluster_id}-replica-{replica_id}-gen-0-{process}"
return f"pod/cluster-{cluster_id}-replica-{replica_id}-gen-0-{process}-0"


def cluster_service_name(cluster_id: str, replica_id: str) -> str:
Expand Down
162 changes: 94 additions & 68 deletions src/orchestrator-kubernetes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{
LabelSelector, LabelSelectorRequirement, OwnerReference,
};
use kube::api::{Api, DeleteParams, ObjectMeta, Patch, PatchParams};
use kube::api::{Api, DeleteParams, ListParams, ObjectMeta, Patch, PatchParams};
use kube::client::Client;
use kube::error::Error as K8sError;
use kube::runtime::{watcher, WatchStreamExt};
Expand Down Expand Up @@ -247,6 +247,7 @@ enum WorkerCommand {
},
DropService {
name: String,
match_labels: BTreeMap<String, String>,
},
ListServices {
namespace: String,
Expand All @@ -262,10 +263,8 @@ enum WorkerCommand {
/// A description of a service to be created by an [`OrchestratorWorker`].
#[derive(Debug, Clone)]
struct ServiceDescription {
name: String,
scale: u16,
service: K8sService,
stateful_set: StatefulSet,
stateful_sets: Vec<StatefulSet>,
pod_template_hash: String,
}

Expand Down Expand Up @@ -397,6 +396,15 @@ impl NamespacedKubernetesOrchestrator {
)
}

/// The minimal set of labels that uniquely identifies the k8s resources (service,
/// statefulsets, pods) in the service with the given id.
fn service_match_labels(&self, id: &str) -> BTreeMap<String, String> {
btreemap! {
"environmentd.materialize.cloud/namespace".into() => self.namespace.clone(),
"environmentd.materialize.cloud/service-id".into() => id.into(),
}
}

/// Return a `watcher::Config` instance that limits results to the namespace
/// assigned to this orchestrator.
fn watch_pod_params(&self) -> watcher::Config {
Expand Down Expand Up @@ -617,16 +625,10 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
user_requested_disk && !size_disables_disk
};

let name = self.service_name(id);
// The match labels should be the minimal set of labels that uniquely
// identify the pods in the stateful set. Changing these after the
// `StatefulSet` is created is not permitted by Kubernetes, and we're
// not yet smart enough to handle deleting and recreating the
// `StatefulSet`.
let match_labels = btreemap! {
"environmentd.materialize.cloud/namespace".into() => self.namespace.clone(),
"environmentd.materialize.cloud/service-id".into() => id.into(),
};
let service_name = self.service_name(id);
let process_names: Vec<_> = (0..scale).map(|i| format!("{service_name}-{i}")).collect();

let match_labels = self.service_match_labels(id);
let mut labels = match_labels.clone();
for (key, value) in labels_in {
labels.insert(self.make_label_key(&key), value);
Expand Down Expand Up @@ -658,7 +660,8 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
}
let service = K8sService {
metadata: ObjectMeta {
name: Some(name.clone()),
name: Some(service_name.clone()),
labels: Some(match_labels.clone()),
..Default::default()
},
spec: Some(ServiceSpec {
Expand All @@ -679,10 +682,11 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
status: None,
};

let hosts = (0..scale)
.map(|i| {
let hosts = process_names
.iter()
.map(|process_name| {
format!(
"{name}-{i}.{name}.{}.svc.cluster.local",
"{process_name}-0.{service_name}.{}.svc.cluster.local",
self.kubernetes_namespace
)
})
Expand Down Expand Up @@ -1193,32 +1197,32 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
pod_template_hash.clone(),
);

let stateful_set = StatefulSet {
metadata: ObjectMeta {
name: Some(name.clone()),
..Default::default()
},
spec: Some(StatefulSetSpec {
selector: LabelSelector {
match_labels: Some(match_labels),
let stateful_sets = process_names
.iter()
.map(|process_name| StatefulSet {
metadata: ObjectMeta {
name: Some(process_name.clone()),
labels: Some(match_labels.clone()),
..Default::default()
},
service_name: name.clone(),
replicas: Some(scale.into()),
template: pod_template_spec,
pod_management_policy: Some("Parallel".to_string()),
volume_claim_templates,
..Default::default()
}),
status: None,
};
spec: Some(StatefulSetSpec {
selector: LabelSelector {
match_labels: Some(match_labels.clone()),
..Default::default()
},
service_name: service_name.clone(),
template: pod_template_spec.clone(),
volume_claim_templates: volume_claim_templates.clone(),
..Default::default()
}),
status: None,
})
.collect();

self.send_command(WorkerCommand::EnsureService {
desc: ServiceDescription {
name,
scale,
service,
stateful_set,
stateful_sets,
pod_template_hash,
},
});
Expand All @@ -1242,6 +1246,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {

self.send_command(WorkerCommand::DropService {
name: self.service_name(id),
match_labels: self.service_match_labels(id),
});

Ok(())
Expand Down Expand Up @@ -1399,7 +1404,9 @@ impl OrchestratorWorker {
EnsureService { desc } => {
retry(|| self.ensure_service(desc.clone()), "EnsureService").await
}
DropService { name } => retry(|| self.drop_service(&name), "DropService").await,
DropService { name, match_labels } => {
retry(|| self.drop_service(&name, &match_labels), "DropService").await
}
ListServices {
namespace,
result_tx,
Expand Down Expand Up @@ -1441,7 +1448,7 @@ impl OrchestratorWorker {
disk: bool,
disk_limit: Option<DiskLimit>,
) -> ServiceProcessMetrics {
let name = format!("{service_name}-{i}");
let pod_name = format!("{service_name}-{i}-0");

let disk_usage_fut = async {
if disk {
Expand All @@ -1454,7 +1461,7 @@ impl OrchestratorWorker {
// with `{pod name}-scratch` as the name. It also provides
// the metrics, and does so under the `persistentvolumeclaims`
// resource type, instead of `pods`.
&format!("{name}-scratch"),
&format!("{pod_name}-scratch"),
)
.await,
)
Expand All @@ -1469,7 +1476,7 @@ impl OrchestratorWorker {
.custom_metrics_api
.get_subresource(
"kubelet_volume_stats_capacity_bytes",
&format!("{name}-scratch"),
&format!("{pod_name}-scratch"),
)
.await,
)
Expand All @@ -1478,7 +1485,7 @@ impl OrchestratorWorker {
}
};
let (metrics, disk_usage, disk_capacity) = match futures::future::join3(
self_.metrics_api.get(&name),
self_.metrics_api.get(&pod_name),
disk_usage_fut,
disk_capacity_fut,
)
Expand All @@ -1493,7 +1500,7 @@ impl OrchestratorWorker {
Some(Ok(disk_usage)) => Some(disk_usage),
Some(Err(e)) if !matches!(&e, K8sError::Api(e) if e.code == 404) => {
warn!(
"Failed to fetch `kubelet_volume_stats_used_bytes` for {name}: {e}"
"Failed to fetch `kubelet_volume_stats_used_bytes` for {pod_name}: {e}"
);
None
}
Expand All @@ -1503,7 +1510,7 @@ impl OrchestratorWorker {
let disk_capacity = match disk_capacity {
Some(Ok(disk_capacity)) => Some(disk_capacity),
Some(Err(e)) if !matches!(&e, K8sError::Api(e) if e.code == 404) => {
warn!("Failed to fetch `kubelet_volume_stats_capacity_bytes` for {name}: {e}");
warn!("Failed to fetch `kubelet_volume_stats_capacity_bytes` for {pod_name}: {e}");
None
}
_ => None,
Expand All @@ -1512,7 +1519,7 @@ impl OrchestratorWorker {
(metrics, disk_usage, disk_capacity)
}
(Err(e), _, _) => {
warn!("Failed to get metrics for {name}: {e}");
warn!("Failed to get metrics for {pod_name}: {e}");
return ServiceProcessMetrics::default();
}
};
Expand All @@ -1525,7 +1532,7 @@ impl OrchestratorWorker {
..
}) = metrics.containers.get(0)
else {
warn!("metrics result contained no containers for {name}");
warn!("metrics result contained no containers for {pod_name}");
return ServiceProcessMetrics::default();
};

Expand Down Expand Up @@ -1676,33 +1683,37 @@ impl OrchestratorWorker {
.owner_references
.get_or_insert(vec![])
.extend(self.owner_references.iter().cloned());
desc.stateful_set
.metadata
.owner_references
.get_or_insert(vec![])
.extend(self.owner_references.iter().cloned());
for ss in &mut desc.stateful_sets {
ss.metadata
.owner_references
.get_or_insert(vec![])
.extend(self.owner_references.iter().cloned());
}

self.service_api
.patch(
&desc.name,
&PatchParams::apply(FIELD_MANAGER).force(),
&Patch::Apply(desc.service),
)
.await?;
self.stateful_set_api
.patch(
&desc.name,
desc.service.metadata.name.as_ref().unwrap(),
&PatchParams::apply(FIELD_MANAGER).force(),
&Patch::Apply(desc.stateful_set),
&Patch::Apply(&desc.service),
)
.await?;
for ss in &desc.stateful_sets {
self.stateful_set_api
.patch(
ss.metadata.name.as_ref().unwrap(),
&PatchParams::apply(FIELD_MANAGER).force(),
&Patch::Apply(&ss),
)
.await?;
}

// Explicitly delete any pods in the stateful set that don't match the
// Explicitly delete any pods in the stateful sets that don't match the
// template. In theory, Kubernetes would do this automatically, but
// in practice we have observed that it does not.
// See: https://github.com/kubernetes/kubernetes/issues/67250
for pod_id in 0..desc.scale {
let pod_name = format!("{}-{pod_id}", desc.name);
for ss in &desc.stateful_sets {
let process_name = ss.metadata.name.as_ref().unwrap();
let pod_name = format!("{process_name}-0");
let pod = match self.pod_api.get(&pod_name).await {
Ok(pod) => pod,
// Pod already doesn't exist.
Expand All @@ -1727,10 +1738,25 @@ impl OrchestratorWorker {
Ok(())
}

async fn drop_service(&self, name: &str) -> Result<(), K8sError> {
async fn drop_service(
&self,
name: &str,
match_labels: &BTreeMap<String, String>,
) -> Result<(), K8sError> {
let label_selectors: Vec<_> = match_labels
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect();
let label_selector = label_selectors.join(",");
let res = self
.stateful_set_api
.delete(name, &DeleteParams::default())
.delete_collection(
&DeleteParams::default(),
&ListParams {
label_selector: Some(label_selector),
..Default::default()
},
)
.await;
match res {
Ok(_) => (),
Expand All @@ -1750,9 +1776,9 @@ impl OrchestratorWorker {
}

async fn list_services(&self, namespace: &str) -> Result<Vec<String>, K8sError> {
let stateful_sets = self.stateful_set_api.list(&Default::default()).await?;
let services = self.service_api.list(&Default::default()).await?;
let name_prefix = format!("{}{namespace}-", self.name_prefix);
Ok(stateful_sets
Ok(services
.into_iter()
.filter_map(|ss| {
ss.metadata
Expand Down
4 changes: 2 additions & 2 deletions test/cloudtest/test_crash.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ def restarts(p: V1Pod) -> int:

def get_replica() -> tuple[V1Pod, V1StatefulSet]:
"""Find the stateful set for the replica of the default cluster"""
compute_pod_name = "cluster-u1-replica-u1-gen-0-0"
ss_name = "cluster-u1-replica-u1-gen-0"
compute_pod_name = "cluster-u1-replica-u1-gen-0-0-0"
ss_name = "cluster-u1-replica-u1-gen-0-0"
compute_pod = mz.environmentd.api().read_namespaced_pod(
compute_pod_name, mz.environmentd.namespace()
)
Expand Down
4 changes: 2 additions & 2 deletions test/cloudtest/test_system_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ def test_system_clusters(mz: MaterializeApplication) -> None:
"""Confirm that the system clusters have the expected labels and selectors"""
mz.wait_replicas()

assert_pod_properties(mz, "cluster-s1-replica-s1-gen-0-0")
assert_pod_properties(mz, "cluster-s2-replica-s2-gen-0-0")
assert_pod_properties(mz, "cluster-s1-replica-s1-gen-0-0-0")
assert_pod_properties(mz, "cluster-s2-replica-s2-gen-0-0-0")

0 comments on commit 9457444

Please sign in to comment.