Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: rebase development #131

Merged
merged 175 commits into from
May 4, 2022
Merged
Changes from 1 commit
Commits
Show all changes
175 commits
Select commit Hold shift + click to select a range
c9a9bfe
test: fix coverage action (#4036)
Cifko Apr 26, 2022
d3d4acd
ci: move npm audit to development only (#4055)
stringhandler Apr 28, 2022
a66502b
chore(deps): bump async from 2.6.3 to 2.6.4 in /applications/launchpa…
dependabot[bot] Apr 28, 2022
9e0ec36
feat(tari_explorer): add total hashrate chart (#4054)
mrnaveira Apr 28, 2022
93bfb88
chore(deps): bump ejs from 3.1.6 to 3.1.7 in /applications/tari_web_e…
dependabot[bot] Apr 28, 2022
6a975ae
chore: remove unused else (#4051)
SWvheerden Apr 28, 2022
24f8aac
refactor(base-node): use existing peer feature methods to check if is…
mrnaveira Apr 28, 2022
5ebf129
fix: makes header consensus encoding infallible (#4045)
sdbondi Apr 28, 2022
fc1aa65
fix(wallet): do not prompt for password if given in config (#4040)
sdbondi Apr 28, 2022
0ab487f
test: cucumber saf test (#3135)
Cifko Apr 28, 2022
1fe5811
chore: obscure grpc error response (#3995)
Cifko Apr 28, 2022
d01d94f
test(covenant): improve test coverage (#4052)
sdbondi Apr 29, 2022
7097185
fix: weird behaviour of dates in base node banned peers (#4037)
mrnaveira Apr 29, 2022
aca072e
chore(deps): bump async from 2.6.3 to 2.6.4 in /applications/tari_web…
dependabot[bot] Apr 29, 2022
8b7877e
chore(deps): bump async from 3.2.1 to 3.2.3 in /integration_tests (#4…
dependabot[bot] Apr 29, 2022
7032667
docs: add key manager docs (#4050)
SWvheerden Apr 29, 2022
190d75a
fix: only count base nodes in peers count in base node status (#4039)
mrnaveira Apr 29, 2022
50a4d19
docs(comms): adds documentation for comms public interface (#4033)
sdbondi Apr 29, 2022
dacb3cd
refactor(dht): use CipherKey new type for diffie-hellman key (#4038)
sdbondi Apr 29, 2022
c37d1ba
refactor(rpc-macros): split into smaller functions (clippy) (#4063)
sdbondi Apr 29, 2022
b15d682
fix: update daily test configuration (#4049)
sdbondi Apr 30, 2022
2ad5c51
refactor(comms): reduce length of long functions (clippy) (#4065)
sdbondi May 3, 2022
ed8a769
test(cucumber): use separate FFI target dir (#4067)
sdbondi May 3, 2022
c2d60b3
fix(key-manager): remove floating point math from mnemonic code (#4064)
sdbondi May 3, 2022
8c78717
feat(p2p): adds tor.forward_address setting (#4070)
sdbondi May 3, 2022
a4e3be8
ci: fix coverage (#4071)
stringhandler May 3, 2022
6d21bc2
chore: remove deprecated ExtendBytes, update EpochTime (#3914)
sdbondi May 3, 2022
d2ceedc
chore(deps): bump ejs from 3.1.6 to 3.1.7 in /applications/tari_colle…
dependabot[bot] May 3, 2022
5b726a6
feat(collectibles): add list assets command (#3908)
stringhandler May 3, 2022
b34f79d
fix: support safe non-interactive mode (#4072)
CjS77 May 3, 2022
c27be5c
feat: allow network to be set by TARI_NETWORK env var (#4073)
sdbondi May 3, 2022
65f1147
chore: update launchpad backend (#4017)
CjS77 May 4, 2022
9bb3968
test: unignore working tests (#4020)
sdbondi May 4, 2022
06b95b1
Set up new Launchpad v2 with Tauri and CRA
tomaszantas Apr 12, 2022
ccc7e49
port (copy) backend from previous launchpad to launchpad_v2
tarnas14 Apr 13, 2022
d9eacd9
call tari backend for list of available docker images
tarnas14 Apr 13, 2022
6cb54c0
remove legacy main.rs from src-tauri
tarnas14 Apr 13, 2022
0465dc7
added .gitkeep to cra build directory to avoid CI panic
tarnas14 Apr 13, 2022
3003c72
changed javascript build to build launchpad_v2
tarnas14 Apr 13, 2022
560de6b
Set up React project tree
tomaszantas Apr 13, 2022
7ff9533
Add ESLint
tomaszantas Apr 13, 2022
23dfe66
Add no-console eslint warning
tomaszantas Apr 13, 2022
5f36889
Remove package-json.lock
tomaszantas Apr 13, 2022
9e91c9c
Switch tabs to spaces
tomaszantas Apr 13, 2022
ea1bdad
Add SVG icon components for extracted Figma icon set (#53)
corquaid Apr 13, 2022
0867486
Add initial content to the Readme.md (#52)
tomaszantas Apr 14, 2022
1079b85
Adding styled components, replacing yarn.lock with package-lock
corquaid Apr 14, 2022
6952c97
Updating README to use npm
corquaid Apr 14, 2022
dbb0ac8
Add colors & gradients, linting icon component files
corquaid Apr 14, 2022
6efd272
Add theme files, refactoring
corquaid Apr 15, 2022
269351c
Add color variables to gradients where possible
corquaid Apr 19, 2022
6f4d782
feat: launchpad CI (#55)
tomaszantas Apr 19, 2022
74dcf3f
quick and dirty one-file dropdown
tarnas14 Apr 19, 2022
5cc79a2
added label to select
tarnas14 Apr 20, 2022
d61566d
extracted value and options and made it configurable in the select
tarnas14 Apr 20, 2022
969d123
extract Select to a reusable dumb component
tarnas14 Apr 20, 2022
bbe6e8a
refactor WithTheme HoC to correctly set displayName
tarnas14 Apr 20, 2022
11bace2
extract transparent background to theme
tarnas14 Apr 20, 2022
3c8a17b
fix types for styledComponents in Select
tarnas14 Apr 20, 2022
065d5ed
add basic tests to Select component
tarnas14 Apr 20, 2022
5aefcf0
typescript children: ReactNode typing
tarnas14 Apr 20, 2022
c8cc12f
change darkBackground prop name to inverted
tarnas14 Apr 20, 2022
b58da6f
use default export in Select component
tarnas14 Apr 20, 2022
e5161dd
move type declarations to separate types.ts files
tarnas14 Apr 20, 2022
2fbad7f
fix test to use correct import
tarnas14 Apr 20, 2022
5323b25
feat: add typography and fonts
corquaid Apr 19, 2022
5f21b1c
add jsdoc to Select component
tarnas14 Apr 20, 2022
f2abe74
improve jsdoc
tarnas14 Apr 21, 2022
cb4f142
feat: main layout (#64)
tomaszantas Apr 21, 2022
dc19bfb
improved styling with the inverted values
tarnas14 Apr 21, 2022
3353e6c
fix Select tests - pass correct theme
tarnas14 Apr 21, 2022
53e92b0
remove last darkBackground reference
tarnas14 Apr 21, 2022
2fc2b13
avoid using optional chaining to not angry CI gods
tarnas14 Apr 21, 2022
174130b
hooked up prettier to eslint and fixed all
tarnas14 Apr 21, 2022
671b9d3
rename MyListboxProps to SelectProps
tarnas14 Apr 21, 2022
48115c5
feat(primitive): add text component
corquaid Apr 20, 2022
084b888
refactor: move globalStyles, set default Text type
corquaid Apr 20, 2022
96925da
remove unnecessary fragment, formatting
corquaid Apr 20, 2022
22fa81d
chore: eslint rule update and readme addition (#79)
tarnas14 Apr 21, 2022
1c27201
Fixed CI tests, refactor d.ts files
corquaid Apr 21, 2022
5480b26
Removed GlobalStyle, moved fonts to App.css
corquaid Apr 21, 2022
c376e3e
lint fix with prettier after merge
tarnas14 Apr 21, 2022
c551f0b
dirty one file implementation of a box
tarnas14 Apr 21, 2022
0208ee2
move Box component to separate files
tarnas14 Apr 21, 2022
f87e023
add all styles declaration to DefaultTheme in custom.d.ts
tarnas14 Apr 21, 2022
165c1c7
add jsdoc to Box component
tarnas14 Apr 22, 2022
da1b953
add basic test for Box component
tarnas14 Apr 22, 2022
df8f4a7
update Select tests to conform to test standard
tarnas14 Apr 22, 2022
9c9773b
fix Box test
tarnas14 Apr 22, 2022
b435f03
Launchpad on Github Actions (#88)
tomaszantas Apr 22, 2022
da44613
Add tag component files, update theme
corquaid Apr 22, 2022
dca3a4a
Add unit tests
corquaid Apr 22, 2022
7235021
More unit tests, changes from PR comments
corquaid Apr 25, 2022
f6786b6
Adjust the audit job (#91)
tomaszantas Apr 25, 2022
db18629
feat: footer & keyboardkeys (#86)
tomaszantas Apr 25, 2022
6153fa9
Improve Switch component (#89)
tomaszantas Apr 25, 2022
dea7c75
chore: tests for icons (#97)
tomaszantas Apr 26, 2022
15eadf1
layout for inactive state of base node
tarnas14 Apr 22, 2022
eb7aa80
prepared layout for dark (running) base node
tarnas14 Apr 22, 2022
4e53b94
extract view component from base node container
tarnas14 Apr 23, 2022
7981587
connect base node container to store; improve styling
tarnas14 Apr 24, 2022
f96da71
cleanup and document Loading component
tarnas14 Apr 24, 2022
0845b65
fix unused payload in base node slice
tarnas14 Apr 24, 2022
807abcf
add placeholder for "running" tag on basenode
tarnas14 Apr 24, 2022
42b980f
add Running tag to a running base node container
tarnas14 Apr 26, 2022
541aff0
add tests for Loading indicator
tarnas14 Apr 26, 2022
0e813cc
move base node store slice to /store/baseNode
tarnas14 Apr 26, 2022
5525d2a
reverse Network type import
tarnas14 Apr 26, 2022
aa60163
feat: tabs and ts issue (#94)
tomaszantas Apr 27, 2022
c490a3a
Remove unused props and imports (#99)
tomaszantas Apr 27, 2022
42cd949
Fix box sizing of the main container (#102)
tomaszantas Apr 27, 2022
d1718e7
Add Text test (#106)
tomaszantas Apr 27, 2022
2e0ca32
Change the size of large tags to 26px (#104)
tomaszantas Apr 27, 2022
3be621c
Polishing Select component: text color and spacing (#105)
tomaszantas Apr 28, 2022
d4e744b
prepared ui for password input
tarnas14 Apr 27, 2022
0aaab69
include tari signed in wallet password page
tarnas14 Apr 27, 2022
a0fb844
prepared main wallet layout (without inputs)
tarnas14 Apr 27, 2022
4a3972c
add loading prop to button
tarnas14 Apr 27, 2022
bf10a96
connect wallet to store
tarnas14 Apr 27, 2022
62a834c
fix tabs component to memoize tabs content
tarnas14 Apr 27, 2022
5a96d9c
move lines from wallet components to locales
tarnas14 Apr 27, 2022
f1a7998
remove Send funds button
tarnas14 Apr 27, 2022
b35efc4
add password input and disabling submit button below certain password…
tarnas14 Apr 27, 2022
466f952
border-box sizing for box
tarnas14 Apr 27, 2022
a6d75fa
change loading indicator to be relatively positioned in button
tarnas14 Apr 27, 2022
43ec284
cleaned up Button disabled styling; removed unnecessary variants and …
tarnas14 Apr 27, 2022
2abd7c7
allowing loading and disabled to be controlled separately
tarnas14 Apr 27, 2022
d48bd48
rename Chart icon component
tarnas14 Apr 27, 2022
393a3ca
add WalletContainer basic tests
tarnas14 Apr 27, 2022
f4b1419
make sure buttons always have the same height/width
tarnas14 Apr 27, 2022
df7ff08
Add password strength + smiley icons
corquaid Apr 25, 2022
35edd29
Add TextInput component files, fix password strength icons, update th…
corquaid Apr 28, 2022
9798e10
Add unit tests
corquaid Apr 28, 2022
80d81dc
Add JSDoc
corquaid Apr 28, 2022
a52947c
Add unit tests for new icons, delete icon images from assets
corquaid Apr 28, 2022
4777fa1
Modal component
tarnas14 Apr 28, 2022
bafdac0
extracted modal styling to separate files
tarnas14 Apr 28, 2022
00f157b
add modal tests
tarnas14 Apr 28, 2022
9297719
add tari wallet box with emoji toggle
tarnas14 Apr 27, 2022
db86190
extract wallet components and styles to separate files
tarnas14 Apr 28, 2022
d74647b
justify base node to center
tarnas14 Apr 28, 2022
bc13688
introduce CenteredLayout component to layout basenode and wallet cont…
tarnas14 Apr 28, 2022
d48175a
Add copy/paste/select keyboard functionality
corquaid Apr 29, 2022
ef420b1
dont clear password field
tarnas14 Apr 29, 2022
ea4632d
fix tari wallet id box
tarnas14 Apr 29, 2022
bc6ac98
extract reusable Input component and built TextInput on top of it
tarnas14 May 1, 2022
5dc11ff
add PasswordInput and use it in wallet password box
tarnas14 May 1, 2022
1ba43df
disabling input icon when whole component is disabled
tarnas14 May 1, 2022
785cd9f
set up static layout of settings modal
tarnas14 Apr 28, 2022
72ba638
make the cancel button on settings "secondary"
tarnas14 May 1, 2022
e801ee9
fix buttons in icon for settings button in title bar
tarnas14 May 1, 2022
8c6ade5
created static layout for wallet settings
tarnas14 May 1, 2022
6a6c246
extracted components to separate files, connected SettingsContainer t…
tarnas14 May 2, 2022
535586f
allow opening settings on specific page
tarnas14 May 2, 2022
933e3e1
connecting wallet settings page component to store
tarnas14 May 2, 2022
058a268
showing `running` indicator on wallet box conditionally
tarnas14 May 2, 2022
a030ee7
pending indicator on button in wallet settings instead of running tag
tarnas14 May 2, 2022
dc04a27
extracted styled components from CopyBox
tarnas14 May 2, 2022
4e535ea
cleaned up WalletSettings component with extracted styles and locales
tarnas14 May 2, 2022
a106f47
dont block wallet settings if wallet is locked
tarnas14 May 4, 2022
e550ee1
Fixing Tag component background styling, App.tsx typo
corquaid May 2, 2022
4112143
Fixing svg icon colour attributes
corquaid May 2, 2022
bb21941
feat: mining dashboard (#108)
tomaszantas May 2, 2022
a99ec4a
add JSDoc for copybox
tarnas14 May 4, 2022
cb1ec54
remove Link component, use Button with href on wallet settings
tarnas14 May 4, 2022
b1335b3
moved react gui over to launchpad
tarnas14 May 4, 2022
e00fd75
update github actions to point to gui-react directory
tarnas14 May 4, 2022
c64a27b
remove launchpad_v2 workspace from root Cargo.toml
tarnas14 May 4, 2022
db37a59
remove reference to tauri-apps/cli from gui-react (not needed)
tarnas14 May 4, 2022
aacc079
Remove useBootstrapper
tomaszantas May 4, 2022
dff8b17
added `dev-vue` package.json script to launch vue version of the appl…
tarnas14 May 4, 2022
77949ee
remove useBoostrapper flag from tauri.vue.conf.json after bumping to rc9
tarnas14 May 4, 2022
aabe7fa
remove tauri scripts from gui-react
tarnas14 May 4, 2022
cceed77
Fix typography letter-spacing
corquaid May 4, 2022
c2d1925
Merge branch 'launchpad_such_wow' into launchpad/rebase_development
tarnas14 May 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor(dht): use CipherKey new type for diffie-hellman key (tari-pr…
…oject#4038)

Description
---
- Document the DHT public interface
- Move forward layer to `outbound` module
- Remove unnecessary allocation from `encrypt` helper
- Return `CipherKey` new type from Diffie-Hellman helper function 
- Changes `encrypt` and `decrypt` to take in `CipherKey` newtype
- implement zeroize on drop for `CipherKey` new type

Motivation and Context
---
Documentation. 

`generate_ecdh_secret` returned a public key, which implies that it is safe to share publicly.
The `CipherKey` new type will zero it's contents before releasing it's memory.

The forward layer didn't have anything to do with SAF so shouldn't be located in that module.

How Has This Been Tested?
---
Existing tests pass
Manually, no breaking changes.

cargo doc --no-deps
sdbondi authored Apr 29, 2022
commit dacb3cd443a5702636b6cba174d148357deec056
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions base_layer/p2p/src/initialization.rs
Original file line number Diff line number Diff line change
@@ -323,8 +323,6 @@ async fn configure_comms_and_dht(
}

// Hook up DHT messaging middlewares
// TODO: messaging events should be optional
let (messaging_events_sender, _) = broadcast::channel(1);
let messaging_pipeline = pipeline::Builder::new()
.outbound_buffer_size(config.outbound_buffer_size)
.with_outbound_pipeline(outbound_rx, |sink| {
@@ -339,6 +337,8 @@ async fn configure_comms_and_dht(
)
.build();

// TODO: messaging events should be optional
let (messaging_events_sender, _) = broadcast::channel(1);
comms = comms.add_protocol_extension(MessagingProtocolExtension::new(
messaging_events_sender,
messaging_pipeline,
1 change: 1 addition & 0 deletions comms/dht/Cargo.toml
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@ serde = "1.0.90"
serde_derive = "1.0.90"
thiserror = "1.0.26"
tower = { version = "0.4", features = ["full"] }
zeroize = "1.4.0"

# Uncomment for tokio tracing via tokio-console (needs "tracing" features)
#console-subscriber = "0.1.3"
16 changes: 14 additions & 2 deletions comms/dht/src/actor.rs
Original file line number Diff line number Diff line change
@@ -63,6 +63,7 @@ use crate::{

const LOG_TARGET: &str = "comms::dht::actor";

/// Error type for the DHT actor
#[derive(Debug, Error)]
pub enum DhtActorError {
#[error("MPSC channel is disconnected")]
@@ -93,6 +94,7 @@ impl<T> From<mpsc::error::SendError<T>> for DhtActorError {
}
}

/// Request type for the DHT actor
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum DhtRequest {
@@ -143,20 +145,23 @@ impl Display for DhtRequest {
}
}

/// DHT actor requester
#[derive(Clone)]
pub struct DhtRequester {
sender: mpsc::Sender<DhtRequest>,
}

impl DhtRequester {
pub fn new(sender: mpsc::Sender<DhtRequest>) -> Self {
pub(crate) fn new(sender: mpsc::Sender<DhtRequest>) -> Self {
Self { sender }
}

/// Send a Join message to the network
pub async fn send_join(&mut self) -> Result<(), DhtActorError> {
self.sender.send(DhtRequest::SendJoin).await.map_err(Into::into)
}

/// Select peers by [BroadcastStrategy](crate::broadcast_strategy::BroadcastStrategy]
pub async fn select_peers(&mut self, broadcast_strategy: BroadcastStrategy) -> Result<Vec<NodeId>, DhtActorError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.sender
@@ -165,6 +170,7 @@ impl DhtRequester {
reply_rx.await.map_err(|_| DhtActorError::ReplyCanceled)
}

/// Adds a message hash to the dedup cache.
pub async fn add_message_to_dedup_cache(
&mut self,
message_hash: Vec<u8>,
@@ -182,6 +188,7 @@ impl DhtRequester {
reply_rx.await.map_err(|_| DhtActorError::ReplyCanceled)
}

/// Gets the number of hits for a given message hash.
pub async fn get_message_cache_hit_count(&mut self, message_hash: Vec<u8>) -> Result<u32, DhtActorError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.sender
@@ -191,6 +198,7 @@ impl DhtRequester {
reply_rx.await.map_err(|_| DhtActorError::ReplyCanceled)
}

/// Returns the deserialized metadata value for the given key
pub async fn get_metadata<T: MessageFormat>(&mut self, key: DhtMetadataKey) -> Result<Option<T>, DhtActorError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.sender.send(DhtRequest::GetMetadata(key, reply_tx)).await?;
@@ -202,6 +210,7 @@ impl DhtRequester {
}
}

/// Sets the metadata value for the given key
pub async fn set_metadata<T: MessageFormat>(&mut self, key: DhtMetadataKey, value: T) -> Result<(), DhtActorError> {
let (reply_tx, reply_rx) = oneshot::channel();
let bytes = value.to_binary().map_err(DhtActorError::FailedToSerializeValue)?;
@@ -223,6 +232,7 @@ impl DhtRequester {
}
}

/// DHT actor. Responsible for executing DHT-related tasks.
pub struct DhtActor {
node_identity: Arc<NodeIdentity>,
peer_manager: Arc<PeerManager>,
@@ -237,7 +247,8 @@ pub struct DhtActor {
}

impl DhtActor {
pub fn new(
/// Create a new DhtActor
pub(crate) fn new(
config: Arc<DhtConfig>,
conn: DbConnection,
node_identity: Arc<NodeIdentity>,
@@ -268,6 +279,7 @@ impl DhtActor {
}
}

/// Spawns the DHT actor on a new task.
pub fn spawn(self) {
task::spawn(async move {
if let Err(err) = self.run().await {
13 changes: 13 additions & 0 deletions comms/dht/src/broadcast_strategy.rs
Original file line number Diff line number Diff line change
@@ -20,6 +20,10 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

//! # Broadcast strategy
//!
//! Describes a strategy for selecting peers and active connections when sending messages.

use std::{
fmt,
fmt::{Display, Formatter},
@@ -29,6 +33,7 @@ use tari_comms::{peer_manager::node_id::NodeId, types::CommsPublicKey};

use crate::envelope::NodeDestination;

/// Parameters for the [ClosestNodes](self::BroadcastStrategy::ClosestNodes) broadcast strategy.
#[derive(Debug, Clone)]
pub struct BroadcastClosestRequest {
pub node_id: NodeId,
@@ -48,6 +53,7 @@ impl Display for BroadcastClosestRequest {
}
}

/// Describes a strategy for selecting peers and active connections when sending messages.
#[derive(Debug, Clone)]
pub enum BroadcastStrategy {
/// Send to a particular peer matching the given node ID
@@ -101,11 +107,14 @@ impl BroadcastStrategy {
}
}

/// Returns true if the strategy is to send directly to the peer, otherwise false
pub fn is_direct(&self) -> bool {
use BroadcastStrategy::{DirectNodeId, DirectPublicKey};
matches!(self, DirectNodeId(_) | DirectPublicKey(_))
}

/// Returns a reference to the NodeId used in the `DirectNodeId` strategy, otherwise None if the strategy is not
/// `DirectNodeId`.
pub fn direct_node_id(&self) -> Option<&NodeId> {
use BroadcastStrategy::DirectNodeId;
match self {
@@ -114,6 +123,8 @@ impl BroadcastStrategy {
}
}

/// Returns a reference to the `CommsPublicKey` used in the `DirectPublicKey` strategy, otherwise None if the
/// strategy is not `DirectPublicKey`.
pub fn direct_public_key(&self) -> Option<&CommsPublicKey> {
use BroadcastStrategy::DirectPublicKey;
match self {
@@ -122,6 +133,8 @@ impl BroadcastStrategy {
}
}

/// Returns the `CommsPublicKey` used in the `DirectPublicKey` strategy, otherwise None if the strategy is not
/// `DirectPublicKey`.
pub fn into_direct_public_key(self) -> Option<Box<CommsPublicKey>> {
use BroadcastStrategy::DirectPublicKey;
match self {
43 changes: 27 additions & 16 deletions comms/dht/src/builder.rs
Original file line number Diff line number Diff line change
@@ -20,6 +20,8 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

//! A builder for customizing and constructing the DHT

use std::{sync::Arc, time::Duration};

use tari_comms::{connectivity::ConnectivityRequester, NodeIdentity, PeerManager};
@@ -35,14 +37,21 @@ use crate::{
DhtConfig,
};

/// Builder for the DHT.
///
/// ```rust
/// # use tari_comms_dht::{DbConnectionUrl, Dht};
/// let builder = Dht::builder().mainnet().with_database_url(DbConnectionUrl::Memory);
/// // let dht = builder.build(...).unwrap();
/// ```
#[derive(Debug, Clone, Default)]
pub struct DhtBuilder {
config: DhtConfig,
outbound_tx: Option<mpsc::Sender<DhtOutboundRequest>>,
}

impl DhtBuilder {
pub fn new() -> Self {
pub(crate) fn new() -> Self {
Self {
#[cfg(test)]
config: DhtConfig::default_local_test(),
@@ -52,87 +61,89 @@ impl DhtBuilder {
}
}

/// Specify a complete [DhtConfig](crate::DhtConfig).
pub fn with_config(&mut self, config: DhtConfig) -> &mut Self {
self.config = config;
self
}

/// Default configuration for local test environments.
pub fn local_test(&mut self) -> &mut Self {
self.config = DhtConfig::default_local_test();
self
}

/// Sets the DHT protocol version.
pub fn with_protocol_version(&mut self, protocol_version: DhtProtocolVersion) -> &mut Self {
self.config.protocol_version = protocol_version;
self
}

/// Sets whether SAF messages are automatically requested on every new connection to a SAF node.
pub fn set_auto_store_and_forward_requests(&mut self, enabled: bool) -> &mut Self {
self.config.saf.auto_request = enabled;
self
}

/// Sets the mpsc sender that is hooked up to the outbound messaging pipeline.
pub fn with_outbound_sender(&mut self, outbound_tx: mpsc::Sender<DhtOutboundRequest>) -> &mut Self {
self.outbound_tx = Some(outbound_tx);
self
}

/// Use the default testnet configuration.
pub fn testnet(&mut self) -> &mut Self {
self.config = DhtConfig::default_testnet();
self
}

/// Use the default mainnet configuration.
pub fn mainnet(&mut self) -> &mut Self {
self.config = DhtConfig::default_mainnet();
self
}

/// Sets the [DbConnectionUrl](crate::DbConnectionUrl).
pub fn with_database_url(&mut self, database_url: DbConnectionUrl) -> &mut Self {
self.config.database_url = database_url;
self
}

pub fn with_dedup_cache_trim_interval(&mut self, trim_interval: Duration) -> &mut Self {
self.config.dedup_cache_trim_interval = trim_interval;
self
}

pub fn with_dedup_cache_capacity(&mut self, capacity: usize) -> &mut Self {
self.config.dedup_cache_capacity = capacity;
self
}

pub fn with_dedup_discard_hit_count(&mut self, max_hit_count: usize) -> &mut Self {
self.config.dedup_allowed_message_occurrences = max_hit_count;
self
}

/// The number of connections to random peers that should be maintained.
/// Connections to random peers are reshuffled every `DhtConfig::connectivity::random_pool_refresh_interval`.
pub fn with_num_random_nodes(&mut self, n: usize) -> &mut Self {
self.config.num_random_nodes = n;
self
}

/// The number of neighbouring peers that the DHT should try maintain connections to.
pub fn with_num_neighbouring_nodes(&mut self, n: usize) -> &mut Self {
self.config.num_neighbouring_nodes = n;
self.config.saf.num_neighbouring_nodes = n;
self
}

/// The number of peers to send a message using the
/// [Broadcast](crate::broadcast_strategy::BroadcastStrategy::Propagate) strategy.
pub fn with_propagation_factor(&mut self, propagation_factor: usize) -> &mut Self {
self.config.propagation_factor = propagation_factor;
self
}

/// The number of peers to send a message broadcast using the
/// [Broadcast](crate::broadcast_strategy::BroadcastStrategy::Broadcast) strategy.
pub fn with_broadcast_factor(&mut self, broadcast_factor: usize) -> &mut Self {
self.config.broadcast_factor = broadcast_factor;
self
}

/// The length of time to wait for a discovery reply after a discovery message has been sent.
pub fn with_discovery_timeout(&mut self, timeout: Duration) -> &mut Self {
self.config.discovery_request_timeout = timeout;
self
}

/// Enables automatically sending a join/announce message when connected to enough peers on the network.
pub fn enable_auto_join(&mut self) -> &mut Self {
self.config.auto_join = true;
self
7 changes: 5 additions & 2 deletions comms/dht/src/config.rs
Original file line number Diff line number Diff line change
@@ -101,14 +101,17 @@ pub struct DhtConfig {
}

impl DhtConfig {
/// Default testnet configuration
pub fn default_testnet() -> Self {
Default::default()
}

/// Default mainnet configuration
pub fn default_mainnet() -> Self {
Default::default()
}

/// Default local test configuration
pub fn default_local_test() -> Self {
Self {
database_url: DbConnectionUrl::Memory,
@@ -171,7 +174,7 @@ pub struct DhtConnectivityConfig {
pub update_interval: Duration,
/// The interval to change the random pool peers.
/// Default: 2 hours
pub random_pool_refresh: Duration,
pub random_pool_refresh_interval: Duration,
/// Length of cooldown when high connection failure rates are encountered. Default: 45s
pub high_failure_rate_cooldown: Duration,
/// The minimum desired ratio of TCPv4 to Tor connections. TCPv4 addresses have some significant cost to create,
@@ -185,7 +188,7 @@ impl Default for DhtConnectivityConfig {
fn default() -> Self {
Self {
update_interval: Duration::from_secs(2 * 60),
random_pool_refresh: Duration::from_secs(2 * 60 * 60),
random_pool_refresh_interval: Duration::from_secs(2 * 60 * 60),
high_failure_rate_cooldown: Duration::from_secs(45),
minimum_desired_tcpv4_node_ratio: 0.1,
}
29 changes: 17 additions & 12 deletions comms/dht/src/connectivity/mod.rs
Original file line number Diff line number Diff line change
@@ -20,6 +20,16 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

//! # DHT Connectivity Actor
//!
//! Responsible for ensuring DHT network connectivity to a neighbouring and random peer set. This includes joining the
//! network when the node has established some peer connections (e.g to seed peers). It maintains neighbouring and
//! random peer pools and instructs the comms `ConnectivityManager` to establish those connections. Once a configured
//! percentage of these peers is online, the node is established on the DHT network.
//!
//! The DHT connectivity actor monitors the connectivity state (using `ConnectivityEvent`s) and attempts
//! to maintain connectivity to the network as peers come and go.
#[cfg(test)]
mod test;

@@ -50,6 +60,7 @@ use crate::{connectivity::metrics::MetricsError, event::DhtEvent, DhtActorError,

const LOG_TARGET: &str = "comms::dht::connectivity";

/// Error type for the DHT connectivity actor.
#[derive(Debug, Error)]
pub enum DhtConnectivityError {
#[error("ConnectivityError: {0}")]
@@ -62,16 +73,8 @@ pub enum DhtConnectivityError {
MetricError(#[from] MetricsError),
}

/// # DHT Connectivity Actor
///
/// Responsible for ensuring DHT network connectivity to a neighbouring and random peer set. This includes joining the
/// network when the node has established some peer connections (e.g to seed peers). It maintains neighbouring and
/// random peer pools and instructs the comms `ConnectivityManager` to establish those connections. Once a configured
/// percentage of these peers is online, the node is established on the DHT network.
///
/// The DHT connectivity actor monitors the connectivity state (using `ConnectivityEvent`s) and attempts
/// to maintain connectivity to the network as peers come and go.
pub struct DhtConnectivity {
/// DHT connectivity actor.
pub(crate) struct DhtConnectivity {
config: Arc<DhtConfig>,
peer_manager: Arc<PeerManager>,
node_identity: Arc<NodeIdentity>,
@@ -133,7 +136,9 @@ impl DhtConnectivity {
task::spawn(async move {
log_mdc::extend(mdc.clone());
debug!(target: LOG_TARGET, "Waiting for connectivity manager to start");
let _result = self.connectivity.wait_started().await;
if let Err(err) = self.connectivity.wait_started().await {
error!(target: LOG_TARGET, "Comms connectivity failed to start: {}", err);
}
log_mdc::extend(mdc.clone());
match self.run(connectivity_events).await {
Ok(_) => Ok(()),
@@ -442,7 +447,7 @@ impl DhtConnectivity {
async fn refresh_random_pool_if_required(&mut self) -> Result<(), DhtConnectivityError> {
let should_refresh = self.config.num_random_nodes > 0 &&
self.random_pool_last_refresh
.map(|instant| instant.elapsed() >= self.config.connectivity.random_pool_refresh)
.map(|instant| instant.elapsed() >= self.config.connectivity.random_pool_refresh_interval)
.unwrap_or(true);
if should_refresh {
self.refresh_random_pool().await?;
47 changes: 29 additions & 18 deletions comms/dht/src/crypt.rs
Original file line number Diff line number Diff line change
@@ -35,56 +35,64 @@ use tari_crypto::{
keys::{DiffieHellmanSharedSecret, PublicKey},
tari_utilities::{epoch_time::EpochTime, ByteArray},
};
use zeroize::{Zeroize, ZeroizeOnDrop};

use crate::{
envelope::{DhtMessageFlags, DhtMessageHeader, DhtMessageType, NodeDestination},
outbound::DhtOutboundError,
version::DhtProtocolVersion,
};

pub fn generate_ecdh_secret<PK>(secret_key: &PK::K, public_key: &PK) -> PK
#[derive(Debug, Clone, Zeroize, ZeroizeOnDrop)]
pub struct CipherKey(chacha20::Key);

/// Generates a Diffie-Hellman secret `kx.G` as a `chacha20::Key` given secret scalar `k` and public key `P = x.G`.
pub fn generate_ecdh_secret<PK>(secret_key: &PK::K, public_key: &PK) -> CipherKey
where PK: PublicKey + DiffieHellmanSharedSecret<PK = PK> {
PK::shared_secret(secret_key, public_key)
// TODO: PK will still leave the secret in released memory. Implementing Zerioze on RistrettoPublicKey is not
// currently possible because (Compressed)RistrettoPoint does not implement it.
let k = PK::shared_secret(secret_key, public_key);
CipherKey(*Key::from_slice(k.as_bytes()))
}

pub fn decrypt(cipher_key: &CommsPublicKey, cipher_text: &[u8]) -> Result<Vec<u8>, DhtOutboundError> {
/// Decrypts cipher text using ChaCha20 stream cipher given the cipher key and cipher text with integral nonce.
pub fn decrypt(cipher_key: &CipherKey, cipher_text: &[u8]) -> Result<Vec<u8>, DhtOutboundError> {
if cipher_text.len() < size_of::<Nonce>() {
return Err(DhtOutboundError::CipherError(
"Cipher text is not long enough to include nonce".to_string(),
));
}

let (nonce, cipher_text) = cipher_text.split_at(size_of::<Nonce>());
let nonce = Nonce::from_slice(nonce);
let mut cipher_text = cipher_text.to_vec();

let key = Key::from_slice(cipher_key.as_bytes()); // 32-bytes
let mut cipher = ChaCha20::new(key, nonce);

let mut cipher = ChaCha20::new(&cipher_key.0, nonce);
cipher.apply_keystream(cipher_text.as_mut_slice());

Ok(cipher_text)
}

pub fn encrypt(cipher_key: &CommsPublicKey, plain_text: &[u8]) -> Result<Vec<u8>, DhtOutboundError> {
/// Encrypt the plain text using the ChaCha20 stream cipher
pub fn encrypt(cipher_key: &CipherKey, plain_text: &[u8]) -> Vec<u8> {
let mut nonce = [0u8; size_of::<Nonce>()];

OsRng.fill_bytes(&mut nonce);
let nonce_ga = Nonce::from_slice(&nonce);

let key = Key::from_slice(cipher_key.as_bytes()); // 32-bytes
let mut cipher = ChaCha20::new(key, nonce_ga);
let nonce_ga = Nonce::from_slice(&nonce);
let mut cipher = ChaCha20::new(&cipher_key.0, nonce_ga);

// Cloning the plain text to avoid a caller thinking we have encrypted in place and losing the integral nonce added
// below
let mut plain_text_clone = plain_text.to_vec();

cipher.apply_keystream(plain_text_clone.as_mut_slice());

let mut ciphertext_integral_nonce = nonce.to_vec();
let mut ciphertext_integral_nonce = Vec::with_capacity(nonce.len() + plain_text_clone.len());
ciphertext_integral_nonce.extend(&nonce);
ciphertext_integral_nonce.append(&mut plain_text_clone);
Ok(ciphertext_integral_nonce)
ciphertext_integral_nonce
}

/// Generates a challenge for the origin MAC.
pub fn create_origin_mac_challenge(header: &DhtMessageHeader, body: &[u8]) -> Challenge {
create_origin_mac_challenge_parts(
header.version,
@@ -97,6 +105,7 @@ pub fn create_origin_mac_challenge(header: &DhtMessageHeader, body: &[u8]) -> Ch
)
}

/// Generates a challenge for the origin MAC.
pub fn create_origin_mac_challenge_parts(
protocol_version: DhtProtocolVersion,
destination: &NodeDestination,
@@ -108,7 +117,7 @@ pub fn create_origin_mac_challenge_parts(
) -> Challenge {
let mut mac_challenge = Challenge::new();
mac_challenge.update(&protocol_version.to_bytes());
mac_challenge.update(destination.to_inner_bytes().as_slice());
mac_challenge.update(destination.as_inner_bytes());
mac_challenge.update(&(message_type as i32).to_le_bytes());
mac_challenge.update(&flags.bits().to_le_bytes());
if let Some(t) = expires {
@@ -129,16 +138,18 @@ mod test {

#[test]
fn encrypt_decrypt() {
let key = CommsPublicKey::default();
let pk = CommsPublicKey::default();
let key = CipherKey(*chacha20::Key::from_slice(pk.as_bytes()));
let plain_text = "Last enemy position 0830h AJ 9863".as_bytes().to_vec();
let encrypted = encrypt(&key, &plain_text).unwrap();
let encrypted = encrypt(&key, &plain_text);
let decrypted = decrypt(&key, &encrypted).unwrap();
assert_eq!(decrypted, plain_text);
}

#[test]
fn decrypt_fn() {
let key = CommsPublicKey::default();
let pk = CommsPublicKey::default();
let key = CipherKey(*chacha20::Key::from_slice(pk.as_bytes()));
let cipher_text =
from_hex("24bf9e698e14938e93c09e432274af7c143f8fb831f344f244ef02ca78a07ddc28b46fec536a0ca5c04737a604")
.unwrap();
4 changes: 4 additions & 0 deletions comms/dht/src/dedup/mod.rs
Original file line number Diff line number Diff line change
@@ -20,6 +20,10 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

//! # Dedup Cache
//!
//! Keeps track of messages seen before by this node and discards duplicates.
mod dedup_cache;

use std::task::Poll;
6 changes: 3 additions & 3 deletions comms/dht/src/dht.rs
Original file line number Diff line number Diff line change
@@ -43,7 +43,7 @@ use crate::{
event::{DhtEventReceiver, DhtEventSender},
filter,
inbound,
inbound::{DecryptedDhtMessage, DhtInboundMessage, MetricsLayer},
inbound::{DecryptedDhtMessage, DhtInboundMessage, ForwardLayer, MetricsLayer},
logging_middleware::MessageLoggingLayer,
network_discovery::DhtNetworkDiscovery,
outbound,
@@ -318,7 +318,7 @@ impl Dht {
Arc::clone(&self.node_identity),
self.store_and_forward_requester(),
))
.layer(store_forward::ForwardLayer::new(
.layer(ForwardLayer::new(
self.outbound_requester(),
self.node_identity.features().contains(PeerFeatures::DHT_STORE_FORWARD),
))
@@ -596,7 +596,7 @@ mod test {
// Encrypt for someone else
let node_identity2 = make_node_identity();
let ecdh_key = crypt::generate_ecdh_secret(node_identity2.secret_key(), node_identity2.public_key());
let encrypted_bytes = crypt::encrypt(&ecdh_key, &msg.to_encoded_bytes()).unwrap();
let encrypted_bytes = crypt::encrypt(&ecdh_key, &msg.to_encoded_bytes());
let dht_envelope = make_dht_envelope(
&node_identity2,
encrypted_bytes,
14 changes: 14 additions & 0 deletions comms/dht/src/discovery/mod.rs
Original file line number Diff line number Diff line change
@@ -20,6 +20,20 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

//! # DHT discovery protocol
//!
//! This protocol broadcasts an encrypted discovery message to the destination peer.
//! The source of this message is unknown to other network peers without using heuristic-based network analysis.
//! This method of discovery requires both peers to be online.
//!
//! The protocol functions as follows:
//! 1. Broadcast an encrypted [Discovery](crate::envelope::DhtMessageType) message destined for the peer containing the
//! necessary details to connect to this peer.
//! 1. If the peer is online, it may decrypt the message and view the peer
//! connection details.
//! 1. The peer may then add the peer and attempt to connect to it.
//! 1. Once a direct connection is established, the discovery is complete.
mod error;
pub use error::DhtDiscoveryError;

14 changes: 11 additions & 3 deletions comms/dht/src/domain_message.rs
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ use std::cmp;

use rand::{rngs::OsRng, RngCore};

/// Trait that exposes conversion to a protobuf i32 enum type.
pub trait ToProtoEnum {
fn as_i32(&self) -> i32;
}
@@ -34,36 +35,42 @@ impl ToProtoEnum for i32 {
}
}

/// Domain message to be sent to another peer.
#[derive(Debug, Clone)]
pub struct OutboundDomainMessage<T> {
inner: T,
message_type: i32,
}

impl<T> OutboundDomainMessage<T> {
/// Create a new outbound domain message
pub fn new<M: ToProtoEnum>(message_type: &M, message: T) -> Self {
Self {
inner: message,
message_type: message_type.as_i32(),
}
}

/// Consumes this instance returning the inner message.
pub fn into_inner(self) -> T {
self.inner
}

pub fn to_propagation_header(&self) -> MessageHeader {
/// Returns a propagation message header
pub(crate) fn to_propagation_header(&self) -> MessageHeader {
MessageHeader::for_propagation(self.message_type)
}

pub fn to_header(&self) -> MessageHeader {
/// Creates a MessageHeader for this outbound message
pub(crate) fn to_header(&self) -> MessageHeader {
MessageHeader::new(self.message_type)
}
}

pub use crate::proto::message_header::MessageHeader;

impl MessageHeader {
/// Creates a new message header with the given message type and random nonce.
pub fn new(message_type: i32) -> Self {
Self {
message_type,
@@ -73,7 +80,8 @@ impl MessageHeader {
}
}

pub fn for_propagation(message_type: i32) -> Self {
/// Creates a new message header with the given message type and a fixed nonce.
pub(crate) fn for_propagation(message_type: i32) -> Self {
const PROPAGATION_NONCE: u64 = 0;
Self {
message_type,
16 changes: 12 additions & 4 deletions comms/dht/src/envelope.rs
Original file line number Diff line number Diff line change
@@ -271,15 +271,18 @@ pub enum NodeDestination {
}

impl NodeDestination {
pub fn to_inner_bytes(&self) -> Vec<u8> {
/// Returns the slice of bytes of the `CommsPublicKey` or `NodeId`. Returns an empty slice if the destination is
/// `Unknown`.
pub fn as_inner_bytes(&self) -> &[u8] {
use NodeDestination::{NodeId, PublicKey, Unknown};
match self {
Unknown => Vec::default(),
PublicKey(pk) => pk.to_vec(),
NodeId(node_id) => node_id.to_vec(),
Unknown => &[],
PublicKey(pk) => pk.as_bytes(),
NodeId(node_id) => node_id.as_bytes(),
}
}

/// Returns a reference to the `CommsPublicKey` if the destination is `CommsPublicKey`.
pub fn public_key(&self) -> Option<&CommsPublicKey> {
use NodeDestination::{NodeId, PublicKey, Unknown};
match self {
@@ -289,6 +292,7 @@ impl NodeDestination {
}
}

/// Returns a reference to the `NodeId` if the destination is `NodeId`.
pub fn node_id(&self) -> Option<&NodeId> {
use NodeDestination::{NodeId, PublicKey, Unknown};
match self {
@@ -298,16 +302,20 @@ impl NodeDestination {
}
}

/// Returns the NodeId for this destination, deriving it from the PublicKey if necessary or returning None if the
/// destination is `Unknown`.
pub fn to_derived_node_id(&self) -> Option<NodeId> {
self.node_id()
.cloned()
.or_else(|| self.public_key().map(NodeId::from_public_key))
}

/// Returns true if the destination is `Unknown`, otherwise false.
pub fn is_unknown(&self) -> bool {
matches!(self, NodeDestination::Unknown)
}

/// Returns true if the NodeIdentity NodeId or PublicKey is equal to this destination.
#[inline]
pub fn equals_node_identity(&self, other: &NodeIdentity) -> bool {
self == other.node_id() || self == other.public_key()
1 change: 1 addition & 0 deletions comms/dht/src/event.rs
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ use crate::network_discovery::DhtNetworkDiscoveryRoundInfo;
pub type DhtEventSender = broadcast::Sender<Arc<DhtEvent>>;
pub type DhtEventReceiver = broadcast::Receiver<Arc<DhtEvent>>;

/// Events emitted by the DHT actor.
#[derive(Debug)]
#[non_exhaustive]
pub enum DhtEvent {
5 changes: 3 additions & 2 deletions comms/dht/src/inbound/decryption.rs
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@ use tower::{layer::Layer, Service, ServiceExt};

use crate::{
crypt,
crypt::CipherKey,
envelope::DhtMessageHeader,
inbound::message::{DecryptedDhtMessage, DhtInboundMessage},
proto::envelope::OriginMac,
@@ -301,7 +302,7 @@ where S: Service<DecryptedDhtMessage, Response = (), Error = PipelineError>
}

fn attempt_decrypt_origin_mac(
shared_secret: &CommsPublicKey,
shared_secret: &CipherKey,
dht_header: &DhtMessageHeader,
) -> Result<(CommsPublicKey, Vec<u8>), DecryptionError> {
let encrypted_origin_mac = Some(&dht_header.origin_mac)
@@ -333,7 +334,7 @@ where S: Service<DecryptedDhtMessage, Response = (), Error = PipelineError>
}

fn attempt_decrypt_message_body(
shared_secret: &CommsPublicKey,
shared_secret: &CipherKey,
message_body: &[u8],
) -> Result<EnvelopeBody, DecryptionError> {
let decrypted =
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
// Copyright 2019, The Tari Project
// Copyright 2022. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::task::Poll;

@@ -30,9 +30,8 @@ use tower::{layer::Layer, Service, ServiceExt};

use crate::{
envelope::NodeDestination,
inbound::DecryptedDhtMessage,
inbound::{error::DhtInboundError, DecryptedDhtMessage},
outbound::{OutboundMessageRequester, SendMessageParams},
store_forward::error::StoreAndForwardError,
};

const LOG_TARGET: &str = "comms::dht::storeforward::forward";
@@ -166,7 +165,7 @@ where S: Service<DecryptedDhtMessage, Response = (), Error = PipelineError>
Ok(())
}

async fn forward(&mut self, message: &DecryptedDhtMessage) -> Result<(), StoreAndForwardError> {
async fn forward(&mut self, message: &DecryptedDhtMessage) -> Result<(), DhtInboundError> {
let DecryptedDhtMessage {
source_peer,
decryption_result,
6 changes: 6 additions & 0 deletions comms/dht/src/inbound/mod.rs
Original file line number Diff line number Diff line change
@@ -20,6 +20,8 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

//! DHT middleware layers for inbound messages.
mod decryption;
pub use decryption::DecryptionLayer;

@@ -29,10 +31,14 @@ pub use deserialize::DeserializeLayer;
mod dht_handler;
pub use dht_handler::DhtHandlerLayer;

mod forward;
pub use forward::ForwardLayer;

mod metrics;
pub use metrics::MetricsLayer;

mod error;

mod message;

pub use message::{DecryptedDhtMessage, DhtInboundMessage};
78 changes: 14 additions & 64 deletions comms/dht/src/lib.rs
Original file line number Diff line number Diff line change
@@ -29,14 +29,15 @@
//! `InboundMessage`(comms) -> _DHT Inbound Middleware_ -> `DhtInboundMessage`(domain)
//!
//! The DHT inbound middleware consist of:
//! * `DeserializeMiddleware` deserializes the body of an `InboundMessage` into a `DhtEnvelope`.
//! * `DecryptionMiddleware` attempts to decrypt the body of a `DhtEnvelope` if required. The result of that decryption
//! (success or failure) is passed to the next service.
//! * `ForwardMiddleware` uses the result of the decryption to determine if the message is destined for this node or
//! not. If not, the message will be forwarded to the applicable peers using the OutboundRequester (i.e. the outbound
//! DHT middleware).
//! * `DhtHandlerMiddleware` handles DHT messages, such as `Join` and `Discover`. If the messages are _not_ DHT messages
//! the `next_service` is called.
//! * metrics: monitors the number of inbound messages
//! * decryption: deserializes and decrypts the `InboundMessage` and produces a
//! [DecryptedDhtMessage](crate::inbound::DecryptedDhtMessage).
//! * dedup: discards the message if previously received.
//! * logging: message logging
//! * SAF storage: stores certain messages for other peers in the SAF store.
//! * message storage: forwards messages for other peers.
//! * SAF message handler: handles SAF protocol messages (requests for SAF messages, SAF message responses).
//! * DHT message handler: handles DHT protocol messages (discovery, join etc.)
//!
//! #### Outbound Message Flow
//!
@@ -50,63 +51,12 @@
//! `DhtOutboundRequest` (domain) -> _DHT Outbound Middleware_ -> `OutboundMessage` (comms)
//!
//! The DHT outbound middleware consist of:
//! * `BroadcastMiddleware` produces multiple outbound messages according on the `BroadcastStrategy` from the received
//! * broadcast layer: produces multiple outbound messages according on the `BroadcastStrategy` from the received
//! `DhtOutboundRequest` message. The `next_service` is called for each resulting message.
//! * `EncryptionMiddleware` encrypts the body of a message if `DhtMessagheFlags::ENCRYPTED` is given. The result is
//! passed onto the `next_service`.
//! * `SerializeMiddleware` wraps the body in a `DhtEnvelope`, serializes the result, constructs an `OutboundMessage`
//! and calls `next_service`. Typically, `next_service` will be a `SinkMiddleware` which send the message to the comms
//! OMS.
//
//! ## Usage
//!
//! ```edition2018,compile_fail
//! #use tari_comms::middleware::ServicePipeline;
//! #use tari_comms_dht::DhtBuilder;
//! #use tari_comms::middleware::sink::SinkMiddleware;
//! #use tari_comms::peer_manager::NodeIdentity;
//! #use rand::rngs::OsRng;
//! #use std::sync::Arc;
//! #use tari_comms::CommsBuilder;
//! #use tokio::runtime::Runtime;
//! #use tokio::sync::mpsc;
//!
//! let runtime = Runtime::new().unwrap();
//! // Channel from comms to inbound dht
//! let (comms_in_tx, comms_in_rx)= mpsc::channel(100);
//! let (comms_out_tx, comms_out_rx)= mpsc::channel(100);
//! let node_identity = NodeIdentity::random(&mut OsRng::new().unwrap(), "127.0.0.1:9000".parse().unwrap())
//! .map(Arc::new).unwrap();
//! let comms = CommsBuilder::new(runtime.executor())
//! // Messages coming from comms
//! .with_inbound_sink(comms_in_tx)
//! // Messages going to comms
//! .with_outbound_stream(comms_out_rx)
//! .with_node_identity(node_identity)
//! .build()
//! .unwrap();
//! let peer_manager = comms.start().unwrap().peer_manager();
//! let dht = Dht::builder().build(node_identity, peer_manager)?;
//!
//! let inbound_pipeline = ServicePipeline::new(
//! comms_in_rx,
//! // In Tari's case, the service would be a InboundMessageConnector in `tari_p2p`
//! dht.inbound_middleware_layer(/* some service which uses DhtInboundMessage */ )
//! );
//! // Use the given executor to spawn calls to the middleware
//! inbound_pipeline.spawn_with(rt.executor());
//!
//! let outbound_pipeline = ServicePipeline::new(
//! dht.take_outbound_receiver(),
//! // SinkMiddleware sends the resulting OutboundMessages to the comms OMS
//! dht.outbound_middleware_layer(SinkMiddleware::new(comms_out_tx))
//! );
//! // Use the given executor to spawn calls to the middleware
//! outbound_pipeline.spawn_with(rt.executor());
//!
//! let oms = dht.outbound_requester();
//! oms.send_message(...).await;
//! ```
//! * message logger layer.
//! * serialization: wraps the body in a [DhtOutboundMessage](crate::outbound::DhtOutboundMessage), serializes the
//! result, constructs an `OutboundMessage` and calls `next_service`. Typically, `next_service` will be a
//! `SinkMiddleware` which send the message to comms messaging.
#![recursion_limit = "256"]
#[macro_use]
2 changes: 2 additions & 0 deletions comms/dht/src/logging_middleware.rs
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@ pub struct MessageLoggingLayer<'a, R> {
}

impl<'a, R> MessageLoggingLayer<'a, R> {
/// Creates a new logging middleware layer
pub fn new<T: Into<Cow<'a, str>>>(prefix_msg: T) -> Self {
Self {
prefix_msg: prefix_msg.into(),
@@ -55,6 +56,7 @@ where
}
}

/// [Service](https://tower-rs.github.io/tower/tower_service/) for DHT message logging.
#[derive(Clone)]
pub struct MessageLoggingService<'a, S> {
prefix_msg: Cow<'a, str>,
4 changes: 2 additions & 2 deletions comms/dht/src/outbound/broadcast.rs
Original file line number Diff line number Diff line change
@@ -500,7 +500,7 @@ where S: Service<DhtOutboundMessage, Response = (), Error = PipelineError>
let (e_secret_key, e_public_key) = CommsPublicKey::random_keypair(&mut OsRng);
let shared_ephemeral_secret = crypt::generate_ecdh_secret(&e_secret_key, &**public_key);
// Encrypt the message with the body
let encrypted_body = crypt::encrypt(&shared_ephemeral_secret, &body)?;
let encrypted_body = crypt::encrypt(&shared_ephemeral_secret, &body);

let mac_challenge = crypt::create_origin_mac_challenge_parts(
self.protocol_version,
@@ -514,7 +514,7 @@ where S: Service<DhtOutboundMessage, Response = (), Error = PipelineError>
// Sign the encrypted message
let origin_mac = create_origin_mac(&self.node_identity, mac_challenge)?;
// Encrypt and set the origin field
let encrypted_origin_mac = crypt::encrypt(&shared_ephemeral_secret, &origin_mac)?;
let encrypted_origin_mac = crypt::encrypt(&shared_ephemeral_secret, &origin_mac);
Ok((
Some(Arc::new(e_public_key)),
Some(encrypted_origin_mac.into()),
4 changes: 3 additions & 1 deletion comms/dht/src/outbound/mod.rs
Original file line number Diff line number Diff line change
@@ -20,14 +20,16 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

//! DHT middleware layers for outbound messages.
mod broadcast;
pub use broadcast::BroadcastLayer;

mod error;
pub use error::DhtOutboundError;

pub(crate) mod message;
pub use message::{DhtOutboundRequest, OutboundEncryption, SendMessageResponse};
pub use message::{DhtOutboundMessage, DhtOutboundRequest, OutboundEncryption, SendMessageResponse};

mod message_params;
pub use message_params::SendMessageParams;
3 changes: 3 additions & 0 deletions comms/dht/src/peer_validator.rs
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@ use crate::DhtConfig;

const LOG_TARGET: &str = "dht::network_discovery::peer_validator";

/// Validation errors for peers shared on the network
#[derive(Debug, thiserror::Error)]
pub enum PeerValidatorError {
#[error("Node ID was invalid for peer '{peer}'")]
@@ -44,12 +45,14 @@ pub enum PeerValidatorError {
PeerManagerError(#[from] PeerManagerError),
}

/// Validator for Peers
pub struct PeerValidator<'a> {
peer_manager: &'a PeerManager,
config: &'a DhtConfig,
}

impl<'a> PeerValidator<'a> {
/// Creates a new peer validator
pub fn new(peer_manager: &'a PeerManager, config: &'a DhtConfig) -> Self {
Self { peer_manager, config }
}
2 changes: 2 additions & 0 deletions comms/dht/src/rpc/mod.rs
Original file line number Diff line number Diff line change
@@ -20,6 +20,8 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

//! DHT RPC interface defining RPC methods for peer sharing.
#[cfg(test)]
mod mock;
#[cfg(test)]
10 changes: 10 additions & 0 deletions comms/dht/src/storage/connection.rs
Original file line number Diff line number Diff line change
@@ -40,6 +40,7 @@ use crate::storage::error::StorageError;
const LOG_TARGET: &str = "comms::dht::storage::connection";
const SQLITE_POOL_SIZE: usize = 16;

/// Describes how to connect to the database (currently, SQLite).
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(into = "String", try_from = "String")]
pub enum DbConnectionUrl {
@@ -52,10 +53,12 @@ pub enum DbConnectionUrl {
}

impl DbConnectionUrl {
/// Use a file to store the database
pub fn file<P: AsRef<Path>>(path: P) -> Self {
DbConnectionUrl::File(path.as_ref().to_path_buf())
}

/// Returns a database connection string
pub fn to_url_string(&self) -> String {
use DbConnectionUrl::{File, Memory, MemoryShared};
match self {
@@ -96,17 +99,20 @@ impl TryFrom<String> for DbConnectionUrl {
}
}

/// A SQLite database connection
#[derive(Clone)]
pub struct DbConnection {
pool: SqliteConnectionPool,
}

impl DbConnection {
/// Connect to an ephemeral database in memory
#[cfg(test)]
pub fn connect_memory(name: String) -> Result<Self, StorageError> {
Self::connect_url(&DbConnectionUrl::MemoryShared(name))
}

/// Connect using the given [DbConnectionUrl](self::DbConnectionUrl).
pub fn connect_url(db_url: &DbConnectionUrl) -> Result<Self, StorageError> {
debug!(target: LOG_TARGET, "Connecting to database using '{:?}'", db_url);

@@ -122,6 +128,7 @@ impl DbConnection {
Ok(Self::new(pool))
}

/// Connect and migrate the database, once complete, a handle to the migrated database is returned.
pub fn connect_and_migrate(db_url: &DbConnectionUrl) -> Result<Self, StorageError> {
let conn = Self::connect_url(db_url)?;
let output = conn.migrate()?;
@@ -133,10 +140,13 @@ impl DbConnection {
Self { pool }
}

/// Fetch a connection from the pool. This function synchronously blocks the current thread for up to 60 seconds or
/// until a connection is available.
pub fn get_pooled_connection(&self) -> Result<PooledConnection<ConnectionManager<SqliteConnection>>, StorageError> {
self.pool.get_pooled_connection().map_err(StorageError::DieselR2d2Error)
}

/// Run database migrations
pub fn migrate(&self) -> Result<String, StorageError> {
embed_migrations!("./migrations");

6 changes: 6 additions & 0 deletions comms/dht/src/storage/database.rs
Original file line number Diff line number Diff line change
@@ -29,23 +29,27 @@ use crate::{
storage::{dht_setting_entry::NewDhtMetadataEntry, DhtMetadataKey},
};

/// DHT database containing DHT key/value metadata
#[derive(Clone)]
pub struct DhtDatabase {
connection: DbConnection,
}

impl DhtDatabase {
/// Create a new DHT database using the provided connection
pub fn new(connection: DbConnection) -> Self {
Self { connection }
}

/// Get a value for the given key, or None if that value has not been set.
pub fn get_metadata_value<T: MessageFormat>(&self, key: DhtMetadataKey) -> Result<Option<T>, StorageError> {
match self.get_metadata_value_bytes(key)? {
Some(bytes) => T::from_binary(&bytes).map(Some).map_err(Into::into),
None => Ok(None),
}
}

/// Get the raw bytes for the given key, or None if that value has not been set.
pub fn get_metadata_value_bytes(&self, key: DhtMetadataKey) -> Result<Option<Vec<u8>>, StorageError> {
let conn = self.connection.get_pooled_connection()?;
dht_metadata::table
@@ -58,11 +62,13 @@ impl DhtDatabase {
})
}

/// Set the value for the given key
pub fn set_metadata_value<T: MessageFormat>(&self, key: DhtMetadataKey, value: &T) -> Result<(), StorageError> {
let bytes = value.to_binary()?;
self.set_metadata_value_bytes(key, bytes)
}

/// Set the raw bytes for the given key
pub fn set_metadata_value_bytes(&self, key: DhtMetadataKey, value: Vec<u8>) -> Result<(), StorageError> {
let conn = self.connection.get_pooled_connection()?;
diesel::replace_into(dht_metadata::table)
3 changes: 3 additions & 0 deletions comms/dht/src/storage/dht_setting_entry.rs
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ use std::fmt;

use crate::schema::dht_metadata;

/// Supported metadata keys for the DHT database
#[derive(Debug, Clone, Copy)]
pub enum DhtMetadataKey {
/// Timestamp each time the DHT is shut down
@@ -38,13 +39,15 @@ impl fmt::Display for DhtMetadataKey {
}
}

/// Struct used to create a new metadata entry
#[derive(Clone, Debug, Insertable)]
#[table_name = "dht_metadata"]
pub struct NewDhtMetadataEntry {
pub key: String,
pub value: Vec<u8>,
}

/// Struct used that contains a metadata entry
#[derive(Clone, Debug, Queryable, Identifiable)]
#[table_name = "dht_metadata"]
pub struct DhtMetadataEntry {
1 change: 1 addition & 0 deletions comms/dht/src/storage/error.rs
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ use tari_utilities::message_format::MessageFormatError;
use thiserror::Error;
use tokio::task;

/// Error type for DHT storage
#[derive(Debug, Error)]
pub enum StorageError {
#[error("ConnectionError: {0}")]
2 changes: 2 additions & 0 deletions comms/dht/src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -20,6 +20,8 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

//! DHT storage maintains persistent DHT state including SAF messages and other DHT metadata.
mod connection;
pub use connection::{DbConnection, DbConnectionUrl};

1 change: 1 addition & 0 deletions comms/dht/src/store_forward/config.rs
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ use std::time::Duration;

use serde::{Deserialize, Serialize};

/// Store and forward configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct SafConfig {
1 change: 1 addition & 0 deletions comms/dht/src/store_forward/error.rs
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@ use thiserror::Error;

use crate::{actor::DhtActorError, envelope::DhtMessageError, outbound::DhtOutboundError, storage::StorageError};

/// Error type for SAF
#[derive(Debug, Error)]
pub enum StoreAndForwardError {
#[error("DhtMessageError: {0}")]
3 changes: 2 additions & 1 deletion comms/dht/src/store_forward/local_state.rs
Original file line number Diff line number Diff line change
@@ -27,8 +27,9 @@ use std::{

use tari_comms::peer_manager::NodeId;

/// Keeps track of the current pending SAF requests.
#[derive(Debug, Clone, Default)]
pub struct SafLocalState {
pub(crate) struct SafLocalState {
inflight_saf_requests: HashMap<NodeId, (usize, Instant)>,
}

5 changes: 2 additions & 3 deletions comms/dht/src/store_forward/mod.rs
Original file line number Diff line number Diff line change
@@ -20,6 +20,8 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

//! Stores messages for a limited time for other offline peers to request later.
type SafResult<T> = Result<T, StoreAndForwardError>;

mod service;
@@ -34,9 +36,6 @@ pub use error::StoreAndForwardError;
mod config;
pub use config::SafConfig;

mod forward;
pub use forward::ForwardLayer;

mod message;

mod saf_handler;
1 change: 1 addition & 0 deletions comms/dht/src/store_forward/saf_handler/layer.rs
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@ use crate::{
store_forward::{SafConfig, StoreAndForwardRequester},
};

/// Layer responsible for handling SAF protocol messages.
pub struct MessageHandlerLayer {
config: SafConfig,
saf_requester: StoreAndForwardRequester,
2 changes: 1 addition & 1 deletion comms/dht/src/store_forward/saf_handler/task.rs
Original file line number Diff line number Diff line change
@@ -276,7 +276,7 @@ where S: Service<DecryptedDhtMessage, Response = (), Error = PipelineError>
let message_tag = message.dht_header.message_tag;

if let Err(err) = self.check_saf_messages_were_requested(&source_node_id).await {
// TODO: Peer send SAF messages we didn't request?? #banheuristics
// TODO: Peer sent SAF messages we didn't request?? #banheuristics
warn!(target: LOG_TARGET, "SAF response check failed: {}", err);
return Ok(());
}
26 changes: 21 additions & 5 deletions comms/dht/src/store_forward/service.rs
Original file line number Diff line number Diff line change
@@ -60,6 +60,7 @@ const LOG_TARGET: &str = "comms::dht::storeforward::actor";
/// This involves cleaning up messages which have been stored too long according to their priority
const CLEANUP_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 mins

/// Query object for fetching stored messages
#[derive(Debug, Clone)]
pub struct FetchStoredMessageQuery {
public_key: Box<CommsPublicKey>,
@@ -69,6 +70,7 @@ pub struct FetchStoredMessageQuery {
}

impl FetchStoredMessageQuery {
/// Creates a new stored message request for
pub fn new(public_key: Box<CommsPublicKey>, node_id: Box<NodeId>) -> Self {
Self {
public_key,
@@ -78,21 +80,25 @@ impl FetchStoredMessageQuery {
}
}

/// Modify query to only include messages since the given date.
pub fn with_messages_since(&mut self, since: DateTime<Utc>) -> &mut Self {
self.since = Some(since);
self
}

/// Modify query to request a certain category of messages.
pub fn with_response_type(&mut self, response_type: SafResponseType) -> &mut Self {
self.response_type = response_type;
self
}

pub fn since(&self) -> Option<DateTime<Utc>> {
#[cfg(test)]
pub(crate) fn since(&self) -> Option<DateTime<Utc>> {
self.since
}
}

/// Request types for the SAF actor.
#[derive(Debug)]
pub enum StoreAndForwardRequest {
FetchMessages(FetchStoredMessageQuery, oneshot::Sender<SafResult<Vec<StoredMessage>>>),
@@ -104,16 +110,18 @@ pub enum StoreAndForwardRequest {
MarkSafResponseReceived(NodeId, oneshot::Sender<Option<Duration>>),
}

/// Store and forward actor handle.
#[derive(Clone)]
pub struct StoreAndForwardRequester {
sender: mpsc::Sender<StoreAndForwardRequest>,
}

impl StoreAndForwardRequester {
pub fn new(sender: mpsc::Sender<StoreAndForwardRequest>) -> Self {
pub(crate) fn new(sender: mpsc::Sender<StoreAndForwardRequest>) -> Self {
Self { sender }
}

/// Fetch messages according to the given query from this node's local DB and return them.
pub async fn fetch_messages(&mut self, request: FetchStoredMessageQuery) -> SafResult<Vec<StoredMessage>> {
let (reply_tx, reply_rx) = oneshot::channel();
self.sender
@@ -123,6 +131,7 @@ impl StoreAndForwardRequester {
reply_rx.await.map_err(|_| StoreAndForwardError::RequestCancelled)?
}

/// Insert a message into the local storage DB.
pub async fn insert_message(&mut self, message: NewStoredMessage) -> SafResult<bool> {
let (reply_tx, reply_rx) = oneshot::channel();
self.sender
@@ -132,6 +141,7 @@ impl StoreAndForwardRequester {
reply_rx.await.map_err(|_| StoreAndForwardError::RequestCancelled)?
}

/// Remove messages from the local storage DB.
pub async fn remove_messages(&mut self, message_ids: Vec<i32>) -> SafResult<()> {
self.sender
.send(StoreAndForwardRequest::RemoveMessages(message_ids))
@@ -140,6 +150,7 @@ impl StoreAndForwardRequester {
Ok(())
}

/// Remove all messages older than the given `DateTime`.
pub async fn remove_messages_older_than(&mut self, threshold: DateTime<Utc>) -> SafResult<()> {
self.sender
.send(StoreAndForwardRequest::RemoveMessagesOlderThan(threshold))
@@ -148,6 +159,7 @@ impl StoreAndForwardRequester {
Ok(())
}

/// Send a request for SAF messages from the given peer.
pub async fn request_saf_messages_from_peer(&mut self, node_id: NodeId) -> SafResult<()> {
self.sender
.send(StoreAndForwardRequest::SendStoreForwardRequestToPeer(node_id))
@@ -156,6 +168,7 @@ impl StoreAndForwardRequester {
Ok(())
}

/// Send a request for SAF messages from neighbouring peers.
pub async fn request_saf_messages_from_neighbours(&mut self) -> SafResult<()> {
self.sender
.send(StoreAndForwardRequest::SendStoreForwardRequestNeighbours)
@@ -164,7 +177,8 @@ impl StoreAndForwardRequester {
Ok(())
}

pub async fn mark_saf_response_received(&mut self, peer: NodeId) -> SafResult<Option<Duration>> {
/// Updates internal SAF state that a SAF response has been received, removing it from the pending list.
pub(crate) async fn mark_saf_response_received(&mut self, peer: NodeId) -> SafResult<Option<Duration>> {
let (reply_tx, reply_rx) = oneshot::channel();
self.sender
.send(StoreAndForwardRequest::MarkSafResponseReceived(peer, reply_tx))
@@ -174,6 +188,7 @@ impl StoreAndForwardRequester {
}
}

/// Store and forward actor.
pub struct StoreAndForwardService {
config: SafConfig,
dht_requester: DhtRequester,
@@ -191,7 +206,8 @@ pub struct StoreAndForwardService {
}

impl StoreAndForwardService {
pub fn new(
/// Creates a new store and forward actor
pub(crate) fn new(
config: SafConfig,
conn: DbConnection,
peer_manager: Arc<PeerManager>,
@@ -220,7 +236,7 @@ impl StoreAndForwardService {
}
}

pub fn spawn(self) {
pub(crate) fn spawn(self) {
debug!(target: LOG_TARGET, "Store and forward service started");
task::spawn(self.run());
}
1 change: 1 addition & 0 deletions comms/dht/src/store_forward/store.rs
Original file line number Diff line number Diff line change
@@ -54,6 +54,7 @@ pub struct StoreLayer {
}

impl StoreLayer {
/// New store layer.
pub fn new(
config: SafConfig,
peer_manager: Arc<PeerManager>,
4 changes: 2 additions & 2 deletions comms/dht/src/test_utils/makers.rs
Original file line number Diff line number Diff line change
@@ -100,7 +100,7 @@ pub fn make_dht_header(
origin_mac = make_valid_origin_mac(node_identity, challenge);
if flags.is_encrypted() {
let shared_secret = crypt::generate_ecdh_secret(e_secret_key, node_identity.public_key());
origin_mac = crypt::encrypt(&shared_secret, &origin_mac).unwrap()
origin_mac = crypt::encrypt(&shared_secret, &origin_mac);
}
}
DhtMessageHeader {
@@ -170,7 +170,7 @@ pub fn make_dht_envelope(
let (e_secret_key, e_public_key) = make_keypair();
if flags.is_encrypted() {
let shared_secret = crypt::generate_ecdh_secret(&e_secret_key, node_identity.public_key());
message = crypt::encrypt(&shared_secret, &message).unwrap();
message = crypt::encrypt(&shared_secret, &message);
}
let header = make_dht_header(
node_identity,
2 changes: 2 additions & 0 deletions comms/dht/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
@@ -20,6 +20,8 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

//! Provides a number of DHT mocks and other testing-related utilities.
macro_rules! unwrap_oms_send_msg {
($var:expr, reply_value=$reply_value:expr) => {
match $var {
7 changes: 7 additions & 0 deletions comms/dht/src/version.rs
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@ use serde::{Deserialize, Serialize};

use crate::envelope::DhtMessageError;

/// Versions for the DHT protocol
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(try_from = "u32", into = "u32")]
pub enum DhtProtocolVersion {
@@ -39,25 +40,30 @@ pub enum DhtProtocolVersion {
}

impl DhtProtocolVersion {
/// Returns the latest version
pub fn latest() -> Self {
DhtProtocolVersion::v2()
}

/// Returns v1 version
pub fn v1() -> Self {
DhtProtocolVersion::V1 { minor: 0 }
}

/// Returns v2 version
pub fn v2() -> Self {
DhtProtocolVersion::V2 { minor: 0 }
}

/// Returns the byte representation for the version
pub fn to_bytes(self) -> Vec<u8> {
let mut buf = Vec::with_capacity(4 * 2);
buf.write_all(&self.as_major().to_le_bytes()).unwrap();
buf.write_all(&self.as_minor().to_le_bytes()).unwrap();
buf
}

/// Returns the major version number
pub fn as_major(&self) -> u32 {
use DhtProtocolVersion::{V1, V2};
match self {
@@ -66,6 +72,7 @@ impl DhtProtocolVersion {
}
}

/// Returns the minor version number
pub fn as_minor(&self) -> u32 {
use DhtProtocolVersion::{V1, V2};
match self {
2 changes: 1 addition & 1 deletion comms/dht/tests/dht.rs
Original file line number Diff line number Diff line change
@@ -892,7 +892,7 @@ async fn dht_repropagate() {
.unwrap();
}

// This relies on the DHT being set with .with_dedup_discard_hit_count(3)
// This relies on the DHT being set with dedup_allowed_message_occurrences = 3
receive_and_repropagate(&mut node_B, &out_msg).await;
receive_and_repropagate(&mut node_C, &out_msg).await;
receive_and_repropagate(&mut node_A, &out_msg).await;