Skip to content
This repository was archived by the owner on Sep 4, 2020. It is now read-only.

add k8s event record for rudr controller #473

Merged
merged 2 commits into from
Dec 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ hyper = "0.12"
clap = "~2.33"
regex = "1.0"
lazy_static = "1.4.0"
chrono = { version = "0.4", features = ["serde"] }

[workspace]
members = [
Expand Down
147 changes: 109 additions & 38 deletions src/instigator.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use failure::Error;
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as meta;
use kube::{api::Api, api::Object, api::PatchParams, api::RawApi, api::Void, client::APIClient};
use log::{debug, info, warn};
use log::{debug, error, info, warn};
use serde_json::json;
use std::collections::BTreeMap;

use k8s_openapi::api::core::v1::ObjectReference;

use crate::schematic::variable::Variable;
use crate::{
kube_event,
lifecycle::Phase,
schematic::{
component::Component,
Expand Down Expand Up @@ -70,6 +73,7 @@ pub struct Instigator {
client: APIClient,
//cache: Reflector<Component, Status>,
namespace: String,
pub event_handler: kube_event::Event,
}

/// Alias for a Kubernetes wrapper on a component.
Expand All @@ -88,7 +92,11 @@ impl Instigator {
/// An instigator uses the reflector as a cache of Components, and will use the API client
/// for creating and managing the component implementation.
pub fn new(client: kube::client::APIClient, namespace: String) -> Self {
Instigator { client, namespace }
Instigator {
client: client.clone(),
namespace: namespace.clone(),
event_handler: kube_event::Event::new(client, namespace),
}
}

pub fn sync_status(&self, event: OpResource) -> InstigatorResult {
Expand Down Expand Up @@ -350,42 +358,8 @@ impl Instigator {
)?;

let inst_name = component.instance_name.clone();
let new_owner_ref = match phase {
Phase::Add => Some(self.create_component_instance(
component.component_name.clone(),
inst_name.clone(),
owner_ref.clone(),
)?),
Phase::Modify => {
let ownref = self.component_instance_owner_reference(
component.component_name.clone(),
inst_name.clone(),
);
match ownref {
Err(err) => {
let e = err.to_string();
if !e.contains("NotFound") {
// Wrap the error to make it clear where we failed
// During deletion, this might indicate that something else
// remove the component instance.
return Err(format_err!(
"{:?} on {}: {}",
phase.clone(),
inst_name.clone(),
e
));
}
Some(self.create_component_instance(
component.component_name.clone(),
inst_name.clone(),
owner_ref.clone(),
)?)
}
Ok(own) => Some(own),
}
}
_ => None,
};
let new_owner_ref =
self.get_new_own_ref(phase.clone(), component.clone(), owner_ref.clone())?;

// Instantiate components
let workload = self.load_workload_type(
Expand Down Expand Up @@ -419,9 +393,24 @@ impl Instigator {
)?;
workload.add()?;
trait_manager.exec(self.namespace.as_str(), self.client.clone(), Phase::Add)?;
if let Err(err) = self.event_handler.push_event_message(
kube_event::Type::Normal,
kube_event::Info {
action: "created".to_string(),
message: format!(
"component {} created",
component.component_name.clone(),
),
reason: "".to_string(),
},
get_object_ref(event.clone()),
) {
error!("adding event err: {:?}", err)
}
}
Phase::Modify => {
info!("Modifying component {}", component.component_name.clone());

workload.validate()?;
trait_manager.exec(
self.namespace.as_str(),
Expand All @@ -434,6 +423,20 @@ impl Instigator {
self.client.clone(),
Phase::Modify,
)?;
if let Err(err) = self.event_handler.push_event_message(
kube_event::Type::Normal,
kube_event::Info {
action: "updated".to_string(),
message: format!(
"component {} updated",
component.component_name.clone(),
),
reason: "".to_string(),
},
get_object_ref(event.clone()),
) {
error!("adding event err {:?}", err)
}
}
Phase::Delete => {
info!("Deleting component {}", component.component_name.clone());
Expand Down Expand Up @@ -500,6 +503,17 @@ impl Instigator {
scope.remove(component.clone())?;
}
}
if let Err(err) = self.event_handler.push_event_message(
kube_event::Type::Normal,
kube_event::Info {
action: "deleted".to_string(),
message: format!("component {} deleted", component.component_name.clone(),),
reason: "".to_string(),
},
get_object_ref(event.clone()),
) {
error!("adding event err {:?}", err)
}
}
// if no component was updated or this is an delete phase, just return without status change.
if !component_updated || phase == Phase::Delete {
Expand Down Expand Up @@ -592,6 +606,51 @@ impl Instigator {
}
}

fn get_new_own_ref(
&self,
phase: Phase,
component: ComponentConfiguration,
owner_ref: meta::OwnerReference,
) -> Result<Option<Vec<meta::OwnerReference>>, Error> {
let new = match phase {
Phase::Add => Some(self.create_component_instance(
component.component_name.clone(),
component.instance_name.clone(),
owner_ref.clone(),
)?),
Phase::Modify => {
let ownref = self.component_instance_owner_reference(
component.component_name.clone(),
component.instance_name.clone(),
);
match ownref {
Err(err) => {
let e = err.to_string();
if !e.contains("NotFound") {
// Wrap the error to make it clear where we failed
// During deletion, this might indicate that something else
// remove the component instance.
return Err(format_err!(
"{:?} on {}: {}",
phase.clone(),
component.instance_name.clone(),
e
));
}
Some(self.create_component_instance(
component.component_name.clone(),
component.instance_name.clone(),
owner_ref.clone(),
)?)
}
Ok(own) => Some(own),
}
}
_ => None,
};
Ok(new)
}

fn delete_component_instance(
&self,
component_name: String,
Expand Down Expand Up @@ -725,6 +784,18 @@ impl Instigator {
}
}

pub fn get_object_ref(event: OpResource) -> ObjectReference {
ObjectReference {
api_version: event.types.apiVersion.clone(),
kind: event.types.kind.clone(),
name: Some(event.metadata.name.clone()),
field_path: None,
namespace: event.metadata.namespace.clone(),
resource_version: event.metadata.resourceVersion.clone(),
uid: event.metadata.uid.clone(),
}
}

/// combine_name combine component name with instance_name,
/// so we won't afraid different components using same instance_name
pub fn combine_name(component_name: String, instance_name: String) -> String {
Expand Down
132 changes: 132 additions & 0 deletions src/kube_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use failure::Error;
use k8s_openapi::api::core::v1::ObjectReference;
use kube::{api::Api, api::PostParams, client::APIClient};
use std::fmt;

pub enum Type {
Normal,
Warning,
}
impl fmt::Display for Type {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Type::Normal => write!(f, "Normal"),
Type::Warning => write!(f, "Warning"),
}
}
}

#[derive(Clone)]
pub struct Event {
pub client: APIClient,
pub namespace: String,
pub reporting_component: Option<String>, // Name of the controller that emitted this Event,e.g. "oam.dev/rudr"
pub reporting_instance: Option<String>, //ID of the controller instance
pub event_handle: Api<kube::api::v1Event>,
}

pub struct Info {
pub action: String,
pub message: String,
pub reason: String,
}

impl Event {
pub fn new(client: APIClient, namespace: String) -> Self {
Event {
client: client.clone(),
namespace: namespace.clone(),
reporting_component: None,
reporting_instance: None,
event_handle: Api::v1Event(client).within(namespace.as_str()),
}
}
fn make_event(
now: chrono::DateTime<chrono::Utc>,
namespace: String,
type_: Type,
info: Info,
involved_object: ObjectReference,
reporting_component: Option<String>,
reporting_instance: Option<String>,
) -> kube::api::v1Event {
let name = format!(
"{}.{:x}",
involved_object.name.clone().unwrap(),
now.timestamp_nanos(),
);
kube::api::v1Event {
metadata: kube::api::ObjectMeta {
name,
namespace: Some(namespace.clone()),
..Default::default()
},
involvedObject: involved_object,
reportingComponent: reporting_component.unwrap_or_else(|| "".to_string()),
reportingInstance: reporting_instance.unwrap_or_else(|| "".to_string()),
message: info.message,
reason: info.reason,
count: 1,
type_: type_.to_string(),
action: Some(info.action),
eventTime: None,
firstTimestamp: Some(k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(now)),
lastTimestamp: Some(k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(now)),
related: None,
series: None,
source: None,
}
}
pub fn push_event_message(
&self,
type_: Type,
info: Info,
involved_object: ObjectReference,
) -> Result<(), Error> {
let now = chrono::Utc::now();
let event = Event::make_event(
now,
self.namespace.clone(),
type_,
info,
involved_object,
self.reporting_component.clone(),
self.reporting_instance.clone(),
);
self.event_handle
.create(&PostParams::default(), serde_json::to_vec(&event)?)?;
Ok(())
}
}

#[test]
fn test_make_event() {
let now = chrono::Utc::now();
let ev = Event::make_event(
now,
"default".to_string(),
Type::Normal,
Info {
action: "ac".to_string(),
message: "ms".to_string(),
reason: "re".to_string(),
},
ObjectReference {
api_version: Some("core.oam.dev/v1alpha1".to_string()),
kind: None,
name: Some("test".to_string()),
field_path: None,
namespace: None,
resource_version: None,
uid: None,
},
None,
None,
);
assert_eq!(ev.count, 1);
assert_eq!(
ev.metadata.name,
format!("test.{:x}", now.timestamp_nanos())
);
assert_eq!(ev.action, Some("ac".to_string()))
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ extern crate lazy_static;
extern crate regex;

pub mod instigator;
pub mod kube_event;
pub mod lifecycle;
pub mod schematic;
mod trait_manager;
Expand Down
Loading