Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add(scan): Implement SubscribeResults request for scan service #8253

Merged
merged 22 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4552886
processes SubscribeResults messages
arya2 Feb 8, 2024
4a62b4d
send tx ids of results to the subscribe channel
arya2 Feb 8, 2024
4f90fe6
replaces BoxError with Report in scan_range
arya2 Feb 8, 2024
2f1d4d7
adds a watch channel for using subscribed_keys in scan_range
arya2 Feb 8, 2024
5d881f6
updates args to process_messages in test
arya2 Feb 8, 2024
2156265
adds a `subscribe` method to ScanTask for sending a SubscribeResults cmd
arya2 Feb 8, 2024
830d487
updates test for process_messages to cover subscribe cmds
arya2 Feb 8, 2024
fef53b7
impls SubscribeResult service request and updates sender type
arya2 Feb 8, 2024
738fcde
adds test for SubscribeResults scan service request
arya2 Feb 8, 2024
d9f31f9
adds acceptance test
arya2 Feb 8, 2024
8f077a7
Merge branch 'main' into scan-subscribe-results
arya2 Feb 9, 2024
26a5804
updates tests and imports
arya2 Feb 9, 2024
df566b6
fixes acceptance test by using spawn_blocking to avoid blocking async…
arya2 Feb 9, 2024
b0a5012
fixes test
arya2 Feb 9, 2024
82803cb
Merge branch 'main' into scan-subscribe-results
arya2 Feb 9, 2024
b6f5461
Applies suggestions from code review.
arya2 Feb 12, 2024
5f6f4b3
use tokio mpsc channel in scan task instead of std/blocking mpsc
arya2 Feb 12, 2024
c45fbe1
use tokio mpsc channel for results sender
arya2 Feb 12, 2024
4d3099a
adds `was_parsed_keys_empty` instead of checking that all the parsed …
arya2 Feb 12, 2024
adfc640
fixes test failures related to send errors in scan task
arya2 Feb 12, 2024
d39790e
returns height and key for scan results from subcribe_results results…
arya2 Feb 12, 2024
d3b6aa9
hide scan_service mod in zebra-node-service behind feature
arya2 Feb 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5829,6 +5829,7 @@ dependencies = [
"reqwest",
"serde",
"serde_json",
"tokio",
"zebra-chain",
]

Expand Down
2 changes: 1 addition & 1 deletion zebra-grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ color-eyre = "0.6.2"

zcash_primitives = { version = "0.13.0-rc.1" }

zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.34" }
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.34", features = ["shielded-scan"] }

[build-dependencies]
tonic-build = "0.10.2"
3 changes: 3 additions & 0 deletions zebra-node-services/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ rpc-client = [
"serde_json",
]

shielded-scan = ["tokio"]

[dependencies]
zebra-chain = { path = "../zebra-chain" , version = "1.0.0-beta.34" }

Expand All @@ -46,6 +48,7 @@ jsonrpc-core = { version = "18.0.0", optional = true }
reqwest = { version = "0.11.24", default-features = false, features = ["rustls-tls"], optional = true }
serde = { version = "1.0.196", optional = true }
serde_json = { version = "1.0.113", optional = true }
tokio = { version = "1.36.0", features = ["time"], optional = true }

[dev-dependencies]

Expand Down
1 change: 1 addition & 0 deletions zebra-node-services/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ pub mod rpc_client;
/// parameterized by 'a), *not* that the object itself has 'static lifetime.
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

#[cfg(feature = "shielded-scan")]
pub mod scan_service;
6 changes: 4 additions & 2 deletions zebra-node-services/src/scan_service/request.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! `zebra_scan::service::ScanService` request types.

use std::collections::HashSet;

use crate::BoxError;

/// The maximum number of keys that may be included in a request to the scan service
Expand All @@ -23,8 +25,8 @@ pub enum Request {
/// Accept keys and return transaction data
Results(Vec<String>),

/// TODO: Accept `KeyHash`es and return a channel receiver
SubscribeResults(Vec<()>),
/// Accept keys and return a channel receiver for transaction data
SubscribeResults(HashSet<String>),

/// Clear the results for a set of viewing keys
ClearResults(Vec<String>),
Expand Down
27 changes: 19 additions & 8 deletions zebra-node-services/src/scan_service/response.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
//! `zebra_scan::service::ScanService` response types.

use std::{
collections::BTreeMap,
sync::{mpsc, Arc},
};
use std::collections::BTreeMap;

use zebra_chain::{block::Height, transaction::Hash};
use zebra_chain::{block::Height, transaction};

/// A relevant transaction for a key and the block height where it was found.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ScanResult {
/// The key that successfully decrypts the transaction
pub key: String,

/// The height of the block with the transaction
pub height: Height,

/// A transaction ID, which uniquely identifies mined v5 transactions,
/// and all v1-v4 transactions.
pub tx_id: transaction::Hash,
}

