Skip to content

Commit

Permalink
feat: webhook crud
Browse files Browse the repository at this point in the history
  • Loading branch information
mahatoankitkumar committed Dec 17, 2024
1 parent feb1555 commit 2ef5451
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ CREATE TABLE public.webhooks (
method text NOT NULL DEFAULT 'POST',
version text NOT NULL,
custom_headers json,
events varchar(100)[] CHECK (array_position(events, NULL) IS NULL),
events varchar(100)[] NOT NULL CHECK (array_position(events, NULL) IS NULL),
max_retries integer NOT NULL DEFAULT 0,
last_triggered_at timestamp,
created_by text NOT NULL,
created_at timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP
last_modified_by text NOT NULL,
last_modified_at timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP
);
--
-- Name: webhooks webhooks_audit; Type: TRIGGER; Schema: public; Owner: -
Expand Down
191 changes: 178 additions & 13 deletions crates/context_aware_config/src/api/webhooks/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,48 @@
use actix_web::{
post,
web::{self, Data, Json},
delete, get, post, put,
web::{self, Json, Query},
HttpResponse, Scope,
};
use chrono::Utc;
use service_utils::service::types::{AppState, DbConnection};
use superposition_macros::{bad_argument, unexpected_error};
use superposition_types::{result as superposition, webhook, User};
use service_utils::service::types::DbConnection;
use superposition_macros::{bad_argument, db_error, not_found, unexpected_error};
use superposition_types::{
cac::schema, custom_query::PaginationParams, result as superposition,
PaginatedResponse, User,
};

use super::types::CreateWebhookRequest;
use super::types::{CreateWebhookRequest, WebhookName};
use diesel::{delete, ExpressionMethods, QueryDsl, RunQueryDsl};
use superposition_types::cac::models::{self as models, Webhooks};
use diesel::{
r2d2::{ConnectionManager, PooledConnection},
PgConnection,
};
use superposition_types::cac::models::Webhooks;
use superposition_types::cac::schema::webhooks::dsl::webhooks;

pub fn endpoints() -> Scope {
Scope::new("").service(create)
Scope::new("")
.service(create)
.service(list_webhooks)
.service(get)
.service(update)
.service(delete_webhook)
}

#[post("")]
async fn create(
state: Data<AppState>,
request: web::Json<CreateWebhookRequest>,
db_conn: DbConnection,
user: User,
) -> superposition::Result<Json<Webhooks>> {
let DbConnection(mut conn) = db_conn;
let req = request.into_inner();

let events: Vec<String> = req
.events
.into_iter()
.map(|event| event.to_string())
.collect();
validate_events(&events, None, &mut conn)?;
let now = Utc::now().naive_utc();
let webhook = Webhooks {
name: req.name,
Expand All @@ -36,12 +52,13 @@ async fn create(
method: req.method,
version: req.version.unwrap_or("v1".to_owned()),
custom_headers: req.custom_headers,
events: req.events,
events,
max_retries: 0,
last_triggered_at: None,
created_by: user.email,
created_by: user.email.clone(),
created_at: now,
updated_at: now,
last_modified_by: user.email,
last_modified_at: now,
};

let insert: Result<Webhooks, diesel::result::Error> = diesel::insert_into(webhooks)
Expand Down Expand Up @@ -71,3 +88,151 @@ async fn create(
},
}
}

#[get("")]
async fn list_webhooks(
db_conn: DbConnection,
filters: Query<PaginationParams>,
) -> superposition::Result<Json<PaginatedResponse<Webhooks>>> {
let DbConnection(mut conn) = db_conn;

let (total_pages, total_items, data) = match filters.all {
Some(true) => {
let result: Vec<Webhooks> = webhooks.get_results(&mut conn)?;
(1, result.len() as i64, result)
}
_ => {
let n_functions: i64 = webhooks.count().get_result(&mut conn)?;
let limit = filters.count.unwrap_or(10);
let mut builder = webhooks
.into_boxed()
.order(schema::webhooks::last_modified_at.desc())
.limit(limit);
if let Some(page) = filters.page {
let offset = (page - 1) * limit;
builder = builder.offset(offset);
}
let result: Vec<Webhooks> = builder.load(&mut conn)?;
let total_pages = (n_functions as f64 / limit as f64).ceil() as i64;
(total_pages, n_functions, result)
}
};

Ok(Json(PaginatedResponse {
total_pages,
total_items,
data,
}))
}

