diff --git a/examples/n2c-miniprotocols/src/main.rs b/examples/n2c-miniprotocols/src/main.rs index 6ca04582..00072ba4 100644 --- a/examples/n2c-miniprotocols/src/main.rs +++ b/examples/n2c-miniprotocols/src/main.rs @@ -1,19 +1,30 @@ use pallas::network::{ facades::NodeClient, - miniprotocols::{chainsync, localstate, Point, MAINNET_MAGIC}, + miniprotocols::{chainsync, localstate::queries_v16, Point, PRE_PRODUCTION_MAGIC}, }; use tracing::info; async fn do_localstate_query(client: &mut NodeClient) { - client.statequery().acquire(None).await.unwrap(); + let client = client.statequery(); - let result = client - .statequery() - .query(localstate::queries::Request::GetSystemStart) + client.acquire(None).await.unwrap(); + + let result = queries_v16::get_chain_point(client).await.unwrap(); + info!("result: {:?}", result); + + let result = queries_v16::get_system_start(client).await.unwrap(); + info!("result: {:?}", result); + + let era = queries_v16::get_current_era(client).await.unwrap(); + info!("result: {:?}", era); + + let result = queries_v16::get_block_epoch_number(client, era) .await .unwrap(); - info!("system start result: {:?}", result); + info!("result: {:?}", result); + + client.send_release().await.unwrap(); } async fn do_chainsync(client: &mut NodeClient) { @@ -43,6 +54,10 @@ async fn do_chainsync(client: &mut NodeClient) { } } +// change the following to match the Cardano node socket in your local +// environment +const SOCKET_PATH: &str = "/tmp/node.socket"; + #[cfg(target_family = "unix")] #[tokio::main] async fn main() { @@ -55,15 +70,7 @@ async fn main() { // we connect to the unix socket of the local node. Make sure you have the right // path for your environment - let socket_path = "/tmp/node.socket"; - - // we connect to the unix socket of the local node and perform a handshake query - let version_table = NodeClient::handshake_query(socket_path, MAINNET_MAGIC) - .await - .unwrap(); - info!("handshake query result: {:?}", version_table); - - let mut client = NodeClient::connect(socket_path, MAINNET_MAGIC) + let mut client = NodeClient::connect(SOCKET_PATH, PRE_PRODUCTION_MAGIC) .await .unwrap(); @@ -75,7 +82,6 @@ async fn main() { } #[cfg(not(target_family = "unix"))] - fn main() { panic!("can't use n2c unix socket on non-unix systems"); } diff --git a/pallas-network/src/facades.rs b/pallas-network/src/facades.rs index 2cd8b5dd..491fdbd8 100644 --- a/pallas-network/src/facades.rs +++ b/pallas-network/src/facades.rs @@ -282,7 +282,7 @@ impl NodeServer { plexer_handle, version: ver, chainsync: server_cs, - statequery: server_sq + statequery: server_sq, }) } else { plexer_handle.abort(); diff --git a/pallas-network/src/miniprotocols/localstate/client.rs b/pallas-network/src/miniprotocols/localstate/client.rs index f14d79cb..34438d86 100644 --- a/pallas-network/src/miniprotocols/localstate/client.rs +++ b/pallas-network/src/miniprotocols/localstate/client.rs @@ -1,11 +1,8 @@ +use pallas_codec::utils::AnyCbor; use std::fmt::Debug; - -use pallas_codec::Fragment; - -use std::marker::PhantomData; use thiserror::*; -use super::{AcquireFailure, Message, Query, State}; +use super::{AcquireFailure, Message, State}; use crate::miniprotocols::Point; use crate::multiplexer; @@ -13,16 +10,25 @@ use crate::multiplexer; pub enum ClientError { #[error("attempted to receive message while agency is ours")] AgencyIsOurs, + #[error("attempted to send message while agency is theirs")] AgencyIsTheirs, + #[error("inbound message is not valid for current state")] InvalidInbound, + #[error("outbound message is not valid for current state")] InvalidOutbound, + #[error("failure acquiring point, not found")] AcquirePointNotFound, + #[error("failure acquiring point, too old")] AcquirePointTooOld, + + #[error("failure decoding CBOR data")] + InvalidCbor(pallas_codec::minicbor::decode::Error), + #[error("error while sending or receiving data through the channel")] Plexer(multiplexer::Error), } @@ -36,22 +42,11 @@ impl From for ClientError { } } -pub struct GenericClient(State, multiplexer::ChannelBuffer, PhantomData) -where - Q: Query, - Message: Fragment; +pub struct GenericClient(State, multiplexer::ChannelBuffer); -impl GenericClient -where - Q: Query, - Message: Fragment, -{ +impl GenericClient { pub fn new(channel: multiplexer::AgentChannel) -> Self { - Self( - State::Idle, - multiplexer::ChannelBuffer::new(channel), - PhantomData {}, - ) + Self(State::Idle, multiplexer::ChannelBuffer::new(channel)) } pub fn state(&self) -> &State { @@ -87,7 +82,7 @@ where } } - fn assert_outbound_state(&self, msg: &Message) -> Result<(), ClientError> { + fn assert_outbound_state(&self, msg: &Message) -> Result<(), ClientError> { match (&self.0, msg) { (State::Idle, Message::Acquire(_)) => Ok(()), (State::Idle, Message::Done) => Ok(()), @@ -98,7 +93,7 @@ where } } - fn assert_inbound_state(&self, msg: &Message) -> Result<(), ClientError> { + fn assert_inbound_state(&self, msg: &Message) -> Result<(), ClientError> { match (&self.0, msg) { (State::Acquiring, Message::Acquired) => Ok(()), (State::Acquiring, Message::Failure(_)) => Ok(()), @@ -107,15 +102,18 @@ where } } - pub async fn send_message(&mut self, msg: &Message) -> Result<(), ClientError> { + pub async fn send_message(&mut self, msg: &Message) -> Result<(), ClientError> { self.assert_agency_is_ours()?; self.assert_outbound_state(msg)?; - self.1.send_msg_chunks(msg).await.map_err(ClientError::Plexer)?; + self.1 + .send_msg_chunks(msg) + .await + .map_err(ClientError::Plexer)?; Ok(()) } - pub async fn recv_message(&mut self) -> Result, ClientError> { + pub async fn recv_message(&mut self) -> Result { self.assert_agency_is_theirs()?; let msg = self.1.recv_full_msg().await.map_err(ClientError::Plexer)?; self.assert_inbound_state(&msg)?; @@ -124,7 +122,7 @@ where } pub async fn send_acquire(&mut self, point: Option) -> Result<(), ClientError> { - let msg = Message::::Acquire(point); + let msg = Message::Acquire(point); self.send_message(&msg).await?; self.0 = State::Acquiring; @@ -132,7 +130,7 @@ where } pub async fn send_reacquire(&mut self, point: Option) -> Result<(), ClientError> { - let msg = Message::::ReAcquire(point); + let msg = Message::ReAcquire(point); self.send_message(&msg).await?; self.0 = State::Acquiring; @@ -140,7 +138,7 @@ where } pub async fn send_release(&mut self) -> Result<(), ClientError> { - let msg = Message::::Release; + let msg = Message::Release; self.send_message(&msg).await?; self.0 = State::Idle; @@ -148,7 +146,7 @@ where } pub async fn send_done(&mut self) -> Result<(), ClientError> { - let msg = Message::::Done; + let msg = Message::Done; self.send_message(&msg).await?; self.0 = State::Done; @@ -174,28 +172,38 @@ where self.recv_while_acquiring().await } - pub async fn send_query(&mut self, request: Q::Request) -> Result<(), ClientError> { - let msg = Message::::Query(request); + pub async fn send_query(&mut self, request: AnyCbor) -> Result { + let msg = Message::Query(request); self.send_message(&msg).await?; self.0 = State::Querying; - Ok(()) + Ok(msg) } - pub async fn recv_while_querying(&mut self) -> Result { + pub async fn recv_while_querying(&mut self) -> Result { match self.recv_message().await? { - Message::Result(x) => { + Message::Result(result) => { self.0 = State::Acquired; - Ok(x) + Ok(result) } _ => Err(ClientError::InvalidInbound), } } - pub async fn query(&mut self, request: Q::Request) -> Result { + pub async fn query_any(&mut self, request: AnyCbor) -> Result { self.send_query(request).await?; self.recv_while_querying().await } + + pub async fn query(&mut self, request: Q) -> Result + where + Q: pallas_codec::minicbor::Encode<()>, + for<'b> R: pallas_codec::minicbor::Decode<'b, ()>, + { + let request = AnyCbor::from_encode(request); + let response = self.query_any(request).await?; + response.into_decode().map_err(ClientError::InvalidCbor) + } } -pub type Client = GenericClient; +pub type Client = GenericClient; diff --git a/pallas-network/src/miniprotocols/localstate/codec.rs b/pallas-network/src/miniprotocols/localstate/codec.rs index 7b747eb0..7673004f 100644 --- a/pallas-network/src/miniprotocols/localstate/codec.rs +++ b/pallas-network/src/miniprotocols/localstate/codec.rs @@ -1,6 +1,6 @@ use pallas_codec::minicbor::{decode, encode, Decode, Encode, Encoder}; -use super::{AcquireFailure, Message, Query}; +use super::{AcquireFailure, Message}; impl Encode<()> for AcquireFailure { fn encode( @@ -36,12 +36,7 @@ impl<'b> Decode<'b, ()> for AcquireFailure { } } -impl Encode<()> for Message -where - Q: Query, - Q::Request: Encode<()>, - Q::Response: Encode<()>, -{ +impl Encode<()> for Message { fn encode( &self, e: &mut Encoder, @@ -97,12 +92,7 @@ where } } -impl<'b, Q> Decode<'b, ()> for Message -where - Q: Query, - Q::Request: Decode<'b, ()>, - Q::Response: Decode<'b, ()>, -{ +impl<'b> Decode<'b, ()> for Message { fn decode( d: &mut pallas_codec::minicbor::Decoder<'b>, _ctx: &mut (), diff --git a/pallas-network/src/miniprotocols/localstate/mod.rs b/pallas-network/src/miniprotocols/localstate/mod.rs index 1e478a40..2485b17b 100644 --- a/pallas-network/src/miniprotocols/localstate/mod.rs +++ b/pallas-network/src/miniprotocols/localstate/mod.rs @@ -1,9 +1,10 @@ mod client; mod codec; mod protocol; -pub mod queries; mod server; +pub mod queries_v16; + pub use client::*; pub use codec::*; pub use protocol::*; diff --git a/pallas-network/src/miniprotocols/localstate/protocol.rs b/pallas-network/src/miniprotocols/localstate/protocol.rs index 1c82106d..1d162c27 100644 --- a/pallas-network/src/miniprotocols/localstate/protocol.rs +++ b/pallas-network/src/miniprotocols/localstate/protocol.rs @@ -1,5 +1,7 @@ use std::fmt::Debug; +use pallas_codec::utils::AnyCbor; + use crate::miniprotocols::Point; #[derive(Debug, PartialEq, Eq, Clone)] @@ -17,18 +19,13 @@ pub enum AcquireFailure { PointNotOnChain, } -pub trait Query: Debug { - type Request: Clone + Debug; - type Response: Clone + Debug; -} - #[derive(Debug)] -pub enum Message { +pub enum Message { Acquire(Option), Failure(AcquireFailure), Acquired, - Query(Q::Request), - Result(Q::Response), + Query(AnyCbor), + Result(AnyCbor), ReAcquire(Option), Release, Done, diff --git a/pallas-network/src/miniprotocols/localstate/queries.rs b/pallas-network/src/miniprotocols/localstate/queries.rs deleted file mode 100644 index 7cae029d..00000000 --- a/pallas-network/src/miniprotocols/localstate/queries.rs +++ /dev/null @@ -1,293 +0,0 @@ -use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder}; - -use super::Query; - -// https://github.com/input-output-hk/ouroboros-consensus/blob/main/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/Query.hs -#[derive(Debug, Clone, PartialEq)] -#[repr(u16)] -pub enum BlockQuery { - GetLedgerTip, - GetEpochNo, - // GetNonMyopicMemberRewards(()), - GetCurrentPParams, - GetProposedPParamsUpdates, - GetStakeDistribution, - // GetUTxOByAddress(()), - // GetUTxOWhole, (Response too large for now) - // DebugEpochState, (Response too large for now) - // GetCBOR(()), - // GetFilteredDelegationsAndRewardAccounts(()), - GetGenesisConfig, - // DebugNewEpochState, (Response too large for now) - DebugChainDepState, - GetRewardProvenance, - // GetUTxOByTxIn(()), - GetStakePools, - // GetStakePoolParams(()), - GetRewardInfoPools, - // GetPoolState(()), - // GetStakeSnapshots(()), - // GetPoolDistr(()), - // GetStakeDelegDeposits(()), - // GetConstitutionHash, -} - -impl Encode<()> for BlockQuery { - fn encode( - &self, - e: &mut Encoder, - _ctx: &mut (), - ) -> Result<(), encode::Error> { - e.array(2)?; - e.u16(0)?; - e.array(2)?; - /* - TODO: Think this is era or something? First fetch era with - [3, [0, [2, [1]]]], then use it here? - */ - e.u16(5)?; - match self { - BlockQuery::GetLedgerTip => { - e.array(1)?; - e.u16(0)?; - } - BlockQuery::GetEpochNo => { - e.array(1)?; - e.u16(1)?; - } - // BlockQuery::GetNonMyopicMemberRewards(()) => { - // e.array(X)?; - // e.u16(2)?; - // } - BlockQuery::GetCurrentPParams => { - e.array(1)?; - e.u16(3)?; - } - BlockQuery::GetProposedPParamsUpdates => { - e.array(1)?; - e.u16(4)?; - } - BlockQuery::GetStakeDistribution => { - e.array(1)?; - e.u16(5)?; - } - // BlockQuery::GetUTxOByAddress(()) => { - // e.array(X)?; - // e.u16(6)?; - // } - // BlockQuery::GetUTxOWhole => { - // e.array(1)?; - // e.u16(7)?; - // } - // BlockQuery::DebugEpochState => { - // e.array(1)?; - // e.u16(8)?; - // } - // BlockQuery::GetCBOR(()) => { - // e.array(X)?; - // e.u16(9)?; - // } - // BlockQuery::GetFilteredDelegationsAndRewardAccounts(()) => { - // e.array(X)?; - // e.u16(10)?; - // } - BlockQuery::GetGenesisConfig => { - e.array(1)?; - e.u16(11)?; - } - // BlockQuery::DebugNewEpochState => { - // e.array(1)?; - // e.u16(12)?; - // } - BlockQuery::DebugChainDepState => { - e.array(1)?; - e.u16(13)?; - } - BlockQuery::GetRewardProvenance => { - e.array(1)?; - e.u16(14)?; - } - // BlockQuery::GetUTxOByTxIn(()) => { - // e.array(X)?; - // e.u16(15)?; - // } - BlockQuery::GetStakePools => { - e.array(1)?; - e.u16(16)?; - } - // BlockQuery::GetStakePoolParams(()) => { - // e.array(X)?; - // e.u16(17)?; - // } - BlockQuery::GetRewardInfoPools => { - e.array(1)?; - e.u16(18)?; - } - // BlockQuery::GetPoolState(()) => { - // e.array(X)?; - // e.u16(19)?; - // } - // BlockQuery::GetStakeSnapshots(()) => { - // e.array(X)?; - // e.u16(20)?; - // } - // BlockQuery::GetPoolDistr(()) => { - // e.array(X)?; - // e.u16(21)?; - // } - // BlockQuery::GetStakeDelegDeposits(()) => { - // e.array(X)?; - // e.u16(22)?; - // } - // BlockQuery::GetConstitutionHash => { - // e.array(1)?; - // e.u16(23)?; - // } - } - Ok(()) - } -} - -impl<'b> Decode<'b, ()> for BlockQuery { - fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { - d.array()?; - d.u16()?; - d.array()?; - d.u16()?; - d.array()?; - - match d.u16()? { - 0 => Ok(Self::GetLedgerTip), - 1 => Ok(Self::GetEpochNo), - // 2 => Ok(Self::GetNonMyopicMemberRewards(())), - 3 => Ok(Self::GetCurrentPParams), - 4 => Ok(Self::GetProposedPParamsUpdates), - 5 => Ok(Self::GetStakeDistribution), - // 6 => Ok(Self::GetUTxOByAddress(())), - // 7 => Ok(Self::GetUTxOWhole), - // 8 => Ok(Self::DebugEpochState), - // 9 => Ok(Self::GetCBOR(())), - // 10 => Ok(Self::GetFilteredDelegationsAndRewardAccounts(())), - 11 => Ok(Self::GetGenesisConfig), - // 12 => Ok(Self::DebugNewEpochState), - 13 => Ok(Self::DebugChainDepState), - 14 => Ok(Self::GetRewardProvenance), - // 15 => Ok(Self::GetUTxOByTxIn(())), - 16 => Ok(Self::GetStakePools), - // 17 => Ok(Self::GetStakePoolParams(())), - 18 => Ok(Self::GetRewardInfoPools), - // 19 => Ok(Self::GetPoolState(())), - // 20 => Ok(Self::GetStakeSnapshots(())), - // 21 => Ok(Self::GetPoolDistr(())), - // 22 => Ok(Self::GetStakeDelegDeposits(())), - // 23 => Ok(Self::GetConstitutionHash), - _ => unreachable!(), - } - } -} - -#[derive(Debug, Clone, PartialEq)] -pub enum Request { - BlockQuery(BlockQuery), - GetSystemStart, - GetChainBlockNo, - GetChainPoint, -} - -impl Encode<()> for Request { - fn encode( - &self, - e: &mut Encoder, - _ctx: &mut (), - ) -> Result<(), encode::Error> { - match self { - Self::BlockQuery(q) => { - e.array(2)?; - e.u16(0)?; - e.encode(q)?; - - Ok(()) - } - Self::GetSystemStart => { - e.array(1)?; - e.u16(1)?; - Ok(()) - } - Self::GetChainBlockNo => { - e.array(1)?; - e.u16(2)?; - Ok(()) - } - Self::GetChainPoint => { - e.array(1)?; - e.u16(3)?; - Ok(()) - } - } - } -} - -impl<'b> Decode<'b, ()> for Request { - fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { - let size = match d.array()? { - Some(l) => l, - None => return Err(decode::Error::message("unexpected indefinite len list")), - }; - - let tag = d.u16()?; - - match (size, tag) { - (2, 0) => Ok(Self::BlockQuery(d.decode()?)), - (1, 1) => Ok(Self::GetSystemStart), - (1, 2) => Ok(Self::GetChainBlockNo), - (1, 3) => Ok(Self::GetChainPoint), - _ => { - return Err(decode::Error::message( - "invalid (size, tag) for lsq request", - )) - } - } - } -} - -#[derive(Debug, Clone, PartialEq)] -pub struct GenericResponse(Vec); - -impl GenericResponse { - /// "bytes" must be valid CBOR - pub fn new(bytes: Vec) -> Self { - Self(bytes) - } -} - -impl Encode<()> for GenericResponse { - fn encode( - &self, - e: &mut Encoder, - _ctx: &mut (), - ) -> Result<(), encode::Error> { - e.writer_mut() - .write_all(&self.0) - .map_err(|e| encode::Error::write(e)) - } -} - -impl<'b> Decode<'b, ()> for GenericResponse { - fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { - let start = d.position(); - d.skip()?; - let end = d.position(); - let slice = &d.input()[start..end]; - let vec = slice.to_vec(); - Ok(GenericResponse(vec)) - } -} - -/// Queries available as of N2C V16 -#[derive(Debug, Clone)] -pub struct QueryV16 {} - -impl Query for QueryV16 { - type Request = Request; - type Response = GenericResponse; -} diff --git a/pallas-network/src/miniprotocols/localstate/queries_v16/codec.rs b/pallas-network/src/miniprotocols/localstate/queries_v16/codec.rs new file mode 100644 index 00000000..f5be36b0 --- /dev/null +++ b/pallas-network/src/miniprotocols/localstate/queries_v16/codec.rs @@ -0,0 +1,248 @@ +use super::*; +use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder}; + +impl Encode<()> for BlockQuery { + fn encode( + &self, + e: &mut Encoder, + _ctx: &mut (), + ) -> Result<(), encode::Error> { + match self { + BlockQuery::GetLedgerTip => { + e.array(1)?; + e.u16(0)?; + } + BlockQuery::GetEpochNo => { + e.array(1)?; + e.u16(1)?; + } + BlockQuery::GetNonMyopicMemberRewards(x) => { + e.array(2)?; + e.u16(2)?; + e.encode(x)?; + } + BlockQuery::GetCurrentPParams => { + e.array(1)?; + e.u16(3)?; + } + BlockQuery::GetProposedPParamsUpdates => { + e.array(1)?; + e.u16(4)?; + } + BlockQuery::GetStakeDistribution => { + e.array(1)?; + e.u16(5)?; + } + BlockQuery::GetUTxOByAddress(x) => { + e.array(2)?; + e.u16(6)?; + e.encode(x)?; + } + BlockQuery::GetUTxOWhole => { + e.encode((7,))?; + } + BlockQuery::DebugEpochState => { + e.array(1)?; + e.u16(8)?; + } + BlockQuery::GetCBOR(x) => { + e.array(2)?; + e.u16(9)?; + e.encode(x)?; + } + BlockQuery::GetFilteredDelegationsAndRewardAccounts(x) => { + e.array(2)?; + e.u16(10)?; + e.encode(x)?; + } + BlockQuery::GetGenesisConfig => { + e.array(1)?; + e.u16(11)?; + } + BlockQuery::DebugNewEpochState => { + e.array(1)?; + e.u16(12)?; + } + BlockQuery::DebugChainDepState => { + e.array(1)?; + e.u16(13)?; + } + BlockQuery::GetRewardProvenance => { + e.array(1)?; + e.u16(14)?; + } + BlockQuery::GetUTxOByTxIn(_) => { + e.array(2)?; + e.u16(15)?; + e.encode(2)?; + } + BlockQuery::GetStakePools => { + e.array(1)?; + e.u16(16)?; + } + BlockQuery::GetStakePoolParams(x) => { + e.array(2)?; + e.u16(17)?; + e.encode(x)?; + } + BlockQuery::GetRewardInfoPools => { + e.array(1)?; + e.u16(18)?; + } + BlockQuery::GetPoolState(x) => { + e.array(2)?; + e.u16(19)?; + e.encode(x)?; + } + BlockQuery::GetStakeSnapshots(x) => { + e.array(2)?; + e.u16(20)?; + e.encode(x)?; + } + BlockQuery::GetPoolDistr(x) => { + e.array(2)?; + e.u16(21)?; + e.encode(x)?; + } + BlockQuery::GetStakeDelegDeposits(x) => { + e.array(2)?; + e.u16(22)?; + e.encode(x)?; + } + BlockQuery::GetConstitutionHash => { + e.array(1)?; + e.u16(23)?; + } + } + Ok(()) + } +} + +impl<'b> Decode<'b, ()> for BlockQuery { + fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { + d.array()?; + + match d.u16()? { + 0 => Ok(Self::GetLedgerTip), + 1 => Ok(Self::GetEpochNo), + // 2 => Ok(Self::GetNonMyopicMemberRewards(())), + 3 => Ok(Self::GetCurrentPParams), + 4 => Ok(Self::GetProposedPParamsUpdates), + 5 => Ok(Self::GetStakeDistribution), + // 6 => Ok(Self::GetUTxOByAddress(())), + // 7 => Ok(Self::GetUTxOWhole), + // 8 => Ok(Self::DebugEpochState), + // 9 => Ok(Self::GetCBOR(())), + // 10 => Ok(Self::GetFilteredDelegationsAndRewardAccounts(())), + 11 => Ok(Self::GetGenesisConfig), + // 12 => Ok(Self::DebugNewEpochState), + 13 => Ok(Self::DebugChainDepState), + 14 => Ok(Self::GetRewardProvenance), + // 15 => Ok(Self::GetUTxOByTxIn(())), + 16 => Ok(Self::GetStakePools), + // 17 => Ok(Self::GetStakePoolParams(())), + 18 => Ok(Self::GetRewardInfoPools), + // 19 => Ok(Self::GetPoolState(())), + // 20 => Ok(Self::GetStakeSnapshots(())), + // 21 => Ok(Self::GetPoolDistr(())), + // 22 => Ok(Self::GetStakeDelegDeposits(())), + // 23 => Ok(Self::GetConstitutionHash), + _ => unreachable!(), + } + } +} + +impl Encode<()> for HardForkQuery { + fn encode( + &self, + e: &mut Encoder, + _ctx: &mut (), + ) -> Result<(), encode::Error> { + match self { + HardForkQuery::GetInterpreter => { + e.encode((0,))?; + } + HardForkQuery::GetCurrentEra => { + e.encode((1,))?; + } + } + + Ok(()) + } +} + +impl<'b> Decode<'b, ()> for HardForkQuery { + fn decode(_d: &mut Decoder<'b>, _: &mut ()) -> Result { + todo!() + } +} + +impl Encode<()> for LedgerQuery { + fn encode( + &self, + e: &mut Encoder, + _: &mut (), + ) -> Result<(), encode::Error> { + match self { + LedgerQuery::BlockQuery(era, q) => { + e.encode((0, (era, q)))?; + } + LedgerQuery::HardForkQuery(q) => { + e.encode((2, q))?; + } + } + + Ok(()) + } +} + +impl<'b> Decode<'b, ()> for LedgerQuery { + fn decode(_d: &mut Decoder<'b>, _: &mut ()) -> Result { + todo!() + } +} + +impl Encode<()> for Request { + fn encode( + &self, + e: &mut Encoder, + _ctx: &mut (), + ) -> Result<(), encode::Error> { + match self { + Self::LedgerQuery(q) => { + e.encode((0, q))?; + Ok(()) + } + Self::GetSystemStart => { + e.array(1)?; + e.u16(1)?; + Ok(()) + } + Self::GetChainBlockNo => { + e.array(1)?; + e.u16(2)?; + Ok(()) + } + Self::GetChainPoint => { + e.array(1)?; + e.u16(3)?; + Ok(()) + } + } + } +} + +impl<'b> Decode<'b, ()> for Request { + fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result { + d.array()?; + let tag = d.u16()?; + + match tag { + 0 => Ok(Self::LedgerQuery(d.decode()?)), + 1 => Ok(Self::GetSystemStart), + 2 => Ok(Self::GetChainBlockNo), + 3 => Ok(Self::GetChainPoint), + _ => Err(decode::Error::message("invalid tag")), + } + } +} diff --git a/pallas-network/src/miniprotocols/localstate/queries_v16/mod.rs b/pallas-network/src/miniprotocols/localstate/queries_v16/mod.rs new file mode 100644 index 00000000..68f3dd64 --- /dev/null +++ b/pallas-network/src/miniprotocols/localstate/queries_v16/mod.rs @@ -0,0 +1,113 @@ +// TODO: this should move to pallas::ledger crate at some point + +// required for derive attrs to work +use pallas_codec::minicbor; + +use pallas_codec::{ + minicbor::{Decode, Encode}, + utils::AnyCbor, +}; + +use crate::miniprotocols::Point; + +use super::{Client, ClientError}; + +mod codec; + +// https://github.com/input-output-hk/ouroboros-consensus/blob/main/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/Query.hs +#[derive(Debug, Clone, PartialEq)] +#[repr(u16)] +pub enum BlockQuery { + GetLedgerTip, + GetEpochNo, + GetNonMyopicMemberRewards(AnyCbor), + GetCurrentPParams, + GetProposedPParamsUpdates, + GetStakeDistribution, + GetUTxOByAddress(AnyCbor), + GetUTxOWhole, + DebugEpochState, + GetCBOR(AnyCbor), + GetFilteredDelegationsAndRewardAccounts(AnyCbor), + GetGenesisConfig, + DebugNewEpochState, + DebugChainDepState, + GetRewardProvenance, + GetUTxOByTxIn(AnyCbor), + GetStakePools, + GetStakePoolParams(AnyCbor), + GetRewardInfoPools, + GetPoolState(AnyCbor), + GetStakeSnapshots(AnyCbor), + GetPoolDistr(AnyCbor), + GetStakeDelegDeposits(AnyCbor), + GetConstitutionHash, +} + +#[derive(Debug, Clone, PartialEq)] +#[repr(u16)] +pub enum HardForkQuery { + GetInterpreter, + GetCurrentEra, +} + +pub type Proto = u16; +pub type Era = u16; + +#[derive(Debug, Clone, PartialEq)] +pub enum LedgerQuery { + BlockQuery(Era, BlockQuery), + HardForkQuery(HardForkQuery), +} + +#[derive(Debug, Clone, PartialEq)] +pub enum Request { + LedgerQuery(LedgerQuery), + GetSystemStart, + GetChainBlockNo, + GetChainPoint, +} + +#[derive(Debug, Encode, Decode, PartialEq)] +pub struct SystemStart { + #[n(0)] + pub year: u32, + + #[n(1)] + pub day_of_year: u32, + + #[n(2)] + pub picoseconds_of_day: u64, +} + +pub async fn get_chain_point(client: &mut Client) -> Result { + let query = Request::GetChainPoint; + let result = client.query(query).await?; + + Ok(result) +} + +pub async fn get_current_era(client: &mut Client) -> Result { + let query = HardForkQuery::GetCurrentEra; + let query = LedgerQuery::HardForkQuery(query); + let query = Request::LedgerQuery(query); + let result = client.query(query).await?; + + Ok(result) +} + +pub async fn get_system_start(client: &mut Client) -> Result { + let query = Request::GetSystemStart; + let result = client.query(query).await?; + + Ok(result) +} + +pub async fn get_block_epoch_number(client: &mut Client, era: u16) -> Result { + let query = BlockQuery::GetEpochNo; + let query = LedgerQuery::BlockQuery(era, query); + let query = Request::LedgerQuery(query); + let (result,): (_,) = client.query(query).await?; + + Ok(result) +} diff --git a/pallas-network/src/miniprotocols/localstate/server.rs b/pallas-network/src/miniprotocols/localstate/server.rs index d3ceba2e..db5ed473 100644 --- a/pallas-network/src/miniprotocols/localstate/server.rs +++ b/pallas-network/src/miniprotocols/localstate/server.rs @@ -1,11 +1,8 @@ +use pallas_codec::utils::AnyCbor; use std::fmt::Debug; - -use pallas_codec::Fragment; - -use std::marker::PhantomData; use thiserror::*; -use super::{AcquireFailure, Message, Query, State}; +use super::{AcquireFailure, Message, State}; use crate::miniprotocols::Point; use crate::multiplexer; @@ -28,28 +25,17 @@ pub struct ClientAcquireRequest(pub Option); /// Request received from the client when in the Acquired state #[derive(Debug)] -pub enum ClientQueryRequest { +pub enum ClientQueryRequest { ReAcquire(Option), - Query(Q::Request), + Query(AnyCbor), Release, } -pub struct GenericServer(State, multiplexer::ChannelBuffer, PhantomData) -where - Q: Query, - Message: Fragment; +pub struct GenericServer(State, multiplexer::ChannelBuffer); -impl GenericServer -where - Q: Query, - Message: Fragment, -{ +impl GenericServer { pub fn new(channel: multiplexer::AgentChannel) -> Self { - Self( - State::Idle, - multiplexer::ChannelBuffer::new(channel), - PhantomData {}, - ) + Self(State::Idle, multiplexer::ChannelBuffer::new(channel)) } pub fn state(&self) -> &State { @@ -84,7 +70,7 @@ where } } - fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> { + fn assert_outbound_state(&self, msg: &Message) -> Result<(), Error> { match (&self.0, msg) { (State::Acquiring, Message::Acquired) => Ok(()), (State::Acquiring, Message::Failure(_)) => Ok(()), @@ -93,7 +79,7 @@ where } } - fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> { + fn assert_inbound_state(&self, msg: &Message) -> Result<(), Error> { match (&self.0, msg) { (State::Idle, Message::Acquire(_)) => Ok(()), (State::Idle, Message::Done) => Ok(()), @@ -104,7 +90,7 @@ where } } - pub async fn send_message(&mut self, msg: &Message) -> Result<(), Error> { + pub async fn send_message(&mut self, msg: &Message) -> Result<(), Error> { self.assert_agency_is_ours()?; self.assert_outbound_state(msg)?; self.1.send_msg_chunks(msg).await.map_err(Error::Plexer)?; @@ -112,7 +98,7 @@ where Ok(()) } - pub async fn recv_message(&mut self) -> Result, Error> { + pub async fn recv_message(&mut self) -> Result { self.assert_agency_is_theirs()?; let msg = self.1.recv_full_msg().await.map_err(Error::Plexer)?; self.assert_inbound_state(&msg)?; @@ -121,7 +107,7 @@ where } pub async fn send_failure(&mut self, reason: AcquireFailure) -> Result<(), Error> { - let msg = Message::::Failure(reason); + let msg = Message::Failure(reason); self.send_message(&msg).await?; self.0 = State::Idle; @@ -129,15 +115,15 @@ where } pub async fn send_acquired(&mut self) -> Result<(), Error> { - let msg = Message::::Acquired; + let msg = Message::Acquired; self.send_message(&msg).await?; self.0 = State::Acquired; Ok(()) } - pub async fn send_result(&mut self, response: Q::Response) -> Result<(), Error> { - let msg = Message::::Result(response); + pub async fn send_result(&mut self, response: AnyCbor) -> Result<(), Error> { + let msg = Message::Result(response); self.send_message(&msg).await?; self.0 = State::Acquired; @@ -162,7 +148,7 @@ where } } - pub async fn recv_while_acquired(&mut self) -> Result, Error> { + pub async fn recv_while_acquired(&mut self) -> Result { match self.recv_message().await? { Message::ReAcquire(point) => { self.0 = State::Acquiring; @@ -181,4 +167,4 @@ where } } -pub type Server = GenericServer; +pub type Server = GenericServer; diff --git a/pallas-network/tests/protocols.rs b/pallas-network/tests/protocols.rs index ad5cb341..d37e2b8a 100644 --- a/pallas-network/tests/protocols.rs +++ b/pallas-network/tests/protocols.rs @@ -2,19 +2,21 @@ use std::fs; use std::net::{Ipv4Addr, SocketAddrV4}; use std::time::Duration; +use pallas_codec::utils::AnyCbor; use pallas_network::facades::{NodeClient, PeerClient, PeerServer}; use pallas_network::miniprotocols::blockfetch::BlockRequest; +use pallas_network::miniprotocols::chainsync::{ClientRequest, HeaderContent, Tip}; use pallas_network::miniprotocols::handshake::n2c; use pallas_network::miniprotocols::handshake::n2n::VersionData; -use pallas_network::miniprotocols::localstate::queries::{GenericResponse, Request}; use pallas_network::miniprotocols::localstate::{ClientAcquireRequest, ClientQueryRequest}; -use pallas_network::miniprotocols::chainsync::{ClientRequest, HeaderContent, Tip}; use pallas_network::miniprotocols::{ blockfetch, chainsync::{self, NextResponse}, Point, }; -use pallas_network::miniprotocols::{handshake, localstate}; +use pallas_network::miniprotocols::{ + handshake, localstate, PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY, +}; use pallas_network::multiplexer::{Bearer, Plexer}; use std::path::Path; use tokio::net::{TcpListener, UnixListener}; @@ -255,150 +257,6 @@ pub async fn blockfetch_server_and_client_happy_path() { _ = tokio::join!(client, server); } -#[tokio::test] -#[ignore] -pub async fn local_state_query_server_and_client_happy_path() { - let server = tokio::spawn({ - async move { - // server setup - let socket_path = Path::new("node.socket"); - - if socket_path.exists() { - fs::remove_file(&socket_path).unwrap(); - } - - let unix_listener = UnixListener::bind(socket_path).unwrap(); - - let (bearer, _) = Bearer::accept_unix(&unix_listener).await.unwrap(); - - let mut server_plexer = Plexer::new(bearer); - - let mut server_hs: handshake::Server = - handshake::Server::new(server_plexer.subscribe_server(0)); - - let mut server_sq: localstate::Server = - localstate::Server::new(server_plexer.subscribe_server(7)); - - tokio::spawn(async move { server_plexer.run().await }); - - server_hs.receive_proposed_versions().await.unwrap(); - server_hs - .accept_version(10, n2c::VersionData::new(0, Some(false))) - .await - .unwrap(); - - // server receives range from client, sends blocks - - let ClientAcquireRequest(maybe_point) = - server_sq.recv_while_idle().await.unwrap().unwrap(); - - assert_eq!(maybe_point, Some(Point::Origin)); - assert_eq!(*server_sq.state(), localstate::State::Acquiring); - - // server_bf.send_block_range(bodies).await.unwrap(); - - server_sq.send_acquired().await.unwrap(); - - assert_eq!(*server_sq.state(), localstate::State::Acquired); - - // server receives query from client - - let query = match server_sq.recv_while_acquired().await.unwrap() { - ClientQueryRequest::Query(q) => q, - x => panic!("unexpected message from client: {x:?}"), - }; - - assert_eq!( - query, - Request::BlockQuery(localstate::queries::BlockQuery::GetStakePools) - ); - - assert_eq!(*server_sq.state(), localstate::State::Querying); - - server_sq - .send_result(GenericResponse::new(hex::decode("82011A008BD423").unwrap())) - .await - .unwrap(); - - assert_eq!(*server_sq.state(), localstate::State::Acquired); - - // server receives reaquire from the client - - let maybe_point = match server_sq.recv_while_acquired().await.unwrap() { - ClientQueryRequest::ReAcquire(p) => p, - x => panic!("unexpected message from client: {x:?}"), - }; - - assert_eq!(maybe_point, Some(Point::Specific(1337, vec![1, 2, 3]))); - assert_eq!(*server_sq.state(), localstate::State::Acquiring); - - server_sq.send_acquired().await.unwrap(); - - // server receives release from the client - - match server_sq.recv_while_acquired().await.unwrap() { - ClientQueryRequest::Release => (), - x => panic!("unexpected message from client: {x:?}"), - }; - - assert!(server_sq.recv_while_idle().await.unwrap().is_none()); - - assert_eq!(*server_sq.state(), localstate::State::Done); - } - }); - - let client = tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(1)).await; - - // client setup - - let socket_path = "node.socket"; - - let mut client_to_server_conn = NodeClient::connect(socket_path, 0).await.unwrap(); - - let client_sq = client_to_server_conn.statequery(); - - // client sends acquire - - client_sq.send_acquire(Some(Point::Origin)).await.unwrap(); - - client_sq.recv_while_acquiring().await.unwrap(); - - assert_eq!(*client_sq.state(), localstate::State::Acquired); - - // client sends a BlockQuery - - client_sq - .send_query(Request::BlockQuery( - localstate::queries::BlockQuery::GetStakePools, - )) - .await - .unwrap(); - - let resp = client_sq.recv_while_querying().await.unwrap(); - - assert_eq!( - resp, - GenericResponse::new(hex::decode("82011A008BD423").unwrap()) - ); - - // client sends a ReAquire - - client_sq - .send_reacquire(Some(Point::Specific(1337, vec![1, 2, 3]))) - .await - .unwrap(); - - client_sq.recv_while_acquiring().await.unwrap(); - - client_sq.send_release().await.unwrap(); - - client_sq.send_done().await.unwrap(); - }); - - _ = tokio::join!(client, server); -} - #[tokio::test] #[ignore] pub async fn chainsync_server_and_client_happy_path_n2n() { @@ -596,4 +454,142 @@ pub async fn chainsync_server_and_client_happy_path_n2n() { }); _ = tokio::join!(client, server); -} \ No newline at end of file +} + +#[tokio::test] +#[ignore] +pub async fn local_state_query_server_and_client_happy_path() { + let server = tokio::spawn({ + async move { + // server setup + let socket_path = Path::new("node.socket"); + + if socket_path.exists() { + fs::remove_file(&socket_path).unwrap(); + } + + let unix_listener = UnixListener::bind(socket_path).unwrap(); + + let mut server = pallas_network::facades::NodeServer::accept(&unix_listener, 0) + .await + .unwrap(); + + // wait for acquire request from client + + let maybe_acquire = server.statequery().recv_while_idle().await.unwrap(); + + assert!(maybe_acquire.is_some()); + assert_eq!(*server.statequery().state(), localstate::State::Acquiring); + + server.statequery().send_acquired().await.unwrap(); + + assert_eq!(*server.statequery().state(), localstate::State::Acquired); + + // server receives query from client + + let query: localstate::queries_v16::Request = + match server.statequery().recv_while_acquired().await.unwrap() { + ClientQueryRequest::Query(q) => q.into_decode().unwrap(), + x => panic!("unexpected message from client: {x:?}"), + }; + + assert_eq!(query, localstate::queries_v16::Request::GetSystemStart); + assert_eq!(*server.statequery().state(), localstate::State::Querying); + + let result = AnyCbor::from_encode(localstate::queries_v16::SystemStart { + year: 2020, + day_of_year: 1, + picoseconds_of_day: 999999999, + }); + + server.statequery().send_result(result).await.unwrap(); + + assert_eq!(*server.statequery().state(), localstate::State::Acquired); + + // server receives re-acquire from the client + + let maybe_point = match server.statequery().recv_while_acquired().await.unwrap() { + ClientQueryRequest::ReAcquire(p) => p, + x => panic!("unexpected message from client: {x:?}"), + }; + + assert_eq!(maybe_point, Some(Point::Specific(1337, vec![1, 2, 3]))); + assert_eq!(*server.statequery().state(), localstate::State::Acquiring); + + server.statequery().send_acquired().await.unwrap(); + + // server receives release from the client + + match server.statequery().recv_while_acquired().await.unwrap() { + ClientQueryRequest::Release => (), + x => panic!("unexpected message from client: {x:?}"), + }; + + let next_request = server.statequery().recv_while_idle().await.unwrap(); + + assert!(next_request.is_none()); + assert_eq!(*server.statequery().state(), localstate::State::Done); + } + }); + + let client = tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(1)).await; + + // client setup + + let socket_path = "node.socket"; + + let mut client = NodeClient::connect(socket_path, 0).await.unwrap(); + + // client sends acquire + + client + .statequery() + .send_acquire(Some(Point::Origin)) + .await + .unwrap(); + + client.statequery().recv_while_acquiring().await.unwrap(); + + assert_eq!(*client.statequery().state(), localstate::State::Acquired); + + // client sends a BlockQuery + + let request = AnyCbor::from_encode(localstate::queries_v16::Request::GetSystemStart); + + client.statequery().send_query(request).await.unwrap(); + + let result: localstate::queries_v16::SystemStart = client + .statequery() + .recv_while_querying() + .await + .unwrap() + .into_decode() + .unwrap(); + + assert_eq!( + result, + localstate::queries_v16::SystemStart { + year: 2020, + day_of_year: 1, + picoseconds_of_day: 999999999, + } + ); + + // client sends a ReAquire + + client + .statequery() + .send_reacquire(Some(Point::Specific(1337, vec![1, 2, 3]))) + .await + .unwrap(); + + client.statequery().recv_while_acquiring().await.unwrap(); + + client.statequery().send_release().await.unwrap(); + + client.statequery().send_done().await.unwrap(); + }); + + _ = tokio::join!(client, server); +}