-
Notifications
You must be signed in to change notification settings - Fork 44
/
Copy pathmod.rs
150 lines (121 loc) · 4.79 KB
/
mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
use std::{cmp::min, sync::Arc, time::Duration};
use async_trait::async_trait;
use tokio::{sync::RwLock, time};
use tracing::{debug, info, instrument, Span};
use crate::{block_builder::BlockBuilder, ProvenTransaction, SharedRwVec, COMPONENT};
#[cfg(test)]
mod tests;
pub mod batch;
pub use batch::TransactionBatch;
use miden_node_utils::formatting::{format_array, format_blake3_digest};
use crate::errors::BuildBatchError;
// BATCH BUILDER
// ================================================================================================
/// Abstraction over batch proving of transactions.
///
/// Transactions are aggregated into batches prior to being added to blocks. This trait abstracts
/// over this responsibility. The trait's goal is to be implementation agnostic, allowing for
/// multiple implementations, e.g.:
///
/// - in-process cpu based prover
/// - out-of-process gpu based prover
/// - distributed prover on another machine
#[async_trait]
pub trait BatchBuilder: Send + Sync + 'static {
/// Start proving of a new batch.
async fn build_batch(
&self,
txs: Vec<ProvenTransaction>,
) -> Result<(), BuildBatchError>;
}
// DEFAULT BATCH BUILDER
// ================================================================================================
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DefaultBatchBuilderOptions {
/// The frequency at which blocks are created
pub block_frequency: Duration,
/// Maximum number of batches in any given block
pub max_batches_per_block: usize,
}
pub struct DefaultBatchBuilder<BB> {
/// Batches ready to be included in a block
ready_batches: SharedRwVec<TransactionBatch>,
block_builder: Arc<BB>,
options: DefaultBatchBuilderOptions,
}
impl<BB> DefaultBatchBuilder<BB>
where
BB: BlockBuilder,
{
// CONSTRUCTOR
// --------------------------------------------------------------------------------------------
/// Returns an new [BatchBuilder] instantiated with the provided [BlockBuilder] and the
/// specified options.
pub fn new(
block_builder: Arc<BB>,
options: DefaultBatchBuilderOptions,
) -> Self {
Self {
ready_batches: Arc::new(RwLock::new(Vec::new())),
block_builder,
options,
}
}
// BATCH BUILDER STARTER
// --------------------------------------------------------------------------------------------
pub async fn run(self: Arc<Self>) {
let mut interval = time::interval(self.options.block_frequency);
info!(target: COMPONENT, period_ms = interval.period().as_millis(), "Batch builder started");
loop {
interval.tick().await;
self.try_build_block().await;
}
}
// HELPER METHODS
// --------------------------------------------------------------------------------------------
/// Note that we call `build_block()` regardless of whether the `ready_batches` queue is empty.
/// A call to an empty `build_block()` indicates that an empty block should be created.
#[instrument(target = "miden-block-producer", skip_all)]
async fn try_build_block(&self) {
let mut batches_in_block: Vec<TransactionBatch> = {
let mut locked_ready_batches = self.ready_batches.write().await;
let num_batches_in_block =
min(self.options.max_batches_per_block, locked_ready_batches.len());
locked_ready_batches.drain(..num_batches_in_block).collect()
};
match self.block_builder.build_block(&batches_in_block).await {
Ok(_) => {
// block successfully built, do nothing
},
Err(_) => {
// Block building failed; add back the batches at the end of the queue
self.ready_batches.write().await.append(&mut batches_in_block);
},
}
}
}
#[async_trait]
impl<BB> BatchBuilder for DefaultBatchBuilder<BB>
where
BB: BlockBuilder,
{
#[instrument(target = "miden-block-producer", skip_all, err, fields(batch_id))]
async fn build_batch(
&self,
txs: Vec<ProvenTransaction>,
) -> Result<(), BuildBatchError> {
let num_txs = txs.len();
info!(target: COMPONENT, num_txs, "Building a transaction batch");
debug!(target: COMPONENT, txs = %format_array(txs.iter().map(|tx| tx.id().to_hex())));
let batch = TransactionBatch::new(txs)?;
info!(target: COMPONENT, "Transaction batch built");
Span::current().record("batch_id", format_blake3_digest(batch.id()));
let num_batches = {
let mut write_guard = self.ready_batches.write().await;
write_guard.push(batch);
write_guard.len()
};
info!(target: COMPONENT, num_batches, "Transaction batch added to the batch queue");
Ok(())
}
}