Skip to content

Commit

Permalink
Peer manager fixes, custom oms backoff, flaky test fixes
Browse files Browse the repository at this point in the history
- It should never be possible for peer manager index/hashmap links to be
  out of sync with the datastore. I've removed this possibility.
- Removed some unused but fairly complex features of the peer manager to
  reduce bug surface.
- Added `Backoff` trait and implemented ExponentialBackoff and
  ConstantBackoff (for tests).
- Added comms initializer function to be used with tests
- Fixed a race condition in `manage_multiple_transactions`
  • Loading branch information
sdbondi committed Dec 9, 2019
1 parent ebf448a commit 37f8204
Show file tree
Hide file tree
Showing 33 changed files with 463 additions and 665 deletions.
2 changes: 1 addition & 1 deletion applications/grpc_wallet/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ struct Peers {

/// Entry point into the gRPC server binary
pub fn main() {
let _ = env_logger::init();
let _ = env_logger::try_init();
let matches = App::new("Tari Wallet gRPC server")
.version("0.1")
.arg(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,6 @@ pub fn random_string(len: usize) -> String {

#[test]
fn test_rpc_text_message_service() {
env_logger::init();
let mut rng = rand::OsRng::new().unwrap();
let listener_address1: NetAddress = "127.0.0.1:32775".parse().unwrap();
let secret_key1 = CommsSecretKey::random(&mut rng);
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_testnet_miner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ struct Settings {

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let _ = env_logger::init();
let _ = env_logger::try_init();

info!(target: LOG_TARGET, "Settings loaded");

Expand Down
2 changes: 0 additions & 2 deletions base_layer/core/src/base_node/test/comms_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,10 @@ use crate::{
MemoryDatabase,
MmrTree,
MutableMmrState,
Validators,
},
mempool::{Mempool, MempoolConfig},
proof_of_work::DiffAdjManager,
test_utils::builders::{add_block_and_update_header, create_default_db, create_test_kernel, create_utxo},
validation::mocks::MockValidator,
};
use croaring::Bitmap;
use futures::{executor::block_on, StreamExt};
Expand Down
24 changes: 11 additions & 13 deletions base_layer/core/src/base_node/test/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::{
},
tx,
};
use futures::{join, select, stream::FusedStream, FutureExt, Stream, StreamExt};
use futures::{future, future::Either, join, stream::FusedStream, FutureExt, Stream, StreamExt};
use std::{
sync::Arc,
time::{Duration, Instant},
Expand Down Expand Up @@ -363,19 +363,17 @@ fn request_and_response_fetch_mmr_state() {
}

pub async fn event_stream_next<TStream>(mut stream: TStream, timeout: Duration) -> Option<TStream::Item>
where
TStream: Stream + FusedStream + Unpin,
TStream::Item: Clone,
{
loop {
select! {
item = stream.select_next_some() => {
return Some(item);
},
_ = tokio::timer::delay(Instant::now() + timeout).fuse() => { break; },
}
where TStream: Stream + FusedStream + Unpin {
let either = future::select(
stream.select_next_some(),
tokio::timer::delay(Instant::now() + timeout).fuse(),
)
.await;

match either {
Either::Left((v, _)) => Some(v),
Either::Right(_) => None,
}
None
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/mempool/test/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ fn receive_and_propagate_transaction() {
async_assert_eventually!(
bob_node.mempool.has_tx_with_excess_sig(&tx_excess_sig).unwrap(),
expect = TxStorageResponse::UnconfirmedPool,
max_attempts = 10,
max_attempts = 20,
interval = Duration::from_millis(1000)
);
async_assert_eventually!(
Expand Down
1 change: 0 additions & 1 deletion base_layer/core/src/test_utils/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ use tari_p2p::{
use tari_service_framework::StackBuilder;
use tari_test_utils::address::get_next_local_address;
use tari_transactions::types::HashDigest;
use tempdir::TempDir;
use tokio::runtime::{Runtime, TaskExecutor};

/// The NodeInterfaces is used as a container for providing access to all the services and interfaces of a single node.
Expand Down
10 changes: 4 additions & 6 deletions base_layer/p2p/examples/pingpong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ fn main() {
set_lan_address(&peer_identity);
}

let datastore_path = TempDir::new(random_string(8).as_str()).unwrap();
fs::create_dir_all(&datastore_path.path()).expect("Unable to create temporary directory");

let comms_config = CommsConfig {
node_identity: Arc::clone(&node_identity),
peer_connection_listening_address: format!("0.0.0.0:{}", port).parse().unwrap(),
Expand All @@ -187,12 +190,7 @@ fn main() {
requested_connection_timeout: Duration::from_millis(2000),
},
establish_connection_timeout: Duration::from_secs(10),
datastore_path: TempDir::new(random_string(8).as_str())
.unwrap()
.path()
.to_str()
.unwrap()
.to_string(),
datastore_path: datastore_path.path().to_str().unwrap().to_string(),
peer_database_name: random_string(8),
inbound_buffer_size: 10,
outbound_buffer_size: 10,
Expand Down
102 changes: 101 additions & 1 deletion base_layer/p2p/src/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
use crate::comms_connector::{InboundDomainConnector, PeerMessage};
use derive_error::Error;
use futures::{channel::mpsc, Sink};
use std::{error::Error, sync::Arc, time::Duration};
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use std::{error::Error, iter, sync::Arc, time::Duration};
use tari_comms::{
builder::{CommsBuilderError, CommsError, CommsNode},
connection::{net_address::ip::SocketAddress, NetAddress},
connection_manager::PeerConnectionConfig,
control_service::ControlServiceConfig,
outbound_message_service::ConstantBackoff,
peer_manager::{node_identity::NodeIdentityError, NodeIdentity},
CommsBuilder,
};
Expand Down Expand Up @@ -71,6 +73,104 @@ pub struct CommsConfig {
pub establish_connection_timeout: Duration,
}

// TODO: DRY up these initialization functions

/// Initialize Tari Comms configured for tests
pub fn initialize_local_test_comms<TSink>(
executor: TaskExecutor,
node_identity: Arc<NodeIdentity>,
connector: InboundDomainConnector<TSink>,
data_path: &str,
) -> Result<(CommsNode, Dht), CommsInitializationError>
where
TSink: Sink<Arc<PeerMessage>> + Unpin + Clone + Send + Sync + 'static,
TSink::Error: Error + Send + Sync,
{
let listener_address = node_identity.control_service_address();
let peer_database_name = {
let mut rng = thread_rng();
iter::repeat(())
.map(|_| rng.sample(Alphanumeric))
.take(8)
.collect::<String>()
};
let datastore = LMDBBuilder::new()
.set_path(data_path)
.set_environment_size(10)
.set_max_number_of_databases(1)
.add_database(&peer_database_name, lmdb_zero::db::CREATE)
.build()
.unwrap();
let peer_database = datastore.get_handle(&peer_database_name).unwrap();
let peer_database = LMDBWrapper::new(Arc::new(peer_database));

//---------------------------------- Comms --------------------------------------------//

// Create inbound and outbound channels
let (inbound_tx, inbound_rx) = mpsc::channel(10);
let (outbound_tx, outbound_rx) = mpsc::channel(10);

let comms = CommsBuilder::new(executor.clone())
.with_node_identity(node_identity)
.with_peer_storage(peer_database)
.with_inbound_sink(inbound_tx)
.with_outbound_stream(outbound_rx)
.with_outbound_backoff(ConstantBackoff::new(Duration::from_millis(500)))
.configure_control_service(ControlServiceConfig {
listener_address,
socks_proxy_address: None,
requested_connection_timeout: Duration::from_millis(2000),
})
.configure_peer_connections(PeerConnectionConfig {
socks_proxy_address: None,
listening_address: "127.0.0.1:0".parse().expect("cannot fail"),
peer_connection_establish_timeout: Duration::from_secs(5),
..Default::default()
})
.build()
.map_err(CommsInitializationError::CommsBuilderError)?
.start()
.map_err(CommsInitializationError::CommsServicesError)?;

// Create a channel for outbound requests
let mut dht = comms_dht::DhtBuilder::from_comms(&comms)
.with_config(DhtConfig {
discovery_request_timeout: Duration::from_secs(1),
..Default::default()
})
.finish();

//---------------------------------- Inbound Pipeline --------------------------------------------//

// Connect inbound comms messages to the inbound pipeline and run it
ServicePipeline::new(
// Messages coming IN from comms to DHT
inbound_rx,
// Messages going OUT from DHT to connector (pubsub)
ServiceBuilder::new()
.layer(dht.inbound_middleware_layer())
.service(connector),
)
.with_shutdown_signal(comms.shutdown_signal())
.spawn_with(executor.clone());

//---------------------------------- Outbound Pipeline --------------------------------------------//

// Connect outbound message pipeline to comms, and run it
ServicePipeline::new(
// Requests coming IN from services to DHT
dht.take_outbound_receiver().expect("take outbound receiver only once"),
// Messages going OUT from DHT to comms
ServiceBuilder::new()
.layer(dht.outbound_middleware_layer())
.service(SinkMiddleware::new(outbound_tx)),
)
.with_shutdown_signal(comms.shutdown_signal())
.spawn_with(executor);

Ok((comms, dht))
}

/// Initialize Tari Comms
///
/// ## Arguments
Expand Down
25 changes: 3 additions & 22 deletions base_layer/p2p/tests/support/comms_and_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,16 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use futures::Sink;
use std::{error::Error, sync::Arc, time::Duration};
use std::{error::Error, sync::Arc};
use tari_comms::{
builder::CommsNode,
control_service::ControlServiceConfig,
peer_manager::{NodeIdentity, Peer, PeerFlags},
};
use tari_comms_dht::Dht;
use tari_p2p::{
comms_connector::{InboundDomainConnector, PeerMessage},
initialization::{initialize_comms, CommsConfig},
initialization::initialize_local_test_comms,
};
use tari_test_utils::random;
use tokio::runtime::TaskExecutor;

pub fn setup_comms_services<TSink>(
Expand All @@ -46,24 +44,7 @@ where
TSink: Sink<Arc<PeerMessage>> + Clone + Unpin + Send + Sync + 'static,
TSink::Error: Error + Send + Sync,
{
let comms_config = CommsConfig {
node_identity: Arc::clone(&node_identity),
peer_connection_listening_address: "127.0.0.1:0".parse().unwrap(),
socks_proxy_address: None,
control_service: ControlServiceConfig {
listener_address: node_identity.control_service_address(),
socks_proxy_address: None,
requested_connection_timeout: Duration::from_millis(2000),
},
datastore_path: data_path.to_string(),
establish_connection_timeout: Duration::from_secs(5),
peer_database_name: random::string(8),
inbound_buffer_size: 10,
outbound_buffer_size: 10,
dht: Default::default(),
};

let (comms, dht) = initialize_comms(executor, comms_config, publisher)
let (comms, dht) = initialize_local_test_comms(executor, node_identity, publisher, data_path)
.map(|(comms, dht)| (Arc::new(comms), dht))
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ tempdir = "0.3.7"

[dev-dependencies]
tari_comms_dht = { path = "../../comms/dht", version = "^0.0", features=["test-mocks"]}
tari_test_utils = { path = "../../infrastructure/test_utils", version = "^0.0"}

env_logger = "0.6.2"
prost = "0.5.0"
tari_test_utils = { path = "../../infrastructure/test_utils", version = "^0.0"}

[features]
test_harness = []
Expand Down
5 changes: 0 additions & 5 deletions base_layer/wallet/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,3 @@ pub mod support;
pub mod contacts_service;
pub mod transaction_service;
pub mod wallet;

#[macro_use]
extern crate diesel;
#[macro_use]
extern crate diesel_migrations;
26 changes: 3 additions & 23 deletions base_layer/wallet/tests/support/comms_and_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,25 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::support::utils::random_string;
use futures::Sink;
use std::{error::Error, sync::Arc, time::Duration};
use std::{error::Error, sync::Arc};
use tari_comms::{
builder::CommsNode,
connection::NetAddress,
control_service::ControlServiceConfig,
peer_manager::{NodeId, NodeIdentity, Peer, PeerFeatures, PeerFlags},
types::CommsPublicKey,
};
use tari_comms_dht::{envelope::DhtMessageHeader, Dht};
use tari_p2p::{
comms_connector::{InboundDomainConnector, PeerMessage},
domain_message::DomainMessage,
initialization::{initialize_comms, CommsConfig},
initialization::initialize_local_test_comms,
};
use tokio::runtime::TaskExecutor;

pub fn setup_comms_services<TSink>(
executor: TaskExecutor,
node_identity: Arc<NodeIdentity>,
listening_address: NetAddress,
peers: Vec<NodeIdentity>,
publisher: InboundDomainConnector<TSink>,
database_path: String,
Expand All @@ -50,24 +47,7 @@ where
TSink: Sink<Arc<PeerMessage>> + Clone + Unpin + Send + Sync + 'static,
TSink::Error: Error + Send + Sync,
{
let comms_config = CommsConfig {
node_identity: Arc::clone(&node_identity),
peer_connection_listening_address: listening_address,
socks_proxy_address: None,
control_service: ControlServiceConfig {
listener_address: node_identity.control_service_address(),
socks_proxy_address: None,
requested_connection_timeout: Duration::from_millis(2000),
},
datastore_path: database_path,
establish_connection_timeout: Duration::from_secs(3),
peer_database_name: random_string(8),
inbound_buffer_size: 100,
outbound_buffer_size: 100,
dht: Default::default(),
};

let (comms, dht) = initialize_comms(executor, comms_config, publisher).unwrap();
let (comms, dht) = initialize_local_test_comms(executor, node_identity, publisher, &database_path).unwrap();

for p in peers {
let addr = p.control_service_address();
Expand Down
Loading

0 comments on commit 37f8204

Please sign in to comment.