Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/webhook cruds #313

Open
wants to merge 1 commit into
base: saas
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/superposition/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ service_utils = { path = "../service_utils" }
superposition_macros = { path = "../superposition_macros" }
superposition_types = { path = "../superposition_types", features = ["diesel_derives"]}
url = { workspace = true }
derive_more = { workspace = true }

[features]
high-performance-mode = [
Expand Down
8 changes: 7 additions & 1 deletion crates/superposition/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
mod app_state;
mod auth;
mod organisation;
mod webhooks;
mod workspace;

use idgenerator::{IdGeneratorOptions, IdInstance};
Expand Down Expand Up @@ -193,7 +194,12 @@ async fn main() -> Result<()> {
)
.service(workspace::endpoints(scope("/workspaces"))
.wrap(OrgWorkspaceMiddlewareFactory::new(true, false))
.wrap(AppExecutionScopeMiddlewareFactory::new(AppScope::SUPERPOSITION)),
.wrap(AppExecutionScopeMiddlewareFactory::new(AppScope::SUPERPOSITION))
)
.service(
scope("/webhook")
.wrap(AppExecutionScopeMiddlewareFactory::new(AppScope::CAC))
.service(webhooks::endpoints()),
)
/***************************** UI Routes ******************************/
.route("/fxn/{tail:.*}", leptos_actix::handle_server_fns())
Expand Down
3 changes: 3 additions & 0 deletions crates/superposition/src/webhooks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod handlers;
pub mod types;
pub use handlers::endpoints;
238 changes: 238 additions & 0 deletions crates/superposition/src/webhooks/handlers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
use actix_web::{
delete, get, post, put,
web::{self, Json, Query},
HttpResponse, Scope,
};
use chrono::Utc;
use service_utils::service::types::DbConnection;
use superposition_macros::{bad_argument, db_error, not_found, unexpected_error};
use superposition_types::{
custom_query::PaginationParams, result as superposition, PaginatedResponse, User,
};

use super::types::{CreateWebhookRequest, WebhookName};
use diesel::{delete, ExpressionMethods, QueryDsl, RunQueryDsl};
use diesel::{
r2d2::{ConnectionManager, PooledConnection},
PgConnection,
};
use superposition_types::database::models::cac::Webhooks;
use superposition_types::database::schema::webhooks::{self, dsl};

pub fn endpoints() -> Scope {
Scope::new("")
.service(create)
.service(list_webhooks)
.service(get)
.service(update)
.service(delete_webhook)
}

#[post("")]
async fn create(
request: web::Json<CreateWebhookRequest>,
db_conn: DbConnection,
user: User,
) -> superposition::Result<Json<Webhooks>> {
let DbConnection(mut conn) = db_conn;
let req = request.into_inner();
let events: Vec<String> = req
.events
.into_iter()
.map(|event| event.to_string())
.collect();
validate_events(&events, None, &mut conn)?;
let now = Utc::now().naive_utc();
let webhook = Webhooks {
name: req.name,
description: req.description,
enabled: req.enabled,
url: req.url,
method: req.method,
version: req.version.unwrap_or("v1".to_owned()),
custom_headers: req.custom_headers,
events,
max_retries: 0,
last_triggered_at: None,
created_by: user.email.clone(),
created_at: now,
last_modified_by: user.email,
last_modified_at: now,
};

let insert: Result<Webhooks, diesel::result::Error> =
diesel::insert_into(dsl::webhooks)
.values(&webhook)
.get_result(&mut conn);

match insert {
Ok(res) => Ok(Json(res)),
Err(e) => match e {
diesel::result::Error::DatabaseError(kind, e) => {
log::error!("Function error: {:?}", e);
match kind {
diesel::result::DatabaseErrorKind::UniqueViolation => {
Err(bad_argument!("Webhook already exists."))
}
_ => Err(unexpected_error!(
"Something went wrong, failed to create the webhook"
)),
}
}
_ => {
log::error!("Webhook creation failed with error: {e}");
Err(unexpected_error!(
"An error occured please contact the admin."
))
}
},
}
}

#[get("")]
async fn list_webhooks(
db_conn: DbConnection,
filters: Query<PaginationParams>,
) -> superposition::Result<Json<PaginatedResponse<Webhooks>>> {
let DbConnection(mut conn) = db_conn;

let (total_pages, total_items, data) = match filters.all {
Some(true) => {
let result: Vec<Webhooks> = dsl::webhooks.get_results(&mut conn)?;
(1, result.len() as i64, result)
}
_ => {
let n_functions: i64 = dsl::webhooks.count().get_result(&mut conn)?;
let limit = filters.count.unwrap_or(10);
let mut builder = dsl::webhooks
.into_boxed()
.order(webhooks::last_modified_at.desc())
.limit(limit);
if let Some(page) = filters.page {
let offset = (page - 1) * limit;
builder = builder.offset(offset);
}
let result: Vec<Webhooks> = builder.load(&mut conn)?;
let total_pages = (n_functions as f64 / limit as f64).ceil() as i64;
(total_pages, n_functions, result)
}
};

Ok(Json(PaginatedResponse {
total_pages,
total_items,
data,
}))
}

#[get("/{webhook_name}")]
async fn get(
params: web::Path<WebhookName>,
db_conn: DbConnection,
) -> superposition::Result<Json<Webhooks>> {
let DbConnection(mut conn) = db_conn;
let w_name: String = params.into_inner().into();
let webhook = fetch_webhook(&w_name, &mut conn)?;
Ok(Json(webhook))
}

pub fn validate_events(
events: &Vec<String>,
exclude_webhook: Option<&String>,
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
) -> superposition::Result<()> {
let result: Vec<Webhooks> = dsl::webhooks.get_results(conn)?;
for webhook in result {
if exclude_webhook.map_or(false, |val| &webhook.name == val) {
continue;
}
if let Some(duplicate_event) =
webhook.events.iter().find(|event| events.contains(event))
{
return Err(bad_argument!("Duplicate event found: {}", duplicate_event));
}
}
Ok(())
}

pub fn fetch_webhook(
w_name: &String,
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
) -> superposition::Result<Webhooks> {
Ok(dsl::webhooks
.filter(webhooks::name.eq(w_name))
.get_result::<Webhooks>(conn)?)
}

#[put("/{webhook_name}")]
async fn update(
params: web::Path<WebhookName>,
db_conn: DbConnection,
user: User,
request: web::Json<CreateWebhookRequest>,
) -> superposition::Result<Json<Webhooks>> {
let DbConnection(mut conn) = db_conn;
let req = request.into_inner();
let w_name: String = params.into_inner().into();
let events: Vec<String> = req
.events
.into_iter()
.map(|event| event.to_string())
.collect();

validate_events(&events, Some(&w_name), &mut conn)?;

let update = diesel::update(dsl::webhooks)
.filter(webhooks::name.eq(w_name))
.set((
webhooks::description.eq(req.description),
webhooks::enabled.eq(req.enabled),
webhooks::url.eq(req.url),
webhooks::method.eq(req.method),
webhooks::version.eq(req.version.unwrap_or("v1".to_owned())),
webhooks::custom_headers.eq(req.custom_headers),
webhooks::events.eq(events),
webhooks::last_modified_by.eq(user.email),
webhooks::last_modified_at.eq(Utc::now().naive_utc()),
))
.get_result::<Webhooks>(&mut conn)
.map_err(|err| {
log::error!("failed to insert custom type with error: {}", err);
db_error!(err)
})?;

Ok(Json(update))
}

#[delete("/{webhook_name}")]
async fn delete_webhook(
params: web::Path<WebhookName>,
db_conn: DbConnection,
user: User,
) -> superposition::Result<HttpResponse> {
let DbConnection(mut conn) = db_conn;
let w_name: String = params.into_inner().into();

diesel::update(dsl::webhooks)
.filter(webhooks::name.eq(&w_name))
.set((
webhooks::last_modified_at.eq(Utc::now().naive_utc()),
webhooks::last_modified_by.eq(user.get_email()),
))
.execute(&mut conn)?;
let deleted_row =
delete(dsl::webhooks.filter(webhooks::name.eq(&w_name))).execute(&mut conn);
match deleted_row {
Ok(0) => Err(not_found!("Webhook {} doesn't exists", w_name)),
Ok(_) => {
log::info!("{w_name} Webhook deleted by {}", user.get_email());
Ok(HttpResponse::NoContent().finish())
}
Err(e) => {
log::error!("Webhook delete query failed with error: {e}");
Err(unexpected_error!(
"Something went wrong, failed to delete the Webhook"
))
}
}
}
Empty file.
35 changes: 35 additions & 0 deletions crates/superposition/src/webhooks/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use derive_more::{AsRef, Deref, DerefMut, Into};
use serde::Deserialize;
use serde_json::Value;
use superposition_types::{webhook::WebhookEvent, RegexEnum};

#[derive(Debug, Deserialize)]
pub struct CreateWebhookRequest {
pub name: String,
pub description: String,
pub enabled: bool,
pub url: String,
pub method: String,
pub version: Option<String>,
pub custom_headers: Option<Value>,
pub events: Vec<WebhookEvent>,
}

#[derive(Debug, Deserialize, AsRef, Deref, DerefMut, Into)]
#[serde(try_from = "String")]
pub struct WebhookName(String);
impl WebhookName {
pub fn validate_data(name: String) -> Result<Self, String> {
let name = name.trim();
RegexEnum::FunctionName
.match_regex(name)
.map(|_| Self(name.to_string()))
}
}

impl TryFrom<String> for WebhookName {
type Error = String;
fn try_from(value: String) -> Result<Self, Self::Error> {
Ok(Self::validate_data(value)?)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- This file should undo anything in `up.sql`
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-- Your SQL goes here
-- Name: webhooks; Type: TABLE; Schema: public; Owner: -
--
CREATE TABLE public.webhooks (
name text PRIMARY KEY,
description text NOT NULL,
enabled boolean NOT NULL DEFAULT true,
url text NOT NULL,
method text NOT NULL DEFAULT 'POST',
version text NOT NULL,
custom_headers json,
events varchar(100)[] NOT NULL CHECK (array_position(events, NULL) IS NULL),
max_retries integer NOT NULL DEFAULT 0,
last_triggered_at timestamp,
created_by text NOT NULL,
created_at timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_modified_by text NOT NULL,
last_modified_at timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP
);
--
-- Name: webhooks webhooks_audit; Type: TRIGGER; Schema: public; Owner: -
--
CREATE TRIGGER webhooks_audit AFTER INSERT OR DELETE OR UPDATE ON public.webhooks FOR EACH ROW EXECUTE FUNCTION public.event_logger();
39 changes: 39 additions & 0 deletions crates/superposition_types/src/cac/schema.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
diff --git a/crates/context_aware_config/src/db/schema.rs b/crates/context_aware_config/src/db/schema.rs
index 15c2eee..25c8088 100644
--- a/crates/context_aware_config/src/db/schema.rs
+++ b/crates/context_aware_config/src/db/schema.rs
@@ -2,13 +2,13 @@

diesel::table! {
config_versions (id) {
id -> Int8,
config -> Json,
config_hash -> Text,
- tags -> Nullable<Array<Nullable<Varchar>>>,
+ tags -> Nullable<Array<Varchar>>,
created_at -> Timestamp,
}
}

diesel::table! {
webhooks (name) {
name -> Text,
description -> Text,
enabled -> Bool,
url -> Text,
method -> Text,
version -> Text,
custom_headers -> Nullable<Json>,
- events -> Array<Nullable<Varchar>>,
+ events -> Array<Varchar>,
max_retries -> Int4,
last_triggered_at -> Nullable<Timestamp>,
created_by -> Text,
created_at -> Timestamp,
last_modified_by -> Text,
last_modified_at -> Timestamp,
}
}

diesel::table! {
contexts (id) {
Loading
Loading