-
Notifications
You must be signed in to change notification settings - Fork 419
/
Copy pathmod.rs
346 lines (308 loc) · 12.2 KB
/
mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
use std::{
collections::HashSet, fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc, time::Duration,
};
use axum::async_trait;
use cursors::*;
use derive_new::new;
use hyperlane_core::{
utils::fmt_sync_time, ContractSyncCursor, CursorAction, HyperlaneDomain, HyperlaneLogStore,
HyperlaneSequenceAwareIndexerStore, HyperlaneWatermarkedLogStore, Indexer,
SequenceAwareIndexer,
};
use hyperlane_core::{Indexed, LogMeta, H512};
pub use metrics::ContractSyncMetrics;
use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::broadcast::{Receiver as BroadcastReceiver, Sender as BroadcastSender};
use tokio::time::sleep;
use tracing::{debug, info, instrument, trace, warn};
use crate::settings::IndexSettings;
pub(crate) mod cursors;
mod eta_calculator;
mod metrics;
use cursors::ForwardBackwardSequenceAwareSyncCursor;
const SLEEP_DURATION: Duration = Duration::from_secs(5);
/// Entity that drives the syncing of an agent's db with on-chain data.
/// Extracts chain-specific data (emitted checkpoints, messages, etc) from an
/// `indexer` and fills the agent's db with this data.
#[derive(Debug)]
pub struct ContractSync<T: Indexable, D: HyperlaneLogStore<T>, I: Indexer<T>> {
domain: HyperlaneDomain,
db: D,
indexer: I,
metrics: ContractSyncMetrics,
broadcast_sender: Option<BroadcastSender<H512>>,
_phantom: PhantomData<T>,
}
impl<T: Indexable, D: HyperlaneLogStore<T>, I: Indexer<T>> ContractSync<T, D, I> {
/// Create a new ContractSync
pub fn new(domain: HyperlaneDomain, db: D, indexer: I, metrics: ContractSyncMetrics) -> Self {
Self {
domain,
db,
indexer,
metrics,
broadcast_sender: T::broadcast_channel_size().map(BroadcastSender::new),
_phantom: PhantomData,
}
}
}
impl<T, D, I> ContractSync<T, D, I>
where
T: Indexable + Debug + Send + Sync + Clone + Eq + Hash + 'static,
D: HyperlaneLogStore<T>,
I: Indexer<T> + 'static,
{
/// The domain that this ContractSync is running on
pub fn domain(&self) -> &HyperlaneDomain {
&self.domain
}
fn get_broadcaster(&self) -> Option<BroadcastSender<H512>> {
self.broadcast_sender.clone()
}
/// Sync logs and write them to the LogStore
#[instrument(name = "ContractSync", fields(domain=self.domain().name()), skip(self, opts))]
pub async fn sync(&self, label: &'static str, mut opts: SyncOptions<T>) {
let chain_name = self.domain.as_ref();
let indexed_height_metric = self
.metrics
.indexed_height
.with_label_values(&[label, chain_name]);
let stored_logs_metric = self
.metrics
.stored_events
.with_label_values(&[label, chain_name]);
loop {
if let Some(rx) = opts.tx_id_receiver.as_mut() {
self.fetch_logs_from_receiver(rx, &stored_logs_metric).await;
}
if let Some(cursor) = opts.cursor.as_mut() {
self.fetch_logs_with_cursor(cursor, &stored_logs_metric, &indexed_height_metric)
.await;
}
}
}
#[instrument(fields(domain=self.domain().name()), skip(self, recv, stored_logs_metric))]
async fn fetch_logs_from_receiver(
&self,
recv: &mut BroadcastReceiver<H512>,
stored_logs_metric: &GenericCounter<AtomicU64>,
) {
loop {
match recv.try_recv() {
Ok(tx_id) => {
let logs = match self.indexer.fetch_logs_by_tx_hash(tx_id).await {
Ok(logs) => logs,
Err(err) => {
warn!(?err, ?tx_id, "Error fetching logs for tx id");
continue;
}
};
let logs = self.dedupe_and_store_logs(logs, stored_logs_metric).await;
let num_logs = logs.len() as u64;
info!(
num_logs,
?tx_id,
sequences = ?logs.iter().map(|(log, _)| log.sequence).collect::<Vec<_>>(),
"Found log(s) for tx id"
);
}
Err(TryRecvError::Empty) => {
trace!("No txid received");
break;
}
Err(err) => {
warn!(?err, "Error receiving txid from channel");
break;
}
}
}
}
#[instrument(fields(domain=self.domain().name()), skip(self, stored_logs_metric, indexed_height_metric))]
async fn fetch_logs_with_cursor(
&self,
cursor: &mut Box<dyn ContractSyncCursor<T>>,
stored_logs_metric: &GenericCounter<AtomicU64>,
indexed_height_metric: &GenericGauge<AtomicI64>,
) {
indexed_height_metric.set(cursor.latest_queried_block() as i64);
let (action, eta) = match cursor.next_action().await {
Ok((action, eta)) => (action, eta),
Err(err) => {
warn!(?err, "Error getting next action");
sleep(SLEEP_DURATION).await;
return;
}
};
let sleep_duration = match action {
// Use `loop` but always break - this allows for returning a value
// from the loop (the sleep duration)
#[allow(clippy::never_loop)]
CursorAction::Query(range) => loop {
debug!(?range, "Looking for events in index range");
let logs = match self.indexer.fetch_logs_in_range(range.clone()).await {
Ok(logs) => logs,
Err(err) => {
warn!(?err, ?range, "Error fetching logs in range");
break SLEEP_DURATION;
}
};
let logs = self.dedupe_and_store_logs(logs, stored_logs_metric).await;
let logs_found = logs.len() as u64;
info!(
?range,
num_logs = logs_found,
estimated_time_to_sync = fmt_sync_time(eta),
sequences = ?logs.iter().map(|(log, _)| log.sequence).collect::<Vec<_>>(),
cursor = ?cursor,
"Found log(s) in index range"
);
if let Some(tx) = self.broadcast_sender.as_ref() {
logs.iter().for_each(|(_, meta)| {
if let Err(err) = tx.send(meta.transaction_id) {
trace!(?err, "Error sending txid to receiver");
}
});
}
// Update cursor
if let Err(err) = cursor.update(logs, range).await {
warn!(?err, "Error updating cursor");
break SLEEP_DURATION;
};
break Default::default();
},
CursorAction::Sleep(duration) => duration,
};
sleep(sleep_duration).await
}
async fn dedupe_and_store_logs(
&self,
logs: Vec<(Indexed<T>, LogMeta)>,
stored_logs_metric: &GenericCounter<AtomicU64>,
) -> Vec<(Indexed<T>, LogMeta)> {
let deduped_logs = HashSet::<_>::from_iter(logs);
let logs = Vec::from_iter(deduped_logs);
// Store deliveries
let stored = match self.db.store_logs(&logs).await {
Ok(stored) => stored,
Err(err) => {
warn!(?err, "Error storing logs in db");
Default::default()
}
};
if stored > 0 {
debug!(
domain = self.domain.as_ref(),
count = stored,
sequences = ?logs.iter().map(|(log, _)| log.sequence).collect::<Vec<_>>(),
"Stored logs in db",
);
}
// Report amount of deliveries stored into db
stored_logs_metric.inc_by(stored as u64);
logs
}
}
/// A ContractSync for syncing events using a SequenceAwareIndexer
pub type SequenceAwareContractSync<T, U> = ContractSync<T, U, Arc<dyn SequenceAwareIndexer<T>>>;
/// Log store for the watermark cursor
pub type WatermarkLogStore<T> = Arc<dyn HyperlaneWatermarkedLogStore<T>>;
/// A ContractSync for syncing events using a RateLimitedContractSyncCursor
pub type WatermarkContractSync<T> =
SequenceAwareContractSync<T, Arc<dyn HyperlaneWatermarkedLogStore<T>>>;
/// Abstraction over a contract syncer that can also be converted into a cursor
#[async_trait]
pub trait ContractSyncer<T>: Send + Sync {
/// Returns a new cursor to be used for syncing events from the indexer
async fn cursor(&self, index_settings: IndexSettings) -> Box<dyn ContractSyncCursor<T>>;
/// Syncs events from the indexer using the provided cursor
async fn sync(&self, label: &'static str, opts: SyncOptions<T>);
/// The domain of this syncer
fn domain(&self) -> &HyperlaneDomain;
/// If this syncer is also a broadcaster, return the channel to receive txids
fn get_broadcaster(&self) -> Option<BroadcastSender<H512>>;
}
#[derive(new)]
/// Options for syncing events
pub struct SyncOptions<T> {
// Keep as optional fields for now to run them simultaneously.
// Might want to refactor into an enum later, where we either index with a cursor or rely on receiving
// txids from a channel to other indexing tasks
cursor: Option<Box<dyn ContractSyncCursor<T>>>,
tx_id_receiver: Option<BroadcastReceiver<H512>>,
}
impl<T> From<Box<dyn ContractSyncCursor<T>>> for SyncOptions<T> {
fn from(cursor: Box<dyn ContractSyncCursor<T>>) -> Self {
Self {
cursor: Some(cursor),
tx_id_receiver: None,
}
}
}
#[async_trait]
impl<T> ContractSyncer<T> for WatermarkContractSync<T>
where
T: Indexable + Debug + Send + Sync + Clone + Eq + Hash + 'static,
{
/// Returns a new cursor to be used for syncing events from the indexer based on time
async fn cursor(&self, index_settings: IndexSettings) -> Box<dyn ContractSyncCursor<T>> {
let watermark = self.db.retrieve_high_watermark().await.unwrap();
let index_settings = IndexSettings {
from: watermark.unwrap_or(index_settings.from),
chunk_size: index_settings.chunk_size,
mode: index_settings.mode,
};
Box::new(
RateLimitedContractSyncCursor::new(
Arc::new(self.indexer.clone()),
self.db.clone(),
index_settings.chunk_size,
index_settings.from,
)
.await
.unwrap(),
)
}
async fn sync(&self, label: &'static str, opts: SyncOptions<T>) {
ContractSync::sync(self, label, opts).await
}
fn domain(&self) -> &HyperlaneDomain {
ContractSync::domain(self)
}
fn get_broadcaster(&self) -> Option<BroadcastSender<H512>> {
ContractSync::get_broadcaster(self)
}
}
/// Log store for sequence aware cursors
pub type SequenceAwareLogStore<T> = Arc<dyn HyperlaneSequenceAwareIndexerStore<T>>;
/// A ContractSync for syncing messages using a SequenceSyncCursor
pub type SequencedDataContractSync<T> =
SequenceAwareContractSync<T, Arc<dyn HyperlaneSequenceAwareIndexerStore<T>>>;
#[async_trait]
impl<T> ContractSyncer<T> for SequencedDataContractSync<T>
where
T: Indexable + Send + Sync + Debug + Clone + Eq + Hash + 'static,
{
/// Returns a new cursor to be used for syncing dispatched messages from the indexer
async fn cursor(&self, index_settings: IndexSettings) -> Box<dyn ContractSyncCursor<T>> {
Box::new(
ForwardBackwardSequenceAwareSyncCursor::new(
self.indexer.clone(),
Arc::new(self.db.clone()),
index_settings.chunk_size,
index_settings.mode,
)
.await
.unwrap(),
)
}
async fn sync(&self, label: &'static str, opts: SyncOptions<T>) {
ContractSync::sync(self, label, opts).await;
}
fn domain(&self) -> &HyperlaneDomain {
ContractSync::domain(self)
}
fn get_broadcaster(&self) -> Option<BroadcastSender<H512>> {
ContractSync::get_broadcaster(self)
}
}