Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State machines: Add Executor trait #705

Merged
merged 4 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,56 @@ use microservices::esb;

use crate::bus::{BusMsg, ServiceBus};
use crate::Endpoints;
use crate::LogStyle;
use crate::ServiceId;

pub trait StateMachineExecutor<
Runtime: esb::Handler<ServiceBus>,
Error: std::error::Error,
T: StateMachine<Runtime, Error>,
> where
esb::Error<ServiceId>: From<<Runtime as esb::Handler<ServiceBus>>::Error>,
{
fn execute(
runtime: &mut Runtime,
endpoints: &mut Endpoints,
source: ServiceId,
request: BusMsg,
sm: T,
) -> Result<Option<T>, Error> {
let event = Event::with(endpoints, runtime.identity(), source, request);
let sm_display = sm.to_string();
let sm_name = sm.name();
if let Some(new_sm) = sm.next(event, runtime)? {
let new_sm_display = new_sm.to_string();
// relegate state transitions staying the same to debug
if new_sm_display == sm_display {
debug!(
"{} state self transition {}",
sm_name,
new_sm.bright_green_bold()
);
} else {
info!(
"{} state transition {} -> {}",
sm_name,
sm_display.red_bold(),
new_sm.bright_green_bold()
);
}
Ok(Some(new_sm))
} else {
info!(
"{} state machine ended {} -> {}",
sm_name,
sm_display.red_bold(),
"End".to_string().bright_green_bold()
);
Ok(None)
}
}
}

/// State machine used by runtimes for managing complex asynchronous workflows
pub trait StateMachine<Runtime: esb::Handler<ServiceBus>, Error: std::error::Error>:
std::fmt::Display
Expand All @@ -15,6 +63,8 @@ where
fn next(self, event: Event, runtime: &mut Runtime) -> Result<Option<Self>, Error>
where
Self: Sized;

fn name(&self) -> String;
}

