From d88cde70f436cbe8c8f4469ffaad049cb7e865c0 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 4 Feb 2025 14:27:01 -0800 Subject: [PATCH] start redoing dispatch sstuff thanks @smklein --- nexus/db-model/src/schema.rs | 1 + nexus/db-model/src/webhook_delivery.rs | 11 +- .../src/db/datastore/webhook_event.rs | 110 ++++++++-------- .../db-queries/src/db/datastore/webhook_rx.rs | 1 + nexus/db-queries/src/db/queries/mod.rs | 1 - .../src/db/queries/webhook_delivery.rs | 124 ------------------ schema/crdb/dbinit.sql | 15 +++ 7 files changed, 82 insertions(+), 181 deletions(-) delete mode 100644 nexus/db-queries/src/db/queries/webhook_delivery.rs diff --git a/nexus/db-model/src/schema.rs b/nexus/db-model/src/schema.rs index cb9eb4ca38e..aed8cf253ae 100644 --- a/nexus/db-model/src/schema.rs +++ b/nexus/db-model/src/schema.rs @@ -2198,6 +2198,7 @@ table! { id -> Uuid, event_id -> Uuid, rx_id -> Uuid, + is_redelivery -> Bool, payload -> Jsonb, attempts -> Int2, time_created -> Timestamptz, diff --git a/nexus/db-model/src/webhook_delivery.rs b/nexus/db-model/src/webhook_delivery.rs index 7a37b776f6e..7bcf3937a9a 100644 --- a/nexus/db-model/src/webhook_delivery.rs +++ b/nexus/db-model/src/webhook_delivery.rs @@ -65,6 +65,10 @@ pub struct WebhookDelivery { /// `webhook_rx`). pub rx_id: DbTypedUuid, + /// True if this is an explicitly triggered re-delivery attempt, false if + /// this is an initial dispatch of the event. + pub is_redelivery: bool, + /// The data payload as sent to this receiver. pub payload: serde_json::Value, @@ -83,12 +87,17 @@ pub struct WebhookDelivery { } impl WebhookDelivery { - pub fn new(event: &WebhookEvent, rx_id: &WebhookReceiverUuid) -> Self { + pub fn new( + event: &WebhookEvent, + rx_id: &WebhookReceiverUuid, + is_redelivery: bool, + ) -> Self { Self { // N.B.: perhaps we ought to use timestamp-based UUIDs for these? id: WebhookDeliveryUuid::new_v4().into(), event_id: event.id, rx_id: (*rx_id).into(), + is_redelivery, payload: event.event.clone(), attempts: SqlU8::new(0), time_created: Utc::now(), diff --git a/nexus/db-queries/src/db/datastore/webhook_event.rs b/nexus/db-queries/src/db/datastore/webhook_event.rs index 280502a98c8..115f06c85db 100644 --- a/nexus/db-queries/src/db/datastore/webhook_event.rs +++ b/nexus/db-queries/src/db/datastore/webhook_event.rs @@ -57,24 +57,10 @@ impl DataStore { let conn = self.pool_connection_authorized(&opctx).await?; self.transaction_retry_wrapper("webhook_event_dispatch_next") .transaction(&conn, |conn| async move { - // Select the next webhook event in need of dispatching. - // - // This performs a `SELECT ... FOR UPDATE SKIP LOCKED` on the - // `webhook_event` table, returning the oldest webhook event which has not - // yet been dispatched to receivers and which is not actively being - // dispatched in another transaction. - // NOTE: it would be kinda nice if this query could also select the - // webhook receivers subscribed to this event, but this requires - // a `FOR UPDATE OF webhook_event` clause to indicate that we only wish - // to lock the `webhook_event` row and not the receiver. - // Unfortunately, I don't believe Diesel supports this at present. let Some(event) = event_dsl::webhook_event .filter(event_dsl::time_dispatched.is_null()) .order_by(event_dsl::time_created.asc()) .limit(1) - .for_update() - // TODO(eliza): AGH SKIP LOCKED IS NOT IMPLEMENTED IN CRDB... - // .skip_locked() .select(WebhookEvent::as_select()) .get_result_async(&conn) .await @@ -124,7 +110,7 @@ impl DataStore { ); match self .webhook_event_insert_delivery_on_conn( - &opctx.log, &event, &rx_id, &conn, + &event, &rx_id, false, &conn ) .await { @@ -165,49 +151,63 @@ impl DataStore { async fn webhook_event_insert_delivery_on_conn( &self, - log: &slog::Logger, event: &WebhookEvent, rx_id: &WebhookReceiverUuid, + is_redelivery: bool, conn: &async_bb8_diesel::Connection, ) -> Result { - loop { - let delivery: Option = - WebhookReceiver::insert_resource( - rx_id.into_untyped_uuid(), - diesel::insert_into(delivery_dsl::webhook_delivery) - .values(WebhookDelivery::new(&event, rx_id)) - .on_conflict(delivery_dsl::id) - .do_nothing(), - ) - .insert_and_get_optional_result_async(conn) - .await?; - match delivery { - Some(delivery) => { - // XXX(eliza): is `Debug` too noisy for this? - slog::debug!( - log, - "dispatched webhook event to receiver"; - "event_id" => ?event.id, - "event_class" => %event.event_class, - "receiver_id" => ?rx_id, - ); - return Ok(delivery); - } - // The `ON CONFLICT (id) DO NOTHING` clause triggers if there's - // already a delivery entry with this UUID --- indicating a UUID - // collision. With 128 bits of random UUID, the chances of this - // happening are incredibly unlikely, but let's handle it - // gracefully nonetheless by trying again with a new UUID... - None => { - slog::warn!( - &log, - "webhook delivery UUID collision, retrying..."; - "event_id" => ?event.id, - "event_class" => %event.event_class, - "receiver_id" => ?rx_id, - ); - } - } - } + let delivery: WebhookDelivery = WebhookReceiver::insert_resource( + rx_id.into_untyped_uuid(), + diesel::insert_into(delivery_dsl::webhook_delivery) + .values(WebhookDelivery::new(&event, rx_id, is_redelivery)), + ) + .insert_and_get_result_async(conn) + .await?; + Ok(delivery) + } +} + +#[cfg(test)] +mod test { + use super::*; + + use crate::db::pub_test_utils::TestDatabase; + use omicron_test_utils::dev; + + #[tokio::test] + async fn test_dispatched_deliveries_are_unique_per_rx() { + // Test setup + let logctx = + dev::test_setup_log("test_dispatched_deliveries_are_unique_per_rx"); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + let conn = db.pool_connection_for_tests().await; + + let rx_id = WebhookReceiverUuid::new_v4(); + let event_id = WebhookEventUuid::new_v4(); + let event = datastore + .webhook_event_create( + &opctx, + event_id, + WebhookEventClass::TestFoo, + serde_json::json!({ + "answer": 42, + }), + ) + .await + .expect("can't create ye event"); + + let delivery1 = datastore + .webhook_event_insert_delivery_on_conn(&event, &rx_id, false, conn) + .await + .expect("delivery 1 should insert"); + + let delivery2 = datastore + .webhook_event_insert_delivery_on_conn(&event, &rx_id, false, conn) + .await; + dbg!(delivery2).expect_err("unique constraint should be violated"); + + db.terminate().await; + logctx.cleanup_successful(); } } diff --git a/nexus/db-queries/src/db/datastore/webhook_rx.rs b/nexus/db-queries/src/db/datastore/webhook_rx.rs index ad83de42e1a..755cfdc3a21 100644 --- a/nexus/db-queries/src/db/datastore/webhook_rx.rs +++ b/nexus/db-queries/src/db/datastore/webhook_rx.rs @@ -456,6 +456,7 @@ fn async_insert_error_to_txn( AsyncInsertError::DatabaseError(e) => TransactionError::Database(e), } } + #[cfg(test)] mod test { use super::*; diff --git a/nexus/db-queries/src/db/queries/mod.rs b/nexus/db-queries/src/db/queries/mod.rs index 755bc5acd9d..5f34c7cfb3d 100644 --- a/nexus/db-queries/src/db/queries/mod.rs +++ b/nexus/db-queries/src/db/queries/mod.rs @@ -16,7 +16,6 @@ pub mod region_allocation; pub mod virtual_provisioning_collection_update; pub mod vpc; pub mod vpc_subnet; -pub mod webhook_delivery; /// SQL used to enable full table scans for the duration of the current /// transaction. diff --git a/nexus/db-queries/src/db/queries/webhook_delivery.rs b/nexus/db-queries/src/db/queries/webhook_delivery.rs deleted file mode 100644 index db9ede1188d..00000000000 --- a/nexus/db-queries/src/db/queries/webhook_delivery.rs +++ /dev/null @@ -1,124 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. - -//! Implementation of queries for webhook deliveries - -use crate::db::model::schema::webhook_delivery::{self, dsl}; -use crate::db::model::WebhookDelivery; -use crate::db::raw_query_builder::{QueryBuilder, TypedSqlQuery}; -use diesel::pg::Pg; -use diesel::prelude::QueryResult; -use diesel::query_builder::{AstPass, Query, QueryFragment, QueryId}; -use diesel::sql_types; -use diesel::Column; -use diesel::QuerySource; -use omicron_uuid_kinds::GenericUuid; -use omicron_uuid_kinds::WebhookEventUuid; -use omicron_uuid_kinds::WebhookReceiverUuid; - -pub struct WebhookDeliveryDispatch { - rx_id: WebhookReceiverUuid, - event_id: WebhookEventUuid, - insert: Box + Send>, -} - -impl WebhookDeliveryDispatch { - pub fn new(delivery: WebhookDelivery) -> Self { - let rx_id = delivery.rx_id.into(); - let event_id = delivery.event_id.into(); - let insert = Box::new( - diesel::insert_into(dsl::webhook_delivery) - .values(delivery) - .on_conflict(dsl::id) - .do_nothing(), // .returning(WebhookDelivery::as_returning()) - ); - Self { rx_id, event_id, insert } - } -} - -impl QueryFragment for WebhookDeliveryDispatch { - fn walk_ast(&self, mut out: AstPass) -> QueryResult<()> { - self.insert.walk_ast(out.reborrow())?; - // WHERE NOT EXISTS ( - // SELECT 1 FROM omicron.public.webhook_delivery - // WHERE rx_id = $1 AND event_id = $2 - // ) - out.push_sql(" WHERE NOT EXISTS ( SELECT 1 "); - dsl::webhook_delivery.from_clause().walk_ast(out.reborrow())?; - out.push_sql(" WHERE "); - out.push_identifier(dsl::rx_id::NAME); - out.push_sql(" = "); - out.push_bind_param::( - self.rx_id.as_untyped_uuid(), - )?; - out.push_sql(" AND "); - out.push_identifier(dsl::event_id::NAME); - out.push_sql(" = "); - out.push_bind_param::( - self.event_id.as_untyped_uuid(), - )?; - out.push_sql(" )"); - Ok(()) - } -} - -#[cfg(test)] -mod test { - use super::*; - use crate::db::explain::ExplainableAsync; - use crate::db::model; - use crate::db::model::WebhookEvent; - use crate::db::model::WebhookEventClass; - use crate::db::pub_test_utils::TestDatabase; - use crate::db::raw_query_builder::expectorate_query_contents; - - use anyhow::Context; - use async_bb8_diesel::AsyncRunQueryDsl; - use nexus_types::external_api::params; - use nexus_types::identity::Resource; - use omicron_common::api::external; - use omicron_test_utils::dev; - use uuid::Uuid; - - fn test_dispatch_query() -> WebhookDeliveryDispatch { - let event = WebhookEvent { - id: WebhookEventUuid::nil().into(), - time_created: Utc::now(), - time_dispatched: None, - event_class: WebhookEventClass::Test, - event: serde_json::json!({ "test": "data" }), - }; - let delivery = WebhookDelivery::new(&event, WebhookReceiverId::nil()); - WebhookDeliveryDispatch::new(delivery) - } - - #[tokio::test] - async fn expectorate_delivery_dispatch_query() { - expectorate_query_contents( - &test_dispatch_query(), - "tests/output/webhook_delivery_dispatch_query.sql", - ) - .await; - } - - #[tokio::test] - async fn explain_delivery_dispatch_query() { - let logctx = - dev::test_setup_log("explain_webhook_delivery_dispatch_query"); - let db = TestDatabase::new_with_pool(&logctx.log).await; - let pool = db.pool(); - let conn = pool.claim().await.unwrap(); - - let query = test_dispatch_query(); - let explanation = query - .explain_async(&conn) - .await - .expect("Failed to explain query - is it valid SQL?"); - - eprintln!("{explanation}"); - - db.terminate().await; - logctx.cleanup_successful(); - } -} diff --git a/schema/crdb/dbinit.sql b/schema/crdb/dbinit.sql index a002d90cac7..eb904b82401 100644 --- a/schema/crdb/dbinit.sql +++ b/schema/crdb/dbinit.sql @@ -4971,6 +4971,10 @@ CREATE TABLE IF NOT EXISTS omicron.public.webhook_delivery ( -- UUID of the webhook receiver (foreign key into -- `omicron.public.webhook_rx`) rx_id UUID NOT NULL, + -- true if this delivery attempt was triggered by a call to the resend API, + -- false if this is the initial delivery attempt. + is_redelivery BOOL NOT NULL, + payload JSONB NOT NULL, --- Delivery attempt count. Starts at 0. @@ -4993,6 +4997,17 @@ CREATE TABLE IF NOT EXISTS omicron.public.webhook_delivery ( ) ); +-- Ensure that initial delivery attempts (nexus-dispatched) are unique to avoid +-- duplicate work when an event is dispatched. For deliveries created by calls +-- to the webhook event resend API, we don't enforce this constraint, to allow +-- re-delivery to be triggered multiple times. +CREATE UNIQUE INDEX IF NOT EXISTS one_webhook_event_dispatch_per_rx +ON omicron.public.webhook_delivery ( + event_id, rx_id +) +WHERE + is_redelivery = FALSE; + -- Index for looking up all webhook messages dispatched to a receiver ID CREATE INDEX IF NOT EXISTS lookup_webhook_dispatched_to_rx ON omicron.public.webhook_delivery (