From 5b09f8e2b630685d9ff748eae772b9798954f6ff Mon Sep 17 00:00:00 2001 From: Brian Pearce Date: Tue, 18 Apr 2023 13:23:01 +0200 Subject: [PATCH] feat: chat scaffold (#5244) Description --- - Refactor the contacts crate for better organization. - Expand contacts with types, and capabilities for managing messaging over the existing comms network - Add cucumber tests to provide support for new functionality Motivation and Context --- We want to expand Tari's communication network to handle p2p chat services. This PR sets us off in the right direction for the core Tari processes to be able to handle message passing and storage. Currently chat clients will be able to send, and receive messages when online or offline. Utilizing store and forward for sending communications offline. How Has This Been Tested? --- Via cucumber only. The chat client within cucumber is currently the only interface for trials. Caveats and known issues --- - [ ] Encryption at rest: We need to change the message storage to support encryption at rest --------- Co-authored-by: SW van Heerden --- Cargo.lock | 37 +++- .../tari_console_wallet/log4rs_sample.yml | 22 ++ .../src/ui/state/app_state.rs | 2 +- .../src/ui/state/wallet_event_monitor.rs | 2 +- .../tari_console_wallet/src/ui/ui_contact.rs | 2 +- base_layer/contacts/Cargo.toml | 19 +- base_layer/contacts/build.rs | 31 +++ .../down.sql | 3 + .../2023-03-14-101758_create_messages/up.sql | 9 + base_layer/contacts/proto/message.proto | 18 ++ .../contacts/src/contacts_service/error.rs | 3 + .../contacts/src/contacts_service/handle.rs | 28 ++- .../contacts/src/contacts_service/mod.rs | 20 +- .../src/contacts_service/proto/mod.rs | 23 +++ .../contacts/src/contacts_service/service.rs | 105 +++++++++- .../src/contacts_service/storage/database.rs | 64 +++--- .../src/contacts_service/storage/mod.rs | 1 + .../src/contacts_service/storage/sqlite_db.rs | 195 +++--------------- .../storage/types/contacts.rs | 184 +++++++++++++++++ .../storage/types/messages.rs | 112 ++++++++++ .../src/contacts_service/storage/types/mod.rs | 24 +++ .../src/contacts_service/types/contact.rs | 67 ++++++ .../src/contacts_service/types/message.rs | 93 +++++++++ .../contacts_service/types/message_builder.rs | 68 ++++++ .../src/contacts_service/types/mod.rs | 30 +++ base_layer/contacts/src/schema.rs | 10 + base_layer/contacts/tests/contacts_service.rs | 12 +- .../core/src/base_node/service/service.rs | 2 +- base_layer/core/tests/mempool.rs | 4 +- base_layer/p2p/src/proto/message_type.proto | 1 + .../p2p/src/services/liveness/service.rs | 2 +- .../protocols/transaction_send_protocol.rs | 2 +- .../tasks/send_finalized_transaction.rs | 2 +- .../tasks/send_transaction_cancelled.rs | 2 +- .../tasks/send_transaction_reply.rs | 2 +- base_layer/wallet/src/wallet.rs | 3 +- base_layer/wallet/tests/wallet.rs | 3 +- base_layer/wallet_ffi/src/callback_handler.rs | 2 +- .../wallet_ffi/src/callback_handler_tests.rs | 2 +- base_layer/wallet_ffi/src/lib.rs | 4 +- comms/dht/src/outbound/requester.rs | 29 ++- integration_tests/Cargo.toml | 5 +- integration_tests/log4rs/base_node.yml | 5 + integration_tests/log4rs/cucumber.yml | 21 ++ integration_tests/log4rs/wallet.yml | 22 ++ integration_tests/tests/cucumber.rs | 116 ++++++++++- integration_tests/tests/features/Chat.feature | 50 +++++ .../tests/utils/base_node_process.rs | 5 +- .../tests/utils/chat_client/Cargo.toml | 24 +++ .../tests/utils/chat_client/src/client.rs | 171 +++++++++++++++ .../tests/utils/chat_client/src/database.rs | 55 +++++ .../tests/utils/chat_client/src/lib.rs | 27 +++ .../tests/utils/chat_client/src/networking.rs | 145 +++++++++++++ 53 files changed, 1645 insertions(+), 245 deletions(-) create mode 100644 base_layer/contacts/build.rs create mode 100644 base_layer/contacts/migrations/2023-03-14-101758_create_messages/down.sql create mode 100644 base_layer/contacts/migrations/2023-03-14-101758_create_messages/up.sql create mode 100644 base_layer/contacts/proto/message.proto create mode 100644 base_layer/contacts/src/contacts_service/proto/mod.rs create mode 100644 base_layer/contacts/src/contacts_service/storage/types/contacts.rs create mode 100644 base_layer/contacts/src/contacts_service/storage/types/messages.rs create mode 100644 base_layer/contacts/src/contacts_service/storage/types/mod.rs create mode 100644 base_layer/contacts/src/contacts_service/types/contact.rs create mode 100644 base_layer/contacts/src/contacts_service/types/message.rs create mode 100644 base_layer/contacts/src/contacts_service/types/message_builder.rs create mode 100644 base_layer/contacts/src/contacts_service/types/mod.rs create mode 100644 integration_tests/tests/features/Chat.feature create mode 100644 integration_tests/tests/utils/chat_client/Cargo.toml create mode 100644 integration_tests/tests/utils/chat_client/src/client.rs create mode 100644 integration_tests/tests/utils/chat_client/src/database.rs create mode 100644 integration_tests/tests/utils/chat_client/src/lib.rs create mode 100644 integration_tests/tests/utils/chat_client/src/networking.rs diff --git a/Cargo.lock b/Cargo.lock index 31ac9d42ad..0ce2a60029 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5251,6 +5251,26 @@ dependencies = [ "zeroize", ] +[[package]] +name = "tari_chat_client" +version = "0.48.0-pre.1" +dependencies = [ + "anyhow", + "diesel", + "lmdb-zero", + "tari_common", + "tari_common_sqlite", + "tari_common_types", + "tari_comms", + "tari_comms_dht", + "tari_contacts", + "tari_p2p", + "tari_service_framework", + "tari_shutdown", + "tari_storage", + "tari_test_utils", +] + [[package]] name = "tari_common" version = "0.50.0-pre.0" @@ -5475,8 +5495,10 @@ dependencies = [ "diesel", "diesel_migrations", "futures 0.3.26", - "libsqlite3-sys", "log", + "num-derive", + "num-traits", + "prost 0.9.0", "rand 0.7.3", "tari_common", "tari_common_sqlite", @@ -5493,6 +5515,7 @@ dependencies = [ "thiserror", "tokio", "tower", + "uuid", ] [[package]] @@ -5616,11 +5639,14 @@ dependencies = [ "tari_app_utilities", "tari_base_node", "tari_base_node_grpc_client", + "tari_chat_client", "tari_common", + "tari_common_sqlite", "tari_common_types", "tari_comms", "tari_comms_dht", "tari_console_wallet", + "tari_contacts", "tari_core", "tari_crypto", "tari_merge_mining_proxy", @@ -6763,6 +6789,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "936e4b492acfd135421d8dca4b1aa80a7bfc26e702ef3af710e0752684df5372" +[[package]] +name = "uuid" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b55a3fef2a1e3b3a00ce878640918820d3c51081576ac657d23af9fc7928fdb" +dependencies = [ + "getrandom 0.2.8", +] + [[package]] name = "valuable" version = "0.1.0" diff --git a/applications/tari_console_wallet/log4rs_sample.yml b/applications/tari_console_wallet/log4rs_sample.yml index ed01144f82..34fab5373e 100644 --- a/applications/tari_console_wallet/log4rs_sample.yml +++ b/applications/tari_console_wallet/log4rs_sample.yml @@ -80,6 +80,23 @@ appenders: encoder: pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [Thread:{I}] {l:5} {m}{n}" + # An appender named "contacts" that writes to a file with a custom pattern encoder + contacts: + kind: rolling_file + path: "{{log_dir}}/log/wallet/contacts.log" + policy: + kind: compound + trigger: + kind: size + limit: 10mb + roller: + kind: fixed_window + base: 1 + count: 5 + pattern: "{{log_dir}}/log/wallet/contacts.{}.log" + encoder: + pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [Thread:{I}] {l:5} {m}{n}" + # root (to base_layer) root: level: debug @@ -116,6 +133,11 @@ loggers: appenders: - network additive: false + contacts: + level: info + appenders: + - contacts + additive: false comms::noise: level: error appenders: diff --git a/applications/tari_console_wallet/src/ui/state/app_state.rs b/applications/tari_console_wallet/src/ui/state/app_state.rs index 8c102b20c1..8ff31563ad 100644 --- a/applications/tari_console_wallet/src/ui/state/app_state.rs +++ b/applications/tari_console_wallet/src/ui/state/app_state.rs @@ -42,7 +42,7 @@ use tari_comms::{ net_address::{MultiaddressesWithStats, PeerAddressSource}, peer_manager::{NodeId, Peer, PeerFeatures, PeerFlags}, }; -use tari_contacts::contacts_service::{handle::ContactsLivenessEvent, storage::database::Contact}; +use tari_contacts::contacts_service::{handle::ContactsLivenessEvent, types::Contact}; use tari_core::transactions::{ tari_amount::{uT, MicroTari}, transaction_components::OutputFeatures, diff --git a/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs b/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs index 42938e7ba6..32ea9c33f4 100644 --- a/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs +++ b/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs @@ -266,7 +266,7 @@ impl WalletEventMonitor { ); self.trigger_contacts_refresh().await; } - ContactsLivenessEvent::NetworkSilence => {} + ContactsLivenessEvent::NetworkSilence => {}, } } Err(broadcast::error::RecvError::Lagged(n)) => { diff --git a/applications/tari_console_wallet/src/ui/ui_contact.rs b/applications/tari_console_wallet/src/ui/ui_contact.rs index 1426aa975a..d10266d459 100644 --- a/applications/tari_console_wallet/src/ui/ui_contact.rs +++ b/applications/tari_console_wallet/src/ui/ui_contact.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: BSD-3-Clause use chrono::{DateTime, Local}; -use tari_contacts::contacts_service::storage::database::Contact; +use tari_contacts::contacts_service::types::Contact; #[derive(Debug, Clone)] pub struct UiContact { diff --git a/base_layer/contacts/Cargo.toml b/base_layer/contacts/Cargo.toml index 488d445062..cf64de0250 100644 --- a/base_layer/contacts/Cargo.toml +++ b/base_layer/contacts/Cargo.toml @@ -8,34 +8,37 @@ edition = "2018" [dependencies] tari_common = { path = "../../common" } +tari_common_sqlite = { path = "../../common_sqlite" } tari_common_types = { path = "../../base_layer/common_types" } tari_comms = { path = "../../comms/core" } +tari_comms_dht = { path = "../../comms/dht" } tari_crypto = { version = "0.16.11"} tari_p2p = { path = "../p2p", features = ["auto-update"] } tari_service_framework = { path = "../service_framework" } tari_shutdown = { path = "../../infrastructure/shutdown" } -tari_common_sqlite = { path = "../../common_sqlite" } tari_utilities = "0.4.10" -tokio = { version = "1.23", features = ["sync", "macros"] } chrono = { version = "0.4.19", default-features = false, features = ["serde"] } diesel = { version = "2.0.3", features = ["sqlite", "serde_json", "chrono", "64-column-tables"] } diesel_migrations = "2.0.0" futures = { version = "^0.3.1", features = ["compat", "std"] } -libsqlite3-sys = { version = "0.25.1", features = ["bundled"], optional = true } log = "0.4.6" +num-derive = "0.3.3" +num-traits = "0.2.15" +prost = "0.9" rand = "0.7.3" thiserror = "1.0.26" +tokio = { version = "1.23", features = ["sync", "macros"] } tower = "0.4" +uuid = { version = "1.3.1", features = ["v4"] } [dev-dependencies] -tari_test_utils = { path = "../../infrastructure/test_utils" } tari_comms_dht = { path = "../../comms/dht", features = ["test-mocks"] } +tari_test_utils = { path = "../../infrastructure/test_utils" } tempfile = "3.1.0" -[features] -default=["bundled_sqlite"] -bundled_sqlite = ["libsqlite3-sys"] +[build-dependencies] +tari_common = { path = "../../common" } [package.metadata.cargo-machete] -ignored = ["libsqlite3-sys"] # this is so we can run cargo machete without getting false positive about macro dependancies +ignored = ["prost"] # this is so we can run cargo machete without getting false positive about macro dependancies diff --git a/base_layer/contacts/build.rs b/base_layer/contacts/build.rs new file mode 100644 index 0000000000..f1858cc133 --- /dev/null +++ b/base_layer/contacts/build.rs @@ -0,0 +1,31 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +fn main() -> Result<(), Box> { + tari_common::build::ProtobufCompiler::new() + .proto_paths(&["proto"]) + .include_paths(&["proto"]) + .emit_rerun_if_changed_directives() + .compile() + .unwrap(); + Ok(()) +} diff --git a/base_layer/contacts/migrations/2023-03-14-101758_create_messages/down.sql b/base_layer/contacts/migrations/2023-03-14-101758_create_messages/down.sql new file mode 100644 index 0000000000..358d2f591e --- /dev/null +++ b/base_layer/contacts/migrations/2023-03-14-101758_create_messages/down.sql @@ -0,0 +1,3 @@ +DROP INDEX idx_messages_address; + +DROP TABLE IF EXISTS messages; diff --git a/base_layer/contacts/migrations/2023-03-14-101758_create_messages/up.sql b/base_layer/contacts/migrations/2023-03-14-101758_create_messages/up.sql new file mode 100644 index 0000000000..1d8b4f0020 --- /dev/null +++ b/base_layer/contacts/migrations/2023-03-14-101758_create_messages/up.sql @@ -0,0 +1,9 @@ +CREATE TABLE messages ( + address BLOB NOT NULL, + message_id BLOB PRIMARY KEY NOT NULL, + body BLOB NOT NULL, + stored_at TIMESTAMP NOT NULL, + direction INTEGER NOT NULL +); + +CREATE INDEX idx_messages_address ON messages (address); diff --git a/base_layer/contacts/proto/message.proto b/base_layer/contacts/proto/message.proto new file mode 100644 index 0000000000..e528929527 --- /dev/null +++ b/base_layer/contacts/proto/message.proto @@ -0,0 +1,18 @@ +// Copyright 2023 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +syntax = "proto3"; +package tari.contacts.chat; + +message Message { + bytes body = 1; + bytes address = 2; + DirectionEnum direction = 3; + uint64 stored_at = 4; + bytes message_id = 5; +} + +enum DirectionEnum { + Inbound = 0; + Outbound = 1; +} \ No newline at end of file diff --git a/base_layer/contacts/src/contacts_service/error.rs b/base_layer/contacts/src/contacts_service/error.rs index 634a100822..8bc6794e5f 100644 --- a/base_layer/contacts/src/contacts_service/error.rs +++ b/base_layer/contacts/src/contacts_service/error.rs @@ -23,6 +23,7 @@ use diesel::result::Error as DieselError; use tari_common_sqlite::error::SqliteStorageError; use tari_comms::connectivity::ConnectivityError; +use tari_comms_dht::outbound::DhtOutboundError; use tari_p2p::services::liveness::error::LivenessError; use tari_service_framework::reply_channel::TransportChannelError; use thiserror::Error; @@ -44,6 +45,8 @@ pub enum ContactsServiceError { LivenessError(#[from] LivenessError), #[error("ConnectivityError error: `{0}`")] ConnectivityError(#[from] ConnectivityError), + #[error("Outbound comms error: `{0}`")] + OutboundCommsError(#[from] DhtOutboundError), } #[derive(Debug, Error)] diff --git a/base_layer/contacts/src/contacts_service/handle.rs b/base_layer/contacts/src/contacts_service/handle.rs index 165a112d34..7008b31c79 100644 --- a/base_layer/contacts/src/contacts_service/handle.rs +++ b/base_layer/contacts/src/contacts_service/handle.rs @@ -35,7 +35,7 @@ use tower::Service; use crate::contacts_service::{ error::ContactsServiceError, service::{ContactMessageType, ContactOnlineStatus}, - storage::database::Contact, + types::{Contact, Message}, }; #[derive(Debug, Clone, PartialEq, Eq)] @@ -130,6 +130,8 @@ pub enum ContactsServiceRequest { RemoveContact(TariAddress), GetContacts, GetContactOnlineStatus(Contact), + SendMessage(TariAddress, Message), + GetAllMessages(TariAddress), } #[derive(Debug)] @@ -139,6 +141,8 @@ pub enum ContactsServiceResponse { Contact(Contact), Contacts(Vec), OnlineStatus(ContactOnlineStatus), + Messages(Vec), + MessageSent, } #[derive(Clone)] @@ -224,4 +228,26 @@ impl ContactsServiceHandle { _ => Err(ContactsServiceError::UnexpectedApiResponse), } } + + pub async fn get_all_messages(&mut self, pk: TariAddress) -> Result, ContactsServiceError> { + match self + .request_response_service + .call(ContactsServiceRequest::GetAllMessages(pk)) + .await?? + { + ContactsServiceResponse::Messages(messages) => Ok(messages), + _ => Err(ContactsServiceError::UnexpectedApiResponse), + } + } + + pub async fn send_message(&mut self, message: Message) -> Result<(), ContactsServiceError> { + match self + .request_response_service + .call(ContactsServiceRequest::SendMessage(message.address.clone(), message)) + .await?? + { + ContactsServiceResponse::MessageSent => Ok(()), + _ => Err(ContactsServiceError::UnexpectedApiResponse), + } + } } diff --git a/base_layer/contacts/src/contacts_service/mod.rs b/base_layer/contacts/src/contacts_service/mod.rs index 7d7a13e18d..8ada6521c8 100644 --- a/base_layer/contacts/src/contacts_service/mod.rs +++ b/base_layer/contacts/src/contacts_service/mod.rs @@ -22,15 +22,18 @@ pub mod error; pub mod handle; +pub mod proto; pub mod service; pub mod storage; +pub mod types; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use futures::future; use log::*; use tari_comms::connectivity::ConnectivityRequester; -use tari_p2p::services::liveness::LivenessHandle; +use tari_comms_dht::Dht; +use tari_p2p::{comms_connector::SubscriptionFactory, services::liveness::LivenessHandle}; use tari_service_framework::{ async_trait, reply_channel, @@ -54,16 +57,23 @@ where T: ContactsBackend backend: Option, contacts_auto_ping_interval: Duration, contacts_online_ping_window: usize, + subscription_factory: Arc, } impl ContactsServiceInitializer where T: ContactsBackend { - pub fn new(backend: T, contacts_auto_ping_interval: Duration, online_ping_window: usize) -> Self { + pub fn new( + backend: T, + subscription_factory: Arc, + contacts_auto_ping_interval: Duration, + online_ping_window: usize, + ) -> Self { Self { backend: Some(backend), contacts_auto_ping_interval, contacts_online_ping_window: online_ping_window, + subscription_factory, } } } @@ -90,9 +100,11 @@ where T: ContactsBackend + 'static let contacts_auto_ping_interval = self.contacts_auto_ping_interval; let contacts_online_ping_window = self.contacts_online_ping_window; + let subscription_factory = self.subscription_factory.clone(); context.spawn_when_ready(move |handles| async move { let liveness = handles.expect_handle::(); let connectivity = handles.expect_handle::(); + let dht = handles.expect_handle::(); let service = ContactsService::new( ContactsDatabase::new(backend), @@ -100,6 +112,8 @@ where T: ContactsBackend + 'static handles.get_shutdown_signal(), liveness, connectivity, + dht, + subscription_factory, publisher, contacts_auto_ping_interval, contacts_online_ping_window, diff --git a/base_layer/contacts/src/contacts_service/proto/mod.rs b/base_layer/contacts/src/contacts_service/proto/mod.rs new file mode 100644 index 0000000000..978869c643 --- /dev/null +++ b/base_layer/contacts/src/contacts_service/proto/mod.rs @@ -0,0 +1,23 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +include!(concat!(env!("OUT_DIR"), "/tari.contacts.chat.rs")); diff --git a/base_layer/contacts/src/contacts_service/service.rs b/base_layer/contacts/src/contacts_service/service.rs index bc34570aa1..627c56fa00 100644 --- a/base_layer/contacts/src/contacts_service/service.rs +++ b/base_layer/contacts/src/contacts_service/service.rs @@ -30,20 +30,34 @@ use std::{ use chrono::{NaiveDateTime, Utc}; use futures::{pin_mut, StreamExt}; use log::*; +use tari_common_types::tari_address::TariAddress; use tari_comms::connectivity::{ConnectivityEvent, ConnectivityRequester}; -use tari_p2p::services::liveness::{LivenessEvent, LivenessHandle, MetadataKey, PingPongEvent}; +use tari_comms_dht::{domain_message::OutboundDomainMessage, outbound::OutboundEncryption, Dht}; +use tari_p2p::{ + comms_connector::SubscriptionFactory, + domain_message::DomainMessage, + services::{ + liveness::{LivenessEvent, LivenessHandle, MetadataKey, PingPongEvent}, + utils::{map_decode, ok_or_skip_result}, + }, + tari_message::TariMessageType, +}; use tari_service_framework::reply_channel; use tari_shutdown::ShutdownSignal; +use tari_utilities::ByteArray; use tokio::sync::broadcast; use crate::contacts_service::{ error::ContactsServiceError, handle::{ContactsLivenessData, ContactsLivenessEvent, ContactsServiceRequest, ContactsServiceResponse}, - storage::database::{Contact, ContactsBackend, ContactsDatabase}, + proto, + storage::database::{ContactsBackend, ContactsDatabase}, + types::{Contact, Message}, }; const LOG_TARGET: &str = "contacts::contacts_service"; const NUM_ROUNDS_NETWORK_SILENCE: u16 = 3; +pub const SUBSCRIPTION_LABEL: &str = "Chat"; #[derive(Debug, Clone, PartialEq, Eq)] pub enum ContactMessageType { @@ -91,6 +105,8 @@ where T: ContactsBackend + 'static liveness: LivenessHandle, liveness_data: Vec, connectivity: ConnectivityRequester, + dht: Dht, + subscription_factory: Arc, event_publisher: broadcast::Sender>, number_of_rounds_no_pings: u16, contacts_auto_ping_interval: Duration, @@ -109,6 +125,8 @@ where T: ContactsBackend + 'static shutdown_signal: ShutdownSignal, liveness: LivenessHandle, connectivity: ConnectivityRequester, + dht: Dht, + subscription_factory: Arc, event_publisher: broadcast::Sender>, contacts_auto_ping_interval: Duration, contacts_online_ping_window: usize, @@ -120,6 +138,8 @@ where T: ContactsBackend + 'static liveness, liveness_data: Vec::new(), connectivity, + dht, + subscription_factory, event_publisher, number_of_rounds_no_pings: 0, contacts_auto_ping_interval, @@ -141,6 +161,13 @@ where T: ContactsBackend + 'static let connectivity_events = self.connectivity.get_event_subscription(); pin_mut!(connectivity_events); + let chat_messages = self + .subscription_factory + .get_subscription(TariMessageType::Chat, SUBSCRIPTION_LABEL) + .map(map_decode::) + .filter_map(ok_or_skip_result); + pin_mut!(chat_messages); + let shutdown = self .shutdown_signal .take() @@ -156,6 +183,13 @@ where T: ContactsBackend + 'static debug!(target: LOG_TARGET, "Contacts Service started"); loop { tokio::select! { + // Incoming chat messages + Some(msg) = chat_messages.next() => { + if let Err(err) = self.handle_incoming_message(msg).await { + warn!(target: LOG_TARGET, "Failed to handle incoming chat message: {}", err); + } + }, + Some(request_context) = request_stream.next() => { let (request, reply_tx) = request_context.split(); let response = self.handle_request(request).await.map_err(|e| { @@ -177,7 +211,7 @@ where T: ContactsBackend + 'static Ok(event) = connectivity_events.recv() => { self.handle_connectivity_event(event); - } + }, _ = shutdown.wait() => { info!(target: LOG_TARGET, "Contacts service shutting down because it received the shutdown signal"); @@ -203,10 +237,10 @@ where T: ContactsBackend + 'static }, ContactsServiceRequest::UpsertContact(c) => { self.db.upsert_contact(c.clone())?; - self.liveness.check_add_monitored_peer(c.node_id).await?; + self.liveness.check_add_monitored_peer(c.node_id.clone()).await?; info!( target: LOG_TARGET, - "Contact Saved: \nAlias: {}\nAddress: {} ", c.alias, c.address + "Contact Saved: \nAlias: {}\nAddress: {}\nNodeId: {}", c.alias, c.address, c.node_id ); Ok(ContactsServiceResponse::ContactSaved) }, @@ -232,6 +266,47 @@ where T: ContactsBackend + 'static let result = self.get_online_status(&contact).await; Ok(result.map(ContactsServiceResponse::OnlineStatus)?) }, + ContactsServiceRequest::GetAllMessages(pk) => { + let result = self.db.get_messages(pk); + Ok(result.map(ContactsServiceResponse::Messages)?) + }, + ContactsServiceRequest::SendMessage(address, mut message) => { + let contact = match self.db.get_contact(address.clone()) { + Ok(contact) => contact, + Err(_) => Contact::from(&address), + }; + + let ob_message = OutboundDomainMessage::from(message.clone()); + let encryption = OutboundEncryption::EncryptFor(Box::new(address.public_key().clone())); + + match self.get_online_status(&contact).await { + Ok(ContactOnlineStatus::Online) => { + info!(target: LOG_TARGET, "Chat message being sent directed"); + let mut comms_outbound = self.dht.outbound_requester(); + + comms_outbound + .send_direct_encrypted( + address.public_key().clone(), + ob_message, + encryption, + "contact service messaging".to_string(), + ) + .await?; + }, + Err(e) => return Err(e), + _ => { + let mut comms_outbound = self.dht.outbound_requester(); + comms_outbound + .closest_broadcast(address.public_key().clone(), encryption, vec![], ob_message) + .await?; + }, + } + + message.stored_at = Utc::now().naive_utc().timestamp() as u64; + self.db.save_message(message)?; + + Ok(ContactsServiceResponse::MessageSent) + }, } } @@ -304,6 +379,26 @@ where T: ContactsBackend + 'static Ok(()) } + async fn handle_incoming_message( + &mut self, + msg: DomainMessage, + ) -> Result<(), ContactsServiceError> { + if let Some(source_public_key) = msg.authenticated_origin { + let DomainMessage::<_> { inner: msg, .. } = msg; + + let message = Message::from(msg.clone()); + let message = Message { + address: TariAddress::from_public_key(&source_public_key, message.address.network()), + stored_at: Utc::now().naive_utc().timestamp() as u64, + ..msg.into() + }; + + self.db.save_message(message).expect("Couldn't save the message"); + } + + Ok(()) + } + async fn get_online_status(&self, contact: &Contact) -> Result { let mut online_status = ContactOnlineStatus::NeverSeen; if let Some(peer_data) = self.connectivity.get_peer_info(contact.node_id.clone()).await? { diff --git a/base_layer/contacts/src/contacts_service/storage/database.rs b/base_layer/contacts/src/contacts_service/storage/database.rs index e70a60898c..849f5cb5b8 100644 --- a/base_layer/contacts/src/contacts_service/storage/database.rs +++ b/base_layer/contacts/src/contacts_service/storage/database.rs @@ -30,39 +30,13 @@ use log::*; use tari_common_types::tari_address::TariAddress; use tari_comms::peer_manager::NodeId; -use crate::contacts_service::error::ContactsServiceStorageError; +use crate::contacts_service::{ + error::ContactsServiceStorageError, + types::{Contact, Message}, +}; const LOG_TARGET: &str = "contacts::contacts_service::database"; -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Contact { - pub alias: String, - pub address: TariAddress, - pub node_id: NodeId, - pub last_seen: Option, - pub latency: Option, - pub favourite: bool, -} - -impl Contact { - pub fn new( - alias: String, - address: TariAddress, - last_seen: Option, - latency: Option, - favourite: bool, - ) -> Self { - Self { - alias, - node_id: NodeId::from_key(address.public_key()), - address, - last_seen, - latency, - favourite, - } - } -} - /// This trait defines the functionality that a database backend need to provide for the Contacts Service pub trait ContactsBackend: Send + Sync + Clone { /// Retrieve the record associated with the provided DbKey @@ -76,12 +50,15 @@ pub enum DbKey { Contact(TariAddress), ContactId(NodeId), Contacts, + Messages(TariAddress), } pub enum DbValue { Contact(Box), Contacts(Vec), TariAddress(Box), + Message(Box), + Messages(Vec), } #[allow(clippy::large_enum_variant)] @@ -91,6 +68,7 @@ pub enum DbKeyValuePair { } pub enum WriteOperation { + Insert(Box), Upsert(Box), UpdateLastSeen(Box), Remove(DbKey), @@ -177,11 +155,32 @@ where T: ContactsBackend + 'static .ok_or_else(|| ContactsServiceStorageError::ValueNotFound(DbKey::Contact(address.clone())))?; match result { DbValue::Contact(c) => Ok(*c), - DbValue::Contacts(_) | DbValue::TariAddress(_) => Err(ContactsServiceStorageError::UnexpectedResult( + _ => Err(ContactsServiceStorageError::UnexpectedResult( "Incorrect response from backend.".to_string(), )), } } + + pub fn get_messages(&self, address: TariAddress) -> Result, ContactsServiceStorageError> { + let key = DbKey::Messages(address); + let db_clone = self.db.clone(); + match db_clone.fetch(&key) { + Ok(None) => log_error( + key, + ContactsServiceStorageError::UnexpectedResult("Could not retrieve messages".to_string()), + ), + Ok(Some(DbValue::Messages(messages))) => Ok(messages), + Ok(Some(other)) => unexpected_result(key, other), + Err(e) => log_error(key, e), + } + } + + pub fn save_message(&self, message: Message) -> Result<(), ContactsServiceStorageError> { + self.db + .write(WriteOperation::Insert(Box::new(DbValue::Message(Box::new(message)))))?; + + Ok(()) + } } fn unexpected_result(req: DbKey, res: DbValue) -> Result { @@ -196,6 +195,7 @@ impl Display for DbKey { DbKey::Contact(c) => f.write_str(&format!("Contact: {:?}", c)), DbKey::ContactId(id) => f.write_str(&format!("Contact: {:?}", id)), DbKey::Contacts => f.write_str("Contacts"), + DbKey::Messages(c) => f.write_str(&format!("Messages for id: {:?}", c)), } } } @@ -206,6 +206,8 @@ impl Display for DbValue { DbValue::Contact(_) => f.write_str("Contact"), DbValue::Contacts(_) => f.write_str("Contacts"), DbValue::TariAddress(_) => f.write_str("Address"), + DbValue::Messages(_) => f.write_str("Messages"), + DbValue::Message(_) => f.write_str("Message"), } } } diff --git a/base_layer/contacts/src/contacts_service/storage/mod.rs b/base_layer/contacts/src/contacts_service/storage/mod.rs index dd5a7bbc6e..4c3855d1b1 100644 --- a/base_layer/contacts/src/contacts_service/storage/mod.rs +++ b/base_layer/contacts/src/contacts_service/storage/mod.rs @@ -22,3 +22,4 @@ pub mod database; pub mod sqlite_db; +pub mod types; diff --git a/base_layer/contacts/src/contacts_service/storage/sqlite_db.rs b/base_layer/contacts/src/contacts_service/storage/sqlite_db.rs index 3977c69942..d7acb3a530 100644 --- a/base_layer/contacts/src/contacts_service/storage/sqlite_db.rs +++ b/base_layer/contacts/src/contacts_service/storage/sqlite_db.rs @@ -22,24 +22,22 @@ use std::{convert::TryFrom, io::Write, sync::Arc}; -use chrono::NaiveDateTime; -use diesel::{prelude::*, result::Error as DieselError, SqliteConnection}; +use diesel::result::Error as DieselError; use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; -use tari_common_sqlite::{ - error::SqliteStorageError, - sqlite_connection_pool::PooledDbConnection, - util::diesel_ext::ExpectedRowsExtension, -}; +use tari_common_sqlite::{error::SqliteStorageError, sqlite_connection_pool::PooledDbConnection}; use tari_common_types::tari_address::TariAddress; -use tari_comms::peer_manager::NodeId; use tari_utilities::ByteArray; -use crate::{ - contacts_service::{ - error::ContactsServiceStorageError, - storage::database::{Contact, ContactsBackend, DbKey, DbKeyValuePair, DbValue, WriteOperation}, +use crate::contacts_service::{ + error::ContactsServiceStorageError, + storage::{ + database::{ContactsBackend, DbKey, DbKeyValuePair, DbValue, WriteOperation}, + types::{ + contacts::{ContactSql, UpdateContact}, + messages::{MessagesSql, MessagesSqlInsert}, + }, }, - schema::contacts, + types::{Contact, Message}, }; const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); @@ -106,6 +104,16 @@ where TContactServiceDbConnection: PooledDbConnection, _>>()?, )), + DbKey::Messages(address) => match MessagesSql::find_by_address(&address.to_bytes(), &mut conn) { + Ok(messages) => Some(DbValue::Messages( + messages + .iter() + .map(|m| Message::try_from(m.clone()).expect("Couldn't cast MessageSql to Message")) + .collect::>(), + )), + Err(ContactsServiceStorageError::DieselError(DieselError::NotFound)) => None, + Err(e) => return Err(e), + }, }; Ok(result) @@ -162,6 +170,12 @@ where TContactServiceDbConnection: PooledDbConnection return Err(e), }, DbKey::Contacts => return Err(ContactsServiceStorageError::OperationNotSupported), + DbKey::Messages(_pk) => return Err(ContactsServiceStorageError::OperationNotSupported), + }, + WriteOperation::Insert(i) => { + if let DbValue::Message(m) = *i { + MessagesSqlInsert::from(*m).commit(&mut conn)?; + } }, } @@ -169,155 +183,6 @@ where TContactServiceDbConnection: PooledDbConnection, - node_id: Vec, - alias: String, - last_seen: Option, - latency: Option, - favourite: i32, -} - -impl ContactSql { - /// Write this struct to the database - pub fn commit(&self, conn: &mut SqliteConnection) -> Result<(), ContactsServiceStorageError> { - diesel::insert_into(contacts::table) - .values(self.clone()) - .execute(conn)?; - Ok(()) - } - - /// Return all contacts - pub fn index(conn: &mut SqliteConnection) -> Result, ContactsServiceStorageError> { - Ok(contacts::table.load::(conn)?) - } - - /// Find a particular Contact by their address, if it exists - pub fn find_by_address( - address: &[u8], - conn: &mut SqliteConnection, - ) -> Result { - Ok(contacts::table - .filter(contacts::address.eq(address)) - .first::(conn)?) - } - - /// Find a particular Contact by their node ID, if it exists - pub fn find_by_node_id( - node_id: &[u8], - conn: &mut SqliteConnection, - ) -> Result { - Ok(contacts::table - .filter(contacts::node_id.eq(node_id)) - .first::(conn)?) - } - - /// Find a particular Contact by their address, and update it if it exists, returning the affected record - pub fn find_by_address_and_update( - conn: &mut SqliteConnection, - address: &[u8], - updated_contact: UpdateContact, - ) -> Result { - // Note: `get_result` not implemented for SQLite - diesel::update(contacts::table.filter(contacts::address.eq(address))) - .set(updated_contact) - .execute(conn) - .num_rows_affected_or_not_found(1)?; - ContactSql::find_by_address(address, conn) - } - - /// Find a particular Contact by their address, and delete it if it exists, returning the affected record - pub fn find_by_address_and_delete( - conn: &mut SqliteConnection, - address: &[u8], - ) -> Result { - // Note: `get_result` not implemented for SQLite - let contact = ContactSql::find_by_address(address, conn)?; - if diesel::delete(contacts::table.filter(contacts::address.eq(address))).execute(conn)? == 0 { - return Err(ContactsServiceStorageError::ValuesNotFound); - } - Ok(contact) - } - - /// Find a particular Contact by their node ID, and update it if it exists, returning the affected record - pub fn find_by_node_id_and_update( - conn: &mut SqliteConnection, - node_id: &[u8], - updated_contact: UpdateContact, - ) -> Result { - // Note: `get_result` not implemented for SQLite - diesel::update(contacts::table.filter(contacts::node_id.eq(node_id))) - .set(updated_contact) - .execute(conn) - .num_rows_affected_or_not_found(1)?; - ContactSql::find_by_node_id(node_id, conn) - } - - /// Find a particular Contact by their node ID, and delete it if it exists, returning the affected record - pub fn find_by_node_id_and_delete( - conn: &mut SqliteConnection, - node_id: &[u8], - ) -> Result { - // Note: `get_result` not implemented for SQLite - let contact = ContactSql::find_by_node_id(node_id, conn)?; - if diesel::delete(contacts::table.filter(contacts::node_id.eq(node_id))).execute(conn)? == 0 { - return Err(ContactsServiceStorageError::ValuesNotFound); - } - Ok(contact) - } -} - -/// Conversion from an Contact to the Sql datatype form -impl TryFrom for Contact { - type Error = ContactsServiceStorageError; - - #[allow(clippy::cast_sign_loss)] - fn try_from(o: ContactSql) -> Result { - let address = TariAddress::from_bytes(&o.address).map_err(|_| ContactsServiceStorageError::ConversionError)?; - Ok(Self { - // Public key must always be the master data source for node ID here - node_id: NodeId::from_key(address.public_key()), - address, - alias: o.alias, - last_seen: o.last_seen, - latency: o.latency.map(|val| val as u32), - favourite: match o.favourite { - 0 => false, - 1 => true, - _ => return Err(ContactsServiceStorageError::ConversionError), - }, - }) - } -} - -/// Conversion from a Contact to the Sql datatype form -#[allow(clippy::cast_possible_wrap)] -impl From for ContactSql { - fn from(o: Contact) -> Self { - Self { - // Public key must always be the master data source for node ID here - node_id: NodeId::from_key(o.address.public_key()).to_vec(), - address: o.address.to_bytes().to_vec(), - alias: o.alias, - last_seen: o.last_seen, - latency: o.latency.map(|val| val as i32), - favourite: i32::from(o.favourite), - } - } -} - -#[derive(AsChangeset)] -#[diesel(table_name = contacts)] -pub struct UpdateContact { - alias: Option, - last_seen: Option>, - latency: Option>, - favourite: Option, -} - #[cfg(test)] mod test { use std::convert::{TryFrom, TryInto}; @@ -333,9 +198,9 @@ mod test { use tari_test_utils::{paths::with_temp_dir, random::string}; use super::*; - use crate::contacts_service::storage::{ - database::Contact, - sqlite_db::{ContactSql, UpdateContact}, + use crate::contacts_service::{ + storage::types::contacts::{ContactSql, UpdateContact}, + types::Contact, }; #[test] diff --git a/base_layer/contacts/src/contacts_service/storage/types/contacts.rs b/base_layer/contacts/src/contacts_service/storage/types/contacts.rs new file mode 100644 index 0000000000..0878b2c230 --- /dev/null +++ b/base_layer/contacts/src/contacts_service/storage/types/contacts.rs @@ -0,0 +1,184 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::convert::TryFrom; + +use chrono::NaiveDateTime; +use diesel::{prelude::*, SqliteConnection}; +use tari_common_sqlite::util::diesel_ext::ExpectedRowsExtension; +use tari_common_types::tari_address::TariAddress; +use tari_comms::peer_manager::NodeId; +use tari_utilities::ByteArray; + +use crate::{ + contacts_service::{error::ContactsServiceStorageError, types::Contact}, + schema::contacts, +}; + +/// A Sql version of the Contact struct +#[derive(Clone, Debug, Queryable, Insertable, PartialEq, Eq)] +#[diesel(table_name = contacts)] +pub struct ContactSql { + pub address: Vec, + node_id: Vec, + pub alias: String, + last_seen: Option, + latency: Option, + pub favourite: i32, +} + +impl ContactSql { + /// Write this struct to the database + pub fn commit(&self, conn: &mut SqliteConnection) -> Result<(), ContactsServiceStorageError> { + diesel::insert_into(contacts::table) + .values(self.clone()) + .execute(conn)?; + Ok(()) + } + + /// Return all contacts + pub fn index(conn: &mut SqliteConnection) -> Result, ContactsServiceStorageError> { + Ok(contacts::table.load::(conn)?) + } + + /// Find a particular Contact by their address, if it exists + pub fn find_by_address( + address: &[u8], + conn: &mut SqliteConnection, + ) -> Result { + Ok(contacts::table + .filter(contacts::address.eq(address)) + .first::(conn)?) + } + + /// Find a particular Contact by their node ID, if it exists + pub fn find_by_node_id( + node_id: &[u8], + conn: &mut SqliteConnection, + ) -> Result { + Ok(contacts::table + .filter(contacts::node_id.eq(node_id)) + .first::(conn)?) + } + + /// Find a particular Contact by their address, and update it if it exists, returning the affected record + pub fn find_by_address_and_update( + conn: &mut SqliteConnection, + address: &[u8], + updated_contact: UpdateContact, + ) -> Result { + // Note: `get_result` not implemented for SQLite + diesel::update(contacts::table.filter(contacts::address.eq(address))) + .set(updated_contact) + .execute(conn) + .num_rows_affected_or_not_found(1)?; + ContactSql::find_by_address(address, conn) + } + + /// Find a particular Contact by their address, and delete it if it exists, returning the affected record + pub fn find_by_address_and_delete( + conn: &mut SqliteConnection, + address: &[u8], + ) -> Result { + // Note: `get_result` not implemented for SQLite + let contact = ContactSql::find_by_address(address, conn)?; + if diesel::delete(contacts::table.filter(contacts::address.eq(address))).execute(conn)? == 0 { + return Err(ContactsServiceStorageError::ValuesNotFound); + } + Ok(contact) + } + + /// Find a particular Contact by their node ID, and update it if it exists, returning the affected record + pub fn find_by_node_id_and_update( + conn: &mut SqliteConnection, + node_id: &[u8], + updated_contact: UpdateContact, + ) -> Result { + // Note: `get_result` not implemented for SQLite + diesel::update(contacts::table.filter(contacts::node_id.eq(node_id))) + .set(updated_contact) + .execute(conn) + .num_rows_affected_or_not_found(1)?; + ContactSql::find_by_node_id(node_id, conn) + } + + /// Find a particular Contact by their node ID, and delete it if it exists, returning the affected record + pub fn find_by_node_id_and_delete( + conn: &mut SqliteConnection, + node_id: &[u8], + ) -> Result { + // Note: `get_result` not implemented for SQLite + let contact = ContactSql::find_by_node_id(node_id, conn)?; + diesel::delete(contacts::table.filter(contacts::node_id.eq(node_id))) + .execute(conn) + .num_rows_affected_or_not_found(1)?; + Ok(contact) + } +} + +/// Conversion from an Contact to the Sql datatype form +impl TryFrom for Contact { + type Error = ContactsServiceStorageError; + + #[allow(clippy::cast_sign_loss)] + fn try_from(o: ContactSql) -> Result { + let address = TariAddress::from_bytes(&o.address).map_err(|_| ContactsServiceStorageError::ConversionError)?; + Ok(Self { + // Public key must always be the master data source for node ID here + node_id: NodeId::from_key(address.public_key()), + address, + alias: o.alias, + last_seen: o.last_seen, + latency: o.latency.map(|val| val as u32), + favourite: match o.favourite { + 0 => false, + 1 => true, + _ => return Err(ContactsServiceStorageError::ConversionError), + }, + }) + } +} + +/// Conversion from a Contact to the Sql datatype form +#[allow(clippy::cast_possible_wrap)] +impl From for ContactSql { + fn from(o: Contact) -> Self { + Self { + // Public key must always be the master data source for node ID here + node_id: NodeId::from_key(o.address.public_key()).to_vec(), + address: o.address.to_bytes().to_vec(), + alias: o.alias, + last_seen: o.last_seen, + latency: o.latency.map(|val| val as i32), + favourite: i32::from(o.favourite), + } + } +} + +#[derive(AsChangeset)] +#[diesel(table_name = contacts)] +pub struct UpdateContact { + pub alias: Option, + pub last_seen: Option>, + pub latency: Option>, + pub favourite: Option, +} diff --git a/base_layer/contacts/src/contacts_service/storage/types/messages.rs b/base_layer/contacts/src/contacts_service/storage/types/messages.rs new file mode 100644 index 0000000000..ad4426d86b --- /dev/null +++ b/base_layer/contacts/src/contacts_service/storage/types/messages.rs @@ -0,0 +1,112 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::convert::TryFrom; + +use chrono::NaiveDateTime; +use diesel::{prelude::*, SqliteConnection}; +use tari_common_types::tari_address::TariAddress; + +use crate::{ + contacts_service::{ + error::ContactsServiceStorageError, + types::{Direction, Message}, + }, + schema::messages, +}; + +/// A Sql version of the Contact struct +#[derive(Clone, Debug, Insertable, PartialEq, Eq)] +#[diesel(table_name = messages)] +#[diesel(primary_key(message_id))] +pub struct MessagesSqlInsert { + pub address: Vec, + pub body: Vec, + pub direction: i32, + pub stored_at: NaiveDateTime, + pub message_id: Vec, +} + +#[derive(Clone, Debug, Queryable, PartialEq, Eq, QueryableByName)] +#[diesel(table_name = messages)] +#[diesel(primary_key(message_id))] +pub struct MessagesSql { + pub address: Vec, + pub message_id: Vec, + pub body: Vec, + pub stored_at: NaiveDateTime, + pub direction: i32, +} + +impl MessagesSqlInsert { + /// Write this struct to the database + pub fn commit(&self, conn: &mut SqliteConnection) -> Result<(), ContactsServiceStorageError> { + diesel::insert_into(messages::table) + .values(self.clone()) + .execute(conn)?; + Ok(()) + } +} + +impl MessagesSql { + /// Find a particular message by their address, if it exists + pub fn find_by_address( + address: &[u8], + conn: &mut SqliteConnection, + ) -> Result, ContactsServiceStorageError> { + Ok(messages::table + .filter(messages::address.eq(address)) + .load::(conn)?) + } +} + +/// Conversion from an Message to the Sql datatype form +impl TryFrom for Message { + type Error = ContactsServiceStorageError; + + #[allow(clippy::cast_sign_loss)] + fn try_from(o: MessagesSql) -> Result { + let address = TariAddress::from_bytes(&o.address).map_err(|_| ContactsServiceStorageError::ConversionError)?; + Ok(Self { + address, + direction: Direction::from_byte(o.direction as u8) + .unwrap_or_else(|| panic!("Direction from byte {}", o.direction)), + stored_at: o.stored_at.timestamp() as u64, + body: o.body, + message_id: o.message_id, + }) + } +} + +/// Conversion from a Contact to the Sql datatype form +#[allow(clippy::cast_possible_wrap)] +impl From for MessagesSqlInsert { + fn from(o: Message) -> Self { + Self { + address: o.address.to_bytes().to_vec(), + direction: i32::from(o.direction.as_byte()), + stored_at: NaiveDateTime::from_timestamp_opt(o.stored_at as i64, 0).unwrap(), + body: o.body, + message_id: o.message_id, + } + } +} diff --git a/base_layer/contacts/src/contacts_service/storage/types/mod.rs b/base_layer/contacts/src/contacts_service/storage/types/mod.rs new file mode 100644 index 0000000000..eeb833bfa9 --- /dev/null +++ b/base_layer/contacts/src/contacts_service/storage/types/mod.rs @@ -0,0 +1,24 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +pub mod contacts; +pub mod messages; diff --git a/base_layer/contacts/src/contacts_service/types/contact.rs b/base_layer/contacts/src/contacts_service/types/contact.rs new file mode 100644 index 0000000000..19153a786c --- /dev/null +++ b/base_layer/contacts/src/contacts_service/types/contact.rs @@ -0,0 +1,67 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use chrono::NaiveDateTime; +use tari_common_types::tari_address::TariAddress; +use tari_comms::peer_manager::NodeId; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Contact { + pub alias: String, + pub address: TariAddress, + pub node_id: NodeId, + pub last_seen: Option, + pub latency: Option, + pub favourite: bool, +} + +impl Contact { + pub fn new( + alias: String, + address: TariAddress, + last_seen: Option, + latency: Option, + favourite: bool, + ) -> Self { + Self { + alias, + node_id: NodeId::from_key(address.public_key()), + address, + last_seen, + latency, + favourite, + } + } +} + +impl From<&TariAddress> for Contact { + fn from(address: &TariAddress) -> Self { + Self { + alias: address.to_emoji_string(), + address: address.clone(), + node_id: NodeId::from_key(address.public_key()), + last_seen: None, + latency: None, + favourite: false, + } + } +} diff --git a/base_layer/contacts/src/contacts_service/types/message.rs b/base_layer/contacts/src/contacts_service/types/message.rs new file mode 100644 index 0000000000..957009ed9b --- /dev/null +++ b/base_layer/contacts/src/contacts_service/types/message.rs @@ -0,0 +1,93 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use num_derive::FromPrimitive; +use num_traits::FromPrimitive; +use tari_common_types::tari_address::TariAddress; +use tari_comms_dht::domain_message::OutboundDomainMessage; +use tari_p2p::tari_message::TariMessageType; +use tari_utilities::ByteArray; + +use crate::contacts_service::proto; + +#[derive(Clone, Debug, Default)] +pub struct Message { + pub body: Vec, + pub address: TariAddress, + pub direction: Direction, + pub stored_at: u64, + pub message_id: Vec, +} + +#[repr(u8)] +#[derive(FromPrimitive, Debug, Copy, Clone)] +pub enum Direction { + Inbound = 0, + Outbound = 1, +} + +impl Direction { + pub fn as_byte(self) -> u8 { + self as u8 + } + + pub fn from_byte(value: u8) -> Option { + FromPrimitive::from_u8(value) + } +} + +impl Default for Direction { + fn default() -> Self { + Self::Outbound + } +} + +impl From for Message { + fn from(message: proto::Message) -> Self { + Self { + body: message.body, + address: TariAddress::from_bytes(&message.address).expect("Couldn't parse address"), + // A Message from a proto::Message will always be an inbound message + direction: Direction::Inbound, + stored_at: message.stored_at, + message_id: message.message_id, + } + } +} + +impl From for proto::Message { + fn from(message: Message) -> Self { + Self { + body: message.body, + address: message.address.to_bytes().to_vec(), + direction: i32::from(message.direction.as_byte()), + stored_at: message.stored_at, + message_id: message.message_id, + } + } +} + +impl From for OutboundDomainMessage { + fn from(message: Message) -> Self { + Self::new(&TariMessageType::Chat, message.into()) + } +} diff --git a/base_layer/contacts/src/contacts_service/types/message_builder.rs b/base_layer/contacts/src/contacts_service/types/message_builder.rs new file mode 100644 index 0000000000..96e95ee3e5 --- /dev/null +++ b/base_layer/contacts/src/contacts_service/types/message_builder.rs @@ -0,0 +1,68 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use tari_common_types::tari_address::TariAddress; +use tari_utilities::ByteArray; +use uuid::Uuid; + +use crate::contacts_service::types::Message; + +#[derive(Clone, Debug, Default)] +pub struct MessageBuilder { + inner: Message, +} + +impl MessageBuilder { + pub fn new() -> Self { + let message_id = Uuid::new_v4().into_bytes().to_vec(); + + Self { + inner: Message { + message_id, + ..Message::default() + }, + } + } + + pub fn address(&self, address: TariAddress) -> Self { + Self { + inner: Message { + address, + ..self.inner.clone() + }, + } + } + + pub fn message(&self, body: String) -> Self { + let body = body.into_bytes(); + Self { + inner: Message { + body, + ..self.inner.clone() + }, + } + } + + pub fn build(&self) -> Message { + self.inner.clone() + } +} diff --git a/base_layer/contacts/src/contacts_service/types/mod.rs b/base_layer/contacts/src/contacts_service/types/mod.rs new file mode 100644 index 0000000000..89b83734b5 --- /dev/null +++ b/base_layer/contacts/src/contacts_service/types/mod.rs @@ -0,0 +1,30 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +mod contact; +pub use contact::Contact; + +mod message; +pub use message::{Direction, Message}; + +mod message_builder; +pub use message_builder::MessageBuilder; diff --git a/base_layer/contacts/src/schema.rs b/base_layer/contacts/src/schema.rs index 86b35c8fb9..25f9d3e061 100644 --- a/base_layer/contacts/src/schema.rs +++ b/base_layer/contacts/src/schema.rs @@ -10,3 +10,13 @@ diesel::table! { favourite -> Integer, } } + +diesel::table! { + messages (message_id) { + address -> Binary, + message_id -> Binary, + body -> Binary, + stored_at -> Timestamp, + direction -> Integer, + } +} diff --git a/base_layer/contacts/tests/contacts_service.rs b/base_layer/contacts/tests/contacts_service.rs index 29922b223f..16f5f3d76c 100644 --- a/base_layer/contacts/tests/contacts_service.rs +++ b/base_layer/contacts/tests/contacts_service.rs @@ -32,9 +32,10 @@ use tari_contacts::contacts_service::{ error::{ContactsServiceError, ContactsServiceStorageError}, handle::ContactsServiceHandle, storage::{ - database::{Contact, ContactsBackend, DbKey}, + database::{ContactsBackend, DbKey}, sqlite_db::ContactsServiceSqliteDatabase, }, + types::Contact, ContactsServiceInitializer, }; use tari_crypto::keys::PublicKey as PublicKeyTrait; @@ -92,7 +93,7 @@ pub fn setup_contacts_service( allow_test_addresses: true, listener_liveness_allowlist_cidrs: StringList::new(), listener_liveness_max_sessions: 0, - user_agent: "tari/test-wallet".to_string(), + user_agent: "tari/test-contacts-service".to_string(), rpc_max_simultaneous_sessions: 0, rpc_max_sessions_per_peer: 0, listener_liveness_check_interval: None, @@ -114,9 +115,14 @@ pub fn setup_contacts_service( max_allowed_ping_failures: 0, // Peer with failed ping-pong will never be removed ..Default::default() }, + peer_message_subscription_factory.clone(), + )) + .add_initializer(ContactsServiceInitializer::new( + backend, peer_message_subscription_factory, + Duration::from_secs(5), + 2, )) - .add_initializer(ContactsServiceInitializer::new(backend, Duration::from_secs(5), 2)) .build(); let handles = runtime.block_on(fut).expect("Service initialization failed"); diff --git a/base_layer/core/src/base_node/service/service.rs b/base_layer/core/src/base_node/service/service.rs index 5622e6a5f4..0ff14288ea 100644 --- a/base_layer/core/src/base_node/service/service.rs +++ b/base_layer/core/src/base_node/service/service.rs @@ -391,7 +391,7 @@ async fn handle_incoming_request( ); let send_message_response = outbound_message_service - .send_direct( + .send_direct_unencrypted( origin_public_key, OutboundDomainMessage::new(&TariMessageType::BaseNodeResponse, message), "Outbound response message from base node".to_string(), diff --git a/base_layer/core/tests/mempool.rs b/base_layer/core/tests/mempool.rs index a0cc0be8e9..b1dc9aa9c5 100644 --- a/base_layer/core/tests/mempool.rs +++ b/base_layer/core/tests/mempool.rs @@ -854,7 +854,7 @@ async fn receive_and_propagate_transaction() { alice_node .outbound_message_service - .send_direct( + .send_direct_unencrypted( bob_node.node_identity.public_key().clone(), OutboundDomainMessage::new( &TariMessageType::NewTransaction, @@ -866,7 +866,7 @@ async fn receive_and_propagate_transaction() { .unwrap(); alice_node .outbound_message_service - .send_direct( + .send_direct_unencrypted( carol_node.node_identity.public_key().clone(), OutboundDomainMessage::new( &TariMessageType::NewTransaction, diff --git a/base_layer/p2p/src/proto/message_type.proto b/base_layer/p2p/src/proto/message_type.proto index 7363b2e23d..3073a95550 100644 --- a/base_layer/p2p/src/proto/message_type.proto +++ b/base_layer/p2p/src/proto/message_type.proto @@ -13,6 +13,7 @@ enum TariMessageType { // -- NetMessages -- TariMessageTypePingPong = 1; + TariMessageTypeChat = 2; // -- Blockchain messages -- diff --git a/base_layer/p2p/src/services/liveness/service.rs b/base_layer/p2p/src/services/liveness/service.rs index 0844872272..2d63c27eb6 100644 --- a/base_layer/p2p/src/services/liveness/service.rs +++ b/base_layer/p2p/src/services/liveness/service.rs @@ -226,7 +226,7 @@ where async fn send_pong(&mut self, nonce: u64, dest: CommsPublicKey) -> Result<(), LivenessError> { let msg = PingPongMessage::pong_with_metadata(nonce, self.state.metadata().clone()); self.outbound_messaging - .send_direct( + .send_direct_unencrypted( dest, OutboundDomainMessage::new(&TariMessageType::PingPong, msg), "Sending pong".to_string(), diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs index 3c475f2df2..794dc8d86d 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs @@ -665,7 +665,7 @@ where match self .resources .outbound_message_service - .send_direct( + .send_direct_unencrypted( self.dest_address.public_key().clone(), OutboundDomainMessage::new(&TariMessageType::SenderPartialTransaction, proto_message.clone()), "transaction send".to_string(), diff --git a/base_layer/wallet/src/transaction_service/tasks/send_finalized_transaction.rs b/base_layer/wallet/src/transaction_service/tasks/send_finalized_transaction.rs index 66ca04aab2..2b438de1e5 100644 --- a/base_layer/wallet/src/transaction_service/tasks/send_finalized_transaction.rs +++ b/base_layer/wallet/src/transaction_service/tasks/send_finalized_transaction.rs @@ -108,7 +108,7 @@ pub async fn send_finalized_transaction_message_direct( let mut store_and_forward_send_result = false; let mut direct_send_result = false; match outbound_message_service - .send_direct( + .send_direct_unencrypted( destination_public_key.clone(), OutboundDomainMessage::new( &TariMessageType::TransactionFinalized, diff --git a/base_layer/wallet/src/transaction_service/tasks/send_transaction_cancelled.rs b/base_layer/wallet/src/transaction_service/tasks/send_transaction_cancelled.rs index 5cac558ee4..da51dd654c 100644 --- a/base_layer/wallet/src/transaction_service/tasks/send_transaction_cancelled.rs +++ b/base_layer/wallet/src/transaction_service/tasks/send_transaction_cancelled.rs @@ -40,7 +40,7 @@ pub async fn send_transaction_cancelled_message( // Send both direct and SAF we are not going to monitor the progress on these messages for potential resend as // they are just courtesy messages let _send_message_response = outbound_message_service - .send_direct( + .send_direct_unencrypted( destination_public_key.clone(), OutboundDomainMessage::new(&TariMessageType::TransactionCancelled, proto_message.clone()), "transaction cancelled".to_string(), diff --git a/base_layer/wallet/src/transaction_service/tasks/send_transaction_reply.rs b/base_layer/wallet/src/transaction_service/tasks/send_transaction_reply.rs index cb822904a5..462ba452e1 100644 --- a/base_layer/wallet/src/transaction_service/tasks/send_transaction_reply.rs +++ b/base_layer/wallet/src/transaction_service/tasks/send_transaction_reply.rs @@ -96,7 +96,7 @@ pub async fn send_transaction_reply_direct( .try_into() .map_err(TransactionServiceError::ServiceError)?; match outbound_message_service - .send_direct( + .send_direct_unencrypted( inbound_transaction.source_address.public_key().clone(), OutboundDomainMessage::new(&TariMessageType::ReceiverPartialTransactionReply, proto_message.clone()), "wallet transaction reply".to_string(), diff --git a/base_layer/wallet/src/wallet.rs b/base_layer/wallet/src/wallet.rs index ad56e0594c..e16c78db68 100644 --- a/base_layer/wallet/src/wallet.rs +++ b/base_layer/wallet/src/wallet.rs @@ -208,10 +208,11 @@ where max_allowed_ping_failures: 0, // Peer with failed ping-pong will never be removed ..Default::default() }, - peer_message_subscription_factory, + peer_message_subscription_factory.clone(), )) .add_initializer(ContactsServiceInitializer::new( contacts_backend, + peer_message_subscription_factory, config.contacts_auto_ping_interval, config.contacts_online_ping_window, )) diff --git a/base_layer/wallet/tests/wallet.rs b/base_layer/wallet/tests/wallet.rs index 4a5e9a33c3..6a64d3d0f7 100644 --- a/base_layer/wallet/tests/wallet.rs +++ b/base_layer/wallet/tests/wallet.rs @@ -43,7 +43,8 @@ use tari_comms_dht::{store_forward::SafConfig, DhtConfig}; use tari_contacts::contacts_service::{ handle::ContactsLivenessEvent, service::ContactMessageType, - storage::{database::Contact, sqlite_db::ContactsServiceSqliteDatabase}, + storage::sqlite_db::ContactsServiceSqliteDatabase, + types::Contact, }; use tari_core::{ consensus::ConsensusManager, diff --git a/base_layer/wallet_ffi/src/callback_handler.rs b/base_layer/wallet_ffi/src/callback_handler.rs index 3265520b29..44b625ab30 100644 --- a/base_layer/wallet_ffi/src/callback_handler.rs +++ b/base_layer/wallet_ffi/src/callback_handler.rs @@ -379,7 +379,7 @@ where TBackend: TransactionBackend + 'static ); self.trigger_contacts_refresh(data.deref().clone()); } - ContactsLivenessEvent::NetworkSilence => {} + ContactsLivenessEvent::NetworkSilence => {}, } } Err(broadcast::error::RecvError::Lagged(n)) => { diff --git a/base_layer/wallet_ffi/src/callback_handler_tests.rs b/base_layer/wallet_ffi/src/callback_handler_tests.rs index 0fc474ca4c..f5fc269f03 100644 --- a/base_layer/wallet_ffi/src/callback_handler_tests.rs +++ b/base_layer/wallet_ffi/src/callback_handler_tests.rs @@ -25,7 +25,7 @@ mod test { use tari_contacts::contacts_service::{ handle::{ContactsLivenessData, ContactsLivenessEvent}, service::{ContactMessageType, ContactOnlineStatus}, - storage::database::Contact, + types::Contact, }; use tari_core::transactions::{ tari_amount::{uT, MicroTari}, diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 0ebbbb04cd..fa7794b7fe 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -100,7 +100,7 @@ use tari_comms::{ types::CommsPublicKey, }; use tari_comms_dht::{store_forward::SafConfig, DbConnectionUrl, DhtConfig}; -use tari_contacts::contacts_service::storage::database::Contact; +use tari_contacts::contacts_service::types::Contact; use tari_core::{ borsh::FromBytes, consensus::ConsensusManager, @@ -210,7 +210,7 @@ pub struct TariUnblindedOutputs(Vec); pub struct TariContacts(Vec); -pub type TariContact = tari_contacts::contacts_service::storage::database::Contact; +pub type TariContact = tari_contacts::contacts_service::types::Contact; pub type TariCompletedTransaction = tari_wallet::transaction_service::storage::models::CompletedTransaction; pub type TariTransactionSendStatus = tari_wallet::transaction_service::handle::TransactionSendStatus; pub type TariFeePerGramStats = tari_wallet::transaction_service::handle::FeePerGramStatsResponse; diff --git a/comms/dht/src/outbound/requester.rs b/comms/dht/src/outbound/requester.rs index 0ac3e2e619..33f6bd4568 100644 --- a/comms/dht/src/outbound/requester.rs +++ b/comms/dht/src/outbound/requester.rs @@ -51,7 +51,34 @@ impl OutboundMessageRequester { } /// Send directly to a peer. If the peer does not exist in the peer list, a discovery will be initiated. - pub async fn send_direct( + pub async fn send_direct_encrypted( + &mut self, + dest_public_key: CommsPublicKey, + message: OutboundDomainMessage, + encryption: OutboundEncryption, + source_info: String, + ) -> Result + where + T: prost::Message, + { + self.send_message( + SendMessageParams::new() + .with_debug_info(format!("Send direct to {} from {}", &dest_public_key, source_info)) + .direct_public_key(dest_public_key.clone()) + .with_discovery(true) + .with_encryption(encryption) + .with_destination(dest_public_key.into()) + .finish(), + message, + ) + .await? + .resolve() + .await + .map_err(Into::into) + } + + /// Send directly to a peer unencrypted. If the peer does not exist in the peer list, a discovery will be initiated. + pub async fn send_direct_unencrypted( &mut self, dest_public_key: CommsPublicKey, message: OutboundDomainMessage, diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index 3c887fe03b..5a120acb27 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -11,11 +11,14 @@ tari_app_grpc = { path = "../applications/tari_app_grpc" } tari_app_utilities = { path = "../applications/tari_app_utilities" } tari_base_node = { path = "../applications/tari_base_node" } tari_base_node_grpc_client = { path = "../clients/rust/base_node_grpc_client" } +tari_chat_client = { path = "tests/utils/chat_client" } tari_common = { path = "../common" } +tari_common_sqlite = { path = "../common_sqlite" } tari_common_types = { path = "../base_layer/common_types" } tari_comms = { path = "../comms/core" } tari_comms_dht = { path = "../comms/dht" } tari_console_wallet = { path = "../applications/tari_console_wallet" } +tari_contacts = { path = "../base_layer/contacts" } tari_core = { path = "../base_layer/core" } tari_merge_mining_proxy = { path = "../applications/tari_merge_mining_proxy" } tari_miner = { path = "../applications/tari_miner" } @@ -24,8 +27,8 @@ tari_script = { path = "../infrastructure/tari_script" } tari_shutdown = { path = "../infrastructure/shutdown" } tari_utilities = "0.4.10" tari_wallet = { path = "../base_layer/wallet" } -tari_wallet_grpc_client = { path = "../clients/rust/wallet_grpc_client" } tari_wallet_ffi = { path = "../base_layer/wallet_ffi" } +tari_wallet_grpc_client = { path = "../clients/rust/wallet_grpc_client" } anyhow = "1.0.53" async-trait = "0.1.50" diff --git a/integration_tests/log4rs/base_node.yml b/integration_tests/log4rs/base_node.yml index bba162e0e9..094df031c2 100644 --- a/integration_tests/log4rs/base_node.yml +++ b/integration_tests/log4rs/base_node.yml @@ -109,6 +109,11 @@ loggers: level: debug appenders: - network + # Route log events sent to the "contacts" logger to the "network" appender + contacts: + level: debug + appenders: + - network # Route log events sent to the "p2p" logger to the "network" appender p2p: level: debug diff --git a/integration_tests/log4rs/cucumber.yml b/integration_tests/log4rs/cucumber.yml index f5a1c8263d..c79b190338 100644 --- a/integration_tests/log4rs/cucumber.yml +++ b/integration_tests/log4rs/cucumber.yml @@ -63,6 +63,21 @@ appenders: pattern: "{{log_dir}}/log/wallet.{}.log" encoder: pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{X(grpc)}] {f}.{L} {i} [{t}] {l:5} {m}{n}" + base_layer_contacts: + kind: rolling_file + path: "{{log_dir}}/log/contacts.log" + policy: + kind: compound + trigger: + kind: size + limit: 100mb + roller: + kind: fixed_window + base: 1 + count: 10 + pattern: "{{log_dir}}/log/contacts.{}.log" + encoder: + pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{X(grpc)}] {f}.{L} {i} [{t}] {l:5} {m}{n}" # An appender named "other" that writes to a file with a custom pattern encoder other: kind: rolling_file @@ -79,6 +94,7 @@ appenders: pattern: "{{log_dir}}/log/other.{}.log" encoder: pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{X(grpc)}] {f}.{L} {i} [{t}] {l:5} {m}{n}" + # We don't want prints during cucumber test, everything useful will in logs. # root: # level: warn @@ -114,6 +130,11 @@ loggers: level: debug appenders: - network + # Route log events sent to the "contacts" logger to the "network" appender + contacts: + level: debug + appenders: + - base_layer_contacts # Route log events sent to the "p2p" logger to the "network" appender p2p: level: debug diff --git a/integration_tests/log4rs/wallet.yml b/integration_tests/log4rs/wallet.yml index e42b0ba386..c7c805f7d8 100644 --- a/integration_tests/log4rs/wallet.yml +++ b/integration_tests/log4rs/wallet.yml @@ -69,6 +69,23 @@ appenders: encoder: pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] {l:5} {m}{n}" + # An appender named "contacts" that writes to a file with a custom pattern encoder + contacts: + kind: rolling_file + path: "log/wallet/contacts.log" + policy: + kind: compound + trigger: + kind: size + limit: 10mb + roller: + kind: fixed_window + base: 1 + count: 5 + pattern: "log/wallet/contacts.{}.log" + encoder: + pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [Thread:{I}] {l:5} {m}{n}" + # Set the default logging level to "warn" and attach the "stdout" appender to the root root: level: warn @@ -93,6 +110,11 @@ loggers: appenders: - network additive: false + contacts: + level: debug + appenders: + - contacts + additive: false comms::noise: level: error appenders: diff --git a/integration_tests/tests/cucumber.rs b/integration_tests/tests/cucumber.rs index dfcad43f9b..f14bb80f83 100644 --- a/integration_tests/tests/cucumber.rs +++ b/integration_tests/tests/cucumber.rs @@ -38,18 +38,19 @@ use cucumber::{event::ScenarioFinished, gherkin::Scenario, given, then, when, Wo use futures::StreamExt; use indexmap::IndexMap; use log::*; -use rand::Rng; +use rand::{rngs::OsRng, Rng}; use serde_json::Value; use tari_app_grpc::tari_rpc::{self as grpc}; use tari_app_utilities::utilities::UniPublicKey; use tari_base_node::BaseNodeConfig; use tari_base_node_grpc_client::grpc::{GetBlocksRequest, ListHeadersRequest}; +use tari_chat_client::Client; use tari_common::{configuration::Network, initialize_logging}; use tari_common_types::{ tari_address::TariAddress, types::{BlindingFactor, ComAndPubSignature, Commitment, PrivateKey, PublicKey}, }; -use tari_comms::multiaddr::Multiaddr; +use tari_comms::{multiaddr::Multiaddr, peer_manager::PeerFeatures, NodeIdentity}; use tari_console_wallet::{ BurnTariArgs, CliCommands, @@ -61,6 +62,7 @@ use tari_console_wallet::{ SetBaseNodeArgs, WhoisArgs, }; +use tari_contacts::contacts_service::service::ContactOnlineStatus; use tari_core::{ blocks::Block, consensus::ConsensusManager, @@ -100,8 +102,9 @@ use thiserror::Error; use tokio::runtime::Runtime; use crate::utils::{ - base_node_process::{spawn_base_node, spawn_base_node_with_config, BaseNodeProcess}, + base_node_process::{get_base_dir, spawn_base_node, spawn_base_node_with_config, BaseNodeProcess}, get_peer_addresses, + get_port, merge_mining_proxy::{register_merge_mining_proxy_process, MergeMiningProxyProcess}, miner::{ mine_block, @@ -147,6 +150,7 @@ pub struct TariWorld { miners: IndexMap, ffi_wallets: IndexMap, wallets: IndexMap, + chat_clients: IndexMap, merge_mining_proxies: IndexMap, transactions: IndexMap, wallet_addresses: IndexMap, // values are strings representing tari addresses @@ -277,10 +281,11 @@ impl TariWorld { pub async fn after(&mut self, _scenario: &Scenario) { self.base_nodes.clear(); - self.seed_nodes.clear(); - self.wallets.clear(); + self.chat_clients.clear(); self.ffi_wallets.clear(); self.miners.clear(); + self.seed_nodes.clear(); + self.wallets.clear(); } } @@ -4980,6 +4985,107 @@ async fn merge_mining_ask_for_block_header_by_hash(world: &mut TariWorld, mining world.last_merge_miner_response = merge_miner.get_block_header_by_hash(hash).await; } +#[when(expr = "I have a chat client {word} connected to seed node {word}")] +async fn chat_client_connected_to_base_node(world: &mut TariWorld, name: String, seed_node_name: String) { + let base_node = world.get_node(&seed_node_name).unwrap(); + + let port = get_port(18000..18499).unwrap(); + let temp_dir_path = get_base_dir() + .join("chat_clients") + .join(format!("port_{}", port)) + .join(name.clone()); + let address = Multiaddr::from_str(&format!("/ip4/127.0.0.1/tcp/{}", port)).unwrap(); + let identity = NodeIdentity::random(&mut OsRng, address, PeerFeatures::COMMUNICATION_NODE); + + let mut client = Client::new( + identity, + vec![base_node.identity.to_peer()], + temp_dir_path, + Network::LocalNet, + ); + client.initialize().await; + + world.chat_clients.insert(name, client); +} + +#[when(expr = "I have a chat client {word} with no peers")] +async fn chat_client_with_no_peers(world: &mut TariWorld, name: String) { + let port = get_port(18000..18499).unwrap(); + let temp_dir_path = get_base_dir() + .join("chat_clients") + .join(format!("port_{}", port)) + .join(name.clone()); + let address = Multiaddr::from_str(&format!("/ip4/127.0.0.1/tcp/{}", port)).unwrap(); + let identity = NodeIdentity::random(&mut OsRng, address, PeerFeatures::COMMUNICATION_NODE); + + let mut client = Client::new(identity, vec![], temp_dir_path, Network::LocalNet); + client.initialize().await; + + world.chat_clients.insert(name, client); +} + +#[when(regex = r"^I use (.+) to send a message '(.+)' to (.*)$")] +async fn send_message_to(world: &mut TariWorld, sender: String, message: String, receiver: String) { + let sender = world.chat_clients.get(&sender).unwrap(); + let receiver = world.chat_clients.get(&receiver).unwrap(); + let address = TariAddress::from_public_key(receiver.identity.public_key(), Network::LocalNet); + + sender.send_message(address, message).await; +} + +#[then(expr = "{word} will have {int} message(s) with {word}")] +async fn receive_n_messages(world: &mut TariWorld, receiver: String, message_count: u64, sender: String) { + let receiver: &Client = world.chat_clients.get(&receiver).unwrap(); + let sender = world.chat_clients.get(&sender).unwrap(); + let address = TariAddress::from_public_key(sender.identity.public_key(), Network::LocalNet); + + let mut messages = vec![]; + for _ in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) { + messages = receiver.get_messages(&address).await; + + if messages.len() as u64 == message_count { + return; + } + + tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + } + + panic!( + "Receiver {} only received {}/{} messages", + receiver.identity.node_id(), + messages.len(), + message_count + ) +} + +#[when(expr = "{word} adds {word} as a contact")] +async fn add_as_contact(world: &mut TariWorld, sender: String, receiver: String) { + let receiver: &Client = world.chat_clients.get(&receiver).unwrap(); + let sender: &Client = world.chat_clients.get(&sender).unwrap(); + + let address = TariAddress::from_public_key(receiver.identity.public_key(), Network::LocalNet); + + sender.add_contact(&address).await; +} + +#[when(expr = "{word} waits for contact {word} to be online")] +async fn wait_for_contact_to_be_online(world: &mut TariWorld, client: String, contact: String) { + let client: &Client = world.chat_clients.get(&client).unwrap(); + let contact: &Client = world.chat_clients.get(&contact).unwrap(); + + let address = TariAddress::from_public_key(contact.identity.public_key(), Network::LocalNet); + + for _ in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) { + if ContactOnlineStatus::Online == client.check_online_status(&address).await { + return; + } + + tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + } + + panic!("Contact {} never came online", contact.identity.node_id(),) +} + fn flush_stdout(buffer: &Arc>>) { // After each test we flush the stdout to the logs. info!( diff --git a/integration_tests/tests/features/Chat.feature b/integration_tests/tests/features/Chat.feature new file mode 100644 index 0000000000..f18d34031f --- /dev/null +++ b/integration_tests/tests/features/Chat.feature @@ -0,0 +1,50 @@ +# Copyright 2023 The Tari Project +# SPDX-License-Identifier: BSD-3-Clause + +Feature: Chat messaging + + Scenario: A message is propagated between nodes via 3rd party + Given I have a seed node SEED_A + When I have a chat client CHAT_A connected to seed node SEED_A + When I have a chat client CHAT_B connected to seed node SEED_A + When I use CHAT_A to send a message 'Hey there' to CHAT_B + Then CHAT_B will have 1 message with CHAT_A + + Scenario: A message is sent directly between nodes + Given I have a seed node SEED_A + When I have a chat client CHAT_A connected to seed node SEED_A + When I have a chat client CHAT_B connected to seed node SEED_A + When CHAT_A adds CHAT_B as a contact + When I stop node SEED_A + When CHAT_A waits for contact CHAT_B to be online + When I use CHAT_A to send a message 'Hey there' to CHAT_B + Then CHAT_B will have 1 message with CHAT_A + + Scenario: Message counts are distinct + Given I have a seed node SEED_A + When I have a chat client CHAT_A connected to seed node SEED_A + When I have a chat client CHAT_B connected to seed node SEED_A + When I have a chat client CHAT_C connected to seed node SEED_A + + When CHAT_A adds CHAT_B as a contact + When CHAT_A adds CHAT_C as a contact + When CHAT_B adds CHAT_C as a contact + When CHAT_C adds CHAT_B as a contact + When I stop node SEED_A + + When I use CHAT_A to send a message 'Message 1 from a to b' to CHAT_B + When I use CHAT_A to send a message 'Message 2 from a to b' to CHAT_B + When I use CHAT_A to send a message 'Message 1 from a to c' to CHAT_C + + When I use CHAT_B to send a message 'Message 1 from b to c' to CHAT_C + When I use CHAT_B to send a message 'Message 2 from b to c' to CHAT_C + + When I use CHAT_C to send a message 'Message 1 from c to b' to CHAT_B + + Then CHAT_B will have 2 messages with CHAT_A + Then CHAT_B will have 3 messages with CHAT_C + Then CHAT_C will have 1 messages with CHAT_A + Then CHAT_C will have 3 messages with CHAT_B + Then CHAT_A will have 2 messages with CHAT_B + Then CHAT_A will have 1 messages with CHAT_C + diff --git a/integration_tests/tests/utils/base_node_process.rs b/integration_tests/tests/utils/base_node_process.rs index b96b0e56cc..930a3bd2bb 100644 --- a/integration_tests/tests/utils/base_node_process.rs +++ b/integration_tests/tests/utils/base_node_process.rs @@ -46,6 +46,7 @@ use crate::{ TariWorld, }; +#[derive(Clone)] pub struct BaseNodeProcess { pub name: String, pub port: u64, @@ -180,11 +181,7 @@ pub async fn spawn_base_node_with_config( .tcp .listener_address .clone()]); - // base_node_config.base_node.p2p.datastore_path = temp_dir_path.to_path_buf(); - // base_node_config.base_node.p2p.peer_database_name = "peer_db.mdb".to_string(); base_node_config.base_node.p2p.dht = DhtConfig::default_local_test(); - // base_node_config.base_node.p2p.dht.database_url = - // DbConnectionUrl::File(temp_dir_path.clone().join("dht.sqlite")); base_node_config.base_node.p2p.dht.network_discovery.enabled = true; base_node_config.base_node.p2p.allow_test_addresses = true; base_node_config.base_node.storage.orphan_storage_capacity = 10; diff --git a/integration_tests/tests/utils/chat_client/Cargo.toml b/integration_tests/tests/utils/chat_client/Cargo.toml new file mode 100644 index 0000000000..e3bf6ebbfc --- /dev/null +++ b/integration_tests/tests/utils/chat_client/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "tari_chat_client" +authors = ["The Tari Development Community"] +description = "Tari cucumber chat client" +license = "BSD-3-Clause" +version = "0.48.0-pre.1" +edition = "2018" + +[dependencies] +tari_common = { path = "../../../../common" } +tari_common_sqlite = { path = "../../../../common_sqlite" } +tari_common_types = { path = "../../../../base_layer/common_types" } +tari_comms = { path = "../../../../comms/core" } +tari_comms_dht = { path = "../../../../comms/dht" } +tari_contacts = { path = "../../../../base_layer/contacts" } +tari_p2p = { path = "../../../../base_layer/p2p" } +tari_service_framework= { path = "../../../../base_layer/service_framework" } +tari_shutdown = { path = "../../../../infrastructure/shutdown" } +tari_storage = { path = "../../../../infrastructure/storage" } +tari_test_utils = { path = "../../../../infrastructure/test_utils" } + +anyhow = "1.0.41" +diesel = { version = "2.0.3", features = ["sqlite", "r2d2", "serde_json", "chrono", "64-column-tables"] } +lmdb-zero = "0.4.4" diff --git a/integration_tests/tests/utils/chat_client/src/client.rs b/integration_tests/tests/utils/chat_client/src/client.rs new file mode 100644 index 0000000000..e4536673b8 --- /dev/null +++ b/integration_tests/tests/utils/chat_client/src/client.rs @@ -0,0 +1,171 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{ + fmt::{Debug, Formatter}, + path::PathBuf, + sync::Arc, + time::Duration, +}; + +use tari_common_types::tari_address::TariAddress; +use tari_comms::{peer_manager::Peer, CommsNode, NodeIdentity}; +use tari_contacts::contacts_service::{ + handle::ContactsServiceHandle, + service::ContactOnlineStatus, + types::{Message, MessageBuilder}, +}; +use tari_p2p::Network; +use tari_shutdown::Shutdown; + +use crate::{database, networking}; + +#[derive(Clone)] +pub struct Client { + pub base_dir: PathBuf, + pub contacts: Option, + pub identity: Arc, + pub network: Network, + pub seed_peers: Vec, + pub shutdown: Shutdown, +} + +impl Debug for Client { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Client") + .field("base_dir", &self.base_dir) + .field("identity", &self.identity) + .field("network", &self.network) + .field("seed_peers", &self.seed_peers) + .field("shutdown", &self.shutdown) + .finish() + } +} + +impl Drop for Client { + fn drop(&mut self) { + self.quit(); + } +} + +impl Client { + pub fn new(identity: NodeIdentity, seed_peers: Vec, base_dir: PathBuf, network: Network) -> Self { + Self { + identity: Arc::new(identity), + base_dir, + seed_peers, + shutdown: Shutdown::new(), + contacts: None, + network, + } + } + + pub async fn add_contact(&self, address: &TariAddress) { + if let Some(mut contacts_service) = self.contacts.clone() { + contacts_service + .upsert_contact(address.into()) + .await + .expect("Contact wasn't added"); + } + } + + pub async fn check_online_status(&self, address: &TariAddress) -> ContactOnlineStatus { + if let Some(mut contacts_service) = self.contacts.clone() { + let contact = contacts_service + .get_contact(address.clone()) + .await + .expect("Client does not have contact"); + + return contacts_service + .get_contact_online_status(contact) + .await + .expect("Failed to get status"); + } + + ContactOnlineStatus::Offline + } + + pub async fn send_message(&self, receiver: TariAddress, message: String) { + if let Some(mut contacts_service) = self.contacts.clone() { + contacts_service + .send_message(MessageBuilder::new().message(message).address(receiver).build()) + .await + .expect("Message wasn't sent"); + } + } + + pub async fn get_messages(&self, sender: &TariAddress) -> Vec { + let mut messages = vec![]; + if let Some(mut contacts_service) = self.contacts.clone() { + messages = contacts_service + .get_all_messages(sender.clone()) + .await + .expect("Messages not fetched"); + } + + messages + } + + pub async fn initialize(&mut self) { + println!("initializing chat"); + + let signal = self.shutdown.to_signal(); + let db = database::create_chat_storage(self.base_dir.clone()).unwrap(); + + let (contacts, comms_node) = networking::start( + self.identity.clone(), + self.base_dir.clone(), + self.seed_peers.clone(), + self.network, + db, + signal, + ) + .await + .unwrap(); + + if !self.seed_peers.is_empty() { + loop { + println!("Waiting for peer connections..."); + match wait_for_connectivity(comms_node.clone()).await { + Ok(_) => break, + Err(e) => println!("{}. Still waiting...", e), + } + } + } + + self.contacts = Some(contacts); + + println!("Connections established") + } + + pub fn quit(&mut self) { + self.shutdown.trigger(); + } +} + +pub async fn wait_for_connectivity(comms: CommsNode) -> anyhow::Result<()> { + comms + .connectivity() + .wait_for_connectivity(Duration::from_secs(30)) + .await?; + Ok(()) +} diff --git a/integration_tests/tests/utils/chat_client/src/database.rs b/integration_tests/tests/utils/chat_client/src/database.rs new file mode 100644 index 0000000000..3b2d0f8215 --- /dev/null +++ b/integration_tests/tests/utils/chat_client/src/database.rs @@ -0,0 +1,55 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{convert::TryInto, path::PathBuf}; + +use diesel::{Connection, SqliteConnection}; +use tari_common_sqlite::{ + connection::{DbConnection, DbConnectionUrl}, + error::StorageError, +}; +use tari_storage::lmdb_store::{LMDBBuilder, LMDBConfig}; +use tari_test_utils::random::string; + +pub fn create_chat_storage(base_path: PathBuf) -> Result { + std::fs::create_dir_all(&base_path).unwrap(); + let db_name = format!("{}.sqlite3", string(8).as_str()); + let db_path = format!("{}/{}", base_path.to_str().unwrap(), db_name); + let url: DbConnectionUrl = db_path.clone().try_into().unwrap(); + + // Create the db + let _db = SqliteConnection::establish(&db_path).unwrap_or_else(|_| panic!("Error connecting to {}", db_path)); + + DbConnection::connect_url(&url) +} + +pub fn create_peer_storage(base_path: PathBuf) { + std::fs::create_dir_all(&base_path).unwrap(); + + LMDBBuilder::new() + .set_path(&base_path) + .set_env_config(LMDBConfig::default()) + .set_max_number_of_databases(1) + .add_database("peerdb", lmdb_zero::db::CREATE) + .build() + .unwrap(); +} diff --git a/integration_tests/tests/utils/chat_client/src/lib.rs b/integration_tests/tests/utils/chat_client/src/lib.rs new file mode 100644 index 0000000000..561f08a5df --- /dev/null +++ b/integration_tests/tests/utils/chat_client/src/lib.rs @@ -0,0 +1,27 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +mod client; +pub use client::Client; + +pub mod database; +pub mod networking; diff --git a/integration_tests/tests/utils/chat_client/src/networking.rs b/integration_tests/tests/utils/chat_client/src/networking.rs new file mode 100644 index 0000000000..4dd0cb3640 --- /dev/null +++ b/integration_tests/tests/utils/chat_client/src/networking.rs @@ -0,0 +1,145 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{path::PathBuf, sync::Arc, time::Duration}; + +use tari_common::configuration::MultiaddrList; +use tari_common_sqlite::connection::DbConnection; +// Re-exports +pub use tari_comms::{ + multiaddr::Multiaddr, + peer_manager::{NodeIdentity, PeerFeatures}, +}; +use tari_comms::{peer_manager::Peer, CommsNode, UnspawnedCommsNode}; +use tari_comms_dht::{store_forward::SafConfig, DhtConfig, NetworkDiscoveryConfig}; +use tari_contacts::contacts_service::{ + handle::ContactsServiceHandle, + storage::sqlite_db::ContactsServiceSqliteDatabase, + ContactsServiceInitializer, +}; +use tari_p2p::{ + comms_connector::pubsub_connector, + initialization::{spawn_comms_using_transport, P2pInitializer}, + services::liveness::{LivenessConfig, LivenessInitializer}, + Network, + P2pConfig, + PeerSeedsConfig, + TcpTransportConfig, + TransportConfig, +}; +use tari_service_framework::StackBuilder; +use tari_shutdown::ShutdownSignal; + +use crate::database; + +pub async fn start( + node_identity: Arc, + base_path: PathBuf, + seed_peers: Vec, + network: Network, + db: DbConnection, + shutdown_signal: ShutdownSignal, +) -> anyhow::Result<(ContactsServiceHandle, CommsNode)> { + database::create_peer_storage(base_path.clone()); + let backend = ContactsServiceSqliteDatabase::init(db); + + let (publisher, subscription_factory) = pubsub_connector(100, 50); + let in_msg = Arc::new(subscription_factory); + + let transport_config = TransportConfig::new_tcp(TcpTransportConfig { + listener_address: node_identity.first_public_address(), + ..TcpTransportConfig::default() + }); + + let mut config = P2pConfig { + datastore_path: base_path.clone(), + dht: DhtConfig { + network_discovery: NetworkDiscoveryConfig { + enabled: true, + ..NetworkDiscoveryConfig::default() + }, + saf: SafConfig { + auto_request: true, + ..Default::default() + }, + ..DhtConfig::default_local_test() + }, + transport: transport_config.clone(), + allow_test_addresses: true, + public_addresses: MultiaddrList::from(vec![node_identity.first_public_address()]), + user_agent: "tari/chat-client/0.0.1".to_string(), + ..P2pConfig::default() + }; + config.set_base_path(base_path.clone()); + + let seed_config = PeerSeedsConfig { + peer_seeds: seed_peers + .iter() + .map(|p| format!("{}::{}", p.public_key, p.addresses.best().unwrap().address())) + .collect::>() + .into(), + ..PeerSeedsConfig::default() + }; + + let fut = StackBuilder::new(shutdown_signal) + .add_initializer(P2pInitializer::new( + config, + seed_config, + network, + node_identity, + publisher, + )) + .add_initializer(LivenessInitializer::new( + LivenessConfig { + auto_ping_interval: Some(Duration::from_secs(1)), + num_peers_per_round: 0, // No random peers + max_allowed_ping_failures: 0, // Peer with failed ping-pong will never be removed + ..Default::default() + }, + in_msg.clone(), + )) + .add_initializer(ContactsServiceInitializer::new( + backend, + in_msg, + Duration::from_secs(5), + 2, + )) + .build(); + + let mut handles = fut.await.expect("Service initialization failed"); + + let comms = handles + .take_handle::() + .expect("P2pInitializer was not added to the stack or did not add UnspawnedCommsNode"); + + let peer_manager = comms.peer_manager(); + for peer in seed_peers { + peer_manager.add_peer(peer).await?; + } + + let comms = spawn_comms_using_transport(comms, transport_config).await.unwrap(); + handles.register(comms); + + let comms = handles.expect_handle::(); + let contacts = handles.expect_handle::(); + Ok((contacts, comms)) +}