#[get("/{webhook_name}")]
async fn get(
params: web::Path<WebhookName>,
db_conn: DbConnection,
) -> superposition::Result<Json<Webhooks>> {
let DbConnection(mut conn) = db_conn;
let w_name: String = params.into_inner().into();
let webhook = fetch_webhook(&w_name, &mut conn)?;
Ok(Json(webhook))
}

pub fn validate_events(
events: &Vec<String>,
exclude_webhook: Option<&String>,
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
) -> superposition::Result<()> {
let result: Vec<Webhooks> = webhooks.get_results(conn)?;
for webhook in result {
if exclude_webhook.map_or(false, |val| &webhook.name == val) {
continue;
}
if let Some(duplicate_event) =
webhook.events.iter().find(|event| events.contains(event))
{
return Err(bad_argument!("Duplicate event found: {}", duplicate_event));
}
}
Ok(())
}

pub fn fetch_webhook(
w_name: &String,
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
) -> superposition::Result<Webhooks> {
Ok(webhooks
.filter(schema::webhooks::name.eq(w_name))
.get_result::<Webhooks>(conn)?)
}

#[put("/{webhook_name}")]
async fn update(
params: web::Path<WebhookName>,
db_conn: DbConnection,
user: User,
request: web::Json<CreateWebhookRequest>,
) -> superposition::Result<Json<Webhooks>> {
let DbConnection(mut conn) = db_conn;
let req = request.into_inner();
let w_name: String = params.into_inner().into();
let events: Vec<String> = req
.events
.into_iter()
.map(|event| event.to_string())
.collect();

validate_events(&events, Some(&w_name), &mut conn)?;

let update = diesel::update(webhooks)
.filter(schema::webhooks::name.eq(w_name))
.set((
schema::webhooks::description.eq(req.description),
schema::webhooks::enabled.eq(req.enabled),
schema::webhooks::url.eq(req.url),
schema::webhooks::method.eq(req.method),
schema::webhooks::version.eq(req.version.unwrap_or("v1".to_owned())),
schema::webhooks::custom_headers.eq(req.custom_headers),
schema::webhooks::events.eq(events),
schema::webhooks::last_modified_by.eq(user.email),
schema::webhooks::last_modified_at.eq(Utc::now().naive_utc()),
))
.get_result::<Webhooks>(&mut conn)
.map_err(|err| {
log::error!("failed to insert custom type with error: {}", err);
db_error!(err)
})?;

Ok(Json(update))
}

#[delete("/{webhook_name}")]
async fn delete_webhook(
params: web::Path<WebhookName>,
db_conn: DbConnection,
user: User,
) -> superposition::Result<HttpResponse> {
let DbConnection(mut conn) = db_conn;
let w_name: String = params.into_inner().into();

diesel::update(webhooks)
.filter(schema::webhooks::name.eq(&w_name))
.set((
schema::webhooks::last_modified_at.eq(Utc::now().naive_utc()),
schema::webhooks::last_modified_by.eq(user.get_email()),
))
.execute(&mut conn)?;
let deleted_row =
delete(webhooks.filter(schema::webhooks::name.eq(&w_name))).execute(&mut conn);
match deleted_row {
Ok(0) => Err(not_found!("Webhook {} doesn't exists", w_name)),
Ok(_) => {
log::info!("{w_name} Webhook deleted by {}", user.get_email());
Ok(HttpResponse::NoContent().finish())
}
Err(e) => {
log::error!("Webhook delete query failed with error: {e}");
Err(unexpected_error!(
"Something went wrong, failed to delete the Webhook"
))
}
}
}
23 changes: 22 additions & 1 deletion crates/context_aware_config/src/api/webhooks/types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use derive_more::{AsRef, Deref, DerefMut, Into};
use serde::Deserialize;
use serde_json::Value;
use superposition_types::{webhook::WebhookEvent, RegexEnum};

