Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Tokio 1.x #213

Merged
merged 14 commits into from
Feb 9, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 0.10.8-dev
- introduce `--report-file` (and `GooseDefault::ReportFile`) to optionally generate an HTML report when the load test completes
- upgrade to `tokio` 1.x, and switch to `flume` for all multi-producer, multi-consumer channels

## 0.10.7 Nov 16, 2020
- account for time spent doing things other than sleeping, maintaining more consistency when displaying statistics and shutting down
Expand Down
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ license = "Apache-2.0"
[dependencies]
chrono = "0.4"
ctrlc = "3.1"
flume = "0.10"
futures = "0.3"
gumdrop = "0.8"
http = "0.2"
Expand All @@ -24,12 +25,12 @@ num_cpus = "1.0"
num-format = "0.4"
rand = "0.7"
regex = "1"
reqwest = { version = "0.10", default-features = false, features = ["cookies", "json"] }
reqwest = { version = "0.11", default-features = false, features = ["cookies", "json"] }
serde = { version = "1.0", features = ["derive"] }
serde_cbor = "0.11"
serde_json = "1.0"
simplelog = "0.8"
tokio = { version = "0.2.20", features = ["fs", "io-util", "macros", "rt-core", "sync", "time"] }
tokio = { version = "1", features = ["fs", "io-util", "macros", "rt-multi-thread", "time"] }
url = "2.1"

# optional dependencies
Expand Down
45 changes: 21 additions & 24 deletions src/goose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ use std::hash::{Hash, Hasher};
use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
use std::{future::Future, pin::Pin, time::Instant};
use tokio::sync::{mpsc, Mutex, RwLock};
use tokio::sync::{Mutex, RwLock};
use url::Url;

