From af778d6b64a08a716222f59724fb15342eb82c17 Mon Sep 17 00:00:00 2001 From: David Pacheco Date: Thu, 4 Aug 2022 08:30:08 -0700 Subject: [PATCH] restyle to match dropshot, omicron, plus general style clean-up (#35) --- examples/demo-provision.rs | 26 +- examples/trip.rs | 12 +- src/dag.rs | 52 +- src/example_provision.rs | 83 ++- src/lib.rs | 31 +- src/rust_features.rs | 8 +- src/saga_action_error.rs | 163 +++-- src/saga_action_func.rs | 92 ++- src/saga_action_generic.rs | 169 +++-- src/saga_exec.rs | 985 +++++++++++++----------------- src/saga_log.rs | 147 ++--- src/sec.rs | 617 ++++++++----------- src/store.rs | 82 +-- tests/test_smoke.rs | 24 +- tests/test_unregistered_action.rs | 4 +- 15 files changed, 1070 insertions(+), 1425 deletions(-) diff --git a/examples/demo-provision.rs b/examples/demo-provision.rs index 1deaa81..030e5fa 100644 --- a/examples/demo-provision.rs +++ b/examples/demo-provision.rs @@ -1,6 +1,4 @@ -/*! - * Command-line tool for demo'ing saga interfaces - */ +//! Command-line tool for demo'ing saga interfaces use anyhow::anyhow; use anyhow::Context; @@ -59,10 +57,8 @@ enum Demo { }, } -/* - * We use a hardcoded SagaId for ease of automated testing. See the note in - * demo_prov_server_alloc(). - */ +// We use a hardcoded SagaId for ease of automated testing. See the note in +// demo_prov_server_alloc(). fn make_saga_id() -> SagaId { SagaId(Uuid::parse_str("049b2522-308d-442e-bc65-9bfaef863597").unwrap()) } @@ -103,9 +99,7 @@ fn make_example_action_registry() -> Arc> { Arc::new(registry) } -/* - * "dot" subcommand - */ +// "dot" subcommand async fn cmd_dot() -> Result<(), anyhow::Error> { let params = ExampleParams { @@ -117,9 +111,7 @@ async fn cmd_dot() -> Result<(), anyhow::Error> { Ok(()) } -/* - * "info" subcommand - */ +// "info" subcommand async fn cmd_info() -> Result<(), anyhow::Error> { let log = make_log(); @@ -147,9 +139,7 @@ async fn cmd_info() -> Result<(), anyhow::Error> { Ok(()) } -/* - * "print-log" subcommand - */ +// "print-log" subcommand #[derive(Debug, StructOpt)] struct PrintLogArgs { @@ -166,9 +156,7 @@ async fn cmd_print_log(args: &PrintLogArgs) -> Result<(), anyhow::Error> { Ok(()) } -/* - * "run" subcommand - */ +// "run" subcommand #[derive(Debug, StructOpt)] struct RunArgs { diff --git a/examples/trip.rs b/examples/trip.rs index e95b93c..6b4ebdb 100644 --- a/examples/trip.rs +++ b/examples/trip.rs @@ -290,7 +290,7 @@ async fn saga_refund_card( async fn saga_book_hotel( action_context: ActionContext, ) -> Result { - /* ... */ + // ... let trip_context = action_context.user_data(); let params = action_context.saga_params::()?; let hotel_name = ¶ms.hotel_name; @@ -301,7 +301,7 @@ async fn saga_book_hotel( async fn saga_cancel_hotel( action_context: ActionContext, ) -> Result<(), anyhow::Error> { - /* ... */ + // ... let trip_context = action_context.user_data(); let confirmation: HotelReservation = action_context.lookup("hotel")?; // ... (make request to another service -- must not fail) @@ -311,7 +311,7 @@ async fn saga_cancel_hotel( async fn saga_book_flight( action_context: ActionContext, ) -> Result { - /* ... */ + // ... let trip_context = action_context.user_data(); let params = action_context.saga_params::()?; let flight_info = ¶ms.flight_info; @@ -322,7 +322,7 @@ async fn saga_book_flight( async fn saga_cancel_flight( action_context: ActionContext, ) -> Result<(), anyhow::Error> { - /* ... */ + // ... let trip_context = action_context.user_data(); let confirmation: FlightReservation = action_context.lookup("flight")?; // ... (make request to another service -- must not fail) @@ -332,7 +332,7 @@ async fn saga_cancel_flight( async fn saga_book_car( action_context: ActionContext, ) -> Result { - /* ... */ + // ... let trip_context = action_context.user_data(); let params = action_context.saga_params::()?; let car_info = ¶ms.car_info; @@ -343,7 +343,7 @@ async fn saga_book_car( async fn saga_cancel_car( action_context: ActionContext, ) -> Result<(), anyhow::Error> { - /* ... */ + // ... let trip_context = action_context.user_data(); let confirmation: CarReservation = action_context.lookup("car")?; // ... (make request to another service -- must not fail) diff --git a/src/dag.rs b/src/dag.rs index df3fc9b..7885399 100644 --- a/src/dag.rs +++ b/src/dag.rs @@ -38,7 +38,7 @@ use std::sync::Arc; use thiserror::Error; use uuid::Uuid; -/** Unique identifier for a Saga (an execution of a saga template) */ +/// Unique identifier for a Saga (an execution of a saga template) #[derive( Clone, Copy, @@ -54,13 +54,11 @@ use uuid::Uuid; pub struct SagaId(pub Uuid); // TODO-cleanup figure out how to use custom_derive here? NewtypeDebug! { () pub struct SagaId(Uuid); } -/* - * TODO-design In the Oxide consumer, we probably want to have the serialized - * form of ids have a prefix describing the type. This seems consumer-specific, - * though. Is there a good way to do support that? Maybe the best way to do - * this is to have the consumer have their own enum or trait that impls Display - * using the various ids provided by consumers. - */ +// TODO-design In the Oxide consumer, we probably want to have the serialized +// form of ids have a prefix describing the type. This seems consumer-specific, +// though. Is there a good way to do support that? Maybe the best way to do +// this is to have the consumer have their own enum or trait that impls Display +// using the various ids provided by consumers. NewtypeDisplay! { () pub struct SagaId(Uuid); } NewtypeFrom! { () pub struct SagaId(Uuid); } @@ -343,9 +341,9 @@ pub struct SagaDag { /// the actual DAG representation /// /// Unlike [`Dag`], [`SagaDag`]'s graph can contain any type of [`Node`]. - /// There is always exactly one [`InternalNode::Start`] node and exactly one - /// [`InternalNode::End`] node. The graph can contain subsagas, which are - /// always bracketed by [`InternalNode::SubsagaStart`] and + /// There is always exactly one [`InternalNode::Start`] node and exactly + /// one [`InternalNode::End`] node. The graph can contain subsagas, + /// which are always bracketed by [`InternalNode::SubsagaStart`] and /// [`InternalNode::SubsagaEnd`] nodes. pub(crate) graph: Graph, /// the index of the [`InternalNode::Start`] node for this Saga @@ -445,12 +443,12 @@ pub struct Dag { /// /// This graph does *not* contain a [`InternalNode::Start`] or /// [`InternalNode::End`] node. Those only make sense for `Dag`s that will - /// become top-level sagas (as opposed to subsagas). Instead, we keep track - /// of the first group of DAG (root nodes) and the last group of DAG nodes - /// (leaf nodes). Later, we'll wrap this `Dag` in either [`SagaDag`] (for - /// use as a top-level saga), in which case we'll add the start and end - /// nodes, or we'll use it as a subsaga, in which case we'll add - /// SubsagaStart and SubsagaEnd nodes. + /// become top-level sagas (as opposed to subsagas). Instead, we keep + /// track of the first group of DAG (root nodes) and the last group of + /// DAG nodes (leaf nodes). Later, we'll wrap this `Dag` in either + /// [`SagaDag`] (for use as a top-level saga), in which case we'll add + /// the start and end nodes, or we'll use it as a subsaga, in which + /// case we'll add SubsagaStart and SubsagaEnd nodes. graph: Graph, /// the initial nodes (root nodes) of the DAG @@ -522,7 +520,7 @@ enum DagBuilderErrorKind { #[error( "subsaga node {0:?} has parameters that come from node {1:?}, but it \ - does not depend on any such node" + does not depend on any such node" )] /// A subsaga was appended whose parameters were supposed to come from a /// node that does not exist or that the subsaga does not depend on. @@ -839,8 +837,8 @@ mod test { )); assert_eq!( error.to_string(), - "building saga \"test-saga\": \ - saga must end with exactly one node" + "building saga \"test-saga\": saga must end with exactly \ + one node" ); } }; @@ -870,8 +868,8 @@ mod test { assert!(matches!(error.kind, DagBuilderErrorKind::EmptyStage)); assert_eq!( error.to_string(), - "building saga \"test-saga\": \ - attempted to append 0 nodes in parallel" + "building saga \"test-saga\": attempted to append 0 nodes in \ + parallel" ); } @@ -889,8 +887,8 @@ mod test { ); assert_eq!( error.to_string(), - "building saga \"test-saga\": \ - name was used multiple times in the same Dag: \"a\"" + "building saga \"test-saga\": name was used multiple times in the \ + same Dag: \"a\"" ); // error case: a DAG that duplicates names (indirect ancestor) @@ -954,9 +952,9 @@ mod test { ); assert_eq!( error.to_string(), - "building saga \"test-saga\": \ - subsaga node \"b\" has parameters that come from node \"barf\", \ - but it does not depend on any such node" + "building saga \"test-saga\": subsaga node \"b\" has parameters \ + that come from node \"barf\", but it does not depend on any such \ + node" ); // error case: subsaga depends on params node that doesn't exist diff --git a/src/example_provision.rs b/src/example_provision.rs index b0f5bdd..34cc329 100644 --- a/src/example_provision.rs +++ b/src/example_provision.rs @@ -1,6 +1,4 @@ -/*! - * Common code shared by examples - */ +//! Common code shared by examples use crate::ActionContext; use crate::ActionError; @@ -17,30 +15,28 @@ use serde::Serialize; use std::sync::Arc; use thiserror::Error; -/* - * Demo provision saga: - * - * create instance (database) - * | | | - * +------+ + +-------------+ - * | | | - * v v v - * alloc IP create volume pick server - * | | | - * +------+--+ v - * | allocate server resources - * | | - * +-------------------+ - * | - * v - * configure instance (server) - * | - * v - * attach volume - * | - * v - * boot instance - */ +// Demo provision saga: +// +// create instance (database) +// | | | +// +------+ + +-------------+ +// | | | +// v v v +// alloc IP create volume pick server +// | | | +// +------+--+ v +// | allocate server resources +// | | +// +-------------------+ +// | +// v +// configure instance (server) +// | +// v +// attach volume +// | +// v +// boot instance #[doc(hidden)] #[derive(Debug)] @@ -86,7 +82,7 @@ struct ServerAllocResult { server_id: u64, } -/* TODO-cleanup can we implement this generically? */ +// TODO-cleanup can we implement this generically? impl From for ActionError { fn from(t: ExampleError) -> ActionError { ActionError::action_failed(t) @@ -248,8 +244,8 @@ async fn demo_prov_instance_create( sgctx.node_label(), params.instance_name ); - /* exercise saga parameters */ - /* make up an instance ID */ + // exercise saga parameters + // make up an instance ID let instance_id = 1211u64; Ok(instance_id) } @@ -258,10 +254,10 @@ async fn demo_prov_vpc_alloc_ip( sgctx: SagaExampleContext, ) -> ExFuncResult { eprintln!("running action: {}", sgctx.node_label()); - /* exercise using some data from a previous node */ + // exercise using some data from a previous node let instance_id = sgctx.lookup::("instance_id")?; assert_eq!(instance_id, 1211); - /* make up an IP (simulate allocation) */ + // make up an IP (simulate allocation) let ip = String::from("10.120.121.122"); Ok(ip) } @@ -270,9 +266,9 @@ async fn demo_prov_vpc_alloc_ip( async fn demo_prov_server_pick(sgctx: SagaExampleContext) -> ExFuncResult { eprintln!("running action: {}", sgctx.node_label()); let params = sgctx.saga_params::()?; - /* exercise subsaga parameters */ + // exercise subsaga parameters assert_eq!(params.number_of_things, 1); - /* make up ("allocate") a new server id */ + // make up ("allocate") a new server id let server_id = 1212u64; Ok(server_id) } @@ -284,12 +280,12 @@ async fn demo_prov_server_reserve( eprintln!("running action: {}", sgctx.node_label()); let params = sgctx.saga_params::()?; - /* exercise subsaga parameters */ + // exercise subsaga parameters assert_eq!(params.number_of_things, 1); - /* exercise using data from previous nodes */ + // exercise using data from previous nodes let server_id = sgctx.lookup::("server_id")?; assert_eq!(server_id, 1212); - /* package this up for downstream consumers */ + // package this up for downstream consumers Ok(ServerAllocResult { server_id }) } @@ -297,9 +293,9 @@ async fn demo_prov_volume_create( sgctx: SagaExampleContext, ) -> ExFuncResult { eprintln!("running action: {}", sgctx.node_label()); - /* exercise using data from previous nodes */ + // exercise using data from previous nodes assert_eq!(sgctx.lookup::("instance_id")?, 1211); - /* make up ("allocate") a volume id */ + // make up ("allocate") a volume id let volume_id = 1213u64; Ok(volume_id) } @@ -308,7 +304,7 @@ async fn demo_prov_instance_configure( sgctx: SagaExampleContext, ) -> ExFuncResult<()> { eprintln!("running action: {}", sgctx.node_label()); - /* exercise using data from previous nodes */ + // exercise using data from previous nodes assert_eq!(sgctx.lookup::("instance_id")?, 1211); let params = sgctx.saga_params::()?; @@ -325,7 +321,7 @@ async fn demo_prov_volume_attach( sgctx: SagaExampleContext, ) -> ExFuncResult<()> { eprintln!("running action: {}", sgctx.node_label()); - /* exercise using data from previous nodes */ + // exercise using data from previous nodes assert_eq!(sgctx.lookup::("instance_id")?, 1211); assert_eq!(sgctx.lookup::("volume_id")?, 1213); @@ -339,11 +335,12 @@ async fn demo_prov_instance_boot( sgctx: SagaExampleContext, ) -> ExFuncResult<()> { eprintln!("running action: {}", sgctx.node_label()); - /* exercise using data from previous nodes */ + // exercise using data from previous nodes assert_eq!(sgctx.lookup::("instance_id")?, 1211); assert_eq!(sgctx.lookup::("volume_id")?, 1213); - // We know there is only one instance of the subsaga that created a server id + // We know there is only one instance of the subsaga that created a server + // id assert_eq!( sgctx.lookup::("server_alloc")?.server_id, 1212 diff --git a/src/lib.rs b/src/lib.rs index d79e8b1..83e4941 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,15 +19,12 @@ //! //! This crate is necessarily somewhat complex to use. **For a detailed, //! documented example, see examples/trip.rs.** -//! #![deny(elided_lifetimes_in_paths)] -/* - * We disable the warning for unstable name collisions because we deliberately - * have some conflicts in rust_features.rs (corresponding to backports of - * unstable features). If and when these features are stabilized, we should see - * warnings that our backported versions are unused and we can remove them. - */ +// We disable the warning for unstable name collisions because we deliberately +// have some conflicts in rust_features.rs (corresponding to backports of +// unstable features). If and when these features are stabilized, we should see +// warnings that our backported versions are unused and we can remove them. #![allow(unstable_name_collisions)] mod dag; @@ -41,12 +38,10 @@ mod saga_log; mod sec; mod store; -/* - * TODO-cleanup The example_provision stuff should probably be in a separate - * crate that depends on "steno". That would ensure it only uses public - * interfaces. However, the "steno" crate wants to have an example that uses - * this crate, hence our problem. - */ +// TODO-cleanup The example_provision stuff should probably be in a separate +// crate that depends on "steno". That would ensure it only uses public +// interfaces. However, the "steno" crate wants to have an example that uses +// this crate, hence our problem. pub use example_provision::load_example_actions; pub use example_provision::make_example_provision_dag; pub use example_provision::ExampleContext; @@ -92,12 +87,10 @@ pub use store::SagaCachedState; pub use store::SagaCreateParams; pub use store::SecStore; -/* - * TODO-cleanup This ought not to be exposed. It's here because we expose - * SagaTemplateGeneric, which is important, and it has a function that uses this - * type. This ought to be a sealed trait where this function is private or - * something. - */ +// TODO-cleanup This ought not to be exposed. It's here because we expose +// SagaTemplateGeneric, which is important, and it has a function that uses this +// type. This ought to be a sealed trait where this function is private or +// something. pub use sec::SecExecClient; #[macro_use] diff --git a/src/rust_features.rs b/src/rust_features.rs index 7131a36..d11787d 100644 --- a/src/rust_features.rs +++ b/src/rust_features.rs @@ -1,10 +1,6 @@ -/*! - * Backports of useful unstable Rust features. - */ +//! Backports of useful unstable Rust features. -/* - * feature(option_expect_none) - */ +// feature(option_expect_none) pub trait ExpectNone { fn expect_none(self, message: &'static str); } diff --git a/src/saga_action_error.rs b/src/saga_action_error.rs index d10585d..7f79178 100644 --- a/src/saga_action_error.rs +++ b/src/saga_action_error.rs @@ -7,57 +7,56 @@ use serde::Serialize; use std::fmt::Display; use thiserror::Error; -/** - * An error produced by a saga action - * - * On failure, actions always return an `ActionError`. This type can represent - * a failure from Steno itself or a failure produced by the consumer (e.g., an - * action whose body fails for some reason). The various specific errors are - * documented below. - * - * You can use your own error type with [`ActionError`]. As long as it meets - * the requirements of [`ActionData`], you can wrap your error in an - * [`ActionError::ActionFailed`] variant using [`ActionError::action_failed()`]. - * Given an [`ActionError::ActionFailed`] variant, you can get your specific - * type back out again using [`ActionError::convert()`]. - * - * Note that the conversion back to your specific error type can fail! This - * looks like a downcast, but it's not. `ActionError`s are typically recorded in - * the saga log and interpreted later, possibly after a crash and recovery. - * Whether there was an intervening crash or not, the conversion here - * deserializes the error from the log into your custom error type. This won't - * work if your error type is incompatible with the one that was used to - * serialize the error in the first place. - * - * # Example - * - * ```rust - * use serde::Deserialize; - * use serde::Serialize; - * use steno::ActionError; - * - * #[derive(Debug, Deserialize, Serialize)] - * struct MyError { message: String } - * - * fn my_func_that_fails() -> Result<(), ActionError> { - * Err(ActionError::action_failed(MyError { message: "boom!".to_owned() })) - * } - * - * fn handle_error(error: ActionError) { - * match error.convert::() { - * Ok(my_error) => { - * eprintln!("my action failed because: {}", my_error.message); - * } - * Err(other_error) => { - * eprintln!( - * "my action failed because the framework had a problem: {}", - * other_error.to_string() - * ); - * } - * } - * } - * ``` - */ +/// An error produced by a saga action +/// +/// On failure, actions always return an `ActionError`. This type can represent +/// a failure from Steno itself or a failure produced by the consumer (e.g., an +/// action whose body fails for some reason). The various specific errors are +/// documented below. +/// +/// You can use your own error type with [`ActionError`]. As long as it meets +/// the requirements of [`ActionData`], you can wrap your error in an +/// [`ActionError::ActionFailed`] variant using +/// [`ActionError::action_failed()`]. Given an [`ActionError::ActionFailed`] +/// variant, you can get your specific type back out again using +/// [`ActionError::convert()`]. +/// +/// Note that the conversion back to your specific error type can fail! This +/// looks like a downcast, but it's not. `ActionError`s are typically recorded +/// in the saga log and interpreted later, possibly after a crash and recovery. +/// Whether there was an intervening crash or not, the conversion here +/// deserializes the error from the log into your custom error type. This won't +/// work if your error type is incompatible with the one that was used to +/// serialize the error in the first place. +/// +/// # Example +/// +/// ```rust +/// use serde::Deserialize; +/// use serde::Serialize; +/// use steno::ActionError; +/// +/// #[derive(Debug, Deserialize, Serialize)] +/// struct MyError { message: String } +/// +/// fn my_func_that_fails() -> Result<(), ActionError> { +/// Err(ActionError::action_failed(MyError { message: "boom!".to_owned() })) +/// } +/// +/// fn handle_error(error: ActionError) { +/// match error.convert::() { +/// Ok(my_error) => { +/// eprintln!("my action failed because: {}", my_error.message); +/// } +/// Err(other_error) => { +/// eprintln!( +/// "my action failed because the framework had a problem: {}", +/// other_error.to_string() +/// ); +/// } +/// } +/// } +/// ``` #[derive(Clone, Debug, Deserialize, Error, JsonSchema, Serialize)] pub enum ActionError { /// Action failed due to a consumer-specific error @@ -85,18 +84,14 @@ pub enum ActionError { } impl ActionError { - /** - * Wrap a consumer-provided error in an [`ActionError`] - */ - /* - * TODO-design Is there a way for us to provide this implementation - * automatically? It would be nice if a consumer could use their own error - * type, use `?` in the body of their function, and then have that get - * wrapped in an ActionError. We'd like to provide a blanket impl for any - * supported error type to convert it to ActionError. But ActionError is - * itself a supported error type (not necessarily by design), so this - * doesn't work. - */ + /// Wrap a consumer-provided error in an [`ActionError`] + // TODO-design Is there a way for us to provide this implementation + // automatically? It would be nice if a consumer could use their own error + // type, use `?` in the body of their function, and then have that get + // wrapped in an ActionError. We'd like to provide a blanket impl for any + // supported error type to convert it to ActionError. But ActionError is + // itself a supported error type (not necessarily by design), so this + // doesn't work. pub fn action_failed(user_error: E) -> ActionError { match serde_json::to_value(user_error) { Ok(source_error) => ActionError::ActionFailed { source_error }, @@ -104,28 +99,26 @@ impl ActionError { } } - /** - * Try to convert the error to a specific consumer error - * - * This function streamlines the most common use case by decomposing the - * error into one of three cases: - * - * 1. If the error can be converted to the specific error type `E` (which - * means that this is the `ActionError::ActionFailed` variant and the - * wrapped error could be deserialized to `E`), this function returns - * `Ok(E)`. - * - * 2. If the error is the `ActionError::ActionFailed` variant but could not - * be converted to type `E`, this function returns `Err(ActionError)` - * where the error is the `ActionError::DeserializeFailed`. This is - * either a bug in the current program or an unexpected operational - * error, as might happen if incompatible versions of the saga executor - * are deployed. Most consumers will propagate this error up and - * eventually abandon the saga. - * - * 3. If the error is any other variant, the error itself is returned as - * `Err(ActionError)`. Most consumers will propagate this error up. - */ + /// Try to convert the error to a specific consumer error + /// + /// This function streamlines the most common use case by decomposing the + /// error into one of three cases: + /// + /// 1. If the error can be converted to the specific error type `E` (which + /// means that this is the `ActionError::ActionFailed` variant and the + /// wrapped error could be deserialized to `E`), this function returns + /// `Ok(E)`. + /// + /// 2. If the error is the `ActionError::ActionFailed` variant but could not + /// be converted to type `E`, this function returns `Err(ActionError)` + /// where the error is the `ActionError::DeserializeFailed`. This is + /// either a bug in the current program or an unexpected operational + /// error, as might happen if incompatible versions of the saga executor + /// are deployed. Most consumers will propagate this error up and + /// eventually abandon the saga. + /// + /// 3. If the error is any other variant, the error itself is returned as + /// `Err(ActionError)`. Most consumers will propagate this error up. pub fn convert(self) -> Result { match self { ActionError::ActionFailed { source_error } => { diff --git a/src/saga_action_func.rs b/src/saga_action_func.rs index 93cd572..d1d7c22 100644 --- a/src/saga_action_func.rs +++ b/src/saga_action_func.rs @@ -18,40 +18,34 @@ use std::fmt::Debug; use std::future::Future; use std::sync::Arc; -/** - * Result of a function that implements a saga action - */ -/* - * This differs from [`ActionResult`] because [`ActionResult`] returns a generic - * type. The function-oriented interface allows you to return more specific - * types as long as they implement the [`ActionData`] trait. - * - * TODO-design There's no reason that the generic interface couldn't also look - * like this. We have this mechanism here to allow `ActionFunc` functions - * to return specific types while storing the generic thing inside the - * framework. We do this translation in the impl of `ActionFunc`. But we - * could instead create another layer above `Action` that does this. This gets - * complicated and doesn't seem especially useful yet. - */ +/// Result of a function that implements a saga action +// This differs from [`ActionResult`] because [`ActionResult`] returns a generic +// type. The function-oriented interface allows you to return more specific +// types as long as they implement the [`ActionData`] trait. +// +// TODO-design There's no reason that the generic interface couldn't also look +// like this. We have this mechanism here to allow `ActionFunc` functions +// to return specific types while storing the generic thing inside the +// framework. We do this translation in the impl of `ActionFunc`. But we +// could instead create another layer above `Action` that does this. This gets +// complicated and doesn't seem especially useful yet. pub type ActionFuncResult = Result; -/** - * Trait that expresses the requirements for async functions to be used with - * `ActionFunc`. This exists just to express the relationships between the types - * involved in the function, so that they don't have to be repeated everywhere. - * You don't need to implement it yourself -- a blanket implementation is - * provided. - */ +/// Trait that expresses the requirements for async functions to be used with +/// `ActionFunc`. This exists just to express the relationships between the +/// types involved in the function, so that they don't have to be repeated +/// everywhere. You don't need to implement it yourself -- a blanket +/// implementation is provided. pub trait ActionFn<'c, S: SagaType>: Send + Sync + 'static { - /** Type returned when the future finally resolves. */ + /// Type returned when the future finally resolves. type Output; - /** Type of the future returned when the function is called. */ + /// Type of the future returned when the function is called. type Future: Future + Send + 'c; - /** Call the function. */ + /// Call the function. fn act(&'c self, ctx: ActionContext) -> Self::Future; } -/* Blanket impl for Fn types returning futures */ +// Blanket impl for Fn types returning futures impl<'c, F, S, FF> ActionFn<'c, S> for F where S: SagaType, @@ -65,10 +59,8 @@ where } } -/** - * Implementation of [`Action`] that uses ordinary functions for the action and - * undo action - */ +/// Implementation of [`Action`] that uses ordinary functions for the action and +/// undo action pub struct ActionFunc { name: ActionName, action_func: ActionFuncType, @@ -76,15 +68,13 @@ pub struct ActionFunc { } impl ActionFunc { - /** - * Construct an [`Action`] from a pair of functions, using `action_func` for - * the action and `undo_func` for the undo action - * - * The result is returned as `Arc` so that it can be used - * directly where `Action`s are expected. (The struct `ActionFunc` has no - * interfaces of its own so there's generally no need to have the specific - * type.) - */ + /// Construct an [`Action`] from a pair of functions, using `action_func` + /// for the action and `undo_func` for the undo action + /// + /// The result is returned as `Arc` so that it can be used + /// directly where `Action`s are expected. (The struct `ActionFunc` has no + /// interfaces of its own so there's generally no need to have the specific + /// type.) pub fn new_action( name: Name, action_func: ActionFuncType, @@ -109,14 +99,10 @@ impl ActionFunc { } } -/* - * TODO-cleanup why can't new_action_noop_undo live in the Action namespace? - */ +// TODO-cleanup why can't new_action_noop_undo live in the Action namespace? -/** - * Given a function `f`, return an `ActionFunc` that uses `f` as the action and - * provides a no-op undo function (which does nothing and always succeeds). - */ +/// Given a function `f`, return an `ActionFunc` that uses `f` as the action and +/// provides a no-op undo function (which does nothing and always succeeds). pub fn new_action_noop_undo( name: Name, f: ActionFuncType, @@ -153,11 +139,9 @@ where ) -> BoxFuture<'_, ActionResult> { Box::pin(async move { let fut = self.action_func.act(sgctx); - /* - * Execute the caller's function and translate its type into the - * generic JsonValue that the framework uses to store action - * outputs. - */ + // Execute the caller's function and translate its type into the + // generic JsonValue that the framework uses to store action + // outputs. fut.await .and_then(|func_output| { serde_json::to_value(func_output) @@ -183,10 +167,8 @@ impl Debug for ActionFunc { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - /* - * The type name for a function includes its name, so it's a handy - * summary for debugging. - */ + // The type name for a function includes its name, so it's a handy + // summary for debugging. f.write_str(&type_name::()) } } diff --git a/src/saga_action_generic.rs b/src/saga_action_generic.rs index c7aaebf..3aef096 100644 --- a/src/saga_action_generic.rs +++ b/src/saga_action_generic.rs @@ -13,39 +13,34 @@ use serde::Serialize; use std::fmt::Debug; use std::sync::Arc; -/** - * Collection of consumer-provided types, effectively defining the type - * signature of a saga - * - * This trait bundles a bunch of consumer-provided types that are used - * throughout Steno to avoid a sprawl of type parameters and duplicated trait - * bounds. - */ +/// Collection of consumer-provided types, effectively defining the type +/// signature of a saga +/// +/// This trait bundles a bunch of consumer-provided types that are used +/// throughout Steno to avoid a sprawl of type parameters and duplicated trait +/// bounds. pub trait SagaType: Debug + 'static { - /** - * Type for the consumer's context object - * - * When beginning execution of a saga with - * [`crate::SecClient::saga_create()`] or resuming a previous execution with - * [`crate::SecClient::saga_resume()`], consumers provide a context object - * with this type. This object is not persistent. Rather, it provides - * programming interfaces the consumer wants available from within actions. - * For example, this could include HTTP clients that will be used by the - * action to make requests to dependent services. This object is made - * available to actions via [`crate::ActionContext::user_data()`]. There's - * one context for the life of each saga's execution. - */ + /// Type for the consumer's context object + /// + /// When beginning execution of a saga with + /// [`crate::SecClient::saga_create()`] or resuming a previous execution + /// with [`crate::SecClient::saga_resume()`], consumers provide a + /// context object with this type. This object is not persistent. + /// Rather, it provides programming interfaces the consumer wants + /// available from within actions. For example, this could include HTTP + /// clients that will be used by the action to make requests to + /// dependent services. This object is made available to actions via + /// [`crate::ActionContext::user_data()`]. There's one context for the + /// life of each saga's execution. type ExecContextType: Debug + Send + Sync + 'static; } -/** - * Data produced by the consumer that may need to be serialized to the saga log - * - * This type is used for saga parameters and the output data and errors from an - * individual action. It's essentially a synonym for `Debug + DeserializeOwned - * + Serialize + Send + Sync`. Consumers are not expected to impl this - * directly. - */ +/// Data produced by the consumer that may need to be serialized to the saga log +/// +/// This type is used for saga parameters and the output data and errors from an +/// individual action. It's essentially a synonym for `Debug + DeserializeOwned +/// + Serialize + Send + Sync`. Consumers are not expected to impl this +/// directly. pub trait ActionData: Debug + DeserializeOwned + Serialize + Send + Sync + 'static { @@ -55,90 +50,78 @@ impl ActionData { } -/** - * Result of a saga action - * - * In this generic Action interface, actions return a pretty generic - * `serde_json::Value`. This is something that we can store uniformly, - * serialize to the log, and deserialize into a more specific type when the - * consumer asks for that. (By contrast, the `ActionFunc` impl is a little - * fancier. It allows consumers to return anything that _can_ be serialized. - * That's why consumers should prefer that interface and not this one.) - */ +/// Result of a saga action +/// +/// In this generic Action interface, actions return a pretty generic +/// `serde_json::Value`. This is something that we can store uniformly, +/// serialize to the log, and deserialize into a more specific type when the +/// consumer asks for that. (By contrast, the `ActionFunc` impl is a little +/// fancier. It allows consumers to return anything that _can_ be serialized. +/// That's why consumers should prefer that interface and not this one.) // TODO-cleanup can we drop this Arc? pub type ActionResult = Result, ActionError>; -/** Result of a saga undo action */ +/// Result of a saga undo action // TODO-design what should the error type here be? Maybe something that can // encompass "general framework error"? This might put the saga into a "needs // attention" state? pub type UndoResult = Result<(), anyhow::Error>; -/** - * Building blocks of sagas - * - * Each node in a saga graph is represented with some kind of `Action`, - * which provides entry points to asynchronously execute an action and its - * corresponding undo action. A saga is essentially a directed acyclic graph of - * these actions with dependencies between them. Each action consumes an - * [`ActionContext`] and asynchronously produces an [`ActionResult`]. The - * primary implementor for most consumers is [`crate::ActionFunc`]. - * - * Actions should be stateless. Any state is supposed to be stored via the - * framework. So it should be easy to make Actions Send and Sync. This is - * important because we want to be able to have multiple references to the same - * Action in multiple threads -- as might happen if the same action appeared - * multiple times in the saga or in different sagas. - */ +/// Building blocks of sagas +/// +/// Each node in a saga graph is represented with some kind of `Action`, +/// which provides entry points to asynchronously execute an action and its +/// corresponding undo action. A saga is essentially a directed acyclic graph +/// of these actions with dependencies between them. Each action consumes an +/// [`ActionContext`] and asynchronously produces an [`ActionResult`]. The +/// primary implementor for most consumers is [`crate::ActionFunc`]. +/// +/// Actions should be stateless. Any state is supposed to be stored via the +/// framework. So it should be easy to make Actions Send and Sync. This is +/// important because we want to be able to have multiple references to the same +/// Action in multiple threads -- as might happen if the same action appeared +/// multiple times in the saga or in different sagas. pub trait Action: Debug + Send + Sync { - /** - * Executes the action for this saga node, whatever that is. Actions - * function like requests in distributed sagas: critically, they must be - * idempotent. This means that multiple calls to the action have the - * same result on the system as a single call, although the action is not - * necessarily required to return the same result. - * - * As an example, generating a UUID to represent an object is a common saga - * action: if called repeatedly, it may generate different results, but it - * has no side effects on the rest of the system. Similarly, using a - * generated UUID in a subsequent action to create an object may help ensure - * that the side effects appear the same, regardless of how many times the - * action has been invoked. - * - * Actions should be very careful in using interfaces outside of - * [`ActionContext`] -- we want them to be as self-contained as possible to - * ensure idempotence and to minimize versioning issues. - * - * On success, this function produces a serialized output. This output will - * be stored persistently, keyed by the _name_ of the current saga node. - * Subsequent stages can access this data with [`ActionContext::lookup`]. - * This is the _only_ supported means of sharing state across actions within - * a saga. - * - * The output of the last node in the DAG becomes the output of the saga. - */ + /// Executes the action for this saga node, whatever that is. Actions + /// function like requests in distributed sagas: critically, they must be + /// idempotent. This means that multiple calls to the action have the + /// same result on the system as a single call, although the action is not + /// necessarily required to return the same result. + /// + /// As an example, generating a UUID to represent an object is a common saga + /// action: if called repeatedly, it may generate different results, but it + /// has no side effects on the rest of the system. Similarly, using a + /// generated UUID in a subsequent action to create an object may help + /// ensure that the side effects appear the same, regardless of how many + /// times the action has been invoked. + /// + /// Actions should be very careful in using interfaces outside of + /// [`ActionContext`] -- we want them to be as self-contained as possible to + /// ensure idempotence and to minimize versioning issues. + /// + /// On success, this function produces a serialized output. This output + /// will be stored persistently, keyed by the _name_ of the current saga + /// node. Subsequent stages can access this data with + /// [`ActionContext::lookup`]. This is the _only_ supported means of + /// sharing state across actions within a saga. + /// + /// The output of the last node in the DAG becomes the output of the saga. fn do_it( &self, sgctx: ActionContext, ) -> BoxFuture<'_, ActionResult>; - /** - * Executes the undo action for this saga node, whatever that is. - */ + /// Executes the undo action for this saga node, whatever that is. fn undo_it( &self, sgctx: ActionContext, ) -> BoxFuture<'_, UndoResult>; - /** - * Return the name of the action used as the key in the ActionRegistry - */ + /// Return the name of the action used as the key in the ActionRegistry fn name(&self) -> ActionName; } -/* - * Special action implementations - */ +// Special action implementations /// [`Action`] impl that emits a value known when the DAG is created /// @@ -171,7 +154,7 @@ where } } -/** Simulates an error at a given spot in the saga graph */ +/// Simulates an error at a given spot in the saga graph #[derive(Debug)] pub struct ActionInjectError {} @@ -182,7 +165,7 @@ impl Action for ActionInjectError { } fn undo_it(&self, _: ActionContext) -> BoxFuture<'_, UndoResult> { - /* We should never undo an action that failed. */ + // We should never undo an action that failed. unimplemented!(); } diff --git a/src/saga_exec.rs b/src/saga_exec.rs index 1ff1b3e..0776713 100644 --- a/src/saga_exec.rs +++ b/src/saga_exec.rs @@ -44,12 +44,10 @@ use std::sync::Arc; use tokio::sync::broadcast; use tokio::task::JoinHandle; -/* - * TODO-design Should we go even further and say that each node is its own - * struct with incoming channels from parents (to notify when done), from - * children (to notify when undone), and to each direction as well? Then the - * whole thing is a message passing exercise? - */ +// TODO-design Should we go even further and say that each node is its own +// struct with incoming channels from parents (to notify when done), from +// children (to notify when undone), and to each direction as well? Then the +// whole thing is a message passing exercise? struct SgnsDone(Arc); struct SgnsFailed(ActionError); struct SgnsUndone(UndoMode); @@ -64,7 +62,7 @@ impl SagaNodeStateType for SgnsDone {} impl SagaNodeStateType for SgnsFailed {} impl SagaNodeStateType for SgnsUndone {} -/* TODO-design Is this right? Is the trait supposed to be empty? */ +// TODO-design Is this right? Is the trait supposed to be empty? trait SagaNodeRest: Send + Sync { fn propagate( &self, @@ -92,9 +90,7 @@ impl SagaNodeRest for SagaNode { .expect_none("node finished twice (storing output)"); if self.node_id == exec.dag.end_node { - /* - * If we've completed the last node, the saga is done. - */ + // If we've completed the last node, the saga is done. assert_eq!(live_state.exec_state, SagaCachedState::Running); assert_eq!(graph.node_count(), live_state.node_outputs.len()); live_state.mark_saga_done(); @@ -102,12 +98,10 @@ impl SagaNodeRest for SagaNode { } if live_state.exec_state == SagaCachedState::Unwinding { - /* - * If the saga is currently unwinding, then this node finishing - * doesn't unblock any other nodes. However, it potentially - * unblocks undoing itself. We'll only proceed if all of our child - * nodes are "undone" already. - */ + // If the saga is currently unwinding, then this node finishing + // doesn't unblock any other nodes. However, it potentially + // unblocks undoing itself. We'll only proceed if all of our child + // nodes are "undone" already. if neighbors_all(graph, &self.node_id, Outgoing, |child| { live_state.nodes_undone.contains_key(child) }) { @@ -116,10 +110,8 @@ impl SagaNodeRest for SagaNode { return; } - /* - * Under normal execution, this node's completion means it's time to - * check dependent nodes to see if they're now runnable. - */ + // Under normal execution, this node's completion means it's time to + // check dependent nodes to see if they're now runnable. for child in graph.neighbors_directed(self.node_id, Outgoing) { if neighbors_all(graph, &child, Incoming, |parent| { live_state.node_outputs.contains_key(parent) @@ -148,16 +140,14 @@ impl SagaNodeRest for SagaNode { .expect_none("node finished twice (storing error)"); if live_state.exec_state == SagaCachedState::Unwinding { - /* - * This node failed while we're already unwinding. We don't - * need to kick off unwinding again. We could in theory - * immediately move this node to "undone" and unblock its - * dependents, but for consistency with a simpler algorithm, - * we'll wait for unwinding to propagate from the end node. - * If all of our children are already undone, however, we - * must go ahead and mark ourselves undone and propagate - * that. - */ + // This node failed while we're already unwinding. We don't + // need to kick off unwinding again. We could in theory + // immediately move this node to "undone" and unblock its + // dependents, but for consistency with a simpler algorithm, + // we'll wait for unwinding to propagate from the end node. + // If all of our children are already undone, however, we + // must go ahead and mark ourselves undone and propagate + // that. if neighbors_all(graph, &self.node_id, Outgoing, |child| { live_state.nodes_undone.contains_key(child) }) { @@ -168,10 +158,8 @@ impl SagaNodeRest for SagaNode { new_node.propagate(exec, live_state); } } else { - /* - * Begin the unwinding process. Start with the end node: mark - * it trivially "undone" and propagate that. - */ + // Begin the unwinding process. Start with the end node: mark + // it trivially "undone" and propagate that. live_state.exec_state = SagaCachedState::Unwinding; assert_ne!(self.node_id, exec.dag.end_node); let new_node = SagaNode { @@ -201,34 +189,26 @@ impl SagaNodeRest for SagaNode { .expect_none("node already undone"); if self.node_id == exec.dag.start_node { - /* - * If we've undone the start node, the saga is done. - */ + // If we've undone the start node, the saga is done. live_state.mark_saga_done(); return; } - /* - * During unwinding, a node's becoming undone means it's time to check - * ancestor nodes to see if they're now undoable. - */ + // During unwinding, a node's becoming undone means it's time to check + // ancestor nodes to see if they're now undoable. for parent in graph.neighbors_directed(self.node_id, Incoming) { if neighbors_all(graph, &parent, Outgoing, |child| { live_state.nodes_undone.contains_key(child) }) { - /* - * We're ready to undo "parent". We don't know whether it's - * finished running, on the todo queue, or currenting - * outstanding. (It should not be on the undo queue!) - * TODO-design Here's an awful approach just intended to let us - * flesh out more of the rest of this to better understand how - * to manage state. - */ + // We're ready to undo "parent". We don't know whether it's + // finished running, on the todo queue, or currenting + // outstanding. (It should not be on the undo queue!) + // TODO-design Here's an awful approach just intended to let us + // flesh out more of the rest of this to better understand how + // to manage state. match live_state.node_exec_state(parent) { - /* - * If the node never started or if it failed, we can - * just mark it undone without doing anything else. - */ + // If the node never started or if it failed, we can + // just mark it undone without doing anything else. NodeExecState::Blocked => { let new_node = SagaNode { node_id: parent, @@ -249,20 +229,16 @@ impl SagaNodeRest for SagaNode { NodeExecState::QueuedToRun | NodeExecState::TaskInProgress => { - /* - * If we're running an action for this task, there's - * nothing we can do right now, but we'll handle it when - * it finishes. We could do better with queued (and - * there's a TODO-design in kick_off_ready() to do so), - * but this isn't wrong as-is. - */ + // If we're running an action for this task, there's + // nothing we can do right now, but we'll handle it when + // it finishes. We could do better with queued (and + // there's a TODO-design in kick_off_ready() to do so), + // but this isn't wrong as-is. continue; } NodeExecState::Done => { - /* - * We have to actually run the undo action. - */ + // We have to actually run the undo action. live_state.queue_undo.push(parent); } @@ -270,8 +246,8 @@ impl SagaNodeRest for SagaNode { | NodeExecState::UndoInProgress | NodeExecState::Undone(_) => { panic!( - "already undoing or undone node \ - whose child was just now undone" + "already undoing or undone node whose child was \ + just now undone" ); } } @@ -280,73 +256,61 @@ impl SagaNodeRest for SagaNode { } } -/** - * Message sent from (tokio) task that executes an action to the executor - * indicating that the action has completed - */ +/// Message sent from (tokio) task that executes an action to the executor +/// indicating that the action has completed struct TaskCompletion { - /* - * TODO-cleanup can this be removed? The node field is a SagaNode, which has - * a node_id. - */ + // TODO-cleanup can this be removed? The node field is a SagaNode, which + // has a node_id. node_id: NodeIndex, node: Box>, } -/** - * Context provided to the (tokio) task that executes an action - */ +/// Context provided to the (tokio) task that executes an action struct TaskParams { dag: Arc, user_context: Arc, - /** - * Handle to the saga's live state - * - * This is used only to update state for status purposes. We want to avoid - * any tight coupling between this task and the internal state. - */ + /// Handle to the saga's live state + /// + /// This is used only to update state for status purposes. We want to + /// avoid any tight coupling between this task and the internal state. live_state: Arc>, - /** id of the graph node whose action we're running */ + /// id of the graph node whose action we're running node_id: NodeIndex, - /** channel over which to send completion message */ + /// channel over which to send completion message done_tx: mpsc::Sender>, - /** Ancestor tree for this node. See [`ActionContext`]. */ + /// Ancestor tree for this node. See [`ActionContext`]. // TODO-cleanup there's no reason this should be an Arc. ancestor_tree: Arc>>, - /** Saga parameters for the closest enclosing saga */ + /// Saga parameters for the closest enclosing saga saga_params: Arc, - /** The action itself that we're executing. */ + /// The action itself that we're executing. action: Arc>, } -/** - * Executes a saga - * - * Call `SagaExecutor.run()` to get a Future. You must `await` this Future to - * actually execute the saga. - */ -/* - * TODO Lots more could be said here, but the basic idea matches distributed - * sagas. - * This will be a good place to put things like concurrency limits, canarying, - * etc. - * - * TODO Design note: SagaExecutor's constructors consume Arc and store Arc - * to reference the user-provided context "E". This makes it easy for us to - * pass references to the various places that need it. It would seem nice if - * the constructor accepted "E" and stored that, since "E" is already Send + - * Sync + 'static. There are two challenges here: (1) There are a bunch of - * other types that store a reference to E, including TaskParams and - * ActionContext, the latter of which is exposed to the user. These would have - * to store &E, which would be okay, but they'd need to have annoying lifetime - * parameters. (2) child sagas (and so child saga executors) are a thing. - * Since there's only one "E", the child would have to reference &E, which means - * it would need a lifetime parameter on it _and_ that might mean it would have - * to be a different type than SagaExecutor, even though they're otherwise the - * same. - */ +/// Executes a saga +/// +/// Call `SagaExecutor.run()` to get a Future. You must `await` this Future to +/// actually execute the saga. +// TODO Lots more could be said here, but the basic idea matches distributed +// sagas. +// This will be a good place to put things like concurrency limits, canarying, +// etc. +// +// TODO Design note: SagaExecutor's constructors consume Arc and store Arc +// to reference the user-provided context "E". This makes it easy for us to +// pass references to the various places that need it. It would seem nice if +// the constructor accepted "E" and stored that, since "E" is already Send + +// Sync + 'static. There are two challenges here: (1) There are a bunch of +// other types that store a reference to E, including TaskParams and +// ActionContext, the latter of which is exposed to the user. These would have +// to store &E, which would be okay, but they'd need to have annoying lifetime +// parameters. (2) child sagas (and so child saga executors) are a thing. +// Since there's only one "E", the child would have to reference &E, which means +// it would need a lifetime parameter on it _and_ that might mean it would have +// to be a different type than SagaExecutor, even though they're otherwise the +// same. #[derive(Debug)] pub struct SagaExecutor { #[allow(dead_code)] @@ -355,13 +319,13 @@ pub struct SagaExecutor { dag: Arc, action_registry: Arc>, - /** Channel for monitoring execution completion */ + /// Channel for monitoring execution completion finish_tx: broadcast::Sender<()>, - /** Unique identifier for this saga (an execution of a saga template) */ + /// Unique identifier for this saga (an execution of a saga template) saga_id: SagaId, - /** For each node, the NodeIndex of the start of its saga or subsaga */ + /// For each node, the NodeIndex of the start of its saga or subsaga node_saga_start: BTreeMap, live_state: Arc>, @@ -375,7 +339,7 @@ enum RecoveryDirection { } impl SagaExecutor { - /** Create an executor to run the given saga. */ + /// Create an executor to run the given saga. pub fn new( log: slog::Logger, saga_id: SagaId, @@ -396,10 +360,8 @@ impl SagaExecutor { ) } - /** - * Create an executor to run the given saga that may have already - * started, using the given log events. - */ + /// Create an executor to run the given saga that may have already + /// started, using the given log events. pub fn new_recover( log: slog::Logger, saga_id: SagaId, @@ -409,21 +371,19 @@ impl SagaExecutor { sec_hdl: SecExecClient, sglog: SagaLog, ) -> Result, anyhow::Error> { - /* Before anything else, do some basic checks on the DAG. */ + // Before anything else, do some basic checks on the DAG. Self::validate_saga(&dag, ®istry).with_context(|| { format!("validating saga {:?}", dag.saga_name()) })?; - /* - * During recovery, there's a fine line between operational errors and - * programmer errors. If we discover semantically invalid saga state, - * that's an operational error that we must handle gracefully. We use - * lots of assertions to check invariants about our own process for - * loading the state. We panic if those are violated. For example, if - * we find that we've loaded the same node twice, that's a bug in this - * code right here (which walks each node of the graph exactly once), - * not a result of corrupted database state. - */ + // During recovery, there's a fine line between operational errors and + // programmer errors. If we discover semantically invalid saga state, + // that's an operational error that we must handle gracefully. We use + // lots of assertions to check invariants about our own process for + // loading the state. We panic if those are violated. For example, if + // we find that we've loaded the same node twice, that's a bug in this + // code right here (which walks each node of the graph exactly once), + // not a result of corrupted database state. let forward = !sglog.unwinding(); let mut live_state = SagaExecLiveState { exec_state: if forward { @@ -445,12 +405,10 @@ impl SagaExecutor { let mut loaded = BTreeSet::new(); let graph = &dag.graph; - /* - * Precompute a mapping from each node to the start of its containing - * saga or subsaga. This is used for quickly finding each node's saga - * parameters and also when building ancestor trees for skipping over - * entire subsagas. - */ + // Precompute a mapping from each node to the start of its containing + // saga or subsaga. This is used for quickly finding each node's saga + // parameters and also when building ancestor trees for skipping over + // entire subsagas. let nodes_sorted = toposort(&graph, None).expect("saga DAG had cycles"); let node_saga_start = { let mut node_saga_start = BTreeMap::new(); @@ -497,8 +455,8 @@ impl SagaExecutor { }; *node_saga_start.get(&ancestor).expect( - "expected to compute ancestor's \ - subsaga start node first", + "expected to compute ancestor's subsaga start \ + node first", ) } }; @@ -507,10 +465,8 @@ impl SagaExecutor { node_saga_start }; - /* - * Iterate in the direction of current execution: for normal execution, - * a standard topological sort. For unwinding, reverse that. - */ + // Iterate in the direction of current execution: for normal execution, + // a standard topological sort. For unwinding, reverse that. let graph_nodes = { let mut nodes = nodes_sorted; if !forward { @@ -523,19 +479,17 @@ impl SagaExecutor { let node_status = live_state.sglog.load_status_for_node(node_id.into()); - /* - * Validate this node's state against its parent nodes' states. By - * induction, this validates everything in the graph from the start - * or end node to the current node. - */ + // Validate this node's state against its parent nodes' states. By + // induction, this validates everything in the graph from the start + // or end node to the current node. for parent in graph.neighbors_directed(node_id, Incoming) { let parent_status = live_state.sglog.load_status_for_node(parent.into()); if !recovery_validate_parent(parent_status, node_status) { return Err(anyhow!( - "recovery for saga {}: node {:?}: \ - load status is \"{:?}\", which is illegal for \ - parent load status \"{:?}\"", + "recovery for saga {}: node {:?}: load status is \ + \"{:?}\", which is illegal for parent load status \ + \"{:?}\"", saga_id, node_id, node_status, @@ -570,35 +524,31 @@ impl SagaExecutor { SagaNodeLoadStatus::NeverStarted => { match direction { RecoveryDirection::Forward(true) => { - /* - * We're recovering a node in the forward direction - * where all parents completed successfully. Add it - * to the ready queue. - */ + // We're recovering a node in the forward direction + // where all parents completed successfully. Add it + // to the ready queue. live_state.queue_todo.push(node_id); } RecoveryDirection::Unwind(true) => { - /* - * We're recovering a node in the reverse direction - * (unwinding) whose children have all been - * undone and which has never started. Just mark - * it undone. - * TODO-design Does this suggest a better way to do - * this might be to simply load all the state that - * we have into the SagaExecLiveState and execute - * the saga as normal, but have normal execution - * check for cached values instead of running - * actions? In a sense, this makes the recovery - * path look like the normal path rather than having - * the normal path look like the recovery path. On - * the other hand, it seems kind of nasty to have to - * hold onto the recovery state for the duration. - * It doesn't make it a whole lot easier to test or - * have fewer code paths, in a real sense. It moves - * those code paths to normal execution, but they're - * still bifurcated from the case where we didn't - * recover the saga. - */ + // We're recovering a node in the reverse direction + // (unwinding) whose children have all been + // undone and which has never started. Just mark + // it undone. + // TODO-design Does this suggest a better way to do + // this might be to simply load all the state that + // we have into the SagaExecLiveState and execute + // the saga as normal, but have normal execution + // check for cached values instead of running + // actions? In a sense, this makes the recovery + // path look like the normal path rather than having + // the normal path look like the recovery path. On + // the other hand, it seems kind of nasty to have to + // hold onto the recovery state for the duration. + // It doesn't make it a whole lot easier to test or + // have fewer code paths, in a real sense. It moves + // those code paths to normal execution, but they're + // still bifurcated from the case where we didn't + // recover the saga. live_state .nodes_undone .insert(node_id, UndoMode::ActionNeverRan); @@ -607,19 +557,15 @@ impl SagaExecutor { } } SagaNodeLoadStatus::Started => { - /* - * Whether we're unwinding or not, we have to finish - * execution of this action. - */ + // Whether we're unwinding or not, we have to finish + // execution of this action. live_state.queue_todo.push(node_id); } SagaNodeLoadStatus::Succeeded(output) => { - /* - * If the node has finished executing and not started - * undoing, and if we're unwinding and the children have - * all finished undoing, then it's time to undo this - * one. - */ + // If the node has finished executing and not started + // undoing, and if we're unwinding and the children have + // all finished undoing, then it's time to undo this + // one. assert!(!live_state.node_errors.contains_key(&node_id)); live_state .node_outputs @@ -636,12 +582,10 @@ impl SagaExecutor { .insert(node_id, error.clone()) .expect_none("recovered node twice (failure case)"); - /* - * If the node failed, and we're unwinding, and the children - * have all been undone, it's time to undo this one. - * But we just mark it undone -- we don't execute the - * undo action. - */ + // If the node failed, and we're unwinding, and the children + // have all been undone, it's time to undo this one. + // But we just mark it undone -- we don't execute the + // undo action. if let RecoveryDirection::Unwind(true) = direction { live_state .nodes_undone @@ -649,27 +593,21 @@ impl SagaExecutor { } } SagaNodeLoadStatus::UndoStarted(output) => { - /* - * We know we're unwinding. (Otherwise, we should have - * failed validation earlier.) Execute the undo action. - */ + // We know we're unwinding. (Otherwise, we should have + // failed validation earlier.) Execute the undo action. assert!(!forward); live_state.queue_undo.push(node_id); - /* - * We still need to record the output because it's available - * to the undo action. - */ + // We still need to record the output because it's available + // to the undo action. live_state .node_outputs .insert(node_id, Arc::clone(output)) .expect_none("recovered node twice (undo case)"); } SagaNodeLoadStatus::UndoFinished => { - /* - * Again, we know we're unwinding. We've also finished - * undoing this node. - */ + // Again, we know we're unwinding. We've also finished + // undoing this node. assert!(!forward); live_state .nodes_undone @@ -677,16 +615,12 @@ impl SagaExecutor { } } - /* - * TODO-correctness is it appropriate to have side effects in an - * assertion here? - */ + // TODO-correctness is it appropriate to have side effects in an + // assertion here? assert!(loaded.insert(node_id)); } - /* - * Check our done conditions. - */ + // Check our done conditions. if live_state.node_outputs.contains_key(&dag.end_node) || live_state.nodes_undone.contains_key(&dag.start_node) { @@ -707,9 +641,7 @@ impl SagaExecutor { }) } - /** - * Validates some basic properties of the saga - */ + /// Validates some basic properties of the saga // Many of these properties may be validated when we construct the saga DAG. // Checking them again here makes sure that we gracefully handle a case // where we got an invalid DAG in some other way (e.g., bad database state). @@ -767,8 +699,8 @@ impl SagaExecutor { ); ensure!( nsubsaga_start == nsubsaga_end, - "bad saga graph (found {} subsaga start nodes \ - but {} subsaga end nodes)", + "bad saga graph (found {} subsaga start nodes but {} subsaga end \ + nodes)", nsubsaga_start, nsubsaga_end ); @@ -783,15 +715,13 @@ impl SagaExecutor { Ok(()) } - /** - * Builds the "ancestor tree" for a node whose dependencies have all - * completed - * - * The ancestor tree for a node is a map whose keys are strings that - * identify ancestor nodes in the graph and whose values represent the - * outputs from those nodes. This is used by [`ActionContext::lookup`]. - * See where we use this function in poll() for more details. - */ + /// Builds the "ancestor tree" for a node whose dependencies have all + /// completed + /// + /// The ancestor tree for a node is a map whose keys are strings that + /// identify ancestor nodes in the graph and whose values represent the + /// outputs from those nodes. This is used by [`ActionContext::lookup`]. + /// See where we use this function in poll() for more details. fn make_ancestor_tree( &self, tree: &mut BTreeMap>, @@ -899,33 +829,27 @@ impl SagaExecutor { } } - /** - * Simulates an error at a given node in the saga graph - * - * When execution reaches this node, instead of running the normal action - * for this node, an error will be generated and processed as though the - * action itself had produced the error. - */ + /// Simulates an error at a given node in the saga graph + /// + /// When execution reaches this node, instead of running the normal action + /// for this node, an error will be generated and processed as though the + /// action itself had produced the error. pub async fn inject_error(&self, node_id: NodeIndex) { let mut live_state = self.live_state.lock().await; live_state.injected_errors.insert(node_id); } - /** - * Runs the saga - * - * This might be running a saga that has never been started before or - * one that has been recovered from persistent state. - */ + /// Runs the saga + /// + /// This might be running a saga that has never been started before or + /// one that has been recovered from persistent state. async fn run_saga(&self) { { - /* - * TODO-design Every SagaExec should be able to run_saga() exactly - * once. We don't really want to let you re-run it and get a new - * message on finish_tx. However, we _do_ want to handle this - * particular case when we've recovered a "done" saga and the - * consumer has run() it (once). - */ + // TODO-design Every SagaExec should be able to run_saga() exactly + // once. We don't really want to let you re-run it and get a new + // message on finish_tx. However, we _do_ want to handle this + // particular case when we've recovered a "done" saga and the + // consumer has run() it (once). let live_state = self.live_state.lock().await; if live_state.exec_state == SagaCachedState::Done { self.finish_tx.send(()).expect("failed to send finish message"); @@ -934,49 +858,41 @@ impl SagaExecutor { } } - /* - * Allocate the channel used for node tasks to tell us when they've - * completed. In practice, each node can enqueue only two messages in - * its lifetime: one for completion of the action, and one for - * completion of the compensating action. We bound this channel's size - * at twice the graph node count for this worst case. - */ + // Allocate the channel used for node tasks to tell us when they've + // completed. In practice, each node can enqueue only two messages in + // its lifetime: one for completion of the action, and one for + // completion of the compensating action. We bound this channel's size + // at twice the graph node count for this worst case. let (tx, mut rx) = mpsc::channel(2 * self.dag.graph.node_count()); loop { self.kick_off_ready(&tx).await; - /* - * Process any messages available on our channel. - * It shouldn't be possible to get None back here. That would mean - * that all of the consumers have closed their ends, but we still - * have a consumer of our own in "tx". - * TODO-robustness Can we assert that there are outstanding tasks - * when we block on this channel? - */ + // Process any messages available on our channel. + // It shouldn't be possible to get None back here. That would mean + // that all of the consumers have closed their ends, but we still + // have a consumer of our own in "tx". + // TODO-robustness Can we assert that there are outstanding tasks + // when we block on this channel? let message = rx.next().await.expect("broken tx"); let task = { let mut live_state = self.live_state.lock().await; live_state.node_task_done(message.node_id) }; - /* - * This should really not take long, as there's nothing else this - * task does after sending the message that we just received. It's - * good to wait here to make sure things are cleaned up. - * TODO-robustness can we enforce that this won't take long? - */ + // This should really not take long, as there's nothing else this + // task does after sending the message that we just received. It's + // good to wait here to make sure things are cleaned up. + // TODO-robustness can we enforce that this won't take long? task.await.expect("node task failed unexpectedly"); let mut live_state = self.live_state.lock().await; let prev_state = live_state.exec_state; message.node.propagate(&self, &mut live_state); - /* - * TODO-cleanup This condition ought to be simplified. We want to - * update the saga state when we become Unwinding (which we do here) - * and when we become Done (which we do below). There may be a - * better place to put this logic that's less ad hoc. - */ + // TODO-cleanup This condition ought to be simplified. We want to + // update the saga state when we become Unwinding (which we do here) + // and when we become Done (which we do below). There may be a + // better place to put this logic that's less ad hoc. if live_state.exec_state == SagaCachedState::Unwinding && prev_state != SagaCachedState::Unwinding { @@ -996,42 +912,34 @@ impl SagaExecutor { live_state.sec_hdl.saga_update(SagaCachedState::Done).await; } - /* - * Kick off any nodes that are ready to run. (Right now, we kick off - * everything, so it might seem unnecessary to store this vector in - * "self" to begin with. However, the intent is to add capacity limits, - * in which case we may return without having scheduled everything, and - * we want to track whatever's still ready to go.) - * TODO revisit dance with the vec to satisfy borrow rules - * TODO implement unwinding - */ + // Kick off any nodes that are ready to run. (Right now, we kick off + // everything, so it might seem unnecessary to store this vector in + // "self" to begin with. However, the intent is to add capacity limits, + // in which case we may return without having scheduled everything, and + // we want to track whatever's still ready to go.) + // TODO revisit dance with the vec to satisfy borrow rules + // TODO implement unwinding async fn kick_off_ready( &self, tx: &mpsc::Sender>, ) { let mut live_state = self.live_state.lock().await; - /* - * TODO is it possible to deadlock with a concurrency limit given that - * we always do "todo" before "undo"? - */ + // TODO is it possible to deadlock with a concurrency limit given that + // we always do "todo" before "undo"? let todo_queue = live_state.queue_todo.clone(); live_state.queue_todo = Vec::new(); for node_id in todo_queue { - /* - * TODO-design It would be good to check whether the saga is - * unwinding, and if so, whether this action has ever started - * running before. If not, then we can send this straight to - * undoing without doing any more work here. What we're - * doing here should also be safe, though. We run the action - * regardless, and when we complete it, we'll undo it. - */ - /* - * TODO we could be much more efficient without copying this tree - * each time. - */ + // TODO-design It would be good to check whether the saga is + // unwinding, and if so, whether this action has ever started + // running before. If not, then we can send this straight to + // undoing without doing any more work here. What we're + // doing here should also be safe, though. We run the action + // regardless, and when we complete it, we'll undo it. + // TODO we could be much more efficient without copying this tree + // each time. let mut ancestor_tree = BTreeMap::new(); self.make_ancestor_tree( &mut ancestor_tree, @@ -1071,11 +979,9 @@ impl SagaExecutor { live_state.queue_undo = Vec::new(); for node_id in undo_queue { - /* - * TODO commonize with code above - * TODO we could be much more efficient without copying this tree - * each time. - */ + // TODO commonize with code above + // TODO we could be much more efficient without copying this tree + // each time. let mut ancestor_tree = BTreeMap::new(); self.make_ancestor_tree( &mut ancestor_tree, @@ -1149,20 +1055,16 @@ impl SagaExecutor { } } - /** - * Body of a (tokio) task that executes an action. - */ + /// Body of a (tokio) task that executes an action. async fn exec_node(task_params: TaskParams) { let node_id = task_params.node_id; { - /* - * TODO-liveness We don't want to hold this lock across a call - * to the database. It's fair to say that if the database - * hangs, the saga's corked anyway, but we should at least be - * able to view its state, and we can't do that with this - * design. - */ + // TODO-liveness We don't want to hold this lock across a call + // to the database. It's fair to say that if the database + // hangs, the saga's corked anyway, but we should at least be + // able to view its state, and we can't do that with this + // design. let mut live_state = task_params.live_state.lock().await; let load_status = live_state.sglog.load_status_for_node(node_id.into()); @@ -1205,14 +1107,10 @@ impl SagaExecutor { SagaExecutor::finish_task(task_params, node).await; } - /** - * Body of a (tokio) task that executes a compensation action. - */ - /* - * TODO-cleanup This has a lot in common with exec_node(), but enough - * different that it doesn't make sense to parametrize that one. Still, it - * sure would be nice to clean this up. - */ + /// Body of a (tokio) task that executes a compensation action. + // TODO-cleanup This has a lot in common with exec_node(), but enough + // different that it doesn't make sense to parametrize that one. Still, it + // sure would be nice to clean this up. async fn undo_node(task_params: TaskParams) { let node_id = task_params.node_id; @@ -1246,10 +1144,8 @@ impl SagaExecutor { dag: Arc::clone(&task_params.dag), user_context: Arc::clone(&task_params.user_context), }); - /* - * TODO-robustness We have to figure out what it means to fail here and - * what we want to do about it. - */ + // TODO-robustness We have to figure out what it means to fail here and + // what we want to do about it. exec_future.await.unwrap(); let node = Box::new(SagaNode { node_id, @@ -1276,19 +1172,15 @@ impl SagaExecutor { .expect("unexpected channel failure"); } - /* - * TODO-design Today, callers that invoke run() maintain a handle to the - * SagaExec so that they can control and check the status of execution. - * But ideally, once a caller has invoked run(), they wouldn't be able to - * call it again; and ideally, they wouldn't be able to get the result of - * the saga until run() had finished. One way we might do this is to - * have run() consume the WfExec, return immediately an object that can be - * used only for status and control, and provide a method on that object - * that turns into the result. - */ - /** - * Runs the saga to completion asynchronously - */ + // TODO-design Today, callers that invoke run() maintain a handle to the + // SagaExec so that they can control and check the status of execution. + // But ideally, once a caller has invoked run(), they wouldn't be able to + // call it again; and ideally, they wouldn't be able to get the result of + // the saga until run() had finished. One way we might do this is to + // have run() consume the WfExec, return immediately an object that can be + // used only for status and control, and provide a method on that object + // that turns into the result. + /// Runs the saga to completion asynchronously pub fn run(&self) -> impl Future + '_ { let mut rx = self.finish_tx.subscribe(); @@ -1298,21 +1190,17 @@ impl SagaExecutor { } } - /** - * Returns a [`SagaResult`] describing the result of the saga, including - * data produced by its actions. - * - * # Panics - * - * If the saga has not yet completed. - */ + /// Returns a [`SagaResult`] describing the result of the saga, including + /// data produced by its actions. + /// + /// # Panics + /// + /// If the saga has not yet completed. pub fn result(&self) -> SagaResult { - /* - * TODO-cleanup is there a way to make this safer? If we could know - * that there were no other references to the live_state (which should - * be true, if we're done), then we could consume it, as well as "self", - * and avoid several copies below. - */ + // TODO-cleanup is there a way to make this safer? If we could know + // that there were no other references to the live_state (which should + // be true, if we're done), then we could consume it, as well as "self", + // and avoid several copies below. let live_state = self .live_state .try_lock() @@ -1322,14 +1210,12 @@ impl SagaExecutor { if live_state.nodes_undone.contains_key(&self.dag.start_node) { assert!(live_state.nodes_undone.contains_key(&self.dag.end_node)); - /* - * Choosing the first node_id in node_errors will find the - * topologically-first node that failed. This may not be the one - * that actually triggered the saga to fail, but it could have done - * so. (That is, if there were another action that failed that - * triggered the saga to fail, this one did not depend on it, so it - * could as well have happened in the other order.) - */ + // Choosing the first node_id in node_errors will find the + // topologically-first node that failed. This may not be the one + // that actually triggered the saga to fail, but it could have done + // so. (That is, if there were another action that failed that + // triggered the saga to fail, this one did not depend on it, so it + // could as well have happened in the other order.) let (error_node_id, error_source) = live_state.node_errors.iter().next().unwrap(); let error_node_name = self @@ -1387,7 +1273,7 @@ impl SagaExecutor { let graph = &self.dag.graph; let topo_visitor = Topo::new(graph); for node in topo_visitor.iter(graph) { - /* Record the current execution state for this node. */ + // Record the current execution state for this node. node_exec_states.insert(node, live_state.node_exec_state(node)); } @@ -1402,54 +1288,50 @@ impl SagaExecutor { } } -/** - * Encapsulates the (mutable) execution state of a saga - */ -/* - * This is linked to a `SagaExecutor` and protected by a Mutex. The state is - * mainly modified by [`SagaExecutor::run_saga`]. We may add methods for - * controlling the saga (e.g., pausing), which would modify this as well. - * We also intend to add methods for viewing saga state, which will take the - * lock to read state. - * - * If the view of a saga were just (1) that it's running, and maybe (2) a - * set of outstanding actions, then we might take a pretty different approach - * here. We might create a read-only view object that's populated periodically - * by the saga executor. This still might be the way to go, but at the - * moment we anticipate wanting pretty detailed debug information (like what - * outputs were produced by what steps), so the view would essentially be a - * whole copy of this object. - * TODO This would be a good place for a debug log. - */ +/// Encapsulates the (mutable) execution state of a saga +// This is linked to a `SagaExecutor` and protected by a Mutex. The state is +// mainly modified by [`SagaExecutor::run_saga`]. We may add methods for +// controlling the saga (e.g., pausing), which would modify this as well. +// We also intend to add methods for viewing saga state, which will take the +// lock to read state. +// +// If the view of a saga were just (1) that it's running, and maybe (2) a +// set of outstanding actions, then we might take a pretty different approach +// here. We might create a read-only view object that's populated periodically +// by the saga executor. This still might be the way to go, but at the +// moment we anticipate wanting pretty detailed debug information (like what +// outputs were produced by what steps), so the view would essentially be a +// whole copy of this object. +// TODO This would be a good place for a debug log. #[derive(Debug)] struct SagaExecLiveState { - /** Unique identifier for this saga (an execution of a saga template) */ + /// Unique identifier for this saga (an execution of a saga template) saga_id: SagaId, sec_hdl: SecExecClient, - /** Overall execution state */ + /// Overall execution state exec_state: SagaCachedState, - /** Queue of nodes that have not started but whose deps are satisfied */ + /// Queue of nodes that have not started but whose deps are satisfied queue_todo: Vec, - /** Queue of nodes whose undo action needs to be run. */ + /// Queue of nodes whose undo action needs to be run. queue_undo: Vec, - /** Outstanding tokio tasks for each node in the graph */ + /// Outstanding tokio tasks for each node in the graph node_tasks: BTreeMap>, - /** Outputs saved by completed actions. */ + /// Outputs saved by completed actions. node_outputs: BTreeMap>, - /** Set of undone nodes. */ + /// Set of undone nodes. nodes_undone: BTreeMap, - /** Errors produced by failed actions. */ + /// Errors produced by failed actions. node_errors: BTreeMap, - /** Persistent state */ + /// Persistent state sglog: SagaLog, - /** Injected errors */ + /// Injected errors injected_errors: BTreeSet, } @@ -1490,20 +1372,16 @@ impl fmt::Display for NodeExecState { } impl SagaExecLiveState { - /* - * TODO-design The current implementation does not use explicit state. In - * most cases, this made things better than before because each hunk of code - * was structured to accept only nodes in states that were valid. But - * there are a few cases where we need a bit more state than we're currently - * keeping. This function is used there. - * - * It's especially questionable to use load_status here -- or is that the - * way we should go more generally? See TODO-design in new_recover(). - */ + // TODO-design The current implementation does not use explicit state. In + // most cases, this made things better than before because each hunk of code + // was structured to accept only nodes in states that were valid. But + // there are a few cases where we need a bit more state than we're currently + // keeping. This function is used there. + // + // It's especially questionable to use load_status here -- or is that the + // way we should go more generally? See TODO-design in new_recover(). fn node_exec_state(&self, node_id: NodeIndex) -> NodeExecState { - /* - * This seems like overkill but it seems helpful to validate state. - */ + // This seems like overkill but it seems helpful to validate state. let mut set: BTreeSet = BTreeSet::new(); let load_status = self.sglog.load_status_for_node(node_id.into()); if let Some(undo_mode) = self.nodes_undone.get(&node_id) { @@ -1567,9 +1445,7 @@ impl SagaExecLiveState { } } -/** - * Summarizes the final state of a saga execution - */ +/// Summarizes the final state of a saga execution #[derive(Clone, Debug)] pub struct SagaResult { pub saga_id: SagaId, @@ -1577,9 +1453,7 @@ pub struct SagaResult { pub kind: Result, } -/** - * Provides access to outputs from a saga that completed successfully - */ +/// Provides access to outputs from a saga that completed successfully #[derive(Clone, Debug)] pub struct SagaResultOk { saga_output: Arc, @@ -1587,9 +1461,7 @@ pub struct SagaResultOk { } impl SagaResultOk { - /** - * Returns the final output of the saga (the output from the last node) - */ + /// Returns the final output of the saga (the output from the last node) pub fn saga_output( &self, ) -> Result { @@ -1598,13 +1470,11 @@ impl SagaResultOk { .map_err(ActionError::new_deserialize) } - /** - * Returns the data produced by a node in the saga. - * - * # Panics - * - * If the saga has no node called `name`. - */ + /// Returns the data produced by a node in the saga. + /// + /// # Panics + /// + /// If the saga has no node called `name`. pub fn lookup_node_output( &self, name: &str, @@ -1624,39 +1494,33 @@ impl SagaResultOk { } } -/** - * Provides access to failure details for a saga that failed - * - * When a saga fails, it's always one action's failure triggers failure of the - * saga. It's possible that other actions also failed, but only if they were - * running concurrently. This structure represents one of these errors, any of - * which could have caused the saga to fail, depending on the order in which - * they completed. - */ -/* - * TODO-coverage We should test that sagas do the right thing when two actions - * fail concurrently. - * - * We don't allow callers to access outputs from a saga that failed - * because it's not obvious yet why this would be useful and it's too - * easy to shoot yourself in the foot by not checking whether the saga - * failed. In practice, the enum that wraps this type ensures that the caller - * has checked for failure, so it wouldn't be unreasonable to provide outputs - * here. (A strong case: there are cases where it's useful to get outputs even - * while the saga is running, as might happen for a saga that generates a - * database record whose id you want to return to a client without waiting for - * the saga to complete. It's silly to let you get this id while the saga is - * running, but not after it's failed.) - */ +/// Provides access to failure details for a saga that failed +/// +/// When a saga fails, it's always one action's failure triggers failure of the +/// saga. It's possible that other actions also failed, but only if they were +/// running concurrently. This structure represents one of these errors, any of +/// which could have caused the saga to fail, depending on the order in which +/// they completed. +// TODO-coverage We should test that sagas do the right thing when two actions +// fail concurrently. +// +// We don't allow callers to access outputs from a saga that failed +// because it's not obvious yet why this would be useful and it's too +// easy to shoot yourself in the foot by not checking whether the saga +// failed. In practice, the enum that wraps this type ensures that the caller +// has checked for failure, so it wouldn't be unreasonable to provide outputs +// here. (A strong case: there are cases where it's useful to get outputs even +// while the saga is running, as might happen for a saga that generates a +// database record whose id you want to return to a client without waiting for +// the saga to complete. It's silly to let you get this id while the saga is +// running, but not after it's failed.) #[derive(Clone, Debug)] pub struct SagaResultErr { pub error_node_name: NodeName, pub error_source: ActionError, } -/** - * Summarizes in-progress execution state of a saga - */ +/// Summarizes in-progress execution state of a saga #[derive(Clone, Debug)] pub struct SagaExecStatus { saga_id: SagaId, @@ -1850,8 +1714,8 @@ impl<'a> PrintOrderer<'a> { // // * Whenever a subsaga starts we want to print its children before any // parallel nodes. - // * Whenever a subsaga ends, we check to see if there are any - // parallel nodes before we look for children. + // * Whenever a subsaga ends, we check to see if there are any parallel + // nodes before we look for children. // * Whenever there is a simple node, we check to see if there are // parallel nodes before we look for children. // @@ -1922,10 +1786,10 @@ impl<'a> PrintOrderer<'a> { // Return true if there is a parallel node. // // Side effects: - // * If there is a parallel node we remove it from the nodes in the top - // of the stack. - // * If this is the last parallel node, we pop the top of the stack - // and reduce the indent level. + // * If there is a parallel node we remove it from the nodes in the top of + // the stack. + // * If this is the last parallel node, we pop the top of the stack and + // reduce the indent level. fn next_parallel_node(&mut self) -> bool { if let Some(StackEntry::Parallel(nodes)) = self.stack.last_mut() { if let Some(next_idx) = nodes.pop() { @@ -1941,10 +1805,8 @@ impl<'a> PrintOrderer<'a> { } } -/** -* Return true if all neighbors of `node_id` in the given `direction` -* return true for the predicate `test`. -*/ +/// Return true if all neighbors of `node_id` in the given `direction` +/// return true for the predicate `test`. fn neighbors_all( graph: &Graph, node_id: &NodeIndex, @@ -1963,39 +1825,33 @@ where return true; } -/** - * Returns true if the parent node's load status is valid for the given child - * node's load status. - */ +/// Returns true if the parent node's load status is valid for the given child +/// node's load status. fn recovery_validate_parent( parent_status: &SagaNodeLoadStatus, child_status: &SagaNodeLoadStatus, ) -> bool { match child_status { - /* - * If the child node has started, finished successfully, or even started - * undoing, the only allowed status for the parent node is "done". The - * states prior to "done" are ruled out because we execute nodes in - * dependency order. "failed" is ruled out because we do not execute - * nodes whose parents failed. The undoing states are ruled out because - * we unwind in reverse-dependency order, so we cannot have started - * undoing the parent if the child node has not finished undoing. (A - * subtle but important implementation detail is that we do not undo a - * node that has not started execution. If we did, then the "undo - * started" load state could be associated with a parent that failed.) - */ + // If the child node has started, finished successfully, or even started + // undoing, the only allowed status for the parent node is "done". The + // states prior to "done" are ruled out because we execute nodes in + // dependency order. "failed" is ruled out because we do not execute + // nodes whose parents failed. The undoing states are ruled out because + // we unwind in reverse-dependency order, so we cannot have started + // undoing the parent if the child node has not finished undoing. (A + // subtle but important implementation detail is that we do not undo a + // node that has not started execution. If we did, then the "undo + // started" load state could be associated with a parent that failed.) SagaNodeLoadStatus::Started | SagaNodeLoadStatus::Succeeded(_) | SagaNodeLoadStatus::UndoStarted(_) => { matches!(parent_status, SagaNodeLoadStatus::Succeeded(_)) } - /* - * If the child node has failed, this looks just like the previous case, - * except that the parent node could be UndoStarted or UndoFinished. - * That's possible because we don't undo a failed node, so after undoing - * the parents, the log state would still show "failed". - */ + // If the child node has failed, this looks just like the previous case, + // except that the parent node could be UndoStarted or UndoFinished. + // That's possible because we don't undo a failed node, so after undoing + // the parents, the log state would still show "failed". SagaNodeLoadStatus::Failed(_) => { matches!( parent_status, @@ -2005,10 +1861,8 @@ fn recovery_validate_parent( ) } - /* - * If we've finished undoing the child node, then the parent must be - * either "done" or one of the undoing states. - */ + // If we've finished undoing the child node, then the parent must be + // either "done" or one of the undoing states. SagaNodeLoadStatus::UndoFinished => matches!( parent_status, SagaNodeLoadStatus::Succeeded(_) @@ -2016,10 +1870,8 @@ fn recovery_validate_parent( | SagaNodeLoadStatus::UndoFinished ), - /* - * If a node has never started, the only illegal states for a parent are - * those associated with undoing, since the child must be undone first. - */ + // If a node has never started, the only illegal states for a parent are + // those associated with undoing, since the child must be undone first. SagaNodeLoadStatus::NeverStarted => matches!( parent_status, SagaNodeLoadStatus::NeverStarted @@ -2030,9 +1882,7 @@ fn recovery_validate_parent( } } -/** - * Action's handle to the saga subsystem - */ +/// Action's handle to the saga subsystem // Any APIs that are useful for actions should hang off this object. It should // have enough state to know which node is invoking the API. pub struct ActionContext { @@ -2044,16 +1894,14 @@ pub struct ActionContext { } impl ActionContext { - /** - * Retrieves a piece of data stored by a previous (ancestor) node in the - * current saga. The data is identified by `name`, the name of the ancestor - * node. - * - * # Panics - * - * This function panics if there was no data previously stored with name - * `name` (which means there was no ancestor node with that name). - */ + /// Retrieves a piece of data stored by a previous (ancestor) node in the + /// current saga. The data is identified by `name`, the name of the + /// ancestor node. + /// + /// # Panics + /// + /// This function panics if there was no data previously stored with name + /// `name` (which means there was no ancestor node with that name). pub fn lookup( &self, name: &str, @@ -2070,13 +1918,11 @@ impl ActionContext { .map_err(ActionError::new_deserialize) } - /** - * Returns the saga parameters for the current action - * - * If this action is being run as a subsaga, this returns the saga - * parameters for the subsaga. This way actions don't have to care whether - * they're running in a saga or not. - */ + /// Returns the saga parameters for the current action + /// + /// If this action is being run as a subsaga, this returns the saga + /// parameters for the subsaga. This way actions don't have to care whether + /// they're running in a saga or not. pub fn saga_params( &self, ) -> Result { @@ -2089,38 +1935,28 @@ impl ActionContext { .map_err(ActionError::new_deserialize) } - /** - * Returns the human-readable label for the current saga node - */ + /// Returns the human-readable label for the current saga node pub fn node_label(&self) -> String { self.dag.get(self.node_id).unwrap().label() } - /** - * Returns the consumer-provided context for the current saga - */ + /// Returns the consumer-provided context for the current saga pub fn user_data(&self) -> &UserType::ExecContextType { &self.user_context } } -/** - * Converts a NodeIndex (used by the graph representation to identify a node) to - * a [`SagaNodeId`] (used elsewhere in this module to identify a node) - */ +/// Converts a NodeIndex (used by the graph representation to identify a node) +/// to a [`SagaNodeId`] (used elsewhere in this module to identify a node) impl From for SagaNodeId { fn from(node_id: NodeIndex) -> SagaNodeId { - /* - * We (must) verify elsewhere that node indexes fit within a u32. - */ + // We (must) verify elsewhere that node indexes fit within a u32. SagaNodeId::from(u32::try_from(node_id.index()).unwrap()) } } -/** - * Wrapper for SagaLog.record_now() that maps internal node indexes to - * stable node ids. - */ +/// Wrapper for SagaLog.record_now() that maps internal node indexes to +/// stable node ids. // TODO Consider how we do map internal node indexes to stable node ids. // TODO clean up this interface async fn record_now( @@ -2131,45 +1967,36 @@ async fn record_now( let saga_id = live_state.saga_id; let node_id = node.into(); - /* - * The only possible failure here today is attempting to record an event - * that's illegal for the current node state. That's a bug in this - * program. - */ + // The only possible failure here today is attempting to record an event + // that's illegal for the current node state. That's a bug in this + // program. let event = SagaNodeEvent { saga_id, node_id, event_type }; live_state.sglog.record(&event).unwrap(); live_state.sec_hdl.record(event).await; } -/** - * Consumer's handle for querying and controlling the execution of a single saga - */ +/// Consumer's handle for querying and controlling the execution of a single +/// saga pub trait SagaExecManager: fmt::Debug + Send + Sync { - /** Run the saga to completion. */ + /// Run the saga to completion. fn run(&self) -> BoxFuture<'_, ()>; - /** - * Return the result of the saga - * - * The returned [`SagaResult`] has interfaces for querying saga outputs and - * error information. - * - * # Panics - * - * If the saga has not finished when this function is called. - */ + /// Return the result of the saga + /// + /// The returned [`SagaResult`] has interfaces for querying saga outputs and + /// error information. + /// + /// # Panics + /// + /// If the saga has not finished when this function is called. fn result(&self) -> SagaResult; - /** - * Returns fine-grained information about saga execution - */ + /// Returns fine-grained information about saga execution fn status(&self) -> BoxFuture<'_, SagaExecStatus>; - /** - * Replaces the action at the specified node with one that just generates an - * error - * - * See [`Dag::get_index()`] to get the node_id for a node. - */ + /// Replaces the action at the specified node with one that just generates + /// an error + /// + /// See [`Dag::get_index()`] to get the node_id for a node. fn inject_error(&self, node_id: NodeIndex) -> BoxFuture<'_, ()>; } @@ -2206,8 +2033,8 @@ mod test { Node::constant(name, serde_json::Value::Null) } - // Assert that the names match the NodeNames of the constant nodes at the given - // indexes. + // Assert that the names match the NodeNames of the constant nodes at the + // given indexes. fn constant_names_match( names: &[&str], indexes: &[NodeIndex], @@ -2408,9 +2235,10 @@ mod test { let entries = orderer.print_order(); // It's super tedious to test by asserting on each entry as in the - // prior tests. Instead, we generate a string and compare it to expected output. - // The output is generated by the test, so it won't change, and we can use - // a different format for actual `SagaExecStatus` output. + // prior tests. Instead, we generate a string and compare it to expected + // output. The output is generated by the test, so it won't + // change, and we can use a different format for actual + // `SagaExecStatus` output. let actual = print_for_testing(&entries, &saga_dag); let expected = "\ Start { params: Null } @@ -2425,7 +2253,8 @@ SubsagaStart { saga_name: \"test-subsaga\", params_node_name: \"d\" } Constant { name: \"b\", value: Null } Constant { name: \"c\", value: Null } Constant { name: \"d\", value: Null } - SubsagaStart { saga_name: \"test-nested-subsaga\", params_node_name: \"d\" } + SubsagaStart { saga_name: \"test-nested-subsaga\", params_node_name: \"d\" \ + } Constant { name: \"a\", value: Null } \"Parallel: \" Constant { name: \"b\", value: Null } @@ -2434,7 +2263,8 @@ SubsagaStart { saga_name: \"test-subsaga\", params_node_name: \"d\" } SubsagaEnd { name: \"e\" } \"Parallel: \" Constant { name: \"f\", value: Null } - SubsagaStart { saga_name: \"test-nested-subsaga\", params_node_name: \"e\" } + SubsagaStart { saga_name: \"test-nested-subsaga\", params_node_name: \ + \"e\" } Constant { name: \"a\", value: Null } \"Parallel: \" Constant { name: \"b\", value: Null } @@ -2536,7 +2366,8 @@ mod proptests { prop_oneof![ // Parallel nodes must contain at least 2 nodes prop::collection::vec(inner.clone(), 2..10).prop_map(|v| { - // Ensure that Parallel nodes do not contain parallel nodes + // Ensure that Parallel nodes do not contain parallel + // nodes if v.iter().any(|node_desc| node_desc.is_parallel()) { NodeDesc::Subsaga(v) } else { @@ -2565,11 +2396,13 @@ mod proptests { // structure, not saga behavior. let params_node_name = "0"; - // Always append a node named "0", so our lookups work for subsaga params + // Always append a node named "0", so our lookups work for subsaga + // params dag.append(Node::constant(params_node_name, serde_json::Value::Null)); - // Node names are just numbers that we can increment and convert to strings. - // Each DAG uses the same set, since they are namespaced. + // Node names are just numbers that we can increment and convert to + // strings. Each DAG uses the same set, since they are + // namespaced. let mut node_name = 1; for node in nodes { @@ -2604,7 +2437,8 @@ mod proptests { node_name += 1; } NodeDesc::Parallel(_) => panic!( - "Strategy Generation Error: Nested `NodeDesc::Parallel` not allowed!" + "Strategy Generation Error: Nested \ + `NodeDesc::Parallel` not allowed!" ), } } @@ -2725,10 +2559,12 @@ mod proptests { // This node was not appended in parallel prop_assert!(!parallel); - // This node has multiple ancestors, meaning that its parents were appended in parallel + // This node has multiple ancestors, meaning + // that its parents were appended in parallel prop_assert!(num_ancestors(dag, *idx) > 1); - // We need to pop the stack to get back in sync with the Dag + // We need to pop the stack to get back in sync + // with the Dag indent_stack.pop(); } } @@ -2741,23 +2577,27 @@ mod proptests { *indent_level ); - // This node has multiple ancestors, meaning that its parents were appended in parallel + // This node has multiple ancestors, meaning + // that its parents were appended in parallel prop_assert!(num_ancestors(dag, *idx) > 1); prop_assert_eq!( &IndentStackEntry::Parallel, indent_stack.last().unwrap() ); - // The last parallel node has already been seen in the - // print output, because the indent_level has been - // reduced. Pop the stack to compensate. + // The last parallel node has already been seen + // in the + // print output, because the indent_level has + // been reduced. + // Pop the stack to compensate. indent_stack.pop(); } indent_stack.push(IndentStackEntry::Subsaga); } InternalNode::SubsagaEnd { .. } => { // SubsagaEnd nodes should always be at the - // de-indented and aligned with the outer indent level. + // de-indented and aligned with the outer indent + // level. prop_assert!(!indent_stack.is_empty()); prop_assert_eq!( indent_stack.len() - 1, @@ -2767,7 +2607,8 @@ mod proptests { indent_stack.last().unwrap(), &IndentStackEntry::Subsaga ); - // We need to pop the stack to get back in sync with the Dag + // We need to pop the stack to get back in sync with + // the Dag indent_stack.pop(); } } diff --git a/src/saga_log.rs b/src/saga_log.rs index 61232fc..7b060b2 100644 --- a/src/saga_log.rs +++ b/src/saga_log.rs @@ -12,16 +12,13 @@ use std::fmt; use std::sync::Arc; use thiserror::Error; -/** - * Unique identifier for a saga node - */ -/* - * We use a newtype for SagaNodeId for the usual reasons. What about the - * underlying representation? The Omicron consumer is going to store these in - * CockroachDB, which makes `i64` the most natural numeric type. There's no - * need for signed values here, so we choose `u32` as large enough for our - * purposes, unsigned, and can be infallibly converted to an `i64`. - */ +/// Unique identifier for a saga node +// We use a newtype for SagaNodeId for the usual reasons. What about the +// underlying representation? The Omicron consumer is going to store these in +// CockroachDB, which makes `i64` the most natural numeric type. There's no +// need for signed values here, so we choose `u32` as large enough for our +// purposes, unsigned, and can be infallibly converted to an `i64`. +// // TODO-cleanup figure out how to use custom_derive here? #[derive( Deserialize, @@ -43,8 +40,8 @@ NewtypeFrom! { () pub struct SagaNodeId(u32); } #[derive(Debug, Clone, Error)] pub enum SagaLogError { #[error( - "event type {event_type} is illegal with current \ - load status {current_status:?}" + "event type {event_type} is illegal with current load status \ + {current_status:?}" )] IllegalEventForState { current_status: SagaNodeLoadStatus, @@ -52,16 +49,14 @@ pub enum SagaLogError { }, } -/** - * An entry in the saga log - */ +/// An entry in the saga log #[derive(Clone, Deserialize, Serialize)] pub struct SagaNodeEvent { - /** id of the saga */ + /// id of the saga pub saga_id: SagaId, - /** id of the saga node */ + /// id of the saga node pub node_id: SagaNodeId, - /** what's indicated by this event */ + /// what's indicated by this event pub event_type: SagaNodeEventType, } @@ -71,25 +66,23 @@ impl fmt::Debug for SagaNodeEvent { } } -/** - * Event types that may be found in the log for a particular action - * - * (This is not a general-purpose debug log, but more like an intent log for - * recovering the action's state in the event of an executor crash. That - * doesn't mean we can't put debugging information here, though.) - */ +/// Event types that may be found in the log for a particular action +/// +/// (This is not a general-purpose debug log, but more like an intent log for +/// recovering the action's state in the event of an executor crash. That +/// doesn't mean we can't put debugging information here, though.) #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "snake_case")] pub enum SagaNodeEventType { - /** The action has started running */ + /// The action has started running Started, - /** The action completed successfully (with output data) */ + /// The action completed successfully (with output data) Succeeded(Arc), - /** The action failed */ + /// The action failed Failed(ActionError), - /** The undo action has started running */ + /// The undo action has started running UndoStarted, - /** The undo action has finished */ + /// The undo action has finished UndoFinished, } @@ -111,36 +104,34 @@ impl SagaNodeEventType { } } -/** - * Persistent status for a saga node - * - * The events present in the log determine the _persistent status_ of the node. - * You can think of this like a single summary of the state of this action, - * based solely on the persistent state. When recovering from a crash, the - * saga executor uses this status to determine what to do next. We also - * maintain this for each SagaLog to identify illegal transitions at runtime. - * - * A node's status is very nearly identified by the type of the last event seen. - * It's cleaner to have a first-class summary here. - */ +/// Persistent status for a saga node +/// +/// The events present in the log determine the _persistent status_ of the node. +/// You can think of this like a single summary of the state of this action, +/// based solely on the persistent state. When recovering from a crash, the +/// saga executor uses this status to determine what to do next. We also +/// maintain this for each SagaLog to identify illegal transitions at runtime. +/// +/// A node's status is very nearly identified by the type of the last event +/// seen. It's cleaner to have a first-class summary here. #[derive(Clone, Debug)] pub enum SagaNodeLoadStatus { - /** The action never started running */ + /// The action never started running NeverStarted, - /** The action has started running */ + /// The action has started running Started, - /** The action completed successfully (with output data) */ + /// The action completed successfully (with output data) Succeeded(Arc), - /** The action failed */ + /// The action failed Failed(ActionError), - /** The undo action has started running (with output data from success) */ + /// The undo action has started running (with output data from success) UndoStarted(Arc), - /** The undo action has finished */ + /// The undo action has finished UndoFinished, } impl SagaNodeLoadStatus { - /** Returns the new status for a node after recording the given event. */ + /// Returns the new status for a node after recording the given event. fn next_status( &self, event_type: &SagaNodeEventType, @@ -172,9 +163,7 @@ impl SagaNodeLoadStatus { } } -/** - * Write to a saga's log - */ +/// Write to a saga's log #[derive(Clone, Debug)] pub struct SagaLog { saga_id: SagaId, @@ -199,29 +188,26 @@ impl SagaLog { ) -> Result { let mut log = Self::new_empty(saga_id); - /* - * Sort the events by the event type. This ensures that if there's at - * least one valid sequence of events, then we'll replay the events in a - * valid sequence. Thus, if we fail to replay below, then the log is - * corrupted somehow. (Remember, the wall timestamp is never used for - * correctness.) For debugging purposes, this is a little disappointing: - * most likely, the events are already in a valid order that reflects - * when they actually happened. However, there's nothing to guarantee - * that unless we make it so, and our simple approach for doing so here - * destroys the sequential order. This should only really matter for a - * person looking at the sequence of entries (as they appear in memory) - * for debugging. - */ + // Sort the events by the event type. This ensures that if there's at + // least one valid sequence of events, then we'll replay the events in a + // valid sequence. Thus, if we fail to replay below, then the log is + // corrupted somehow. (Remember, the wall timestamp is never used for + // correctness.) For debugging purposes, this is a little disappointing: + // most likely, the events are already in a valid order that reflects + // when they actually happened. However, there's nothing to guarantee + // that unless we make it so, and our simple approach for doing so here + // destroys the sequential order. This should only really matter for a + // person looking at the sequence of entries (as they appear in memory) + // for debugging. events.sort_by_key(|f| match f.event_type { - /* - * TODO-cleanup Is there a better way to do this? We want to sort - * by the event type, where event types are compared by the order - * they're defined in SagaEventType. We could almost use derived - * PartialOrd and PartialEq implementations for SagaEventType, except - * that one variant has a payload that does _not_ necessarily - * implement PartialEq or PartialOrd. It seems like that means we - * have to implement this by hand. - */ + // TODO-cleanup Is there a better way to do this? We want to sort + // by the event type, where event types are compared by the order + // they're defined in SagaEventType. We could almost use derived + // PartialOrd and PartialEq implementations for SagaEventType, + // except that one variant has a payload that does _not_ + // necessarily implement PartialEq or PartialOrd. It + // seems like that means we have to implement this by + // hand. SagaNodeEventType::Started => 1, SagaNodeEventType::Succeeded(_) => 2, SagaNodeEventType::Failed(_) => 3, @@ -229,14 +215,12 @@ impl SagaLog { SagaNodeEventType::UndoFinished => 5, }); - /* - * Replay the events for this saga. - */ + // Replay the events for this saga. for event in events { if event.saga_id != saga_id { return Err(anyhow!( - "found an event in the log for a \ - different saga ({}) than requested ({})", + "found an event in the log for a different saga ({}) than \ + requested ({})", event.saga_id, saga_id, )); @@ -291,9 +275,7 @@ impl SagaLog { } } -/** - * Handle for pretty-printing a SagaLog (using the `fmt::Debug` trait) - */ +/// Handle for pretty-printing a SagaLog (using the `fmt::Debug` trait) pub struct SagaLogPretty<'a> { log: &'a SagaLog, } @@ -316,7 +298,6 @@ impl<'a> fmt::Debug for SagaLogPretty<'a> { } } -// // TODO-testing lots of automated tests are possible here, but let's see if the // abstraction makes any sense first. // diff --git a/src/sec.rs b/src/sec.rs index 3426329..aea4575 100644 --- a/src/sec.rs +++ b/src/sec.rs @@ -1,57 +1,55 @@ -/*! - * Saga Execution Coordinator - * - * The Saga Execution Coordinator ("SEC") manages the execution of a group of - * sagas, providing interfaces for running new sagas, recovering sagas that were - * running in a previous lifetime, listing sagas, querying the state of a saga, - * and providing some control over sagas (e.g., to inject errors). - * - * The implementation is grouped into - * - * * [`sec()`], a function to start up an SEC - * * an `SecClient`, which Steno consumers use to interact with the SEC - * * an `Sec`: a background task that owns the list of sagas and their overall - * runtime state. (The detailed runtime state is owned by a separate - * `SagaExecutor` type internally.) - * * a number of `SecExecClient` objects, which individual saga executors use to - * communicate back to the Sec (to communicate progress, record persistent - * state, etc.) - * - * The control flow of these components and their messages is shown in the - * below diagram: - * - * +---------+ - * | Saga | - * |Consumer |--+ - * +---------+ | - * | - * Saga API - * | - * | +-------------+ +-------------+ - * | | | | | - * | | SEC | | SEC | - * +-->| Client |----------------->| (task) | - * | | SecClientMsg | | - * | | | | - * +-------------+ +-------------+ - * ^ - * | - * | - * SecExecMsg - * | - * | - * +-------------+ - * | | - * | SagaExecutor| - * | (future) | - * | | - * | | - * +-------------+ - * - * The Steno consumer is responsible for implementing an `SecStore` to store - * persistent state. There's an [`crate::InMemorySecStore`] to play around - * with. - */ +//! Saga Execution Coordinator +//! +//! The Saga Execution Coordinator ("SEC") manages the execution of a group of +//! sagas, providing interfaces for running new sagas, recovering sagas that +//! were running in a previous lifetime, listing sagas, querying the state of a +//! saga, and providing some control over sagas (e.g., to inject errors). +//! +//! The implementation is grouped into +//! +//! * [`sec()`], a function to start up an SEC +//! * an `SecClient`, which Steno consumers use to interact with the SEC +//! * an `Sec`: a background task that owns the list of sagas and their overall +//! runtime state. (The detailed runtime state is owned by a separate +//! `SagaExecutor` type internally.) +//! * a number of `SecExecClient` objects, which individual saga executors use +//! to communicate back to the Sec (to communicate progress, record persistent +//! state, etc.) +//! +//! The control flow of these components and their messages is shown in the +//! below diagram: +//! +//! +---------+ +//! | Saga | +//! |Consumer |--+ +//! +---------+ | +//! | +//! Saga API +//! | +//! | +-------------+ +-------------+ +//! | | | | | +//! | | SEC | | SEC | +//! +-->| Client |----------------->| (task) | +//! | | SecClientMsg | | +//! | | | | +//! +-------------+ +-------------+ +//! ^ +//! | +//! | +//! SecExecMsg +//! | +//! | +//! +-------------+ +//! | | +//! | SagaExecutor| +//! | (future) | +//! | | +//! | | +//! +-------------+ +//! +//! The Steno consumer is responsible for implementing an `SecStore` to store +//! persistent state. There's an [`crate::InMemorySecStore`] to play around +//! with. #![allow(clippy::large_enum_variant)] @@ -88,41 +86,32 @@ use std::sync::Arc; use tokio::sync::mpsc; use tokio::sync::oneshot; -/* - * SEC client side (handle used by Steno consumers) - */ - -/** - * Maximum number of messages for the SEC that can be queued from the client - * - * This is very small. These messages are commands, and the client always waits - * for a response. So it makes little difference to latency or throughput - * whether the client waits up front for available buffer space or waits instead - * on the response channel (with the request buffered in the queue). - */ +// SEC client side (handle used by Steno consumers) + +/// Maximum number of messages for the SEC that can be queued from the client +/// +/// This is very small. These messages are commands, and the client always +/// waits for a response. So it makes little difference to latency or +/// throughput whether the client waits up front for available buffer space or +/// waits instead on the response channel (with the request buffered in the +/// queue). const SEC_CLIENT_MAXQ_MESSAGES: usize = 2; -/** - * Maximum number of messages for the SEC that can be queued from SagaExecutors - * - * As with clients, we keep the queue small. This may mean that SagaExecutors - * get stuck behind the SEC, but that's preferable to bloat or more implicit - * queueing delays. - */ +/// Maximum number of messages for the SEC that can be queued from SagaExecutors +/// +/// As with clients, we keep the queue small. This may mean that SagaExecutors +/// get stuck behind the SEC, but that's preferable to bloat or more implicit +/// queueing delays. const SEC_EXEC_MAXQ_MESSAGES: usize = 2; -/** - * Creates a new Saga Execution Coordinator - */ +/// Creates a new Saga Execution Coordinator pub fn sec(log: slog::Logger, sec_store: Arc) -> SecClient { let (cmd_tx, cmd_rx) = mpsc::channel(SEC_CLIENT_MAXQ_MESSAGES); let (exec_tx, exec_rx) = mpsc::channel(SEC_EXEC_MAXQ_MESSAGES); - /* - * We spawn a new task rather than return a `Future` for the caller to - * poll because we want to make sure the Sec can't be dropped unless - * shutdown() has been invoked on the client. - */ + // We spawn a new task rather than return a `Future` for the caller to + // poll because we want to make sure the Sec can't be dropped unless + // shutdown() has been invoked on the client. let task = tokio::spawn(async move { let sec = Sec { log, @@ -140,12 +129,11 @@ pub fn sec(log: slog::Logger, sec_store: Arc) -> SecClient { SecClient { cmd_tx, task: Some(task), shutdown: false } } -/** - * Client handle for a Saga Execution Coordinator (SEC) - * - * This is the interface through which Steno consumers create new sagas, recover - * sagas that were created in previous lifetimes, list sagas, and so on. - */ +/// Client handle for a Saga Execution Coordinator (SEC) +/// +/// This is the interface through which Steno consumers create new sagas, +/// recover sagas that were created in previous lifetimes, list sagas, and so +/// on. #[derive(Debug)] pub struct SecClient { cmd_tx: mpsc::Sender, @@ -154,12 +142,10 @@ pub struct SecClient { } impl SecClient { - /** - * Creates a new saga, which may later started with [`Self::saga_start`]. - * - * This function asynchronously returns a `Future` that can be used to wait - * for the saga to finish. It's also safe to cancel (drop) this Future. - */ + /// Creates a new saga, which may later started with [`Self::saga_start`]. + /// + /// This function asynchronously returns a `Future` that can be used to wait + /// for the saga to finish. It's also safe to cancel (drop) this Future. pub async fn saga_create( &self, saga_id: SagaId, @@ -183,12 +169,10 @@ impl SecClient { .await } - /** - * Resume a saga that was previously running - * - * This function asynchronously returns a `Future` that can be used to wait - * for the saga to finish. It's also safe to cancel (drop) this Future. - */ + /// Resume a saga that was previously running + /// + /// This function asynchronously returns a `Future` that can be used to wait + /// for the saga to finish. It's also safe to cancel (drop) this Future. pub async fn saga_resume( &self, saga_id: SagaId, @@ -220,10 +204,8 @@ impl SecClient { .await } - /** - * Start running (or resume running) a saga that was created with - * [`SecClient::saga_create()`] or [`SecClient::saga_resume()`]. - */ + /// Start running (or resume running) a saga that was created with + /// [`SecClient::saga_create()`] or [`SecClient::saga_resume()`]. pub async fn saga_start( &self, saga_id: SagaId, @@ -232,9 +214,7 @@ impl SecClient { self.sec_cmd(ack_rx, SecClientMsg::SagaStart { ack_tx, saga_id }).await } - /** - * List known sagas - */ + /// List known sagas pub async fn saga_list( &self, marker: Option, @@ -245,17 +225,13 @@ impl SecClient { .await } - /** - * Fetch information about one saga - */ + /// Fetch information about one saga pub async fn saga_get(&self, saga_id: SagaId) -> Result { let (ack_tx, ack_rx) = oneshot::channel(); self.sec_cmd(ack_rx, SecClientMsg::SagaGet { ack_tx, saga_id }).await } - /** - * Inject an error into one saga - */ + /// Inject an error into one saga pub async fn saga_inject_error( &self, saga_id: SagaId, @@ -269,9 +245,7 @@ impl SecClient { .await } - /** - * Shut down the SEC and wait for it to come to rest. - */ + /// Shut down the SEC and wait for it to come to rest. pub async fn shutdown(mut self) { self.shutdown = true; self.cmd_tx.send(SecClientMsg::Shutdown).await.unwrap_or_else( @@ -284,14 +258,12 @@ impl SecClient { .expect("failed to join on SEC task"); } - /** - * Sends `msg` to the SEC and waits for a response on `ack_rx` - * - * The SEC is not expected to shut down until we issue the shutdown command, - * which only happens when the consumer has given up ownership of this - * object. So we can assume that the SEC is still running and that these - * channel operations will not fail. - */ + /// Sends `msg` to the SEC and waits for a response on `ack_rx` + /// + /// The SEC is not expected to shut down until we issue the shutdown + /// command, which only happens when the consumer has given up ownership + /// of this object. So we can assume that the SEC is still running and + /// that these channel operations will not fail. async fn sec_cmd( &self, ack_rx: oneshot::Receiver, @@ -307,24 +279,22 @@ impl SecClient { impl Drop for SecClient { fn drop(&mut self) { if !self.shutdown { - /* - * If we get here, there should be no outstanding requests on this - * channel, in which case there must be buffer space and try_send() - * ought not to fail for running out of space. It may fail if the - * other side is closed, but that should only happen if the SEC task - * panicked. - */ + // If we get here, there should be no outstanding requests on this + // channel, in which case there must be buffer space and try_send() + // ought not to fail for running out of space. It may fail if the + // other side is closed, but that should only happen if the SEC task + // panicked. let _ = self.cmd_tx.try_send(SecClientMsg::Shutdown); } } } -/** External consumer's view of a saga */ +/// External consumer's view of a saga #[derive(Clone, Debug, JsonSchema, Serialize)] pub struct SagaView { pub id: SagaId, - /* TODO-debugging impl an appropriate Serialize here */ + // TODO-debugging impl an appropriate Serialize here #[serde(skip)] pub state: SagaStateView, @@ -342,13 +312,11 @@ impl SagaView { } } - /** - * Returns an object that impl's serde's `Deserialize` and `Serialize` - * traits - * - * This is mainly intended for tooling and demoing. Production state - * serialization happens via the [`SecStore`]. - */ + /// Returns an object that impl's serde's `Deserialize` and `Serialize` + /// traits + /// + /// This is mainly intended for tooling and demoing. Production state + /// serialization happens via the [`SecStore`]. pub fn serialized(&self) -> SagaSerialized { SagaSerialized { saga_id: self.id, @@ -358,24 +326,24 @@ impl SagaView { } } -/** State-specific parts of a consumer's view of a saga */ +/// State-specific parts of a consumer's view of a saga #[derive(Debug, Clone)] pub enum SagaStateView { - /** The saga is ready to start running */ + /// The saga is ready to start running Ready { - /** initial execution status */ + /// initial execution status status: SagaExecStatus, }, - /** The saga is still running */ + /// The saga is still running Running { - /** current execution status */ + /// current execution status status: SagaExecStatus, }, - /** The saga has finished running */ + /// The saga has finished running Done { - /** final execution status */ + /// final execution status status: SagaExecStatus, - /** final result */ + /// final result result: SagaResult, }, } @@ -415,7 +383,7 @@ impl SagaStateView { } } - /** Returns the status summary for this saga */ + /// Returns the status summary for this saga pub fn status(&self) -> &SagaExecStatus { match self { SagaStateView::Ready { status } => status, @@ -425,100 +393,88 @@ impl SagaStateView { } } -/* - * SEC Client/Server interface - */ - -/** - * Message passed from the [`SecClient`] to the [`Sec`] - */ -/* - * TODO-cleanup This might be cleaner using separate named structs for the - * enums, similar to what we do for SecStep. - */ +// SEC Client/Server interface + +/// Message passed from the [`SecClient`] to the [`Sec`] +// TODO-cleanup This might be cleaner using separate named structs for the +// enums, similar to what we do for SecStep. enum SecClientMsg { - /** - * Creates a new saga - * - * The response includes a Future that can be used to wait for the saga to - * finish. The caller can ignore this. - */ + /// Creates a new saga + /// + /// The response includes a Future that can be used to wait for the saga to + /// finish. The caller can ignore this. SagaCreate { - /** response channel */ + /// response channel ack_tx: oneshot::Sender< Result, anyhow::Error>, >, - /** caller-defined id (must be unique) */ + /// caller-defined id (must be unique) saga_id: SagaId, - /** user-type-specific parameters */ + /// user-type-specific parameters template_params: Box, - /** The user created DAG */ + /// The user created DAG dag: Arc, }, - /** - * Resumes a saga from a previous lifetime (i.e., after a restart) - * - * The response includes a Future that can be used to wait for the saga to - * finish. The caller can ignore this. - */ + /// Resumes a saga from a previous lifetime (i.e., after a restart) + /// + /// The response includes a Future that can be used to wait for the saga to + /// finish. The caller can ignore this. SagaResume { - /** response channel */ + /// response channel ack_tx: oneshot::Sender< Result, anyhow::Error>, >, - /** unique id of the saga (from persistent state) */ + /// unique id of the saga (from persistent state) saga_id: SagaId, - /** user-type-specific parameters */ + /// user-type-specific parameters template_params: Box, - /** The user created DAG */ + /// The user created DAG dag: Arc, }, - /** Start (or resume) running a saga */ + /// Start (or resume) running a saga SagaStart { - /** response channel */ + /// response channel ack_tx: oneshot::Sender>, - /** id of the saga to start running */ + /// id of the saga to start running saga_id: SagaId, }, - /** List sagas */ + /// List sagas SagaList { - /** response channel */ + /// response channel ack_tx: oneshot::Sender>, - /** marker (where in the ID space to start listing from) */ + /// marker (where in the ID space to start listing from) marker: Option, - /** maximum number of sagas to return */ + /// maximum number of sagas to return limit: NonZeroU32, }, - /** Fetch information about one saga */ + /// Fetch information about one saga SagaGet { - /** response channel */ + /// response channel ack_tx: oneshot::Sender>, - /** id of saga to fetch information about */ + /// id of saga to fetch information about saga_id: SagaId, }, - /** Inject an error at a specific action in the saga */ + /// Inject an error at a specific action in the saga SagaInjectError { - /** response channel */ + /// response channel ack_tx: oneshot::Sender>, - /** id of saga to fetch information about */ + /// id of saga to fetch information about saga_id: SagaId, - /** - * id of the node to inject the error (see - * [`SagaTemplateMetadata::node_for_name`]) - */ + /// id of the node to inject the error (see + /// [`SagaTemplateMetadata::node_for_name`]) node_id: NodeIndex, }, - /** Shut down the SEC */ + /// Shut down the SEC Shutdown, } @@ -559,11 +515,9 @@ impl fmt::Debug for SecClientMsg { } } -/** - * This trait erases the type parameters on a [`SagaTemplate`], user context, - * and user parameters so that we can more easily pass it through a channel. - * TODO(AJS) - rename since template no longer exists? - */ +/// This trait erases the type parameters on a [`SagaTemplate`], user context, +/// and user parameters so that we can more easily pass it through a channel. +/// TODO(AJS) - rename since template no longer exists? trait TemplateParams: Send + fmt::Debug { fn into_exec( self: Box, @@ -573,13 +527,11 @@ trait TemplateParams: Send + fmt::Debug { ) -> Result, anyhow::Error>; } -/** - * Stores a template and user context in a way where the - * user-defined types can be erased with [`TemplateParams`] - * - * This version is for the "create" case, where we know the specific - * [`SagaType`] for these values. See [`SecClient::saga_create`]. - */ +/// Stores a template and user context in a way where the +/// user-defined types can be erased with [`TemplateParams`] +/// +/// This version is for the "create" case, where we know the specific +/// [`SagaType`] for these values. See [`SecClient::saga_create`]. #[derive(Debug)] struct TemplateParamsForCreate { dag: Arc, @@ -608,14 +560,12 @@ where } } -/** - * Stores a template, saga parameters, and user context in a way where the - * user-defined types can be erased with [`TemplateParams] - * - * This version is for the "resume" case, where we know the specific context - * type, but not the parameters or template type. We also have a saga log in - * this case. See [`SecClient::saga_resume()`]. - */ +/// Stores a template, saga parameters, and user context in a way where the +/// user-defined types can be erased with [`TemplateParams] +/// +/// This version is for the "resume" case, where we know the specific context +/// type, but not the parameters or template type. We also have a saga log in +/// this case. See [`SecClient::saga_resume()`]. #[derive(Debug)] struct TemplateParamsForRecover { dag: Arc, @@ -646,14 +596,10 @@ where } } -/* - * SEC internal client side (handle used by SagaExecutor) - */ +// SEC internal client side (handle used by SagaExecutor) -/** - * Handle used by `SagaExecutor` for sending messages back to the SEC - */ -/* TODO-cleanup This should be pub(crate). See lib.rs. */ +/// Handle used by `SagaExecutor` for sending messages back to the SEC +// TODO-cleanup This should be pub(crate). See lib.rs. #[derive(Debug)] pub struct SecExecClient { saga_id: SagaId, @@ -661,7 +607,7 @@ pub struct SecExecClient { } impl SecExecClient { - /** Write `event` to the saga log */ + /// Write `event` to the saga log pub async fn record(&self, event: SagaNodeEvent) { assert_eq!(event.saga_id, self.saga_id); let (ack_tx, ack_rx) = oneshot::channel(); @@ -672,9 +618,7 @@ impl SecExecClient { .await } - /** - * Update the cached state for the saga - */ + /// Update the cached state for the saga pub async fn saga_update(&self, update: SagaCachedState) { let (ack_tx, ack_rx) = oneshot::channel(); self.sec_send( @@ -702,70 +646,62 @@ impl SecExecClient { ack_rx: oneshot::Receiver, msg: SecExecMsg, ) -> T { - /* - * TODO-robustness How does shutdown interact (if this channel gets - * closed)? - */ + // TODO-robustness How does shutdown interact (if this channel gets + // closed)? self.exec_tx.send(msg).await.unwrap(); ack_rx.await.unwrap() } } -/** - * Message passed from the [`SecExecClient`] to the [`Sec`] - */ +/// Message passed from the [`SecExecClient`] to the [`Sec`] #[derive(Debug)] enum SecExecMsg { - /** Fetch the status of a saga */ + /// Fetch the status of a saga SagaGet(SagaGetData), - /** Record an event to the saga log */ + /// Record an event to the saga log LogEvent(SagaLogEventData), - /** Update the cached state of a saga */ + /// Update the cached state of a saga UpdateCachedState(SagaUpdateCacheData), } -/** See [`SecExecMsg::SagaGet`] */ -/* TODO-cleanup commonize with the client's SagaGet */ +/// See [`SecExecMsg::SagaGet`] +// TODO-cleanup commonize with the client's SagaGet #[derive(Debug)] struct SagaGetData { - /** response channel */ + /// response channel ack_tx: oneshot::Sender>, - /** saga being updated */ + /// saga being updated saga_id: SagaId, } -/** See [`SecExecMsg::LogEvent`] */ +/// See [`SecExecMsg::LogEvent`] #[derive(Debug)] struct SagaLogEventData { - /** response channel */ + /// response channel ack_tx: oneshot::Sender<()>, - /** event to be recorded to the saga log */ + /// event to be recorded to the saga log event: SagaNodeEvent, } -/** See [`SecExecMsg::UpdateCachedState`] */ +/// See [`SecExecMsg::UpdateCachedState`] #[derive(Debug)] struct SagaUpdateCacheData { - /** response channel */ + /// response channel ack_tx: oneshot::Sender<()>, - /** saga being updated */ + /// saga being updated saga_id: SagaId, - /** updated state */ + /// updated state updated_state: SagaCachedState, } -/* - * SEC server side (background task) - */ +// SEC server side (background task) -/** - * The `Sec` (Saga Execution Coordinator) is responsible for tracking and - * running sagas - * - * Steno consumers create this via [`sec()`]. - */ +/// The `Sec` (Saga Execution Coordinator) is responsible for tracking and +/// running sagas +/// +/// Steno consumers create this via [`sec()`]. struct Sec { log: slog::Logger, sagas: BTreeMap, @@ -778,30 +714,28 @@ struct Sec { } impl Sec { - /** Body of the SEC's task */ + /// Body of the SEC's task async fn run(mut self) { - /* - * Until we're asked to shutdown, wait for any sagas to finish or for - * messages to be received on the command channel. - * - * It's important to avoid waiting for any Futures to complete in the - * body of this loop aside from those that we're explicitly selecting - * on. Bad things can happen if such a Future were to block on some - * operation that requires the loop in order to proceed. For example, - * the Futures generated by cmd_saga_get() and cmd_saga_list() both - * block on the SagaExecutor, which can in turn block on the Sec in - * order to write log entries to the SecStore. It's critical that we're - * able to respond to requests from the SagaExecutor to write log - * entries to the SecStore even when we're blocked on that SagaExecutor - * to fetch its status. As much as possible, any time the Sec needs to - * do async work, that should be wrapped in a Future that's inserted - * into `self.futures`. That way we can poll on it with all the other - * work we have to do. - * - * Another failure mode to consider is if writes to the SecStore hang. - * We still want status requests from the SecClient to complete. This - * is another reason to avoid any sort of blocking in the main loop. - */ + // Until we're asked to shutdown, wait for any sagas to finish or for + // messages to be received on the command channel. + // + // It's important to avoid waiting for any Futures to complete in the + // body of this loop aside from those that we're explicitly selecting + // on. Bad things can happen if such a Future were to block on some + // operation that requires the loop in order to proceed. For example, + // the Futures generated by cmd_saga_get() and cmd_saga_list() both + // block on the SagaExecutor, which can in turn block on the Sec in + // order to write log entries to the SecStore. It's critical that we're + // able to respond to requests from the SagaExecutor to write log + // entries to the SecStore even when we're blocked on that SagaExecutor + // to fetch its status. As much as possible, any time the Sec needs to + // do async work, that should be wrapped in a Future that's inserted + // into `self.futures`. That way we can poll on it with all the other + // work we have to do. + // + // Another failure mode to consider is if writes to the SecStore hang. + // We still want status requests from the SecClient to complete. This + // is another reason to avoid any sort of blocking in the main loop. info!(&self.log, "SEC running"); while !self.shutdown || !self.futures.is_empty() { tokio::select! { @@ -864,9 +798,7 @@ impl Sec { } } - /* - * Dispatch functions for miscellaneous async work - */ + // Dispatch functions for miscellaneous async work fn dispatch_work(&mut self, step: SecStep) { match step { @@ -883,10 +815,10 @@ impl Sec { let exec_tx = self.exec_tx.clone(); let sec_hdl = SecExecClient { saga_id, exec_tx }; - /* Prepare a channel used to wait for the saga to finish. */ + // Prepare a channel used to wait for the saga to finish. let (done_tx, done_rx) = oneshot::channel(); - /* Create the executor to run this saga. */ + // Create the executor to run this saga. let maybe_exec = rec.template_params.into_exec(log.new(o!()), saga_id, sec_hdl); if let Err(e) = maybe_exec { @@ -919,16 +851,14 @@ impl Sec { self.do_saga_start(saga_id).unwrap(); } - /* Return a Future that the consumer can use to wait for the saga. */ + // Return a Future that the consumer can use to wait for the saga. Sec::client_respond( &log, ack_tx, Ok(async move { - /* - * It should not be possible for the receive to fail because the - * other side will not be closed while the saga is still - * running. - */ + // It should not be possible for the receive to fail because the + // other side will not be closed while the saga is still + // running. done_rx.await.unwrap_or_else(|_| { panic!("failed to wait for saga to finish") }) @@ -1014,9 +944,7 @@ impl Sec { } } - /* - * Dispatch functions for consumer client messages - */ + // Dispatch functions for consumer client messages fn dispatch_client_message(&mut self, message: SecClientMsg) { match message { @@ -1078,7 +1006,7 @@ impl Sec { "saga_id" => saga_id.to_string(), "saga_name" => dag.saga_name.to_string(), )); - /* TODO-log Figure out the way to log JSON objects to a JSON drain */ + // TODO-log Figure out the way to log JSON objects to a JSON drain // TODO(AJS) - Get rid of this unwrap? let serialized_dag = serde_json::to_value(&dag) .map_err(ActionError::new_serialize) @@ -1088,9 +1016,7 @@ impl Sec { "dag" => serde_json::to_string(&serialized_dag).unwrap() ); - /* - * Before doing anything else, create a persistent record for this saga. - */ + // Before doing anything else, create a persistent record for this saga. let saga_create = SagaCreateParams { id: saga_id, name: dag.saga_name.clone(), @@ -1135,7 +1061,7 @@ impl Sec { "saga_id" => saga_id.to_string(), "saga_name" => dag.saga_name.to_string(), )); - /* TODO-log Figure out the way to log JSON objects to a JSON drain */ + // TODO-log Figure out the way to log JSON objects to a JSON drain // TODO(AJS) - Get rid of this unwrap? let serialized_dag = serde_json::to_value(&dag) .map_err(ActionError::new_serialize) @@ -1161,14 +1087,12 @@ impl Sec { limit: NonZeroU32, ) { trace!(&self.log, "saga_list"); - /* TODO-cleanup */ + // TODO-cleanup let log = self.log.new(o!()); - /* - * We always expect to be able to go from NonZeroU32 to usize. This - * would only not be true on systems with usize < 32 bits, which seems - * an unlikely target for us. - */ + // We always expect to be able to go from NonZeroU32 to usize. This + // would only not be true on systems with usize < 32 bits, which seems + // an unlikely target for us. let limit = usize::try_from(limit.get()).unwrap(); let futures = match marker { None => self @@ -1200,12 +1124,10 @@ impl Sec { ); } - /* - * TODO-cleanup We should define a useful error type for the SEC. This - * function can only produce a NotFound, and we use `()` just to - * communicate that there's only one kind of error here (so that the caller - * can produce an appropriate NotFound instead of a generic error). - */ + // TODO-cleanup We should define a useful error type for the SEC. This + // function can only produce a NotFound, and we use `()` just to + // communicate that there's only one kind of error here (so that the caller + // can produce an appropriate NotFound instead of a generic error). fn cmd_saga_get( &self, ack_tx: oneshot::Sender>, @@ -1270,17 +1192,13 @@ impl Sec { } fn cmd_shutdown(&mut self) { - /* - * TODO We probably want to stop executing any sagas that are running at - * this point. - */ + // TODO We probably want to stop executing any sagas that are running at + // this point. info!(&self.log, "initiating shutdown"); self.shutdown = true; } - /* - * Dispatch functions for SagaExecutor messages - */ + // Dispatch functions for SagaExecutor messages fn dispatch_exec_message(&mut self, exec_message: SecExecMsg) { let log = self.log.new(o!()); @@ -1347,9 +1265,7 @@ impl Sec { } } -/** - * Represents the internal state of a saga in the [`Sec`] - */ +/// Represents the internal state of a saga in the [`Sec`] struct Saga { id: SagaId, log: slog::Logger, @@ -1359,55 +1275,52 @@ struct Saga { #[derive(Debug)] pub enum SagaRunState { - /** Saga is ready to be run */ + /// Saga is ready to be run Ready { - /** Handle to executor (for status, etc.) */ + /// Handle to executor (for status, etc.) exec: Arc, - /** Notify when the saga is done */ + /// Notify when the saga is done waiter: oneshot::Sender, }, - /** Saga is currently running */ + /// Saga is currently running Running { - /** Handle to executor (for status, etc.) */ + /// Handle to executor (for status, etc.) exec: Arc, - /** Notify when the saga is done */ + /// Notify when the saga is done waiter: oneshot::Sender, }, - /** Saga has finished */ + /// Saga has finished Done { - /** Final execution status */ + /// Final execution status status: SagaExecStatus, - /** Overall saga result */ + /// Overall saga result result: SagaResult, }, } -/** - * Describes the next step that an SEC needs to take in order to process a - * command, execute a saga, or any other asynchronous work - * - * This provides a uniform interface that can be processed in the body of the - * SEC loop. - * - * In some cases, it would seem clearer to write straight-line async code to - * handle a complete client request. However, that code would wind up borrowing - * the Sec (sometimes mutably) for the duration of async work. It's important - * to avoid that here in order to avoid deadlock or blocking all operations in - * pathological conditions (e.g., when writes to the database hang). - */ +/// Describes the next step that an SEC needs to take in order to process a +/// command, execute a saga, or any other asynchronous work +/// +/// This provides a uniform interface that can be processed in the body of the +/// SEC loop. +/// +/// In some cases, it would seem clearer to write straight-line async code to +/// handle a complete client request. However, that code would wind up +/// borrowing the Sec (sometimes mutably) for the duration of async work. It's +/// important to avoid that here in order to avoid deadlock or blocking all +/// operations in pathological conditions (e.g., when writes to the database +/// hang). enum SecStep { - /** Start tracking a new saga, either as part of "create" or "resume" */ + /// Start tracking a new saga, either as part of "create" or "resume" SagaInsert(SagaInsertData), - /** A saga has just finished. */ + /// A saga has just finished. SagaDone(SagaDoneData), } -/** Data associated with [`SecStep::SagaInsert`] */ -/* - * TODO-cleanup This could probably be commonized with a struct that makes up - * the body of the CreateSaga message. - */ +/// Data associated with [`SecStep::SagaInsert`] +// TODO-cleanup This could probably be commonized with a struct that makes up +// the body of the CreateSaga message. struct SagaInsertData { log: slog::Logger, saga_id: SagaId, @@ -1418,7 +1331,7 @@ struct SagaInsertData { autostart: bool, } -/** Data associated with [`SecStep::SagaDone`] */ +/// Data associated with [`SecStep::SagaDone`] struct SagaDoneData { saga_id: SagaId, result: SagaResult, diff --git a/src/store.rs b/src/store.rs index 6bdc589..a35e99a 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,6 +1,4 @@ -/*! - * [`SecStore`] trait, related types, and built-in implementations - */ +//! [`SecStore`] trait, related types, and built-in implementations use crate::SagaId; use crate::SagaName; @@ -19,42 +17,34 @@ use std::fmt; /// Correct implementation of these interfaces is critical for crash recovery. #[async_trait] pub trait SecStore: fmt::Debug + Send + Sync { - /** - * Create a record for a newly created saga - * - * Once this step has completed, the saga will be discovered and recovered - * upon startup. Until this step has completed, the saga has not finished - * being created (since it won't be recovered on startup). - */ + /// Create a record for a newly created saga + /// + /// Once this step has completed, the saga will be discovered and recovered + /// upon startup. Until this step has completed, the saga has not finished + /// being created (since it won't be recovered on startup). async fn saga_create( &self, create_params: SagaCreateParams, ) -> Result<(), anyhow::Error>; - /** - * Write a record to a saga's persistent log - */ + /// Write a record to a saga's persistent log async fn record_event(&self, event: SagaNodeEvent); - /** - * Update the cached runtime state of the saga - * - * Steno invokes this function when the saga has reached one of the states - * described by [`SagaCachedState`] (like "Done"). This allows consumers to - * persistently record this information for easy access. This step is not - * strictly required for correctness, since the saga log contains all the - * information needed to determine this state. But by recording when a saga - * has finished, for example, the consumer can avoid having to read the - * saga's log altogether when it next starts up since there's no need to - * recover the saga. - */ + /// Update the cached runtime state of the saga + /// + /// Steno invokes this function when the saga has reached one of the states + /// described by [`SagaCachedState`] (like "Done"). This allows consumers + /// to persistently record this information for easy access. This step + /// is not strictly required for correctness, since the saga log + /// contains all the information needed to determine this state. But by + /// recording when a saga has finished, for example, the consumer can + /// avoid having to read the saga's log altogether when it next starts + /// up since there's no need to recover the saga. async fn saga_update(&self, id: SagaId, update: SagaCachedState); } -/** - * Describes what an impl of [`SecStore`] needs to store for a persistent saga - * record. - */ +/// Describes what an impl of [`SecStore`] needs to store for a persistent saga +/// record. #[derive(Clone, Debug, Serialize, Deserialize, JsonSchema)] pub struct SagaCreateParams { pub id: SagaId, @@ -67,11 +57,9 @@ pub struct SagaCreateParams { pub state: SagaCachedState, } -/** - * Describes the cacheable state of the saga - * - * See [`SecStore::saga_update`]. - */ +/// Describes the cacheable state of the saga +/// +/// See [`SecStore::saga_update`]. #[derive( Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize, JsonSchema, )] @@ -91,12 +79,10 @@ impl fmt::Display for SagaCachedState { impl TryFrom<&str> for SagaCachedState { type Error = anyhow::Error; fn try_from(value: &str) -> Result { - /* - * Round-tripping through serde is a little absurd, but has the benefit - * of always staying in sync with the real definition. (The initial - * serialization is necessary to correctly handle any quotes or the like - * in the input string.) - */ + // Round-tripping through serde is a little absurd, but has the benefit + // of always staying in sync with the real definition. (The initial + // serialization is necessary to correctly handle any quotes or the like + // in the input string.) let json = serde_json::to_string(value).unwrap(); serde_json::from_str(&json).context("parsing saga state") } @@ -112,12 +98,10 @@ impl<'a> From<&'a SagaCachedState> for &'a str { } } -/** - * Implementation of [`SecStore`] that doesn't store any state persistently - * - * Sagas created using this store will not be recovered after the program - * crashes. - */ +/// Implementation of [`SecStore`] that doesn't store any state persistently +/// +/// Sagas created using this store will not be recovered after the program +/// crashes. #[derive(Debug)] pub struct InMemorySecStore {} @@ -133,15 +117,15 @@ impl SecStore for InMemorySecStore { &self, _create_params: SagaCreateParams, ) -> Result<(), anyhow::Error> { - /* Nothing to do. */ + // Nothing to do. Ok(()) } async fn record_event(&self, _event: SagaNodeEvent) { - /* Nothing to do. */ + // Nothing to do. } async fn saga_update(&self, _id: SagaId, _update: SagaCachedState) { - /* Nothing to do. */ + // Nothing to do. } } diff --git a/tests/test_smoke.rs b/tests/test_smoke.rs index c3c89e1..30268a8 100644 --- a/tests/test_smoke.rs +++ b/tests/test_smoke.rs @@ -1,7 +1,5 @@ -/*! - * Smoke tests for steno. These aren't close to exhaustive, but tests that it's - * not completely broken. - */ +//! Smoke tests for steno. These aren't close to exhaustive, but tests that +//! it's not completely broken. use expectorate::assert_contents; use std::env::current_exe; @@ -11,10 +9,8 @@ use subprocess::Exec; use subprocess::Redirection; fn example_bin() -> PathBuf { - /* - * This is unfortunate, but it's the best way I know to run one of the - * examples out of our project. - */ + // This is unfortunate, but it's the best way I know to run one of the + // examples out of our project. let mut my_path = current_exe().expect("failed to find test program"); my_path.pop(); assert_eq!(my_path.file_name().unwrap(), "deps"); @@ -77,18 +73,18 @@ fn cmd_run_error() { #[test] fn cmd_run_recover() { - /* Do a normal run and save the log so we can try recovering from it. */ + // Do a normal run and save the log so we can try recovering from it. let log = run_example("recover1", |exec| { exec.arg("run").arg("--dump-to=-").arg("--quiet") }); - /* First, try recovery without having changed anything. */ + // First, try recovery without having changed anything. let recovery_done = run_example("recover2", |exec| { exec.arg("run").arg("--recover-from=-").stdin(log.as_str()) }); assert_contents("tests/test_smoke_run_recover_done.out", &recovery_done); - /* Now try lopping off the last handful of records so there's work to do. */ + // Now try lopping off the last handful of records so there's work to do. let mut log_parsed: SagaSerialized = serde_json::from_str(&log).expect("failed to parse generated log"); log_parsed.events.truncate( @@ -107,7 +103,7 @@ fn cmd_run_recover() { #[test] fn cmd_run_recover_unwind() { - /* Do a failed run and save the log so we can try recovering from it. */ + // Do a failed run and save the log so we can try recovering from it. let log = run_example("recover_fail1", |exec| { exec.arg("run") .arg("--dump-to=-") @@ -115,7 +111,7 @@ fn cmd_run_recover_unwind() { .arg("--inject-error=instance_boot") }); - /* First, try recovery without having changed anything. */ + // First, try recovery without having changed anything. let recovery_done = run_example("recover_fail2", |exec| { exec.arg("run").arg("--recover-from=-").stdin(log.as_str()) }); @@ -124,7 +120,7 @@ fn cmd_run_recover_unwind() { &recovery_done, ); - /* Now try lopping off the last handful of records so there's work to do. */ + // Now try lopping off the last handful of records so there's work to do. let mut log_parsed: SagaSerialized = serde_json::from_str(&log).expect("failed to parse generated log"); log_parsed.events.truncate( diff --git a/tests/test_unregistered_action.rs b/tests/test_unregistered_action.rs index 1716c4d..6fd6371 100644 --- a/tests/test_unregistered_action.rs +++ b/tests/test_unregistered_action.rs @@ -58,8 +58,8 @@ async fn unregistered_action() { if let Err(error) = result { assert_eq!( format!("{:#}", error), - "validating saga \"my-saga\": \ - action for node \"my_node\" not registered: \"my_action\"" + "validating saga \"my-saga\": action for node \"my_node\" not \ + registered: \"my_action\"" ); } else { panic!("expected failure to create saga with unregistered action");