#[derive(Debug)]
/// Response types for `zebra_scan::service::ScanService`
Expand All @@ -24,14 +35,14 @@ pub enum Response {
/// Response to [`Results`](super::request::Request::Results) request
///
/// We use the nested `BTreeMap` so we don't repeat any piece of response data.
Results(BTreeMap<String, BTreeMap<Height, Vec<Hash>>>),
Results(BTreeMap<String, BTreeMap<Height, Vec<transaction::Hash>>>),

/// Response to [`DeleteKeys`](super::request::Request::DeleteKeys) request
DeletedKeys,

/// Response to [`ClearResults`](super::request::Request::ClearResults) request
ClearedResults,

/// Response to `SubscribeResults` request
SubscribeResults(mpsc::Receiver<Arc<Hash>>),
/// Response to [`SubscribeResults`](super::request::Request::SubscribeResults) request
SubscribeResults(tokio::sync::mpsc::Receiver<ScanResult>),
}
2 changes: 1 addition & 1 deletion zebra-scan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ zcash_primitives = "0.13.0-rc.1"

zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.34" }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.34", features = ["shielded-scan"] }
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.33" }
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.34", features = ["shielded-scan"] }
zebra-grpc = { path = "../zebra-grpc", version = "0.1.0-alpha.1" }

chrono = { version = "0.4.33", default-features = false, features = ["clock", "std", "serde"] }
Expand Down
2 changes: 1 addition & 1 deletion zebra-scan/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub fn spawn_init(
tokio::task::spawn_blocking(move || Storage::new(&config, network, false))
.wait_for_panics()
.await;
let (_cmd_sender, cmd_receiver) = std::sync::mpsc::channel();
let (_cmd_sender, cmd_receiver) = tokio::sync::mpsc::channel(1);
scan::start(state, chain_tip_change, storage, cmd_receiver).await
}
.in_current_span(),
Expand Down
13 changes: 10 additions & 3 deletions zebra-scan/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub mod scan_task;
pub use scan_task::{ScanTask, ScanTaskCommand};

#[cfg(any(test, feature = "proptest-impl"))]
use std::sync::mpsc::Receiver;
use tokio::sync::mpsc::Receiver;

/// Zebra-scan [`tower::Service`]
#[derive(Debug)]
Expand Down Expand Up @@ -165,8 +165,15 @@ impl Service<Request> for ScanService {
.boxed();
}

