Skip to content

Commit

Permalink
Add VSS Store Implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
G8XSU committed Sep 26, 2023
1 parent 422514e commit d10bf2e
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ jobs:
if: matrix.msrv
run: |
cargo update -p hashlink --precise "0.8.2" --verbose # hashlink 0.8.3 requires hashbrown 0.14, requiring 1.64.0
cargo update -p petgraph --precise "0.6.3" --verbose # petgraph v0.6.4, requires rustc 1.64 or newer
- name: Build on Rust ${{ matrix.toolchain }}
run: cargo build --verbose --color always
- name: Build with UniFFI support on Rust ${{ matrix.toolchain }}
Expand Down
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ panic = 'abort' # Abort on panic

[features]
default = []
vss = []
vss-test = []

[dependencies]
#lightning = { version = "0.0.116", features = ["max_level_trace", "std"] }
Expand Down Expand Up @@ -78,6 +80,7 @@ tokio = { version = "1", default-features = false, features = [ "rt-multi-thread
esplora-client = { version = "0.4", default-features = false }
libc = "0.2"
uniffi = { version = "0.23.0", features = ["build"], optional = true }
vss-client = "0.1"

[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3", features = ["winbase"] }
Expand Down
12 changes: 12 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ use bip39::Mnemonic;

use bitcoin::BlockHash;

#[cfg(feature = "vss")]
use crate::io::vss_store::VssStore;
use std::convert::TryInto;
use std::default::Default;
use std::fmt;
Expand Down Expand Up @@ -264,6 +266,16 @@ impl NodeBuilder {
self.build_with_store(kv_store)
}

/// Builds a [`Node`] instance with a [`VssStore`] backend and according to the options
/// previously configured.
#[cfg(feature = "vss")]
pub fn build_with_vss_store(
&self, url: &str, store_id: String,
) -> Result<Node<VssStore>, BuildError> {
let vss = Arc::new(VssStore::new(url, store_id));
self.build_with_store(vss)
}

/// Builds a [`Node`] instance according to the options previously configured.
pub fn build_with_store<K: KVStore + Sync + Send + 'static>(
&self, kv_store: Arc<K>,
Expand Down
1 change: 1 addition & 0 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod sqlite_store;
#[cfg(test)]
pub(crate) mod test_utils;
pub(crate) mod utils;
pub(crate) mod vss_store;

/// The event queue will be persisted under this key.
pub(crate) const EVENT_QUEUE_PERSISTENCE_NAMESPACE: &str = "";
Expand Down
183 changes: 183 additions & 0 deletions src/io/vss_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
use io::Error;
use std::io;
use std::io::ErrorKind;
#[cfg(test)]
use std::panic::RefUnwindSafe;

use crate::io::utils::check_namespace_key_validity;
use lightning::util::persist::KVStore;
use tokio::runtime::Runtime;
use vss_client::client::VssClient;
use vss_client::error::VssError;
use vss_client::types::{
DeleteObjectRequest, GetObjectRequest, KeyValue, ListKeyVersionsRequest, PutObjectRequest,
};

/// A [`KVStore`] implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend.
pub struct VssStore {
client: VssClient,
store_id: String,
runtime: Runtime,
}

impl VssStore {
#[cfg(feature = "vss")]
pub(crate) fn new(base_url: &str, store_id: String) -> Self {
let client = VssClient::new(base_url);
let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
Self { client, store_id, runtime }
}

fn build_key(&self, namespace: &str, sub_namespace: &str, key: &str) -> io::Result<String> {
if key.is_empty() {
return Err(Error::new(ErrorKind::Other, "Empty key is not allowed"));
}
// But namespace and sub_namespace can be empty
if namespace.is_empty() {
Ok(key.to_string())
} else {
Ok(format!("{}#{}#{}", namespace, sub_namespace, key))
}
}

fn split_key(&self, key: &str) -> io::Result<(String, String, String)> {
let parts: Vec<&str> = key.split('#').collect();
match parts.as_slice() {
[namespace, sub_namespace, actual_key] => {
Ok((namespace.to_string(), sub_namespace.to_string(), actual_key.to_string()))
}
_ => Err(Error::new(ErrorKind::InvalidData, "Invalid key format")),
}
}

async fn list_all_keys(&self, namespace: &str, sub_namespace: &str) -> io::Result<Vec<String>> {
let mut page_token = None;
let mut keys = vec![];
let key_prefix = format!("{}#{}", namespace, sub_namespace);
while page_token != Some("".to_string()) {
let request = ListKeyVersionsRequest {
store_id: self.store_id.to_string(),
key_prefix: Some(key_prefix.to_string()),
page_token,
page_size: None,
};

let response = self.client.list_key_versions(&request).await.map_err(|e| {
let msg = format!("Failed to list keys in {}/{}: {}", namespace, sub_namespace, e);
Error::new(ErrorKind::Other, msg)
})?;

for kv in response.key_versions {
keys.push(self.split_key(&kv.key)?.2);
}
page_token = response.next_page_token;
}
Ok(keys)
}
}

impl KVStore for VssStore {
fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> io::Result<Vec<u8>> {
check_namespace_key_validity(namespace, sub_namespace, Some(key), "read")?;
let request = GetObjectRequest {
store_id: self.store_id.to_string(),
key: self.build_key(namespace, sub_namespace, key)?,
};
// self.runtime.spawn()
let resp =
tokio::task::block_in_place(|| self.runtime.block_on(self.client.get_object(&request)))
.map_err(|e| match e {
VssError::NoSuchKeyError(..) => {
let msg = format!(
"Failed to read as key could not be found: {}/{}. Details: {}",
namespace, key, e
);
Error::new(ErrorKind::NotFound, msg)
}
_ => {
let msg = format!("Failed to read from key {}/{}: {}", namespace, key, e);
Error::new(ErrorKind::Other, msg)
}
})?;
Ok(resp.value.unwrap().value)
}

fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> {
check_namespace_key_validity(namespace, sub_namespace, Some(key), "write")?;
let request = PutObjectRequest {
store_id: self.store_id.to_string(),
global_version: None,
transaction_items: vec![KeyValue {
key: self.build_key(namespace, sub_namespace, key)?,
version: -1,
value: buf.to_vec(),
}],
delete_items: vec![],
};

tokio::task::block_in_place(|| self.runtime.block_on(self.client.put_object(&request)))
.map_err(|e| {
let msg = format!("Failed to write to key {}/{}: {}", namespace, key, e);
Error::new(ErrorKind::Other, msg)
})?;

Ok(())
}

fn remove(
&self, namespace: &str, sub_namespace: &str, key: &str, _lazy: bool,
) -> io::Result<()> {
check_namespace_key_validity(namespace, sub_namespace, Some(key), "remove")?;
let request = DeleteObjectRequest {
store_id: self.store_id.to_string(),
key_value: Some(KeyValue {
key: self.build_key(namespace, sub_namespace, key)?,
version: -1,
value: vec![],
}),
};

tokio::task::block_in_place(|| self.runtime.block_on(self.client.delete_object(&request)))
.map_err(|e| {
let msg = format!("Failed to delete key {}/{}: {}", namespace, key, e);
Error::new(ErrorKind::Other, msg)
})?;
Ok(())
}

fn list(&self, namespace: &str, sub_namespace: &str) -> io::Result<Vec<String>> {
check_namespace_key_validity(namespace, sub_namespace, None, "list")?;

let keys = tokio::task::block_in_place(|| {
self.runtime.block_on(self.list_all_keys(namespace, sub_namespace))
})
.map_err(|e| {
let msg = format!("Failed to retrieve keys in namespace: {} : {}", namespace, e);
Error::new(ErrorKind::Other, msg)
})?;

Ok(keys)
}
}

#[cfg(test)]
impl RefUnwindSafe for VssStore {}

#[cfg(test)]
#[cfg(feature = "vss-test")]
mod tests {
use super::*;
use crate::io::test_utils::do_read_write_remove_list_persist;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};

#[test]
fn read_write_remove_list_persist() {
let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap();
let mut rng = thread_rng();
let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect();
let vss_store = VssStore::new(&vss_base_url, rand_store_id);

do_read_write_remove_list_persist(&vss_store);
}
}
23 changes: 23 additions & 0 deletions src/test/functional_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,29 @@ fn channel_full_cycle() {
do_channel_full_cycle(node_a, node_b, &bitcoind, &electrsd, false);
}

#[test]
#[cfg(feature = "vss-test")]
fn channel_full_cycle_with_vss_store() {
let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();
println!("== Node A ==");
let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap());
let config_a = random_config();
let mut builder_a = NodeBuilder::from_config(config_a);
builder_a.set_esplora_server(esplora_url.clone());
let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap();
let node_a = builder_a.build_with_vss_store(&vss_base_url, "node_1_store".to_string()).unwrap();
node_a.start().unwrap();

println!("\n== Node B ==");
let config_b = random_config();
let mut builder_b = NodeBuilder::from_config(config_b);
builder_b.set_esplora_server(esplora_url);
let node_b = builder_b.build_with_vss_store(&vss_base_url, "node_2_store".to_string()).unwrap();
node_b.start().unwrap();

do_channel_full_cycle(node_a, node_b, &bitcoind, &electrsd, false);
}

#[test]
fn channel_full_cycle_0conf() {
let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();
Expand Down

0 comments on commit d10bf2e

Please sign in to comment.