#[derive(Debug, Deserialize)]
pub struct CreateWebhookRequest {
Expand All @@ -10,5 +12,24 @@ pub struct CreateWebhookRequest {
pub method: String,
pub version: Option<String>,
pub custom_headers: Option<Value>,
pub events: Option<Vec<Option<String>>>,
pub events: Vec<WebhookEvent>,
}

#[derive(Debug, Deserialize, AsRef, Deref, DerefMut, Into)]
#[serde(try_from = "String")]
pub struct WebhookName(String);
impl WebhookName {
pub fn validate_data(name: String) -> Result<Self, String> {
let name = name.trim();
RegexEnum::FunctionName
.match_regex(name)
.map(|_| Self(name.to_string()))
}
}

impl TryFrom<String> for WebhookName {
type Error = String;
fn try_from(value: String) -> Result<Self, Self::Error> {
Ok(Self::validate_data(value)?)
}
}
5 changes: 3 additions & 2 deletions crates/superposition_types/src/cac/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,11 @@ pub struct Webhooks {
pub method: String,
pub version: String,
pub custom_headers: Option<Value>,
pub events: Option<Vec<Option<String>>>,
pub events: Vec<String>,
pub max_retries: i32,
pub last_triggered_at: Option<NaiveDateTime>,
pub created_by: String,
pub created_at: NaiveDateTime,
pub updated_at: NaiveDateTime,
pub last_modified_by: String,
pub last_modified_at: NaiveDateTime,
}
22 changes: 21 additions & 1 deletion crates/superposition_types/src/cac/schema.patch
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,25 @@ index 15c2eee..25c8088 100644
}
}

diesel::table! {
diesel::table! {
webhooks (name) {
name -> Text,
description -> Text,
enabled -> Bool,
url -> Text,
method -> Text,
version -> Text,
custom_headers -> Nullable<Json>,
- events -> Array<Nullable<Varchar>>,
+ events -> Array<Varchar>,
max_retries -> Int4,
last_triggered_at -> Nullable<Timestamp>,
created_by -> Text,
created_at -> Timestamp,
last_modified_by -> Text,
last_modified_at -> Timestamp,
}
}

diesel::table! {
contexts (id) {
5 changes: 3 additions & 2 deletions crates/superposition_types/src/cac/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,12 +674,13 @@ diesel::table! {
method -> Text,
version -> Text,
custom_headers -> Nullable<Json>,
events -> Nullable<Array<Nullable<Varchar>>>,
events -> Array<Varchar>,
max_retries -> Int4,
last_triggered_at -> Nullable<Timestamp>,
created_by -> Text,
created_at -> Timestamp,
updated_at -> Timestamp,
last_modified_by -> Text,
last_modified_at -> Timestamp,
}
}

Expand Down
12 changes: 12 additions & 0 deletions crates/superposition_types/src/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ pub enum WebhookEvent {
ExperimentConcluded,
}

impl fmt::Display for WebhookEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
WebhookEvent::ExperimentCreated => write!(f, "ExperimentCreated"),
WebhookEvent::ExperimentStarted => write!(f, "ExperimentStarted"),
WebhookEvent::ExperimentInprogress => write!(f, "ExperimentInprogress"),
WebhookEvent::ExperimentUpdated => write!(f, "ExperimentUpdated"),
WebhookEvent::ExperimentConcluded => write!(f, "ExperimentConcluded"),
}
}
}

#[derive(Serialize, Deserialize, Debug)]
pub struct WebhookEventInfo {
pub webhook_event: WebhookEvent,
Expand Down

0 comments on commit 2ef5451

Please sign in to comment.