use crate::metrics::GooseMetric;
Expand Down Expand Up @@ -334,24 +334,21 @@ pub enum GooseTaskError {
/// `.raw_request`.
RequestFailed { raw_request: GooseRawRequest },
/// The request was canceled (this happens when the throttle is enabled and
/// the load test finished). The mpsc SendError can be found in `.source`.
/// A `GooseRawRequest` has not yet been constructed, so is not available in
/// this error.
RequestCanceled {
source: mpsc::error::SendError<bool>,
},
/// the load test finished). A `GooseRawRequest` has not yet been constructed,
// so is not available in this error.
RequestCanceled { source: flume::SendError<bool> },
/// There was an error sending the metrics for a request to the parent thread.
/// The `GooseRawRequest` that was not recorded can be extracted from the error
/// chain, available inside `.source`.
MetricsFailed {
source: mpsc::error::SendError<GooseMetric>,
source: flume::SendError<GooseMetric>,
},
/// Attempt to send debug detail to logger failed.
/// There was an error sending debug information to the logger thread. The
/// `GooseDebug` that was not logged can be extracted from the error chain,
/// available inside `.source`.
LoggerFailed {
source: mpsc::error::SendError<Option<GooseDebug>>,
source: flume::SendError<Option<GooseDebug>>,
},
/// Attempted an unrecognized HTTP request method. The unrecognized method
/// is available in `.method`.
Expand Down Expand Up @@ -426,24 +423,24 @@ impl From<url::ParseError> for GooseTaskError {
}

/// When the throttle is enabled and the load test ends, the throttle channel is
/// shut down. This causes mpsc SendError, which gets automatically converted to
/// `RequestCanceled`.
impl From<mpsc::error::SendError<bool>> for GooseTaskError {
fn from(source: mpsc::error::SendError<bool>) -> GooseTaskError {
/// shut down. This causes a SendError, which gets automatically converted to
// `RequestCanceled`.
impl From<flume::SendError<bool>> for GooseTaskError {
fn from(source: flume::SendError<bool>) -> GooseTaskError {
GooseTaskError::RequestCanceled { source }
}
}

/// Attempt to send metrics to the parent thread failed.
impl From<mpsc::error::SendError<GooseMetric>> for GooseTaskError {
fn from(source: mpsc::error::SendError<GooseMetric>) -> GooseTaskError {
impl From<flume::SendError<GooseMetric>> for GooseTaskError {
fn from(source: flume::SendError<GooseMetric>) -> GooseTaskError {
GooseTaskError::MetricsFailed { source }
}
}

/// Attempt to send logs to the logger thread failed.
impl From<mpsc::error::SendError<Option<GooseDebug>>> for GooseTaskError {
fn from(source: mpsc::error::SendError<Option<GooseDebug>>) -> GooseTaskError {
impl From<flume::SendError<Option<GooseDebug>>> for GooseTaskError {
fn from(source: flume::SendError<Option<GooseDebug>>) -> GooseTaskError {
GooseTaskError::LoggerFailed { source }
}
}
Expand Down Expand Up @@ -908,13 +905,13 @@ pub struct GooseUser {
/// A local copy of the global GooseConfiguration.
pub config: GooseConfiguration,
/// Channel to logger.
pub debug_logger: Option<mpsc::UnboundedSender<Option<GooseDebug>>>,
pub debug_logger: Option<flume::Sender<Option<GooseDebug>>>,
/// Channel to throttle.
pub throttle: Option<mpsc::Sender<bool>>,
pub throttle: Option<flume::Sender<bool>>,
/// Normal tasks are optionally throttled, test_start and test_stop tasks are not.
pub is_throttled: bool,
/// Channel to parent.
pub channel_to_parent: Option<mpsc::UnboundedSender<GooseMetric>>,
pub channel_to_parent: Option<flume::Sender<GooseMetric>>,
/// An index into the internal `GooseTest.weighted_users, indicating which weighted GooseTaskSet is running.
pub weighted_users_index: usize,
/// A weighted list of all tasks that run when the user first starts.
Expand Down Expand Up @@ -1424,9 +1421,9 @@ impl GooseUser {
/// metrics.
///
/// Calls to `user.goose_send()` returns a `Result` containing a `GooseResponse` on success,
/// and a `tokio::sync::mpsc::error::SendError<bool>` on failure. Failure only happens when
/// `--throttle-requests` is enabled and the load test completes. The `GooseResponse` object
// contains a copy of the request made
/// and a `flume::SendError<bool>` on failure. Failure only happens when `--throttle-requests`
/// is enabled and the load test completes. The `GooseResponse` object contains a copy of the
/// request made
/// ([`goose.request`](https://docs.rs/goose/*/goose/goose/struct.GooseRawRequest)), and the
/// Reqwest response ([`goose.response`](https://docs.rs/reqwest/*/reqwest/struct.Response.html)).
///
Expand Down Expand Up @@ -1457,7 +1454,7 @@ impl GooseUser {
// ...wait until there's room to add a token to the throttle channel before proceeding.
debug!("GooseUser: waiting on throttle");
// Will result in GooseTaskError::RequestCanceled if this fails.
self.throttle.clone().unwrap().send(true).await?;
self.throttle.clone().unwrap().send_async(true).await?;
};

let started = Instant::now();
Expand Down
72 changes: 35 additions & 37 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,8 @@ use std::sync::{
};
use std::{fmt, io, time};
use tokio::fs::File;
use tokio::io::BufWriter;
use tokio::prelude::*;
use tokio::sync::mpsc;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::runtime::Runtime;
use url::Url;

use crate::goose::{
Expand All @@ -460,7 +459,7 @@ lazy_static! {
type WeightedGooseTasks = Vec<Vec<(usize, String)>>;

type DebugLoggerHandle = Option<tokio::task::JoinHandle<()>>;
type DebugLoggerChannel = Option<mpsc::UnboundedSender<Option<GooseDebug>>>;
type DebugLoggerChannel = Option<flume::Sender<Option<GooseDebug>>>;

/// Worker ID to aid in tracing logs when running a Gaggle.
pub fn get_worker_id() -> usize {
Expand Down Expand Up @@ -744,17 +743,17 @@ pub struct GooseAttackRunState {
/// happen regularly.
drift_timer: tokio::time::Instant,
/// Unbounded sender used by all GooseUser threads to send metrics to parent.
all_threads_metrics_tx: mpsc::UnboundedSender<GooseMetric>,
all_threads_metrics_tx: flume::Sender<GooseMetric>,
/// Unbounded receiver used by Goose parent to receive metrics from GooseUsers.
metrics_rx: mpsc::UnboundedReceiver<GooseMetric>,
metrics_rx: flume::Receiver<GooseMetric>,
/// Optional unbounded receiver for logger thread, if enabled.
debug_logger: DebugLoggerHandle,
/// Optional unbounded sender from all GooseUsers to logger thread, if enabled.
all_threads_debug_logger_tx: DebugLoggerChannel,
/// Optional receiver for all GooseUsers from throttle thread, if enabled.
throttle_threads_tx: Option<mpsc::Sender<bool>>,
throttle_threads_tx: Option<flume::Sender<bool>>,
/// Optional sender for throttle thread, if enabled.
parent_to_throttle_tx: Option<mpsc::Sender<bool>>,
parent_to_throttle_tx: Option<flume::Sender<bool>>,
/// Optional buffered writer for requests log file, if enabled.
requests_file: Option<BufWriter<File>>,
/// Optional unbuffered writer for html-formatted report file, if enabled.
Expand All @@ -765,7 +764,7 @@ pub struct GooseAttackRunState {
/// Collection of all GooseUser threads so they can be stopped later.
users: Vec<tokio::task::JoinHandle<()>>,
/// All unbounded senders to allow communication with GooseUser threads.
user_channels: Vec<mpsc::UnboundedSender<GooseUserCommand>>,
user_channels: Vec<flume::Sender<GooseUserCommand>>,
/// Timer tracking when to display running metrics, if enabled.
running_metrics_timer: std::time::Instant,
/// Boolean flag indicating if running metrics should be displayed.
Expand Down Expand Up @@ -2298,7 +2297,7 @@ impl GooseAttack {
if self.attack_mode == AttackMode::Manager {
#[cfg(feature = "gaggle")]
{
let mut rt = tokio::runtime::Runtime::new().unwrap();
let rt = Runtime::new().unwrap();
self = rt.block_on(manager::manager_main(self));
}

Expand All @@ -2313,7 +2312,7 @@ impl GooseAttack {
else if self.attack_mode == AttackMode::Worker {
#[cfg(feature = "gaggle")]
{
let mut rt = tokio::runtime::Runtime::new().unwrap();
let rt = Runtime::new().unwrap();
self = rt.block_on(worker::worker_main(&self));
}

Expand All @@ -2327,7 +2326,7 @@ impl GooseAttack {
}
// Start goose in single-process mode.
else {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let rt = Runtime::new().unwrap();
self = rt.block_on(self.start_attack(None))?;
}

Expand Down Expand Up @@ -2398,15 +2397,15 @@ impl GooseAttack {
"".to_string()
};
// If the logger isn't configured, return immediately.
if self.configuration.debug_file == "" {
if self.configuration.debug_file.is_empty() {
return Ok((None, None));
}

// Create an unbounded channel allowing GooseUser threads to log errors.
let (all_threads_debug_logger, logger_receiver): (
mpsc::UnboundedSender<Option<GooseDebug>>,
mpsc::UnboundedReceiver<Option<GooseDebug>>,
) = mpsc::unbounded_channel();
flume::Sender<Option<GooseDebug>>,
flume::Receiver<Option<GooseDebug>>,
) = flume::unbounded();
// Launch a new thread for logging.
let logger_thread = tokio::spawn(logger::logger_main(
self.configuration.clone(),
Expand All @@ -2420,9 +2419,9 @@ impl GooseAttack {
&self,
) -> (
// A channel used by GooseClients to throttle requests.
Option<mpsc::Sender<bool>>,
Option<flume::Sender<bool>>,
// A channel used by parent to tell throttle the load test is complete.
Option<mpsc::Sender<bool>>,
Option<flume::Sender<bool>>,
) {
// If the throttle isn't enabled, return immediately.
if self.configuration.throttle_requests == 0 {
Expand All @@ -2431,13 +2430,15 @@ impl GooseAttack {

// Create a bounded channel allowing single-sender multi-receiver to throttle
// GooseUser threads.
let (all_threads_throttle, throttle_receiver): (mpsc::Sender<bool>, mpsc::Receiver<bool>) =
mpsc::channel(self.configuration.throttle_requests);
let (all_threads_throttle, throttle_receiver): (
flume::Sender<bool>,
flume::Receiver<bool>,
) = flume::bounded(self.configuration.throttle_requests);

// Create a channel allowing the parent to inform the throttle thread when the
// load test is finished. Even though we only send one message, we can't use a
// oneshot channel as we don't want to block waiting for a message.
let (parent_to_throttle_tx, throttle_rx) = mpsc::channel(1);
let (parent_to_throttle_tx, throttle_rx) = flume::bounded(1);

// Launch a new thread for throttling, no need to rejoin it.
let _ = Some(tokio::spawn(throttle::throttle_main(
Expand All @@ -2446,15 +2447,15 @@ impl GooseAttack {
throttle_rx,
)));

let mut sender = all_threads_throttle.clone();
let sender = all_threads_throttle.clone();
// We start from 1 instead of 0 to intentionally fill all but one slot in the
// channel to avoid a burst of traffic during startup. The channel then provides
// an implementation of the leaky bucket algorithm as a queue. Requests have to
// add a token to the bucket before making a request, and are blocked until this
// throttle thread "leaks out" a token thereby creating space. More information
// can be found at: https://en.wikipedia.org/wiki/Leaky_bucket
for _ in 1..self.configuration.throttle_requests {
let _ = sender.send(true).await;
let _ = sender.send_async(true).await;
}

(Some(all_threads_throttle), Some(parent_to_throttle_tx))
Expand Down Expand Up @@ -2549,9 +2550,9 @@ impl GooseAttack {
// Create a single channel used to send metrics from GooseUser threads
// to parent thread.
let (all_threads_metrics_tx, metrics_rx): (
mpsc::UnboundedSender<GooseMetric>,
mpsc::UnboundedReceiver<GooseMetric>,
) = mpsc::unbounded_channel();
flume::Sender<GooseMetric>,
flume::Receiver<GooseMetric>,
) = flume::unbounded();

// If enabled, spawn a logger thread.
let (debug_logger, all_threads_debug_logger_tx) = self.setup_debug_logger()?;
Expand Down Expand Up @@ -2691,9 +2692,9 @@ impl GooseAttack {

// Create a per-thread channel allowing parent thread to control child threads.
let (parent_sender, thread_receiver): (
mpsc::UnboundedSender<GooseUserCommand>,
mpsc::UnboundedReceiver<GooseUserCommand>,
) = mpsc::unbounded_channel();
flume::Sender<GooseUserCommand>,
flume::Receiver<GooseUserCommand>,
) = flume::unbounded();
goose_attack_run_state.user_channels.push(parent_sender);

if self.get_debug_file_path()?.is_some() {
Expand Down Expand Up @@ -2759,10 +2760,7 @@ impl GooseAttack {
} else {
// If displaying running metrics, be sure we wake up often enough to
// display them at the configured rate.
let running_metrics = match self.configuration.running_metrics {
Some(r) => r,
None => 0,
};
let running_metrics = self.configuration.running_metrics.unwrap_or(0);

// Otherwise, sleep until the next time something needs to happen.
let sleep_duration = if running_metrics > 0
Expand All @@ -2782,7 +2780,7 @@ impl GooseAttack {
// If enough users have been spawned, move onto the next attack phase.
if self.metrics.users >= self.weighted_users.len() {
// Pause a tenth of a second waiting for the final user to fully start up.
tokio::time::delay_for(tokio::time::Duration::from_millis(100)).await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

if self.attack_mode == AttackMode::Worker {
info!(
Expand Down Expand Up @@ -2847,8 +2845,8 @@ impl GooseAttack {
}

// If throttle is enabled, tell throttle thread the load test is over.
if let Some(mut tx) = goose_attack_run_state.parent_to_throttle_tx.clone() {
let _ = tx.send(false).await;
if let Some(tx) = goose_attack_run_state.parent_to_throttle_tx.clone() {
let _ = tx.send(false);
}

// Take the users vector out of the GooseAttackRunState object so it can be
Expand Down Expand Up @@ -3342,7 +3340,7 @@ impl GooseAttack {
});
};
// Be sure the file flushes to disk.
report_file.flush();
report_file.flush().await?;

info!(
"wrote html report file to: {}",
Expand Down
7 changes: 3 additions & 4 deletions src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,8 @@

use serde_json::json;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::io::BufWriter;
use tokio::prelude::*;
use tokio::sync::mpsc;

use crate::goose::GooseDebug;
use crate::GooseConfiguration;
Expand All @@ -95,7 +94,7 @@ use crate::GooseConfiguration;
/// GooseUser threads. This function is not intended to be invoked manually.
pub async fn logger_main(
configuration: GooseConfiguration,
mut log_receiver: mpsc::UnboundedReceiver<Option<GooseDebug>>,
log_receiver: flume::Receiver<Option<GooseDebug>>,
) {
// Determine if a debug file has been configured.
let mut debug_file_path: Option<String> = None;
Expand Down Expand Up @@ -125,7 +124,7 @@ pub async fn logger_main(
}

// Loop waiting for and writing error logs from GooseUser threads.
while let Some(message) = log_receiver.recv().await {
while let Ok(message) = log_receiver.recv_async().await {
if let Some(goose_debug) = message {
// All Options are defined above, search for formatted_log.
if let Some(file) = debug_file.as_mut() {
Expand Down
Loading