Request::SubscribeResults(_key_hashes) => {
// TODO: send key_hashes and mpsc::Sender to scanner task, return mpsc::Receiver to caller
Request::SubscribeResults(keys) => {
let mut scan_task = self.scan_task.clone();

return async move {
let results_receiver = scan_task.subscribe(keys)?;

Ok(Response::SubscribeResults(results_receiver))
}
.boxed();
}

Request::ClearResults(keys) => {
Expand Down
9 changes: 6 additions & 3 deletions zebra-scan/src/service/scan_task.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Types and method implementations for [`ScanTask`]

use std::sync::{mpsc, Arc};
use std::sync::Arc;

use color_eyre::Report;
use tokio::task::JoinHandle;
Expand All @@ -25,14 +25,17 @@ pub struct ScanTask {
pub handle: Arc<JoinHandle<Result<(), Report>>>,

/// Task command channel sender
pub cmd_sender: mpsc::Sender<ScanTaskCommand>,
pub cmd_sender: tokio::sync::mpsc::Sender<ScanTaskCommand>,
}

/// The size of the command channel buffer
const SCAN_TASK_BUFFER_SIZE: usize = 100;

impl ScanTask {
/// Spawns a new [`ScanTask`].
pub fn spawn(db: Storage, state: scan::State, chain_tip_change: ChainTipChange) -> Self {
// TODO: Use a bounded channel or move this logic to the scan service or another service.
let (cmd_sender, cmd_receiver) = mpsc::channel();
let (cmd_sender, cmd_receiver) = tokio::sync::mpsc::channel(SCAN_TASK_BUFFER_SIZE);

Self {
handle: Arc::new(scan::spawn_init(db, state, chain_tip_change, cmd_receiver)),
Expand Down
89 changes: 65 additions & 24 deletions zebra-scan/src/service/scan_task/commands.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
//! Types and method implementations for [`ScanTaskCommand`]

use std::{
collections::HashMap,
sync::{
mpsc::{self, Receiver, TryRecvError},
Arc,
},
};
use std::collections::{HashMap, HashSet};

use color_eyre::{eyre::eyre, Report};
use tokio::sync::oneshot;
use tokio::sync::{
mpsc::{error::TrySendError, Receiver, Sender},
oneshot,
};

use zcash_primitives::{sapling::SaplingIvk, zip32::DiversifiableFullViewingKey};
use zebra_chain::{block::Height, parameters::Network, transaction::Transaction};
use zebra_chain::{block::Height, parameters::Network};
use zebra_node_services::scan_service::response::ScanResult;
use zebra_state::SaplingScanningKey;

use crate::scan::sapling_key_to_scan_block_keys;

use super::ScanTask;

const RESULTS_SENDER_BUFFER_SIZE: usize = 100;

#[derive(Debug)]
/// Commands that can be sent to [`ScanTask`]
pub enum ScanTaskCommand {
Expand All @@ -40,13 +40,12 @@ pub enum ScanTaskCommand {
},

/// Start sending results for key hashes to `result_sender`
// TODO: Implement this command (#8206)
SubscribeResults {
/// Sender for results
result_sender: mpsc::Sender<Arc<Transaction>>,
result_sender: Sender<ScanResult>,

/// Key hashes to send the results of to result channel
keys: Vec<String>,
keys: HashSet<String>,
},
}

Expand All @@ -57,17 +56,26 @@ impl ScanTask {
///
/// Returns newly registered keys for scanning.
pub fn process_messages(
cmd_receiver: &Receiver<ScanTaskCommand>,
parsed_keys: &mut HashMap<
cmd_receiver: &mut tokio::sync::mpsc::Receiver<ScanTaskCommand>,
registered_keys: &mut HashMap<
SaplingScanningKey,
(Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>),
>,
network: Network,
) -> Result<
HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>, Height)>,
(
HashMap<
SaplingScanningKey,
(Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>, Height),
>,
HashMap<SaplingScanningKey, Sender<ScanResult>>,
),
Report,
> {
use tokio::sync::mpsc::error::TryRecvError;

let mut new_keys = HashMap::new();
let mut new_result_senders = HashMap::new();
let sapling_activation_height = network.sapling_activation_height();

loop {
Expand All @@ -90,7 +98,9 @@ impl ScanTask {
// Don't accept keys that:
// 1. the scanner already has, and
// 2. were already submitted.
if parsed_keys.contains_key(&key.0) && !new_keys.contains_key(&key.0) {
if registered_keys.contains_key(&key.0)
&& !new_keys.contains_key(&key.0)
{
return None;
}

Expand All @@ -116,42 +126,55 @@ impl ScanTask {

new_keys.extend(keys.clone());

parsed_keys.extend(
registered_keys.extend(
keys.into_iter()
.map(|(key, (dfvks, ivks, _))| (key, (dfvks, ivks))),
);
}

ScanTaskCommand::RemoveKeys { done_tx, keys } => {
for key in keys {
parsed_keys.remove(&key);
registered_keys.remove(&key);
new_keys.remove(&key);
}

// Ignore send errors for the done notification, caller is expected to use a timeout.
let _ = done_tx.send(());
}

_ => continue,
ScanTaskCommand::SubscribeResults {
result_sender,
keys,
} => {
let keys = keys
.into_iter()
.filter(|key| registered_keys.contains_key(key));

for key in keys {
new_result_senders.insert(key, result_sender.clone());
}
}
}
}

Ok(new_keys)
Ok((new_keys, new_result_senders))
}

/// Sends a command to the scan task
pub fn send(
&mut self,
command: ScanTaskCommand,
) -> Result<(), mpsc::SendError<ScanTaskCommand>> {
self.cmd_sender.send(command)
) -> Result<(), tokio::sync::mpsc::error::TrySendError<ScanTaskCommand>> {
self.cmd_sender.try_send(command)
}

/// Sends a message to the scan task to remove the provided viewing keys.
///
/// Returns a oneshot channel receiver to notify the caller when the keys have been removed.
pub fn remove_keys(
&mut self,
keys: &[String],
) -> Result<oneshot::Receiver<()>, mpsc::SendError<ScanTaskCommand>> {
) -> Result<oneshot::Receiver<()>, TrySendError<ScanTaskCommand>> {
let (done_tx, done_rx) = oneshot::channel();

self.send(ScanTaskCommand::RemoveKeys {
Expand All @@ -166,11 +189,29 @@ impl ScanTask {
pub fn register_keys(
&mut self,
keys: Vec<(String, Option<u32>)>,
) -> Result<oneshot::Receiver<Vec<String>>, mpsc::SendError<ScanTaskCommand>> {
) -> Result<oneshot::Receiver<Vec<String>>, TrySendError<ScanTaskCommand>> {
let (rsp_tx, rsp_rx) = oneshot::channel();

self.send(ScanTaskCommand::RegisterKeys { keys, rsp_tx })?;

Ok(rsp_rx)
}

/// Sends a message to the scan task to start sending the results for the provided viewing keys to a channel.
///
/// Returns the channel receiver.
pub fn subscribe(
&mut self,
keys: HashSet<SaplingScanningKey>,
) -> Result<Receiver<ScanResult>, TrySendError<ScanTaskCommand>> {
// TODO: Use a bounded channel
let (result_sender, result_receiver) =
tokio::sync::mpsc::channel(RESULTS_SENDER_BUFFER_SIZE);

self.send(ScanTaskCommand::SubscribeResults {
result_sender,
keys,
})
.map(|_| result_receiver)
}
}
Loading
Loading