Skip to content

Commit

Permalink
fix: only remove from mailbox after queued event (#541)
Browse files Browse the repository at this point in the history
  • Loading branch information
chris13524 authored May 6, 2024
1 parent 9d3da2e commit 3c0cded
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 40 deletions.
80 changes: 42 additions & 38 deletions src/services/public_http_server/handlers/relay_webhook/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,48 +128,52 @@ pub async fn handler(

let event = claims.evt;

state
.relay_mailbox_clearer_tx
.send(Receipt {
topic: event.topic.clone(),
message_id: event.message_id,
})
.await
.expect("Batch receive channel should not be closed");

// Check these after the mailbox cleaner because these
// messages would actually be in the mailbox becuase
// the client ID (sub) matches, meaning we are the one
// that subscribed. However, aud and whu are not valid,
// that's a relay error. We should still clear the mailbox
// TODO check sub
info!("aud: {}", claims.basic.aud);
// TODO check whu
info!("whu: {}", claims.whu);

let incoming_message = RelayIncomingMessage {
topic: event.topic,
message_id: get_message_id(&event.message).into(),
message: event.message,
tag: event.tag,
received_at: Utc::now(),
};
// If the message was queued to the mailbox, remove it from the mailbox
// The message was already handle by the accepted event, so don't handle a second time here
if event.status == WatchStatus::Queued {
state
.relay_mailbox_clearer_tx
.send(Receipt {
topic: event.topic.clone(),
message_id: event.message_id,
})
.await
.expect("Batch receive channel should not be closed");
} else {
// Check these after the mailbox cleaner because these
// messages would actually be in the mailbox becuase
// the client ID (sub) matches, meaning we are the one
// that subscribed. However, aud and whu are not valid,
// that's a relay error. We should still clear the mailbox
// TODO check sub
info!("aud: {}", claims.basic.aud);
// TODO check whu
info!("whu: {}", claims.whu);

let incoming_message = RelayIncomingMessage {
topic: event.topic,
message_id: get_message_id(&event.message).into(),
message: event.message,
tag: event.tag,
received_at: Utc::now(),
};

if claims.act != WatchAction::WatchEvent {
return Err(Error::Client(ClientError::WrongWatchAction(claims.act)));
}
if claims.typ != WatchType::Subscriber {
return Err(Error::Client(ClientError::WrongWatchType(claims.typ)));
}

if claims.act != WatchAction::WatchEvent {
return Err(Error::Client(ClientError::WrongWatchAction(claims.act)));
}
if claims.typ != WatchType::Subscriber {
return Err(Error::Client(ClientError::WrongWatchType(claims.typ)));
}
if event.status != WatchStatus::Accepted {
return Err(Error::Client(ClientError::WrongWatchStatus(event.status)));
}

if event.status != WatchStatus::Queued && event.status != WatchStatus::Accepted {
return Err(Error::Client(ClientError::WrongWatchStatus(event.status)));
handle_msg(incoming_message, &state)
.await
.map_err(Error::Server)?;
}

handle_msg(incoming_message, &state)
.await
.map_err(Error::Server)?;

Ok(StatusCode::NO_CONTENT.into_response())
}

Expand Down
5 changes: 3 additions & 2 deletions src/services/relay_renewal_job/register_webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ pub async fn run(
tags: INCOMING_TAGS.to_vec(),
// Alternatively we could not care about the tag, as an incoming message is an incoming message
// tags: (4000..4100).collect(),
statuses: vec![WatchStatus::Accepted],
// Accepted webhook to handle the message, Queued webhook to remove message from mailbox
statuses: vec![WatchStatus::Accepted, WatchStatus::Queued],
ttl: Duration::from_secs(60 * 60 * 24 * 30),
},
keypair,
Expand Down Expand Up @@ -111,7 +112,7 @@ mod tests {
);
assert_eq!(claims.typ, WatchType::Subscriber);
assert_eq!(claims.act, WatchAction::Register);
assert_eq!(claims.sts, vec![WatchStatus::Accepted]);
assert_eq!(claims.sts, vec![WatchStatus::Accepted, WatchStatus::Queued]);
const LEEWAY: i64 = 2;
let expected_iat = Utc::now().timestamp();
assert!(claims.basic.iat <= expected_iat);
Expand Down

0 comments on commit 3c0cded

Please sign in to comment.