Skip to content

Commit 41ef9cd

Browse files
committed
[bitcoind_rpc] Initial work on RPC example
1 parent 492b7a4 commit 41ef9cd

File tree

6 files changed

+310
-5
lines changed

6 files changed

+310
-5
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ members = [
88
"crates/bitcoind_rpc",
99
"example-crates/example_cli",
1010
"example-crates/example_electrum",
11+
"example-crates/example_rpc",
1112
"example-crates/wallet_electrum",
1213
"example-crates/wallet_esplora",
1314
"example-crates/wallet_esplora_async",

crates/bitcoind_rpc/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[package]
2-
name = "bitcoind_rpc"
2+
name = "bdk_bitcoind_rpc"
33
version = "0.1.0"
44
edition = "2021"
55

crates/bitcoind_rpc/src/lib.rs

+9-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use bdk_chain::{
66
local_chain::CheckPoint,
77
BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, TxGraph,
88
};
9+
pub use bitcoincore_rpc;
910
use bitcoincore_rpc::{bitcoincore_rpc_json::GetBlockResult, Client, RpcApi};
1011

1112
#[derive(Debug, Clone)]
@@ -51,6 +52,10 @@ pub fn confirmation_time_anchor(
5152
}
5253

5354
impl BitcoindRpcItem {
55+
pub fn is_mempool(&self) -> bool {
56+
matches!(self, Self::Mempool { .. })
57+
}
58+
5459
pub fn into_update<K, A, F>(self, anchor: F) -> LocalUpdate<K, A>
5560
where
5661
A: Clone + Ord + PartialOrd,
@@ -145,15 +150,17 @@ impl<'a> BitcoindRpcIter<'a> {
145150
// block is not in the main chain
146151
continue 'cp_loop;
147152
}
153+
148154
// agreement
149-
// next loop
150155
*last_cp = Some(cp);
151156
*last_info = Some(info);
157+
continue 'main_loop;
152158
}
153159

154160
// no point of agreement found
155161
// next loop will emit block @ fallback height
156162
*last_cp = None;
163+
*last_info = None;
157164
}
158165
(Some(last_cp), last_info @ Some(_)) => {
159166
// find next block
@@ -212,7 +219,7 @@ impl<'a> BitcoindRpcIter<'a> {
212219
}
213220
}
214221
}
215-
(None, Some(_)) => unreachable!(),
222+
(None, Some(info)) => unreachable!("got info with no checkpoint? info={:#?}", info),
216223
}
217224
}
218225
}

example-crates/example_electrum/src/main.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ use example_cli::{
2222
};
2323

2424
const DB_MAGIC: &[u8] = b"bdk_example_electrum";
25-
const DB_PATH: &str = ".bdk_electrum_example.db";
26-
// const ASSUME_FINAL_DEPTH: usize = 10;
25+
const DB_PATH: &str = ".bdk_example_electrum.db";
2726

