Skip to content

Commit

Permalink
Always load user/database list from api server to avoid local caching…
Browse files Browse the repository at this point in the history
… issues
  • Loading branch information
zlepper committed Oct 17, 2023
1 parent 690ebaa commit bac494d
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 36 deletions.
33 changes: 1 addition & 32 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@ use kube::client::Client;
use kube::{Api, CustomResourceExt, Resource};
use kube_runtime::controller::{Action};
use kube_runtime::watcher::Config;
use kube_runtime::{Controller, reflector, watcher, WatchStreamExt};
use kube_runtime::{Controller};
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use std::fs::File;
use std::hash::Hash;
use std::io::Write;
use std::sync::Arc;
use std::time::Duration;
use futures::future;
use kube_runtime::reflector::{Store};
use tokio::task::JoinSet;
use crate::types::{HasPgBouncerReference, PgBouncer, PgBouncerDatabase, PgBouncerUser, PostgresAdminConnection, PostgresRole, PostgresSchema};

Expand Down Expand Up @@ -60,23 +57,11 @@ async fn main() -> anyhow::Result<()> {
let services_api: Api<Service> = Api::all(kubernetes_client.clone());
let config_map_api: Api<ConfigMap> = Api::all(kubernetes_client.clone());




let (pg_bouncer_database_store, pg_bouncer_database_store_task) = create_store(kubernetes_client.clone());
let (pg_bouncer_user_store, pg_bouncer_user_store_task) = create_store(kubernetes_client.clone());


let context = Arc::new(ContextData {
kubernetes_client: kubernetes_client.clone(),
pg_bouncer_databases_store: pg_bouncer_database_store,
pg_bouncer_users_store: pg_bouncer_user_store,
});

let mut tasks = JoinSet::new();
tasks.spawn(pg_bouncer_database_store_task);
tasks.spawn(pg_bouncer_user_store_task);


tasks.spawn(Controller::new(pg_bouncer_api.clone(), Config::default())
.watches(related_pg_bouncer_databases_api, Config::default(), |o| o.get_pg_bouncer_object_ref())
Expand Down Expand Up @@ -121,20 +106,6 @@ async fn main() -> anyhow::Result<()> {
}



fn create_store<K>(kubernetes_client: Client) -> (Store<K>, impl futures::Future<Output = ()>)
where
K: Resource + Clone + 'static + DeserializeOwned + Debug + Send,
K::DynamicType: Eq + Hash + Clone + Default,
{
let (reader, writer) = reflector::store();
let api = Api::all(kubernetes_client);
let rf = reflector::reflector(writer, watcher(api, Config::default()));
let task = rf.applied_objects().for_each(|_| future::ready(()));
(reader, task)
}


fn write_crds() -> anyhow::Result<()> {
let file_path = "charts/postgres-topology-operator/templates/crds.yaml";

Expand Down Expand Up @@ -162,8 +133,6 @@ fn write_crd<TResource: CustomResourceExt>(mut file: &mut File) -> anyhow::Resul

pub struct ContextData {
kubernetes_client: Client,
pg_bouncer_databases_store: Store<PgBouncerDatabase>,
pg_bouncer_users_store: Store<PgBouncerUser>,
}

/// All errors possible to occur during reconciliation
Expand Down
15 changes: 11 additions & 4 deletions src/reconcilers/pg_bouncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use k8s_openapi::api::core::v1::{ConfigMap, ConfigMapVolumeSource, Container, Po
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta};
use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
use kube::{Api, Resource, ResourceExt};
use kube::api::{Patch, PatchParams};
use kube::api::{ListParams, Patch, PatchParams};
use kube_runtime::controller::Action;
use sha2::{Digest};
use crate::{ContextData, Error};
use crate::helpers::ini_builder;
use crate::types::{HasPgBouncerReference, PgBouncer, PgBouncerDatabaseSpec, PgBouncerSpec, PgBouncerUserSpec};
use crate::types::{HasPgBouncerReference, PgBouncer, PgBouncerDatabase, PgBouncerDatabaseSpec, PgBouncerSpec, PgBouncerUser, PgBouncerUserSpec};

const PG_BOUNCER_INI_FILE_NAME: &str = "pgbouncer.ini";
const USERLIST_TXT_FILE_NAME: &str = "userlist.txt";
Expand All @@ -37,13 +37,20 @@ async fn run_reconciler(resource: Arc<PgBouncer>, context: Arc<ContextData>) ->
let resource_name = resource.name_any();


let databases = context.pg_bouncer_databases_store.state();



let databases = Api::<PgBouncerDatabase>::all(context.kubernetes_client.clone())
.list(&ListParams::default())
.await?;
let databases = databases
.iter()
.filter(|db| db.is_for(&resource))
.map(|db| &db.spec);

let users = context.pg_bouncer_users_store.state();
let users = Api::<PgBouncerUser>::all(context.kubernetes_client.clone())
.list(&ListParams::default())
.await?;
let users = users
.iter()
.filter(|u| u.is_for(&resource))
Expand Down

0 comments on commit bac494d

Please sign in to comment.