Skip to content

Commit

Permalink
feat: webhook crud
Browse files Browse the repository at this point in the history
  • Loading branch information
mahatoankitkumar committed Jan 21, 2025
1 parent 27294be commit b82c6e1
Show file tree
Hide file tree
Showing 13 changed files with 409 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/superposition/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ service_utils = { path = "../service_utils" }
superposition_macros = { path = "../superposition_macros" }
superposition_types = { path = "../superposition_types", features = ["diesel_derives"]}
url = { workspace = true }
derive_more = { workspace = true }

[features]
high-performance-mode = [
Expand Down
8 changes: 7 additions & 1 deletion crates/superposition/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
mod app_state;
mod auth;
mod organisation;
mod webhooks;
mod workspace;

use idgenerator::{IdGeneratorOptions, IdInstance};
Expand Down Expand Up @@ -193,7 +194,12 @@ async fn main() -> Result<()> {
)
.service(workspace::endpoints(scope("/workspaces"))
.wrap(OrgWorkspaceMiddlewareFactory::new(true, false))
.wrap(AppExecutionScopeMiddlewareFactory::new(AppScope::SUPERPOSITION)),
.wrap(AppExecutionScopeMiddlewareFactory::new(AppScope::SUPERPOSITION))
)
.service(
scope("/webhook")
.wrap(AppExecutionScopeMiddlewareFactory::new(AppScope::CAC))
.service(webhooks::endpoints()),
)
/***************************** UI Routes ******************************/
.route("/fxn/{tail:.*}", leptos_actix::handle_server_fns())
Expand Down
3 changes: 3 additions & 0 deletions crates/superposition/src/webhooks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod handlers;
pub mod types;
pub use handlers::endpoints;
238 changes: 238 additions & 0 deletions crates/superposition/src/webhooks/handlers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
use actix_web::{
delete, get, post, put,
web::{self, Json, Query},
HttpResponse, Scope,
};
use chrono::Utc;
use service_utils::service::types::DbConnection;
use superposition_macros::{bad_argument, db_error, not_found, unexpected_error};
use superposition_types::{
custom_query::PaginationParams, result as superposition, PaginatedResponse, User,
};

use super::types::{CreateWebhookRequest, WebhookName};
use diesel::{delete, ExpressionMethods, QueryDsl, RunQueryDsl};
use diesel::{
r2d2::{ConnectionManager, PooledConnection},
PgConnection,
};
use superposition_types::database::models::cac::Webhooks;
use superposition_types::database::schema::webhooks::{self, dsl};

pub fn endpoints() -> Scope {
Scope::new("")
.service(create)
.service(list_webhooks)
.service(get)
.service(update)
.service(delete_webhook)
}

#[post("")]
async fn create(
request: web::Json<CreateWebhookRequest>,
db_conn: DbConnection,
user: User,
) -> superposition::Result<Json<Webhooks>> {
let DbConnection(mut conn) = db_conn;
let req = request.into_inner();
let events: Vec<String> = req
.events
.into_iter()
.map(|event| event.to_string())
.collect();
validate_events(&events, None, &mut conn)?;
let now = Utc::now().naive_utc();
let webhook = Webhooks {
name: req.name,
description: req.description,
enabled: req.enabled,
url: req.url,
method: req.method,
version: req.version.unwrap_or("v1".to_owned()),
custom_headers: req.custom_headers,
events,
max_retries: 0,
last_triggered_at: None,
created_by: user.email.clone(),
created_at: now,
last_modified_by: user.email,
last_modified_at: now,
};

let insert: Result<Webhooks, diesel::result::Error> =
diesel::insert_into(dsl::webhooks)
.values(&webhook)
.get_result(&mut conn);

match insert {
Ok(res) => Ok(Json(res)),
Err(e) => match e {
diesel::result::Error::DatabaseError(kind, e) => {
log::error!("Function error: {:?}", e);
match kind {
diesel::result::DatabaseErrorKind::UniqueViolation => {
Err(bad_argument!("Webhook already exists."))
}
_ => Err(unexpected_error!(
"Something went wrong, failed to create the webhook"
)),
}
}
_ => {
log::error!("Webhook creation failed with error: {e}");
Err(unexpected_error!(
"An error occured please contact the admin."
))
}
},
}
}

#[get("")]
async fn list_webhooks(
db_conn: DbConnection,
filters: Query<PaginationParams>,
) -> superposition::Result<Json<PaginatedResponse<Webhooks>>> {
let DbConnection(mut conn) = db_conn;

let (total_pages, total_items, data) = match filters.all {
Some(true) => {
let result: Vec<Webhooks> = dsl::webhooks.get_results(&mut conn)?;
(1, result.len() as i64, result)
}
_ => {
let n_functions: i64 = dsl::webhooks.count().get_result(&mut conn)?;
let limit = filters.count.unwrap_or(10);
let mut builder = dsl::webhooks
.into_boxed()
.order(webhooks::last_modified_at.desc())
.limit(limit);
if let Some(page) = filters.page {
let offset = (page - 1) * limit;
builder = builder.offset(offset);
}
let result: Vec<Webhooks> = builder.load(&mut conn)?;
let total_pages = (n_functions as f64 / limit as f64).ceil() as i64;
(total_pages, n_functions, result)
}
};

Ok(Json(PaginatedResponse {
total_pages,
total_items,
data,
}))
}

#[get("/{webhook_name}")]
async fn get(
params: web::Path<WebhookName>,
db_conn: DbConnection,
) -> superposition::Result<Json<Webhooks>> {
let DbConnection(mut conn) = db_conn;
let w_name: String = params.into_inner().into();
let webhook = fetch_webhook(&w_name, &mut conn)?;
Ok(Json(webhook))
}

pub fn validate_events(
events: &Vec<String>,
exclude_webhook: Option<&String>,
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
) -> superposition::Result<()> {
let result: Vec<Webhooks> = dsl::webhooks.get_results(conn)?;
for webhook in result {
if exclude_webhook.map_or(false, |val| &webhook.name == val) {
continue;
}
if let Some(duplicate_event) =
webhook.events.iter().find(|event| events.contains(event))
{
return Err(bad_argument!("Duplicate event found: {}", duplicate_event));
}
}
Ok(())
}

pub fn fetch_webhook(
w_name: &String,
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
) -> superposition::Result<Webhooks> {
Ok(dsl::webhooks
.filter(webhooks::name.eq(w_name))
.get_result::<Webhooks>(conn)?)
}

#[put("/{webhook_name}")]
async fn update(
params: web::Path<WebhookName>,
db_conn: DbConnection,
user: User,
request: web::Json<CreateWebhookRequest>,
) -> superposition::Result<Json<Webhooks>> {
let DbConnection(mut conn) = db_conn;
let req = request.into_inner();
let w_name: String = params.into_inner().into();
let events: Vec<String> = req
.events
.into_iter()
.map(|event| event.to_string())
.collect();

validate_events(&events, Some(&w_name), &mut conn)?;

let update = diesel::update(dsl::webhooks)
.filter(webhooks::name.eq(w_name))
.set((
webhooks::description.eq(req.description),
webhooks::enabled.eq(req.enabled),
webhooks::url.eq(req.url),
webhooks::method.eq(req.method),
webhooks::version.eq(req.version.unwrap_or("v1".to_owned())),
webhooks::custom_headers.eq(req.custom_headers),
webhooks::events.eq(events),
webhooks::last_modified_by.eq(user.email),
webhooks::last_modified_at.eq(Utc::now().naive_utc()),
))
.get_result::<Webhooks>(&mut conn)
.map_err(|err| {
log::error!("failed to insert custom type with error: {}", err);
db_error!(err)
})?;

Ok(Json(update))
}

#[delete("/{webhook_name}")]
async fn delete_webhook(
params: web::Path<WebhookName>,
db_conn: DbConnection,
user: User,
) -> superposition::Result<HttpResponse> {
let DbConnection(mut conn) = db_conn;
let w_name: String = params.into_inner().into();

diesel::update(dsl::webhooks)
.filter(webhooks::name.eq(&w_name))
.set((
webhooks::last_modified_at.eq(Utc::now().naive_utc()),
webhooks::last_modified_by.eq(user.get_email()),
))
.execute(&mut conn)?;
let deleted_row =
delete(dsl::webhooks.filter(webhooks::name.eq(&w_name))).execute(&mut conn);
match deleted_row {
Ok(0) => Err(not_found!("Webhook {} doesn't exists", w_name)),
Ok(_) => {
log::info!("{w_name} Webhook deleted by {}", user.get_email());
Ok(HttpResponse::NoContent().finish())
}
Err(e) => {
log::error!("Webhook delete query failed with error: {e}");
Err(unexpected_error!(
"Something went wrong, failed to delete the Webhook"
))
}
}
}
Empty file.
35 changes: 35 additions & 0 deletions crates/superposition/src/webhooks/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use derive_more::{AsRef, Deref, DerefMut, Into};
use serde::Deserialize;
use serde_json::Value;
use superposition_types::{webhook::WebhookEvent, RegexEnum};

#[derive(Debug, Deserialize)]
pub struct CreateWebhookRequest {
pub name: String,
pub description: String,
pub enabled: bool,
pub url: String,
pub method: String,
pub version: Option<String>,
pub custom_headers: Option<Value>,
pub events: Vec<WebhookEvent>,
}

#[derive(Debug, Deserialize, AsRef, Deref, DerefMut, Into)]
#[serde(try_from = "String")]
pub struct WebhookName(String);
impl WebhookName {
pub fn validate_data(name: String) -> Result<Self, String> {
let name = name.trim();
RegexEnum::FunctionName
.match_regex(name)
.map(|_| Self(name.to_string()))
}
}

impl TryFrom<String> for WebhookName {
type Error = String;
fn try_from(value: String) -> Result<Self, Self::Error> {
Ok(Self::validate_data(value)?)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- This file should undo anything in `up.sql`
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-- Your SQL goes here
-- Name: webhooks; Type: TABLE; Schema: public; Owner: -
--
CREATE TABLE public.webhooks (
name text PRIMARY KEY,
description text NOT NULL,
enabled boolean NOT NULL DEFAULT true,
url text NOT NULL,
method text NOT NULL DEFAULT 'POST',
version text NOT NULL,
custom_headers json,
events varchar(100)[] NOT NULL CHECK (array_position(events, NULL) IS NULL),
max_retries integer NOT NULL DEFAULT 0,
last_triggered_at timestamp,
created_by text NOT NULL,
created_at timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_modified_by text NOT NULL,
last_modified_at timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP
);
--
-- Name: webhooks webhooks_audit; Type: TRIGGER; Schema: public; Owner: -
--
CREATE TRIGGER webhooks_audit AFTER INSERT OR DELETE OR UPDATE ON public.webhooks FOR EACH ROW EXECUTE FUNCTION public.event_logger();
39 changes: 39 additions & 0 deletions crates/superposition_types/src/cac/schema.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
diff --git a/crates/context_aware_config/src/db/schema.rs b/crates/context_aware_config/src/db/schema.rs
index 15c2eee..25c8088 100644
--- a/crates/context_aware_config/src/db/schema.rs
+++ b/crates/context_aware_config/src/db/schema.rs
@@ -2,13 +2,13 @@

diesel::table! {
config_versions (id) {
id -> Int8,
config -> Json,
config_hash -> Text,
- tags -> Nullable<Array<Nullable<Varchar>>>,
+ tags -> Nullable<Array<Varchar>>,
created_at -> Timestamp,
}
}

diesel::table! {
webhooks (name) {
name -> Text,
description -> Text,
enabled -> Bool,
url -> Text,
method -> Text,
version -> Text,
custom_headers -> Nullable<Json>,
- events -> Array<Nullable<Varchar>>,
+ events -> Array<Varchar>,
max_retries -> Int4,
last_triggered_at -> Nullable<Timestamp>,
created_by -> Text,
created_at -> Timestamp,
last_modified_by -> Text,
last_modified_at -> Timestamp,
}
}

diesel::table! {
contexts (id) {
Loading

0 comments on commit b82c6e1

Please sign in to comment.