Skip to content

Commit

Permalink
f add CreateChannelRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
jbesraa committed Feb 26, 2024
1 parent 264c827 commit 5e0bbc6
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 71 deletions.
29 changes: 28 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ const WALLET_SYNC_INTERVAL_MINIMUM_SECS: u64 = 10;
// The length in bytes of our wallets' keys seed.
const WALLET_KEYS_SEED_LEN: usize = 64;

// Payjoin server port
const PAYJOIN_HTTP_SERVER_PORT: u16 = 3227;

#[derive(Debug, Clone)]
/// Represents the configuration of an [`Node`] instance.
///
Expand Down Expand Up @@ -581,8 +584,32 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
});
}

use hyper::server::conn::http1;
use hyper_util::rt::TokioIo;
use std::net::SocketAddr;
use tokio::net::TcpListener;

// Start the HTTP server for payjoin
let wallet = Arc::clone(&self.wallet);
runtime.spawn(async move {
pj::Receiver::start().await.unwrap();
let addr = SocketAddr::from(([127, 0, 0, 1], PAYJOIN_HTTP_SERVER_PORT));
let listener = TcpListener::bind(addr).await.unwrap();
dbg!("Started HTTP server on http://{}", addr);
// let our_pubkey= wallet.get_new_address().unwrap().script_pubkey().into_bytes();
let create_channel_request = pj::CreateChannelRequest::init(wallet);
loop {
let (stream, _) = listener.accept().await.unwrap();
let io = TokioIo::new(stream);
let clone_ccr = create_channel_request.clone();
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
.serve_connection(io, clone_ccr)
.await
{
println!("Failed to serve connection: {:?}", err);
}
});
}
});

// Regularly reconnect to channel peers.
Expand Down
205 changes: 135 additions & 70 deletions src/pj.rs
Original file line number Diff line number Diff line change
@@ -1,88 +1,153 @@
use bitcoin::address::NetworkChecked;
use hyper::HeaderMap;
use payjoin::bitcoin::{self, Amount};
use payjoin::Uri;
use crate::types::Wallet;
use bitcoin::psbt::Psbt;
use bitcoin::secp256k1::PublicKey;
use bitcoin::TxOut;
use bitcoincore_rpc::jsonrpc::serde_json;
use http_body_util::{BodyExt, Full};
use hyper::body::Incoming as IncomingBody;
use hyper::{service::Service, Request};
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::{future::Future, pin::Pin};

struct Headers(HeaderMap);

impl payjoin::receive::Headers for Headers {
fn get_header(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|v| v.to_str().ok())
}
#[derive(Clone)]
pub struct CreateChannelRequest {
wallet: Arc<Wallet>,
channel_amount_sats: Arc<Mutex<Option<u64>>>,
push_msat: Arc<Mutex<Option<u64>>>,
announce_channel: Arc<Mutex<Option<bool>>>,
peer_node_id: Arc<Mutex<Option<PublicKey>>>,
}

