-
Notifications
You must be signed in to change notification settings - Fork 62
/
runloops.rs
146 lines (136 loc) · 5.47 KB
/
runloops.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
use std::sync::mpsc::Sender;
use chainhook_sdk::{
chainhooks::types::{
BitcoinChainhookSpecification, ChainhookSpecification, StacksChainhookSpecification,
},
observer::ObserverCommand,
utils::Context,
};
use threadpool::ThreadPool;
use crate::{
config::{Config, PredicatesApi},
scan::{
bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate,
stacks::scan_stacks_chainstate_via_rocksdb_using_predicate,
},
service::{
open_readwrite_predicates_db_conn_or_panic, update_predicate_status, PredicateStatus,
},
storage::open_readonly_stacks_db_conn,
};
pub fn start_stacks_scan_runloop(
config: &Config,
stacks_scan_op_rx: crossbeam_channel::Receiver<StacksChainhookSpecification>,
observer_command_tx: Sender<ObserverCommand>,
ctx: &Context,
) {
let stacks_scan_pool = ThreadPool::new(config.limits.max_number_of_concurrent_stacks_scans);
while let Ok(predicate_spec) = stacks_scan_op_rx.recv() {
let moved_ctx = ctx.clone();
let moved_config = config.clone();
let observer_command_tx = observer_command_tx.clone();
stacks_scan_pool.execute(move || {
let stacks_db_conn =
match open_readonly_stacks_db_conn(&moved_config.expected_cache_path(), &moved_ctx)
{
Ok(db_conn) => db_conn,
Err(e) => {
error!(
moved_ctx.expect_logger(),
"unable to store stacks block: {}",
e.to_string()
);
unimplemented!()
}
};
let op = scan_stacks_chainstate_via_rocksdb_using_predicate(
&predicate_spec,
&stacks_db_conn,
&moved_config,
&moved_ctx,
);
let res = hiro_system_kit::nestable_block_on(op);
let last_block_scanned = match res {
Ok(last_block_scanned) => last_block_scanned,
Err(e) => {
warn!(
moved_ctx.expect_logger(),
"Unable to evaluate predicate on Stacks chainstate: {e}",
);
// Update predicate status in redis
if let PredicatesApi::On(ref api_config) = moved_config.http_api {
let status = PredicateStatus::Interrupted(format!(
"Unable to evaluate predicate on Stacks chainstate: {e}"
));
let mut predicates_db_conn =
open_readwrite_predicates_db_conn_or_panic(api_config, &moved_ctx);
update_predicate_status(
&predicate_spec.key(),
status,
&mut predicates_db_conn,
&moved_ctx,
);
}
return;
}
};
info!(
moved_ctx.expect_logger(),
"Stacks chainstate scan completed up to block: {}", last_block_scanned.index
);
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Stacks(predicate_spec),
));
});
}
let res = stacks_scan_pool.join();
res
}
pub fn start_bitcoin_scan_runloop(
config: &Config,
bitcoin_scan_op_rx: crossbeam_channel::Receiver<BitcoinChainhookSpecification>,
observer_command_tx: Sender<ObserverCommand>,
ctx: &Context,
) {
let bitcoin_scan_pool = ThreadPool::new(config.limits.max_number_of_concurrent_bitcoin_scans);
while let Ok(predicate_spec) = bitcoin_scan_op_rx.recv() {
let moved_ctx = ctx.clone();
let moved_config = config.clone();
let observer_command_tx = observer_command_tx.clone();
bitcoin_scan_pool.execute(move || {
let op = scan_bitcoin_chainstate_via_rpc_using_predicate(
&predicate_spec,
&moved_config,
&moved_ctx,
);
match hiro_system_kit::nestable_block_on(op) {
Ok(_) => {}
Err(e) => {
error!(
moved_ctx.expect_logger(),
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
);
// Update predicate status in redis
if let PredicatesApi::On(ref api_config) = moved_config.http_api {
let status = PredicateStatus::Interrupted(format!(
"Unable to evaluate predicate on Bitcoin chainstate: {e}"
));
let mut predicates_db_conn =
open_readwrite_predicates_db_conn_or_panic(api_config, &moved_ctx);
update_predicate_status(
&predicate_spec.key(),
status,
&mut predicates_db_conn,
&moved_ctx,
);
}
return;
}
};
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Bitcoin(predicate_spec),
));
});
}
let _ = bitcoin_scan_pool.join();
}