Skip to content

Commit

Permalink
start redoing dispatch sstuff
Browse files Browse the repository at this point in the history
thanks @smklein
  • Loading branch information
hawkw committed Feb 4, 2025
1 parent e6dc20b commit d88cde7
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 181 deletions.
1 change: 1 addition & 0 deletions nexus/db-model/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2198,6 +2198,7 @@ table! {
id -> Uuid,
event_id -> Uuid,
rx_id -> Uuid,
is_redelivery -> Bool,
payload -> Jsonb,
attempts -> Int2,
time_created -> Timestamptz,
Expand Down
11 changes: 10 additions & 1 deletion nexus/db-model/src/webhook_delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ pub struct WebhookDelivery {
/// `webhook_rx`).
pub rx_id: DbTypedUuid<WebhookReceiverKind>,

/// True if this is an explicitly triggered re-delivery attempt, false if
/// this is an initial dispatch of the event.
pub is_redelivery: bool,

/// The data payload as sent to this receiver.
pub payload: serde_json::Value,

Expand All @@ -83,12 +87,17 @@ pub struct WebhookDelivery {
}

impl WebhookDelivery {
pub fn new(event: &WebhookEvent, rx_id: &WebhookReceiverUuid) -> Self {
pub fn new(
event: &WebhookEvent,
rx_id: &WebhookReceiverUuid,
is_redelivery: bool,
) -> Self {
Self {
// N.B.: perhaps we ought to use timestamp-based UUIDs for these?
id: WebhookDeliveryUuid::new_v4().into(),
event_id: event.id,
rx_id: (*rx_id).into(),
is_redelivery,
payload: event.event.clone(),
attempts: SqlU8::new(0),
time_created: Utc::now(),
Expand Down
110 changes: 55 additions & 55 deletions nexus/db-queries/src/db/datastore/webhook_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,10 @@ impl DataStore {
let conn = self.pool_connection_authorized(&opctx).await?;
self.transaction_retry_wrapper("webhook_event_dispatch_next")
.transaction(&conn, |conn| async move {
// Select the next webhook event in need of dispatching.
//
// This performs a `SELECT ... FOR UPDATE SKIP LOCKED` on the
// `webhook_event` table, returning the oldest webhook event which has not
// yet been dispatched to receivers and which is not actively being
// dispatched in another transaction.
// NOTE: it would be kinda nice if this query could also select the
// webhook receivers subscribed to this event, but this requires
// a `FOR UPDATE OF webhook_event` clause to indicate that we only wish
// to lock the `webhook_event` row and not the receiver.
// Unfortunately, I don't believe Diesel supports this at present.
let Some(event) = event_dsl::webhook_event
.filter(event_dsl::time_dispatched.is_null())
.order_by(event_dsl::time_created.asc())
.limit(1)
.for_update()
// TODO(eliza): AGH SKIP LOCKED IS NOT IMPLEMENTED IN CRDB...
// .skip_locked()
.select(WebhookEvent::as_select())
.get_result_async(&conn)
.await
Expand Down Expand Up @@ -124,7 +110,7 @@ impl DataStore {
);
match self
.webhook_event_insert_delivery_on_conn(
&opctx.log, &event, &rx_id, &conn,
&event, &rx_id, false, &conn
)
.await
{
Expand Down Expand Up @@ -165,49 +151,63 @@ impl DataStore {

async fn webhook_event_insert_delivery_on_conn(
&self,
log: &slog::Logger,
event: &WebhookEvent,
rx_id: &WebhookReceiverUuid,
is_redelivery: bool,
conn: &async_bb8_diesel::Connection<DbConnection>,
) -> Result<WebhookDelivery, AsyncInsertError> {
loop {
let delivery: Option<WebhookDelivery> =
WebhookReceiver::insert_resource(
rx_id.into_untyped_uuid(),
diesel::insert_into(delivery_dsl::webhook_delivery)
.values(WebhookDelivery::new(&event, rx_id))
.on_conflict(delivery_dsl::id)
.do_nothing(),
)
.insert_and_get_optional_result_async(conn)
.await?;
match delivery {
Some(delivery) => {
// XXX(eliza): is `Debug` too noisy for this?
slog::debug!(
log,
"dispatched webhook event to receiver";
"event_id" => ?event.id,
"event_class" => %event.event_class,
"receiver_id" => ?rx_id,
);
return Ok(delivery);
}
// The `ON CONFLICT (id) DO NOTHING` clause triggers if there's
// already a delivery entry with this UUID --- indicating a UUID
// collision. With 128 bits of random UUID, the chances of this
// happening are incredibly unlikely, but let's handle it
// gracefully nonetheless by trying again with a new UUID...
None => {
slog::warn!(
&log,
"webhook delivery UUID collision, retrying...";
"event_id" => ?event.id,
"event_class" => %event.event_class,
"receiver_id" => ?rx_id,
);
}
}
}
let delivery: WebhookDelivery = WebhookReceiver::insert_resource(
rx_id.into_untyped_uuid(),
diesel::insert_into(delivery_dsl::webhook_delivery)
.values(WebhookDelivery::new(&event, rx_id, is_redelivery)),
)
.insert_and_get_result_async(conn)
.await?;
Ok(delivery)
}
}

#[cfg(test)]
mod test {
use super::*;

use crate::db::pub_test_utils::TestDatabase;
use omicron_test_utils::dev;

#[tokio::test]
async fn test_dispatched_deliveries_are_unique_per_rx() {
// Test setup
let logctx =
dev::test_setup_log("test_dispatched_deliveries_are_unique_per_rx");
let db = TestDatabase::new_with_datastore(&logctx.log).await;
let (opctx, datastore) = (db.opctx(), db.datastore());
let conn = db.pool_connection_for_tests().await;

let rx_id = WebhookReceiverUuid::new_v4();
let event_id = WebhookEventUuid::new_v4();
let event = datastore
.webhook_event_create(
&opctx,
event_id,
WebhookEventClass::TestFoo,
serde_json::json!({
"answer": 42,
}),
)
.await
.expect("can't create ye event");

let delivery1 = datastore
.webhook_event_insert_delivery_on_conn(&event, &rx_id, false, conn)
.await
.expect("delivery 1 should insert");

let delivery2 = datastore
.webhook_event_insert_delivery_on_conn(&event, &rx_id, false, conn)
.await;
dbg!(delivery2).expect_err("unique constraint should be violated");

db.terminate().await;
logctx.cleanup_successful();
}
}
1 change: 1 addition & 0 deletions nexus/db-queries/src/db/datastore/webhook_rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ fn async_insert_error_to_txn(
AsyncInsertError::DatabaseError(e) => TransactionError::Database(e),
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
1 change: 0 additions & 1 deletion nexus/db-queries/src/db/queries/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ pub mod region_allocation;
pub mod virtual_provisioning_collection_update;
pub mod vpc;
pub mod vpc_subnet;
pub mod webhook_delivery;

/// SQL used to enable full table scans for the duration of the current
/// transaction.
Expand Down
124 changes: 0 additions & 124 deletions nexus/db-queries/src/db/queries/webhook_delivery.rs

This file was deleted.

15 changes: 15 additions & 0 deletions schema/crdb/dbinit.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4971,6 +4971,10 @@ CREATE TABLE IF NOT EXISTS omicron.public.webhook_delivery (
-- UUID of the webhook receiver (foreign key into
-- `omicron.public.webhook_rx`)
rx_id UUID NOT NULL,
-- true if this delivery attempt was triggered by a call to the resend API,
-- false if this is the initial delivery attempt.
is_redelivery BOOL NOT NULL,

payload JSONB NOT NULL,

--- Delivery attempt count. Starts at 0.
Expand All @@ -4993,6 +4997,17 @@ CREATE TABLE IF NOT EXISTS omicron.public.webhook_delivery (
)
);

-- Ensure that initial delivery attempts (nexus-dispatched) are unique to avoid
-- duplicate work when an event is dispatched. For deliveries created by calls
-- to the webhook event resend API, we don't enforce this constraint, to allow
-- re-delivery to be triggered multiple times.
CREATE UNIQUE INDEX IF NOT EXISTS one_webhook_event_dispatch_per_rx
ON omicron.public.webhook_delivery (
event_id, rx_id
)
WHERE
is_redelivery = FALSE;

-- Index for looking up all webhook messages dispatched to a receiver ID
CREATE INDEX IF NOT EXISTS lookup_webhook_dispatched_to_rx
ON omicron.public.webhook_delivery (
Expand Down

0 comments on commit d88cde7

Please sign in to comment.