Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
KirilMihaylov committed Feb 26, 2025
1 parent d00e0c9 commit 09df03f
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 88 deletions.
2 changes: 1 addition & 1 deletion applications/alarms-dispatcher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::Arc;
use anyhow::{Context as _, Result};
use tokio::sync::Mutex;

use ::task::{spawn_new, spawn_restarting, Run, RunnableState, Task};
use chain_ops::{
node,
signer::{Gas, Signer},
Expand All @@ -20,7 +21,6 @@ use environment::ReadFromVar as _;
use protocol_watcher::Command;
use service::supervisor::configuration::Service;
use supervisor::supervisor;
use ::task::{spawn_new, spawn_restarting, Run, RunnableState, Task};
use task_set::TaskSet;
use tx::{NoExpiration, TxPackage};

Expand Down
2 changes: 1 addition & 1 deletion applications/alarms-dispatcher/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ use cosmrs::{
use serde::{Deserialize, Serialize};
use tokio::{sync::oneshot, time::sleep};

use ::tx::{NoExpiration, TxPackage};
use chain_ops::{node::QueryTx, signer::Gas, tx};
use channel::unbounded;
use contract::{CheckedContract, GeneralizedOracle};
use task::{Run, RunnableState};
use ::tx::{NoExpiration, TxPackage};

macro_rules! log {
($macro:ident![$self:expr]($($body:tt)+)) => {
Expand Down
2 changes: 1 addition & 1 deletion applications/market-data-feeder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tokio::{
time::{sleep, Instant},
};

use ::task::{spawn_new, spawn_restarting, RunnableState, Task};
use chain_ops::{
node::{self, QueryTx},
signer::Gas,
Expand All @@ -29,7 +30,6 @@ use environment::ReadFromVar as _;
use protocol_watcher::Command;
use service::supervisor::configuration::Service;
use supervisor::supervisor;
use ::task::{spawn_new, spawn_restarting, RunnableState, Task};
use task_set::TaskSet;
use tx::{TimeBasedExpiration, TxPackage};

Expand Down
12 changes: 6 additions & 6 deletions applications/market-data-feeder/src/state/price_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use chain_ops::node;
use chain_ops::node::QueryTx;
use contract::{Admin, CheckedContract};
use std::sync::Arc;
use tokio::sync::Mutex;
use cosmrs::Gas;
use std::collections::BTreeMap;
use chain_ops::node;
use std::sync::Arc;
use std::time::Duration;
use cosmrs::Gas;
use chain_ops::node::QueryTx;
use tokio::sync::Mutex;

#[derive(Clone)]
#[must_use]
Expand All @@ -17,4 +17,4 @@ pub struct State {
pub hard_gas_limit: Gas,
pub query_tx: QueryTx,
pub timeout_duration: Duration,
}
}
2 changes: 1 addition & 1 deletion applications/market-data-feeder/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tokio::{
time::{interval, sleep, timeout, Instant, MissedTickBehavior},
};

use ::tx::{TimeBasedExpiration, TxPackage};
use chain_ops::{
node,
tx::{self, ExecuteTemplate},
Expand All @@ -25,7 +26,6 @@ use defer::Defer;
use dex::provider::{Amount, Base, CurrencyPair, Decimal, Dex, Quote};
use task::RunnableState;
use task_set::TaskSet;
use ::tx::{TimeBasedExpiration, TxPackage};

use crate::oracle::Oracle;

Expand Down
132 changes: 54 additions & 78 deletions libraries/supervisor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,53 +49,31 @@ where

let mut terminate_signal = pin!(ctrl_c());

let terminate_signal_sent = {
let mut polled = false;

poll_fn(|ctx| {
if polled {
Poll::Ready(Ok(false))
} else {
polled = true;

terminate_signal
.as_mut()
.poll(ctx)
.map(|result| result.map(|()| true))
}
})
.await?
};

if terminate_signal_sent {
return Ok(state);
}

loop {
state = match join_or_receive_or_terminate(
state = match await_with_action_receiver(
&mut tasks,
&mut rx,
terminate_signal.as_mut(),
)
.await
{
JoinOrReceiveOrTerminate::Received(action_result) => {
AwaitWithActionResult::Received(action_result) => {
action_handler(&mut tasks, state, action_result).await?
},
JoinOrReceiveOrTerminate::ReceiverClosed => {
AwaitWithActionResult::ReceiverClosed => {
break;
},
JoinOrReceiveOrTerminate::Joined(id, result) => {
AwaitWithActionResult::Joined(id, result) => {
let Err(()) = log_errors(result) else {
continue;
};

on_error_exit(&mut tasks, state, id).await?
},
JoinOrReceiveOrTerminate::JoinSetEmpty => {
AwaitWithActionResult::JoinSetEmpty => {
return Ok(state);
},
JoinOrReceiveOrTerminate::Shutdown(result) => {
AwaitWithActionResult::Shutdown(result) => {
return result.map(|()| state);
},
};
Expand All @@ -106,20 +84,23 @@ where
drop(action_handler);

loop {
state = match join_or_terminate(&mut tasks, terminate_signal.as_mut())
.await
state = match await_without_action_receiver(
&mut tasks,
terminate_signal.as_mut(),
)
.await
{
JoinOrTerminate::Joined(id, result) => {
AwaitWithoutActionResult::Joined(id, result) => {
let Err(()) = log_errors(result) else {
continue;
};

on_error_exit(&mut tasks, state, id).await?
},
JoinOrTerminate::JoinSetEmpty => {
AwaitWithoutActionResult::JoinSetEmpty => {
break Ok(state);
},
JoinOrTerminate::Shutdown(result) => {
AwaitWithoutActionResult::Shutdown(result) => {
break result.map(|()| state);
},
};
Expand All @@ -138,11 +119,11 @@ fn log_errors(result: Result<Result<()>, JoinError>) -> Result<(), ()> {
})
}

async fn join_or_receive_or_terminate<Id, Receiver, TerminateSignal>(
async fn await_with_action_receiver<Id, Receiver, TerminateSignal>(
tasks: &mut TaskSet<Id, Result<()>>,
rx: &mut Receiver,
mut terminate_signal: Pin<&mut TerminateSignal>,
) -> JoinOrReceiveOrTerminate<Receiver::Value, Id>
) -> AwaitWithActionResult<Receiver::Value, Id>
where
Id: Unpin,
Receiver: channel::Receiver,
Expand All @@ -152,75 +133,70 @@ where

let mut receive_action = pin!(rx.recv());

poll_fn({
move |ctx| {
if let Poll::Ready(receive_result) =
receive_action.as_mut().poll(ctx)
{
Poll::Ready(match receive_result {
Ok(received) => {
JoinOrReceiveOrTerminate::Received(received)
},
Err(Closed {}) => JoinOrReceiveOrTerminate::ReceiverClosed,
})
} else if let Poll::Ready(join_result) =
join_next_task.as_mut().poll(ctx)
{
Poll::Ready(join_result.map_or(
const { JoinOrReceiveOrTerminate::JoinSetEmpty },
|(id, result)| JoinOrReceiveOrTerminate::Joined(id, result),
))
} else {
terminate_signal
.as_mut()
.poll(ctx)
.map_err(Into::into)
.map(JoinOrReceiveOrTerminate::Shutdown)
poll_fn(move |ctx| {
if let Poll::Ready(receive_result) = receive_action.as_mut().poll(ctx) {
Poll::Ready(match receive_result {
Ok(received) => AwaitWithActionResult::Received(received),
Err(Closed {}) => AwaitWithActionResult::ReceiverClosed,
})
} else if let Poll::Ready(join_result) =
join_next_task.as_mut().poll(ctx)
{
Poll::Ready(match join_result {
Some((id, result)) => AwaitWithActionResult::Joined(id, result),
None => AwaitWithActionResult::JoinSetEmpty,
})
} else {
match terminate_signal.as_mut().poll(ctx) {
Poll::Ready(Ok(())) => const {
Poll::Ready(AwaitWithActionResult::Shutdown(Ok(())))
},
Poll::Ready(Err(error)) => Poll::Ready(
AwaitWithActionResult::Shutdown(Err(error.into())),
),
Poll::Pending => const { Poll::Pending },
}
}
})
.await
}

async fn join_or_terminate<Id, TerminateSignal>(
async fn await_without_action_receiver<Id, TerminateSignal>(
tasks: &mut TaskSet<Id, Result<()>>,
mut terminate_signal: Pin<&mut TerminateSignal>,
) -> JoinOrTerminate<Id>
) -> AwaitWithoutActionResult<Id>
where
Id: Unpin,
TerminateSignal: Future<Output = Result<(), io::Error>>,
{
let mut join_next_task = pin!(tasks.join_next());

poll_fn({
move |ctx| {
if let Poll::Ready(join_result) = join_next_task.as_mut().poll(ctx)
{
Poll::Ready(join_result.map_or(
const { JoinOrTerminate::JoinSetEmpty },
|(id, result)| JoinOrTerminate::Joined(id, result),
))
} else {
terminate_signal
.as_mut()
.poll(ctx)
.map_err(Into::into)
.map(JoinOrTerminate::Shutdown)
}
poll_fn(move |ctx| {
if let Poll::Ready(join_result) = join_next_task.as_mut().poll(ctx) {
Poll::Ready(join_result.map_or(
const { AwaitWithoutActionResult::JoinSetEmpty },
|(id, result)| AwaitWithoutActionResult::Joined(id, result),
))
} else {
terminate_signal
.as_mut()
.poll(ctx)
.map_err(Into::into)
.map(AwaitWithoutActionResult::Shutdown)
}
})
.await
}

enum JoinOrReceiveOrTerminate<Value, Id> {
enum AwaitWithActionResult<Value, Id> {
Received(Value),
ReceiverClosed,
Joined(Id, Result<Result<()>, JoinError>),
JoinSetEmpty,
Shutdown(Result<()>),
}

enum JoinOrTerminate<Id> {
enum AwaitWithoutActionResult<Id> {
Joined(Id, Result<Result<()>, JoinError>),
JoinSetEmpty,
Shutdown(Result<()>),
Expand Down

0 comments on commit 09df03f

Please sign in to comment.