Skip to content

Commit

Permalink
feat(relayer): ListOperations api endpoint (#4089)
Browse files Browse the repository at this point in the history
### Description

Adds a `list_operations` API endpoint to the relayer, which returns all
operations in its prepare queue.

Example output from `GET
http://0.0.0.0:9092/list_operations?destination_domain=13371`

```
[
  {
    "app_context": null,
    "message": {
      "body": [
        18,
        52
      ],
      "destination": 13371,
      "nonce": 1,
      "origin": 13373,
      "recipient": "0x000000000000000000000000927b167526babb9be047421db732c663a0b77b11",
      "sender": "0x000000000000000000000000927b167526babb9be047421db732c663a0b77b11",
      "version": 3
    },
    "num_retries": 13,
    "status": {
      "Retry": "CouldNotFetchMetadata"
    },
    "submitted": false,
    "type": "PendingMessage"
  },
  {
    "app_context": null,
    "message": {
      "body": [
        18,
        52
      ],
      "destination": 13371,
      "nonce": 2,
      "origin": 13372,
      "recipient": "0x000000000000000000000000927b167526babb9be047421db732c663a0b77b11",
      "sender": "0x000000000000000000000000927b167526babb9be047421db732c663a0b77b11",
      "version": 3
    },
    "num_retries": 13,
    "status": {
      "Retry": "CouldNotFetchMetadata"
    },
    "submitted": false,
    "type": "PendingMessage"
  }
]
```

### Drive-by changes

Makes `PendingOperation` serializable, which means switching from JSON
format to e.g. CSV (for easily filtering / aggregating in excel) should
be very easy.

### Related issues

<!--
- Fixes #[issue number here]
-->

### Backward compatibility

Yes

### Testing

Manual and Unit Tests
  • Loading branch information
daniel-savu authored Jul 4, 2024
1 parent 1022e38 commit 8c8f39a
Show file tree
Hide file tree
Showing 14 changed files with 351 additions and 84 deletions.
48 changes: 48 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ tracing-error = "0.2"
tracing-futures = "0.2"
tracing-subscriber = { version = "0.3", default-features = false }
tracing-test = "0.2.2"
typetag = "0.2"
uint = "0.9.5"
ureq = { version = "2.4", default-features = false }
url = "2.3"
Expand Down
1 change: 1 addition & 0 deletions rust/agents/relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ tokio = { workspace = true, features = ["rt", "macros", "parking_lot", "rt-multi
tokio-metrics.workspace = true
tracing-futures.workspace = true
tracing.workspace = true
typetag.workspace = true

hyperlane-core = { path = "../../hyperlane-core", features = ["agent", "async"] }
hyperlane-base = { path = "../../hyperlane-base", features = ["test-utils"] }
Expand Down
16 changes: 10 additions & 6 deletions rust/agents/relayer/src/msg/op_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use tracing::{debug, info, instrument};

use crate::server::MessageRetryRequest;

pub type OperationPriorityQueue = Arc<Mutex<BinaryHeap<Reverse<QueueOperation>>>>;

/// Queue of generic operations that can be submitted to a destination chain.
/// Includes logic for maintaining queue metrics by the destination and `app_context` of an operation
#[derive(Debug, Clone, new)]
Expand All @@ -16,7 +18,7 @@ pub struct OpQueue {
queue_metrics_label: String,
retry_rx: Arc<Mutex<Receiver<MessageRetryRequest>>>,
#[new(default)]
queue: Arc<Mutex<BinaryHeap<Reverse<QueueOperation>>>>,
pub queue: OperationPriorityQueue,
}

impl OpQueue {
Expand Down Expand Up @@ -115,27 +117,28 @@ impl OpQueue {
}

#[cfg(test)]
mod test {
pub mod test {
use super::*;
use hyperlane_core::{
HyperlaneDomain, HyperlaneMessage, KnownHyperlaneDomain, PendingOperationResult,
TryBatchAs, TxOutcome, H256, U256,
};
use serde::Serialize;
use std::{
collections::VecDeque,
time::{Duration, Instant},
};
use tokio::sync;

#[derive(Debug, Clone)]
struct MockPendingOperation {
#[derive(Debug, Clone, Serialize)]
pub struct MockPendingOperation {
id: H256,
seconds_to_next_attempt: u64,
destination_domain: HyperlaneDomain,
}

impl MockPendingOperation {
fn new(seconds_to_next_attempt: u64, destination_domain: HyperlaneDomain) -> Self {
pub fn new(seconds_to_next_attempt: u64, destination_domain: HyperlaneDomain) -> Self {
Self {
id: H256::random(),
seconds_to_next_attempt,
Expand All @@ -147,6 +150,7 @@ mod test {
impl TryBatchAs<HyperlaneMessage> for MockPendingOperation {}

#[async_trait::async_trait]
#[typetag::serialize]
impl PendingOperation for MockPendingOperation {
fn id(&self) -> H256 {
self.id
Expand Down Expand Up @@ -236,7 +240,7 @@ mod test {
}
}

fn dummy_metrics_and_label() -> (IntGaugeVec, String) {
pub fn dummy_metrics_and_label() -> (IntGaugeVec, String) {
(
IntGaugeVec::new(
prometheus::Opts::new("op_queue", "OpQueue metrics"),
Expand Down
67 changes: 48 additions & 19 deletions rust/agents/relayer/src/msg/op_submitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::msg::pending_message::CONFIRM_DELAY;
use crate::server::MessageRetryRequest;

use super::op_queue::OpQueue;
use super::op_queue::OperationPriorityQueue;

/// SerialSubmitter accepts operations over a channel. It is responsible for
/// executing the right strategy to deliver those messages to the destination
Expand Down Expand Up @@ -75,23 +76,64 @@ use super::op_queue::OpQueue;
/// eligible for submission, we should be working on it within reason. This
/// must be balanced with the cost of making RPCs that will almost certainly
/// fail and potentially block new messages from being sent immediately.
#[derive(Debug, new)]
#[derive(Debug)]
pub struct SerialSubmitter {
/// Domain this submitter delivers to.
domain: HyperlaneDomain,
/// Receiver for new messages to submit.
rx: mpsc::UnboundedReceiver<QueueOperation>,
/// Receiver for retry requests.
retry_tx: Sender<MessageRetryRequest>,
/// Metrics for serial submitter.
metrics: SerialSubmitterMetrics,
/// Max batch size for submitting messages
max_batch_size: u32,
/// tokio task monitor
task_monitor: TaskMonitor,
prepare_queue: OpQueue,
submit_queue: OpQueue,
confirm_queue: OpQueue,
}

impl SerialSubmitter {
pub fn new(
domain: HyperlaneDomain,
rx: mpsc::UnboundedReceiver<QueueOperation>,
retry_op_transmitter: Sender<MessageRetryRequest>,
metrics: SerialSubmitterMetrics,
max_batch_size: u32,
task_monitor: TaskMonitor,
) -> Self {
let prepare_queue = OpQueue::new(
metrics.submitter_queue_length.clone(),
"prepare_queue".to_string(),
Arc::new(Mutex::new(retry_op_transmitter.subscribe())),
);
let submit_queue = OpQueue::new(
metrics.submitter_queue_length.clone(),
"submit_queue".to_string(),
Arc::new(Mutex::new(retry_op_transmitter.subscribe())),
);
let confirm_queue = OpQueue::new(
metrics.submitter_queue_length.clone(),
"confirm_queue".to_string(),
Arc::new(Mutex::new(retry_op_transmitter.subscribe())),
);

Self {
domain,
rx,
metrics,
max_batch_size,
task_monitor,
prepare_queue,
submit_queue,
confirm_queue,
}
}

pub async fn prepare_queue(&self) -> OperationPriorityQueue {
self.prepare_queue.queue.clone()
}

pub fn spawn(self) -> Instrumented<JoinHandle<()>> {
let span = info_span!("SerialSubmitter", destination=%self.domain);
let task_monitor = self.task_monitor.clone();
Expand All @@ -106,25 +148,12 @@ impl SerialSubmitter {
domain,
metrics,
rx: rx_prepare,
retry_tx,
max_batch_size,
task_monitor,
prepare_queue,
submit_queue,
confirm_queue,
} = self;
let prepare_queue = OpQueue::new(
metrics.submitter_queue_length.clone(),
"prepare_queue".to_string(),
Arc::new(Mutex::new(retry_tx.subscribe())),
);
let submit_queue = OpQueue::new(
metrics.submitter_queue_length.clone(),
"submit_queue".to_string(),
Arc::new(Mutex::new(retry_tx.subscribe())),
);
let confirm_queue = OpQueue::new(
metrics.submitter_queue_length.clone(),
"confirm_queue".to_string(),
Arc::new(Mutex::new(retry_tx.subscribe())),
);

let tasks = [
tokio::spawn(TaskMonitor::instrument(
Expand Down
13 changes: 10 additions & 3 deletions rust/agents/relayer/src/msg/pending_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use hyperlane_core::{
TxOutcome, H256, U256,
};
use prometheus::{IntCounter, IntGauge};
use serde::Serialize;
use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument};

use super::{
Expand Down Expand Up @@ -50,23 +51,28 @@ pub struct MessageContext {
}

/// A message that the submitter can and should try to submit.
#[derive(new)]
#[derive(new, Serialize)]
pub struct PendingMessage {
pub message: HyperlaneMessage,
#[serde(skip_serializing)]
ctx: Arc<MessageContext>,
status: PendingOperationStatus,
app_context: Option<String>,
#[new(default)]
submitted: bool,
#[new(default)]
#[serde(skip_serializing)]
submission_data: Option<Box<MessageSubmissionData>>,
#[new(default)]
num_retries: u32,
#[new(value = "Instant::now()")]
#[serde(skip_serializing)]
last_attempted_at: Instant,
#[new(default)]
#[serde(skip_serializing)]
next_attempt_after: Option<Instant>,
#[new(default)]
#[serde(skip_serializing)]
submission_outcome: Option<TxOutcome>,
}

Expand All @@ -85,8 +91,8 @@ impl Debug for PendingMessage {
}
})
.unwrap_or(0);
write!(f, "PendingMessage {{ num_retries: {}, since_last_attempt_s: {last_attempt}, next_attempt_after_s: {next_attempt}, message: {:?} }}",
self.num_retries, self.message)
write!(f, "PendingMessage {{ num_retries: {}, since_last_attempt_s: {last_attempt}, next_attempt_after_s: {next_attempt}, message: {:?}, status: {:?}, app_context: {:?} }}",
self.num_retries, self.message, self.status, self.app_context)
}
}

Expand Down Expand Up @@ -117,6 +123,7 @@ impl TryBatchAs<HyperlaneMessage> for PendingMessage {
}

#[async_trait]
#[typetag::serialize]
impl PendingOperation for PendingMessage {
fn id(&self) -> H256 {
self.message.id()
Expand Down
Loading

0 comments on commit 8c8f39a

Please sign in to comment.