/// Event changing state machine state, consisting of a certain P2P or RPC `request` sent from some
Expand Down
86 changes: 9 additions & 77 deletions src/farcasterd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use crate::bus::msg::{self, Msg};
use crate::bus::rpc::NodeInfo;
use crate::bus::sync::SyncMsg;
use crate::bus::{BusMsg, List, ServiceBus};
use crate::event::{Event, StateMachine};
use crate::farcasterd::syncer_state_machine::SyncerStateMachine;
use crate::farcasterd::trade_state_machine::TradeStateMachine;
use crate::event::StateMachineExecutor;
use crate::farcasterd::syncer_state_machine::{SyncerStateMachine, SyncerStateMachineExecutor};
use crate::farcasterd::trade_state_machine::{TradeStateMachine, TradeStateMachineExecutor};
use crate::farcasterd::Opts;
use crate::syncerd::{Event as SyncerEvent, SweepSuccess, TaskId};
use crate::{
Expand Down Expand Up @@ -389,7 +389,8 @@ impl Runtime {
.drain(..)
.collect::<Vec<TradeStateMachine>>();
for tsm in moved_trade_state_machines.drain(..) {
if let Some(new_tsm) = self.execute_trade_state_machine(
if let Some(new_tsm) = TradeStateMachineExecutor::execute(
self,
endpoints,
source.clone(),
request.clone(),
Expand All @@ -403,7 +404,8 @@ impl Runtime {
.drain()
.collect::<Vec<(TaskId, SyncerStateMachine)>>();
for (task_id, ssm) in moved_syncer_state_machines.drain(..) {
if let Some(new_ssm) = self.execute_syncer_state_machine(
if let Some(new_ssm) = SyncerStateMachineExecutor::execute(
self,
endpoints,
source.clone(),
request.clone(),
Expand Down Expand Up @@ -1003,7 +1005,7 @@ impl Runtime {
self.match_request_to_trade_state_machine(request.clone(), source.clone())?
{
if let Some(new_tsm) =
self.execute_trade_state_machine(endpoints, source, request, tsm)?
TradeStateMachineExecutor::execute(self, endpoints, source, request, tsm)?
{
self.trade_state_machines.push(new_tsm);
}
Expand All @@ -1012,7 +1014,7 @@ impl Runtime {
self.match_request_to_syncer_state_machine(request.clone(), source.clone())?
{
if let Some(new_ssm) =
self.execute_syncer_state_machine(endpoints, source, request, ssm)?
SyncerStateMachineExecutor::execute(self, endpoints, source, request, ssm)?
{
if let Some(task_id) = new_ssm.task_id() {
self.syncer_state_machines.insert(task_id, new_ssm);
Expand All @@ -1027,76 +1029,6 @@ impl Runtime {
}
}

fn execute_syncer_state_machine(
&mut self,
endpoints: &mut Endpoints,
source: ServiceId,
request: BusMsg,
ssm: SyncerStateMachine,
) -> Result<Option<SyncerStateMachine>, Error> {
let event = Event::with(endpoints, self.identity(), source, request);
let ssm_display = ssm.to_string();
if let Some(new_ssm) = ssm.next(event, self)? {
let new_ssm_display = new_ssm.to_string();
// relegate state transitions staying the same to debug
if new_ssm_display == ssm_display {
debug!(
"Syncer state self transition {}",
new_ssm.bright_green_bold()
);
} else {
info!(
"Syncer state transition {} -> {}",
ssm_display.red_bold(),
new_ssm.bright_green_bold()
);
}
Ok(Some(new_ssm))
} else {
info!(
"Syncer state machine ended {} -> {}",
ssm_display.red_bold(),
"End".to_string().bright_green_bold()
);
Ok(None)
}
}

fn execute_trade_state_machine(
&mut self,
endpoints: &mut Endpoints,
source: ServiceId,
request: BusMsg,
tsm: TradeStateMachine,
) -> Result<Option<TradeStateMachine>, Error> {
let event = Event::with(endpoints, self.identity(), source, request);
let tsm_display = tsm.to_string();
if let Some(new_tsm) = tsm.next(event, self)? {
let new_tsm_display = new_tsm.to_string();
// relegate state transitions staying the same to debug
if new_tsm_display == tsm_display {
debug!(
"Trade state self transition {}",
new_tsm.bright_green_bold()
);
} else {
info!(
"Trade state transition {} -> {}",
tsm_display.red_bold(),
new_tsm.bright_green_bold()
);
}
Ok(Some(new_tsm))
} else {
info!(
"Trade state machine ended {} -> {}",
tsm_display.red_bold(),
"End".to_string().bright_green_bold()
);
Ok(None)
}
}

pub fn listen(&mut self, bind_addr: InetSocketAddr) -> Result<NodeId, Error> {
self.services_ready()?;
let (peer_secret_key, peer_public_key) = self.peer_keys_ready()?;
Expand Down
17 changes: 15 additions & 2 deletions src/farcasterd/syncer_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
bus::sync::SyncMsg,
bus::BusMsg,
error::Error,
event::{Event, StateMachine},
event::{Event, StateMachine, StateMachineExecutor},
syncerd::{Event as SyncerEvent, SweepAddress, SweepAddressAddendum, Task, TaskId},
ServiceId,
};
Expand Down Expand Up @@ -78,8 +78,15 @@ impl StateMachine<Runtime, Error> for SyncerStateMachine {
}
}
}

fn name(&self) -> String {
"Syncer".to_string()
}
}

pub struct SyncerStateMachineExecutor {}
impl StateMachineExecutor<Runtime, Error, SyncerStateMachine> for SyncerStateMachineExecutor {}

impl SyncerStateMachine {
pub fn task_id(&self) -> Option<TaskId> {
match self {
Expand Down Expand Up @@ -173,7 +180,13 @@ fn attempt_transition_to_awaiting_syncer_or_awaiting_syncer_request(
}
}

_ => Ok(None),
req => {
warn!(
"Request {} from {} invalid for state start - invalidating.",
req, event.source
);
Ok(None)
}
}
}

Expand Down
54 changes: 20 additions & 34 deletions src/farcasterd/trade_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::LogStyle;
use crate::{
bus::{BusMsg, Outcome},
error::Error,
event::{Event, StateMachine},
event::{Event, StateMachine, StateMachineExecutor},
ServiceId,
};
use bitcoin::hashes::hex::ToHex;
Expand Down Expand Up @@ -188,8 +188,15 @@ impl StateMachine<Runtime, Error> for TradeStateMachine {
}
}
}

fn name(&self) -> String {
"Syncer".to_string()
}
}

pub struct TradeStateMachineExecutor {}
impl StateMachineExecutor<Runtime, Error, TradeStateMachine> for TradeStateMachineExecutor {}

impl TradeStateMachine {
pub fn open_offer(&self) -> Option<PublicOffer> {
if let TradeStateMachine::MakeOffer(MakeOffer { public_offer, .. }) = self {
Expand Down Expand Up @@ -345,17 +352,10 @@ fn attempt_transition_to_make_offer(
}
}
req => {
if let BusMsg::Ctl(Ctl::Hello) = req {
trace!(
"BusMsg {} invalid for state start maker - invalidating.",
req
);
} else {
warn!(
"BusMsg {} invalid for state start maker - invalidating.",
req
);
}
warn!(
"Request {} from {} invalid for state start maker - invalidating.",
req, event.source
);
Ok(None)
}
}
Expand Down Expand Up @@ -441,17 +441,10 @@ fn attempt_transition_to_take_offer(
}
}
req => {
if let BusMsg::Ctl(Ctl::Hello) = req {
trace!(
"BusMsg {} invalid for state start restore - invalidating.",
req
);
} else {
warn!(
"BusMsg {} invalid for state start restore - invalidating.",
req
);
}
warn!(
"Request {} from {} invalid for state start restore - invalidating.",
req, event.source,
);
Ok(None)
}
}
Expand Down Expand Up @@ -529,17 +522,10 @@ fn attempt_transition_to_restoring_swapd(
})))
}
req => {
if let BusMsg::Ctl(Ctl::Hello) = req {
trace!(
"BusMsg {} invalid for state start restore - invalidating.",
req
);
} else {
warn!(
"BusMsg {} invalid for state start restore - invalidating.",
req
);
}
warn!(
"Request {} from {} invalid for state start restore - invalidating.",
req, event.source,
);
Ok(None)
}
}
Expand Down