pub struct Receiver;
impl CreateChannelRequest {
pub fn init(wallet: Arc<Wallet>) -> Self {
Self {
wallet,
channel_amount_sats: Arc::new(Mutex::new(None)),
push_msat: Arc::new(Mutex::new(None)),
announce_channel: Arc::new(Mutex::new(None)),
peer_node_id: Arc::new(Mutex::new(None)),
}
}

impl Receiver {
pub async fn start() -> Result<(), Box<dyn std::error::Error>> {
http_server::start().await.unwrap();
Ok(())
pub fn set(
&self, channel_amount_sats: u64, push_msat: Option<u64>,
announce_channel: bool, peer_node_id: PublicKey,
) {
*self.channel_amount_sats.lock().unwrap() = Some(channel_amount_sats);
*self.push_msat.lock().unwrap() = push_msat;
*self.announce_channel.lock().unwrap() = Some(announce_channel);
*self.peer_node_id.lock().unwrap() = Some(peer_node_id);
}

fn _build_pj_uri(
address: bitcoin::Address, amount: Amount, pj: &'static str,
) -> Result<Uri<'static, NetworkChecked>, Box<dyn std::error::Error>> {
let pj_uri_string = format!("{}?amount={}&pj={}", address.to_qr_uri(), amount.to_btc(), pj);
let pj_uri = Uri::from_str(&pj_uri_string).map_err(|e| e.to_string())?;
Ok(pj_uri.assume_checked())
pub fn get(&self) -> (u64, Option<u64>, bool, Option<PublicKey>, Arc<Wallet>) {
let channel_amount_sats = self.channel_amount_sats.lock().unwrap().unwrap_or(0);
let push_msat = *self.push_msat.lock().unwrap();
let announce_channel = self.announce_channel.lock().unwrap().unwrap_or(false);
let peer_node_id = if let Some(peer_node_id) = *self.peer_node_id.lock().unwrap() {
Some(peer_node_id)
} else {
None
};
let wallet = self.wallet.clone();

(
channel_amount_sats,
push_msat,
announce_channel,
peer_node_id,
wallet
)
}
}

mod http_server {
use bytes::Bytes;
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Method, Request, Response, StatusCode};
use hyper_util::rt::TokioIo;
use std::net::SocketAddr;
use tokio::net::TcpListener;
fn make_http_response(s: String) -> Result<hyper::Response<Full<bytes::Bytes>>, hyper::Error> {
Ok(hyper::Response::builder().body(Full::new(bytes::Bytes::from(s))).unwrap())
}

pub async fn start() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let addr = SocketAddr::from(([127, 0, 0, 1], 3227));
let listener = TcpListener::bind(addr).await?;
println!("Listening on http://{}", addr);
loop {
let (stream, _) = listener.accept().await?;
let io = TokioIo::new(stream);
async fn payjoin_handler(
http_request: Request<hyper::body::Incoming>, chan_req: Arc<Mutex<CreateChannelRequest>>,
) -> Result<hyper::Response<Full<bytes::Bytes>>, hyper::Error> {
match (http_request.method(), http_request.uri().path()) {
(&hyper::Method::POST, "/payjoin") => {
let headers = http_request.headers().clone();
println!("Received payjoin request with headers: {:?}", headers);
let body = http_request.into_body().collect().await?;
let body = String::from_utf8(body.to_bytes().to_vec()).unwrap();
println!("Received payjoin request with body: {:?}", &body);
let psbt = Psbt::from_str(&body).unwrap();
let (channel_amount_sats, push_msat, announce_channel, peer_node_id, wallet) = chan_req.lock().unwrap().get();
let our_output: &TxOut = psbt
.unsigned_tx
.output
.iter()
.find(|out| wallet.is_mine(&out.script_pubkey).unwrap())
.unwrap();
assert!(our_output.value == 1000);

tokio::task::spawn(async move {
if let Err(err) =
http1::Builder::new().serve_connection(io, service_fn(request_handler)).await
{
println!("Error serving connection: {:?}", err);
}
});
}
assert_eq!(channel_amount_sats, 0);
assert_eq!(push_msat, None);
assert_eq!(announce_channel, false);
assert_eq!(peer_node_id, None);
//TODO 1: Validations
//
//TODO 2: Construct + Send OpenChannel MSG
//
//TODO 3: Await for AcceptChannel MSG
//
//TODO 4: Construct + Send FundingCreated MSG (Funding will be created using the above PSBT)
//
//TODO 5: Await for FundingSigned MSG
//
//TODO 6: Construct Final PJ PSBT and respond to sender.
make_http_response(body.into())
},
(&hyper::Method::GET, "/channel_request") => {
let (channel_amount_sats, push_msat, announce_channel, peer_node_id, _) =
chan_req.lock().unwrap().get();
let peer_node_id = if let Some(peer_node_id) = peer_node_id {
peer_node_id.to_string()
} else {
"".to_string()
};
let res = format!(
"{{\"channel_amount_sats\":{},\"push_msat\":{},\"announce_channel\":{},\"peer_node_id\":\"{}\"}}",
channel_amount_sats, push_msat.unwrap_or(0), announce_channel, peer_node_id
);
make_http_response(res)
},
(&hyper::Method::POST, "/channel_request") => {
let body = http_request.into_body().collect().await?;
let body = String::from_utf8(body.to_bytes().to_vec()).unwrap();
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
let channel_amount_sats = json["channel_amount_sats"].as_u64().unwrap();
let push_msat = json["push_msat"].as_u64();
let announce_channel = json["announce_channel"].as_bool().unwrap();
let peer_node_id = PublicKey::from_str(json["peer_node_id"].as_str().unwrap()).unwrap();
chan_req.lock().unwrap().set(
channel_amount_sats,
push_msat,
announce_channel,
peer_node_id
);
make_http_response("{}".into())
},
_ => make_http_response("404".into()),
}
}

async fn request_handler(
req: Request<hyper::body::Incoming>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
match (req.method(), req.uri().path()) {
// Serve some instructions at /
(&Method::GET, "/") => Ok(Response::new(full(
"Try POSTing data to /request_handler such as: `curl localhost:3000/request_handler -XPOST -d \"PAYJOIN ME\"`",
))),

// Simply echo the body back to the client.
(&Method::POST, "/payjoin") => Ok(Response::new(req.into_body().boxed())),
impl Service<Request<IncomingBody>> for CreateChannelRequest{
type Response = hyper::Response<Full<bytes::Bytes>>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

// Return the 404 Not Found for other routes.
_ => {
let mut not_found = Response::new(
Empty::<Bytes>::new()
.map_err(|never| match never {})
.boxed()
);
*not_found.status_mut() = StatusCode::NOT_FOUND;
Ok(not_found)
}
}
}
fn call(&self, http_request: Request<IncomingBody>) -> Self::Future {
let state = Arc::new(Mutex::new(self.clone()));
let res = match (http_request.method(), http_request.uri().path()) {
_ => payjoin_handler(http_request, state),
};

fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
Full::new(chunk.into()).map_err(|never| match never {}).boxed()
Box::pin(async { res.await })
}
}

0 comments on commit 5e0bbc6

Please sign in to comment.