2827
#[derive(Subcommand, Debug, Clone)]
2928
enum ElectrumCommands {

example-crates/example_rpc/Cargo.toml

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
[package]
2+
name = "example_rpc"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
8+
[dependencies]
9+
bdk_chain = { path = "../../crates/chain", features = ["serde"] }
10+
bdk_bitcoind_rpc = { path = "../../crates/bitcoind_rpc" }
11+
example_cli = { path = "../example_cli" }
12+
ctrlc = { version = "3.2.4" }
+286
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
use std::{
2+
path::PathBuf,
3+
sync::{
4+
atomic::{AtomicBool, Ordering},
5+
mpsc::sync_channel,
6+
Arc, Mutex,
7+
},
8+
time::{Duration, Instant, SystemTime},
9+
};
10+
11+
use bdk_bitcoind_rpc::{
12+
bitcoincore_rpc::{Auth, Client, RpcApi},
13+
confirmation_time_anchor, BitcoindRpcItem, BitcoindRpcIter,
14+
};
15+
use bdk_chain::{
16+
bitcoin::{Address, Transaction},
17+
indexed_tx_graph::IndexedAdditions,
18+
keychain::{LocalChangeSet, LocalUpdate},
19+
local_chain::LocalChain,
20+
Append, ConfirmationTimeAnchor, IndexedTxGraph,
21+
};
22+
use example_cli::{
23+
anyhow,
24+
clap::{self, Args, Subcommand},
25+
CoinSelectionAlgo, Keychain,
26+
};
27+
28+
const DB_MAGIC: &[u8] = b"bdk_example_rpc";
29+
const DB_PATH: &str = ".bdk_example_rpc.db";
30+
const CHANNEL_BOUND: usize = 10;
31+
const LIVE_POLL_DUR_SECS: Duration = Duration::from_secs(15);
32+
33+
type ChangeSet = LocalChangeSet<Keychain, ConfirmationTimeAnchor>;
34+
35+
#[derive(Args, Debug, Clone)]
36+
struct RpcArgs {
37+
/// RPC URL
38+
#[clap(env = "RPC_URL", long, default_value = "127.0.0.1:8332")]
39+
url: String,
40+
/// RPC auth cookie file
41+
#[clap(env = "RPC_COOKIE", long)]
42+
rpc_cookie: Option<PathBuf>,
43+
/// RPC auth username
44+
#[clap(env = "RPC_USER", long)]
45+
rpc_user: Option<String>,
46+
/// RPC auth password
47+
#[clap(env = "RPC_PASS", long)]
48+
rpc_password: Option<String>,
49+
}
50+
51+
impl From<RpcArgs> for Auth {
52+
fn from(args: RpcArgs) -> Self {
53+
match (args.rpc_cookie, args.rpc_user, args.rpc_password) {
54+
(None, None, None) => Self::None,
55+
(Some(path), _, _) => Self::CookieFile(path),
56+
(_, Some(user), Some(pass)) => Self::UserPass(user, pass),
57+
(_, Some(_), None) => panic!("rpc auth: missing rpc_pass"),
58+
(_, None, Some(_)) => panic!("rpc auth: missing rpc_user"),
59+
}
60+
}
61+
}
62+
63+
#[derive(Subcommand, Debug, Clone)]
64+
enum RpcCommands {
65+
/// Scans blocks via RPC (starting from last point of agreement) and stores/indexes relevant
66+
/// transactions
67+
Scan {
68+
/// Starting block height to fallback to if no point of agreement if found
69+
#[clap(env = "FALLBACK_HEIGHT", long, default_value = "0")]
70+
fallback_height: u32,
71+
/// The unused-scripts lookahead will be kept at this size
72+
#[clap(long, default_value = "10")]
73+
lookahead: u32,
74+
/// Whether to be live!
75+
#[clap(long, default_value = "false")]
76+
live: bool,
77+
#[clap(flatten)]
78+
rpc_args: RpcArgs,
79+
},
80+
/// Create and broadcast a transaction.
81+
Tx {
82+
value: u64,
83+
address: Address,
84+
#[clap(short, default_value = "bnb")]
85+
coin_select: CoinSelectionAlgo,
86+
#[clap(flatten)]
87+
rpc_args: RpcArgs,
88+
},
89+
}
90+
91+
impl RpcCommands {
92+
fn rpc_args(&self) -> &RpcArgs {
93+
match self {
94+
RpcCommands::Scan { rpc_args, .. } => rpc_args,
95+
RpcCommands::Tx { rpc_args, .. } => rpc_args,
96+
}
97+
}
98+
}
99+
100+
fn main() -> anyhow::Result<()> {
101+
let sigterm_flag = start_ctrlc_handler();
102+
103+
let (args, keymap, index, db, init_changeset) =
104+
example_cli::init::<RpcCommands, ChangeSet>(DB_MAGIC, DB_PATH)?;
105+
106+
let graph = Mutex::new({
107+
let mut graph = IndexedTxGraph::new(index);
108+
graph.apply_additions(init_changeset.indexed_additions);
109+
graph
110+
});
111+
112+
let chain = Mutex::new(LocalChain::from_changeset(init_changeset.chain_changeset));
113+
114+
let rpc_cmd = match args.command {
115+
example_cli::Commands::ChainSpecific(rpc_cmd) => rpc_cmd,
116+
general_cmd => {
117+
let res = example_cli::handle_commands(
118+
&graph,
119+
&db,
120+
&chain,
121+
&keymap,
122+
args.network,
123+
|_| Err(anyhow::anyhow!("use `tx` instead")),
124+
general_cmd,
125+
);
126+
db.lock().unwrap().commit()?;
127+
return res;
128+
}
129+
};
130+
131+
let rpc_client = {
132+
let a = rpc_cmd.rpc_args();
133+
Client::new(
134+
&a.url,
135+
match (&a.rpc_cookie, &a.rpc_user, &a.rpc_password) {
136+
(None, None, None) => Auth::None,
137+
(Some(path), _, _) => Auth::CookieFile(path.clone()),
138+
(_, Some(user), Some(pass)) => Auth::UserPass(user.clone(), pass.clone()),
139+
(_, Some(_), None) => panic!("rpc auth: missing rpc_pass"),
140+
(_, None, Some(_)) => panic!("rpc auth: missing rpc_user"),
141+
},
142+
)?
143+
};
144+
145+
match rpc_cmd {
146+
RpcCommands::Scan {
147+
fallback_height,
148+
lookahead,
149+
live,
150+
..
151+
} => {
152+
graph.lock().unwrap().index.set_lookahead_for_all(lookahead);
153+
154+
let (chan, recv) = sync_channel::<(BitcoindRpcItem, u32)>(CHANNEL_BOUND);
155+
let prev_cp = chain.lock().unwrap().tip();
156+
157+
let join_handle = std::thread::spawn(move || -> anyhow::Result<()> {
158+
let mut tip_height = Option::<u32>::None;
159+
160+
for item in BitcoindRpcIter::new(&rpc_client, fallback_height, prev_cp) {
161+
let item = item?;
162+
let is_block = !item.is_mempool();
163+
let is_mempool = item.is_mempool();
164+
165+
if tip_height.is_none() || !is_block {
166+
tip_height = Some(rpc_client.get_block_count()? as u32);
167+
}
168+
chan.send((item, tip_height.expect("must have tip height")))?;
169+
170+
if sigterm_flag.load(Ordering::Acquire) {
171+
return Ok(());
172+
}
173+
if is_mempool {
174+
if !live {
175+
return Ok(());
176+
}
177+
if await_flag(&sigterm_flag, LIVE_POLL_DUR_SECS) {
178+
return Ok(());
179+
}
180+
}
181+
}
182+
unreachable!()
183+
});
184+
185+
let mut start = Instant::now();
186+
187+
for (item, tip_height) in recv {
188+
let is_mempool = item.is_mempool();
189+
let update: LocalUpdate<Keychain, ConfirmationTimeAnchor> =
190+
item.into_update(confirmation_time_anchor);
191+
192+
let current_height = update.tip.height();
193+
194+
if start.elapsed() >= Duration::from_secs(3) {
195+
start = Instant::now();
196+
println!(
197+
"* scanned_to: {} / {} tip",
198+
if is_mempool {
199+
"mempool".to_string()
200+
} else {
201+
current_height.to_string()
202+
},
203+
tip_height,
204+
);
205+
}
206+
207+
let db_changeset = {
208+
let mut chain = chain.lock().unwrap();
209+
let mut graph = graph.lock().unwrap();
210+
211+
let chain_changeset = chain.apply_update(update.tip)?;
212+
213+
let mut indexed_additions =
214+
IndexedAdditions::<ConfirmationTimeAnchor, _>::default();
215+
let (_, index_additions) = graph.index.reveal_to_target_multi(&update.keychain);
216+
indexed_additions.append(index_additions.into());
217+
indexed_additions.append(graph.prune_and_apply_update(update.graph));
218+
219+
ChangeSet {
220+
indexed_additions,
221+
chain_changeset,
222+
}
223+
};
224+
225+
let mut db = db.lock().unwrap();
226+
db.stage(db_changeset);
227+
}
228+
229+
db.lock().unwrap().commit()?;
230+
println!("commited to database!");
231+
232+
join_handle
233+
.join()
234+
.expect("failed to join chain source thread")
235+
}
236+
RpcCommands::Tx {
237+
value,
238+
address,
239+
coin_select,
240+
..
241+
} => {
242+
let chain = chain.lock().unwrap();
243+
let broadcast = move |tx: &Transaction| -> anyhow::Result<()> {
244+
rpc_client.send_raw_transaction(tx)?;
245+
Ok(())
246+
};
247+
example_cli::run_send_cmd(
248+
&graph,
249+
&db,
250+
&*chain,
251+
&keymap,
252+
coin_select,
253+
address,
254+
value,
255+
broadcast,
256+
)
257+
}
258+
}
259+
}
260+
261+
fn start_ctrlc_handler() -> Arc<AtomicBool> {
262+
let flag = Arc::new(AtomicBool::new(false));
263+
let cloned_flag = flag.clone();
264+
265+
ctrlc::set_handler(move || cloned_flag.store(true, Ordering::Release))
266+
.expect("failed to set Ctrl+C handler");
267+
268+
flag
269+
}
270+
271+
fn await_flag(flag: &AtomicBool, duration: Duration) -> bool {
272+
let start = SystemTime::now();
273+
loop {
274+
if flag.load(Ordering::Acquire) {
275+
return true;
276+
}
277+
if SystemTime::now()
278+
.duration_since(start)
279+
.expect("should succeed")
280+
>= duration
281+
{
282+
return false;
283+
}
284+
std::thread::sleep(Duration::from_secs(1));
285+
}
286+
}

0 commit comments

Comments
 (0)