diff --git a/.changelog/unreleased/features/125-grpc-support-via-ibc-trait.md b/.changelog/unreleased/features/125-grpc-support-via-ibc-trait.md new file mode 100644 index 00000000..3b94b272 --- /dev/null +++ b/.changelog/unreleased/features/125-grpc-support-via-ibc-trait.md @@ -0,0 +1,2 @@ +- Support gRPC services via IBC-rs trait implementation + ([\#125](https://github.com/informalsystems/basecoin-rs/pull/125)) \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 60ab2175..dc976320 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1104,7 +1104,7 @@ dependencies = [ [[package]] name = "ibc" version = "0.44.1" -source = "git+https://github.com/cosmos/ibc-rs.git?rev=6ffe8ce#6ffe8ce26f40b029e176d8fe92dde13467fb3735" +source = "git+https://github.com/cosmos/ibc-rs.git?rev=c17ff9d#c17ff9dba665dbc6a90427a66602a5c4dc83ab89" dependencies = [ "bytes", "derive_more", @@ -1125,6 +1125,7 @@ dependencies = [ "tendermint-light-client-verifier", "tendermint-proto 0.33.0", "time", + "tonic", "tracing", "uint", ] @@ -1132,7 +1133,7 @@ dependencies = [ [[package]] name = "ibc-derive" version = "0.3.0" -source = "git+https://github.com/cosmos/ibc-rs.git?rev=6ffe8ce#6ffe8ce26f40b029e176d8fe92dde13467fb3735" +source = "git+https://github.com/cosmos/ibc-rs.git?rev=c17ff9d#c17ff9dba665dbc6a90427a66602a5c4dc83ab89" dependencies = [ "darling", "proc-macro2", @@ -1142,9 +1143,9 @@ dependencies = [ [[package]] name = "ibc-proto" -version = "0.34.0" +version = "0.34.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6e8625e1aa28e4da4a33505c6469747a9201f14ccd9012e2481313dfbbf9e2f" +checksum = "46349dc9a3baa6c72c4c9e7e137bb3f679ccef2e25d84261624bbebc982be184" dependencies = [ "base64 0.21.2", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 383a3dbd..ac5258e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,8 +12,8 @@ base64 = { version = "0.21", default-features = false, features = ["alloc"] } displaydoc = { version = "0.2", default-features = false } derive_more = { version = "0.99.17", default-features = false, features = ["from", "into", "display"] } ed25519 = { version = "2.1.0", default-features = false } -ibc = { git = "https://github.com/cosmos/ibc-rs.git", rev= "6ffe8ce" } -ibc-proto = { version = "0.34.0", default-features = false } +ibc = { git = "https://github.com/cosmos/ibc-rs.git", rev= "c17ff9d", features = ["grpc"] } +ibc-proto = { version = "0.34.1", default-features = false } ics23 = { version = "0.10.1", default-features = false } prost = { version = "0.11.6", default-features = false } serde = "1.0" diff --git a/ci/Makefile b/ci/Makefile index 5a50f944..72d3efab 100644 --- a/ci/Makefile +++ b/ci/Makefile @@ -18,7 +18,7 @@ upgrade-client: create-channel bash ~/tests/upgrade-client.sh @echo "Client upgraded" -grpc-service: create-channel +grpc-service: upgrade-client @echo "Testing gRPC services..." bash ~/tests/grpc-service.sh @echo "gRPC services working" diff --git a/ci/tests/grpc-service.sh b/ci/tests/grpc-service.sh index 03eab18e..d0d1e006 100644 --- a/ci/tests/grpc-service.sh +++ b/ci/tests/grpc-service.sh @@ -1,50 +1,190 @@ #!/bin/bash set -euo pipefail -echo "Testing grpc service using grpcurl..." +echo "Testing gRPC service using grpcurl..." # list services via gRPC reflection +echo "List Client services via gRPC reflection." grpcurl -plaintext localhost:9093 list ibc.core.client.v1.Query +echo "List Connection services via gRPC reflection." grpcurl -plaintext localhost:9093 list ibc.core.connection.v1.Query +echo "List Channel services via gRPC reflection." grpcurl -plaintext localhost:9093 list ibc.core.channel.v1.Query # client services +echo "ibc.core.client.v1.Query/ClientState" +grpcurl -plaintext -d @ localhost:9093 ibc.core.client.v1.Query/ClientState <(app: &BaseCoinApp) -> ResponseCommi info!( "Committed height {} with hash({})", state.current_height() - 1, - data.iter().map(|b| format!("{b:02X}")).collect::() + data.iter().fold(String::new(), |mut acc, b| { + // write!-ing into a String can never fail + let _ = write!(acc, "{b:02X}"); + acc + }) ); ResponseCommit { data: data.into(), diff --git a/crates/app/src/modules/ibc/impls.rs b/crates/app/src/modules/ibc/impls.rs index 282afb3a..ab03817f 100644 --- a/crates/app/src/modules/ibc/impls.rs +++ b/crates/app/src/modules/ibc/impls.rs @@ -1,84 +1,81 @@ -use crate::modules::bank::impls::BankBalanceKeeper; -use crate::modules::context::Identifiable; -use crate::modules::context::Module; -use crate::modules::ibc::router::IbcRouter; -use crate::modules::ibc::service::{IbcChannelService, IbcClientService, IbcConnectionService}; -use crate::modules::ibc::transfer::IbcTransferModule; -use crate::types::error::Error as AppError; -use crate::types::QueryResult; - -use basecoin_store::context::ProvableStore; -use basecoin_store::context::Store; -use basecoin_store::impls::SharedStore; -use basecoin_store::types::BinStore; -use basecoin_store::types::Height; -use basecoin_store::types::JsonStore; -use basecoin_store::types::Path; -use basecoin_store::types::ProtobufStore; -use basecoin_store::types::TypedSet; -use basecoin_store::types::TypedStore; - -use ibc::applications::transfer::msgs::transfer::MsgTransfer; -use ibc::applications::transfer::send_transfer; -use ibc::clients::ics07_tendermint::client_state::ClientState as TmClientState; -use ibc::clients::ics07_tendermint::consensus_state::ConsensusState as TmConsensusState; -use ibc::core::dispatch; -use ibc::core::events::IbcEvent; -use ibc::core::ics02_client::consensus_state::ConsensusState; -use ibc::core::ics02_client::error::ClientError; -use ibc::core::ics03_connection::connection::ConnectionEnd; -use ibc::core::ics03_connection::error::ConnectionError; -use ibc::core::ics03_connection::version::Version as ConnectionVersion; -use ibc::core::ics04_channel::channel::ChannelEnd; -use ibc::core::ics04_channel::commitment::AcknowledgementCommitment; -use ibc::core::ics04_channel::commitment::PacketCommitment; -use ibc::core::ics04_channel::error::ChannelError; -use ibc::core::ics04_channel::error::PacketError; -use ibc::core::ics04_channel::packet::Receipt; -use ibc::core::ics04_channel::packet::Sequence; -use ibc::core::ics23_commitment::commitment::CommitmentPrefix; -use ibc::core::ics23_commitment::commitment::CommitmentRoot; -use ibc::core::ics24_host::identifier::ClientId; -use ibc::core::ics24_host::identifier::ConnectionId; -use ibc::core::ics24_host::path::AckPath; -use ibc::core::ics24_host::path::ChannelEndPath; -use ibc::core::ics24_host::path::ClientConnectionPath; -use ibc::core::ics24_host::path::ClientConsensusStatePath; -use ibc::core::ics24_host::path::ClientStatePath; -use ibc::core::ics24_host::path::CommitmentPath; -use ibc::core::ics24_host::path::ConnectionPath; -use ibc::core::ics24_host::path::Path as IbcPath; -use ibc::core::ics24_host::path::ReceiptPath; -use ibc::core::ics24_host::path::SeqAckPath; -use ibc::core::ics24_host::path::SeqRecvPath; -use ibc::core::ics24_host::path::SeqSendPath; -use ibc::core::timestamp::Timestamp; -use ibc::core::ContextError; -use ibc::core::ExecutionContext; -use ibc::core::MsgEnvelope; -use ibc::core::ValidationContext; -use ibc::hosts::tendermint::IBC_QUERY_PATH; -use ibc::Any; -use ibc::Height as IbcHeight; -use ibc::Signer; - -use ibc_proto::ibc::core::channel::v1::query_server::QueryServer as ChannelQueryServer; -use ibc_proto::ibc::core::channel::v1::Channel as RawChannelEnd; -use ibc_proto::ibc::core::client::v1::query_server::QueryServer as ClientQueryServer; -use ibc_proto::ibc::core::connection::v1::query_server::QueryServer as ConnectionQueryServer; -use ibc_proto::ibc::core::connection::v1::ConnectionEnd as RawConnectionEnd; - +use crate::CHAIN_REVISION_NUMBER; +use crate::{ + modules::{ + bank::impls::BankBalanceKeeper, + context::{Identifiable, Module}, + ibc::{router::IbcRouter, transfer::IbcTransferModule}, + upgrade::Upgrade, + }, + types::{error::Error as AppError, QueryResult}, +}; +use basecoin_store::{ + context::{ProvableStore, Store}, + impls::SharedStore, + types::{BinStore, Height, JsonStore, Path, ProtobufStore, TypedSet, TypedStore}, +}; use cosmrs::AccountId; use derive_more::{From, TryInto}; +use ibc::{ + applications::transfer::{msgs::transfer::MsgTransfer, send_transfer}, + clients::ics07_tendermint::{ + client_state::ClientState as TmClientState, + consensus_state::ConsensusState as TmConsensusState, + }, + core::{ + dispatch, + events::IbcEvent, + ics02_client::{consensus_state::ConsensusState, error::ClientError}, + ics03_connection::{ + connection::{ConnectionEnd, IdentifiedConnectionEnd}, + error::ConnectionError, + version::Version as ConnectionVersion, + }, + ics04_channel::{ + channel::{ChannelEnd, IdentifiedChannelEnd}, + commitment::{AcknowledgementCommitment, PacketCommitment}, + error::{ChannelError, PacketError}, + packet::{Receipt, Sequence}, + }, + ics23_commitment::commitment::{CommitmentPrefix, CommitmentRoot}, + ics24_host::{ + identifier::{ClientId, ConnectionId}, + path::{ + AckPath, ChannelEndPath, ClientConnectionPath, ClientConsensusStatePath, + ClientStatePath, CommitmentPath, ConnectionPath, Path as IbcPath, ReceiptPath, + SeqAckPath, SeqRecvPath, SeqSendPath, + }, + }, + timestamp::Timestamp, + ContextError, ExecutionContext, MsgEnvelope, ValidationContext, + }, + hosts::tendermint::IBC_QUERY_PATH, + services::core::{ + channel::ChannelQueryService, + client::ClientQueryService, + connection::ConnectionQueryService, + context::{ProvableContext, QueryContext}, + }, + Height as IbcHeight, Signer, +}; +use ibc_proto::{ + google::protobuf::Any, + ibc::core::{ + channel::v1::{query_server::QueryServer as ChannelQueryServer, Channel as RawChannelEnd}, + client::v1::query_server::QueryServer as ClientQueryServer, + connection::v1::{ + query_server::QueryServer as ConnectionQueryServer, ConnectionEnd as RawConnectionEnd, + }, + }, +}; use prost::Message; -use std::collections::HashMap; -use std::convert::TryFrom; -use std::convert::TryInto; -use std::fmt::Debug; -use std::ops::Deref; -use std::sync::Arc; -use std::time::Duration; - +use std::{ + collections::HashMap, + convert::{TryFrom, TryInto}, + fmt::Debug, + ops::Deref, + sync::{Arc, RwLock}, + time::Duration, +}; use tendermint::merkle::proof::ProofOp; use tendermint::{abci::Event, block::Header}; use tracing::debug; @@ -91,6 +88,14 @@ pub enum AnyConsensusState { Tendermint(TmConsensusState), } +impl From for Any { + fn from(value: AnyConsensusState) -> Self { + match value { + AnyConsensusState::Tendermint(tm_consensus_state) => tm_consensus_state.into(), + } + } +} + #[derive(Clone)] pub struct Ibc where @@ -113,7 +118,6 @@ where router, } } - pub fn ctx(&self) -> IbcContext { self.ctx.clone() } @@ -147,16 +151,24 @@ where } } - pub fn client_service(&self) -> ClientQueryServer> { - ClientQueryServer::new(IbcClientService::new(self.ctx.store.clone())) + pub fn client_service( + &self, + upgrade_context: &Upgrade, + ) -> ClientQueryServer, Upgrade>> { + ClientQueryServer::new(ClientQueryService::new( + self.ctx.clone(), + upgrade_context.clone(), + )) } - pub fn connection_service(&self) -> ConnectionQueryServer> { - ConnectionQueryServer::new(IbcConnectionService::new(self.ctx.store.clone())) + pub fn connection_service( + &self, + ) -> ConnectionQueryServer>> { + ConnectionQueryServer::new(ConnectionQueryService::new(self.ctx.clone())) } - pub fn channel_service(&self) -> ChannelQueryServer> { - ChannelQueryServer::new(IbcChannelService::new(self.ctx.store.clone())) + pub fn channel_service(&self) -> ChannelQueryServer>> { + ChannelQueryServer::new(ChannelQueryService::new(self.ctx.clone())) } } @@ -251,6 +263,8 @@ where self.ctx .consensus_states + .write() + .expect("lock is poisoined") .insert(header.height.value(), consensus_state); vec![] @@ -288,7 +302,7 @@ where /// Tracks the processed height for client updates client_processed_heights: HashMap<(ClientId, IbcHeight), IbcHeight>, /// Map of host consensus states - consensus_states: HashMap, + consensus_states: Arc>>, /// A typed-store for AnyClientState pub(crate) client_state_store: ProtobufStore, ClientStatePath, TmClientState, Any>, @@ -370,26 +384,16 @@ where type AnyClientState = TmClientState; fn client_state(&self, client_id: &ClientId) -> Result { - let client_state = self + Ok(self .client_state_store .get(Height::Pending, &ClientStatePath(client_id.clone())) .ok_or(ClientError::ClientStateNotFound { client_id: client_id.clone(), - }) - .map_err(ContextError::from)?; - - Ok(client_state) + })?) } fn decode_client_state(&self, client_state: Any) -> Result { - if let Ok(client_state) = TmClientState::try_from(client_state.clone()) { - Ok(client_state) - } else { - Err(ClientError::UnknownClientStateType { - client_state_type: client_state.type_url, - }) - .map_err(ContextError::from) - } + Ok(TmClientState::try_from(client_state.clone())?) } fn consensus_state( @@ -410,7 +414,10 @@ where } fn host_height(&self) -> Result { - IbcHeight::new(0, self.store.current_height()).map_err(ContextError::from) + Ok(IbcHeight::new( + CHAIN_REVISION_NUMBER, + self.store.current_height(), + )?) } fn host_timestamp(&self) -> Result { @@ -423,8 +430,8 @@ where &self, height: &IbcHeight, ) -> Result { - let consensus_state = self - .consensus_states + let consensus_states_binding = self.consensus_states.read().expect("lock is poisoned"); + let consensus_state = consensus_states_binding .get(&height.revision_height()) .ok_or(ClientError::MissingLocalConsensusState { height: *height })?; @@ -436,12 +443,12 @@ where } fn connection_end(&self, conn_id: &ConnectionId) -> Result { - self.connection_end_store + Ok(self + .connection_end_store .get(Height::Pending, &ConnectionPath::new(conn_id)) .ok_or(ConnectionError::ConnectionNotFound { connection_id: conn_id.clone(), - }) - .map_err(ContextError::from) + })?) } fn validate_self_client(&self, _counterparty_client_state: Any) -> Result<(), ContextError> { @@ -619,6 +626,367 @@ where } } +/// Trait to provide proofs in gRPC service blanket implementations. +impl ProvableContext for IbcContext +where + S: ProvableStore + Debug, +{ + /// Returns the proof for the given [`IbcHeight`] and [`Path`] + fn get_proof(&self, height: IbcHeight, path: &IbcPath) -> Option> { + self.store + .get_proof(height.revision_height().into(), &path.clone().into()) + .map(|p| p.encode_to_vec()) + } +} + +/// Trait to complete the gRPC service blanket implementations. +impl QueryContext for IbcContext +where + S: ProvableStore + Debug, +{ + /// Returns the list of all client states. + fn client_states(&self) -> Result, ContextError> { + let path = "clients" + .to_owned() + .try_into() + .map_err(|_| ClientError::Other { + description: "Invalid client state path: clients".into(), + })?; + + self.client_state_store + .get_keys(&path) + .into_iter() + .filter_map(|path| { + if let Ok(IbcPath::ClientState(client_path)) = path.try_into() { + Some(client_path) + } else { + None + } + }) + .map(|client_state_path| { + let client_state = self + .client_state_store + .get(Height::Pending, &client_state_path) + .ok_or_else(|| ClientError::ClientStateNotFound { + client_id: client_state_path.0.clone(), + })?; + Ok((client_state_path.0, client_state)) + }) + .collect() + } + + /// Returns the list of all consensus states of the given client. + fn consensus_states( + &self, + client_id: &ClientId, + ) -> Result, ContextError> { + let path = format!("clients/{}/consensusStates", client_id) + .try_into() + .map_err(|_| ClientError::Other { + description: "Invalid consensus state path".into(), + })?; + + self.consensus_state_store + .get_keys(&path) + .into_iter() + .flat_map(|path| { + if let Ok(IbcPath::ClientConsensusState(consensus_path)) = path.try_into() { + Some(consensus_path) + } else { + None + } + }) + .map(|consensus_path| { + let height = IbcHeight::new(consensus_path.epoch, consensus_path.height)?; + let client_state = self + .consensus_state_store + .get(Height::Pending, &consensus_path) + .ok_or({ + ClientError::ConsensusStateNotFound { + client_id: consensus_path.client_id, + height, + } + })?; + Ok((height, client_state.into())) + }) + .collect() + } + + /// Returns the list of heights at which the consensus state of the given client was updated. + fn consensus_state_heights( + &self, + client_id: &ClientId, + ) -> Result, ContextError> { + let path = format!("clients/{}/consensusStates", client_id) + .try_into() + .map_err(|_| ClientError::Other { + description: "Invalid consensus state path".into(), + })?; + + self.consensus_state_store + .get_keys(&path) + .into_iter() + .flat_map(|path| { + if let Ok(IbcPath::ClientConsensusState(consensus_path)) = path.try_into() { + Some(consensus_path) + } else { + None + } + }) + .map(|consensus_path| Ok(IbcHeight::new(consensus_path.epoch, consensus_path.height)?)) + .collect::, _>>() + } + + /// Connections queries all the IBC connections of a chain. + fn connection_ends(&self) -> Result, ContextError> { + let path = "connections" + .to_owned() + .try_into() + .map_err(|_| ConnectionError::Other { + description: "Invalid connection path: connections".into(), + })?; + + self.connection_end_store + .get_keys(&path) + .into_iter() + .flat_map(|path| { + if let Ok(IbcPath::Connection(connection_path)) = path.try_into() { + Some(connection_path) + } else { + None + } + }) + .map(|connection_path| { + let connection_end = self + .connection_end_store + .get(Height::Pending, &connection_path) + .ok_or_else(|| ConnectionError::ConnectionNotFound { + connection_id: connection_path.0.clone(), + })?; + Ok(IdentifiedConnectionEnd { + connection_id: connection_path.0, + connection_end, + }) + }) + .collect() + } + + /// ClientConnections queries all the connection paths associated with a client. + fn client_connection_ends( + &self, + client_id: &ClientId, + ) -> Result, ContextError> { + let client_connection_path = ClientConnectionPath::new(client_id); + + Ok(self + .connection_ids_store + .get(Height::Pending, &client_connection_path) + .unwrap_or_default()) + } + + /// Channels queries all the IBC channels of a chain. + fn channel_ends(&self) -> Result, ContextError> { + let path = "channelEnds" + .to_owned() + .try_into() + .map_err(|_| ChannelError::Other { + description: "Invalid channel path: channels".into(), + })?; + + self.channel_end_store + .get_keys(&path) + .into_iter() + .flat_map(|path| { + if let Ok(IbcPath::ChannelEnd(channel_path)) = path.try_into() { + Some(channel_path) + } else { + None + } + }) + .map(|channel_path| { + let channel_end = self + .channel_end_store + .get(Height::Pending, &channel_path) + .ok_or_else(|| ChannelError::ChannelNotFound { + port_id: channel_path.0.clone(), + channel_id: channel_path.1.clone(), + })?; + Ok(IdentifiedChannelEnd { + port_id: channel_path.0, + channel_id: channel_path.1, + channel_end, + }) + }) + .collect() + } + + /// PacketCommitments returns all the packet commitments associated with a channel. + fn packet_commitments( + &self, + channel_end_path: &ChannelEndPath, + ) -> Result, ContextError> { + let path = format!( + "commitments/ports/{}/channels/{}/sequences", + channel_end_path.0, channel_end_path.1 + ) + .try_into() + .map_err(|_| PacketError::Other { + description: "Invalid commitment path".into(), + })?; + + Ok(self + .packet_commitment_store + .get_keys(&path) + .into_iter() + .flat_map(|path| { + if let Ok(IbcPath::Commitment(commitment_path)) = path.try_into() { + Some(commitment_path) + } else { + None + } + }) + .filter(|commitment_path| { + if let Some(data) = self + .packet_commitment_store + .get(Height::Pending, commitment_path) + { + !data.into_vec().is_empty() + } else { + false + } + }) + .collect()) + } + + /// PacketAcknowledgements returns all the packet acknowledgements associated with a channel. + /// Returns all the packet acknowledgements if sequences is empty. + fn packet_acknowledgements( + &self, + channel_end_path: &ChannelEndPath, + sequences: impl ExactSizeIterator, + ) -> Result, ContextError> { + let non_empty = |ack_path: &AckPath| -> bool { + if let Some(data) = self.packet_ack_store.get(Height::Pending, ack_path) { + // ack is removed by setting its value to an empty vec + // so ignoring removed acks + !data.into_vec().is_empty() + } else { + false + } + }; + + Ok(if sequences.len() > 0 { + // if sequences is empty, return all the acks + let ack_path_prefix = format!( + "acks/ports/{}/channels/{}/sequences", + channel_end_path.0, channel_end_path.1 + ) + .try_into() + .map_err(|_| PacketError::Other { + description: "Invalid ack path".into(), + })?; + + self.packet_ack_store + .get_keys(&ack_path_prefix) + .into_iter() + .flat_map(|path| { + if let Ok(IbcPath::Ack(ack_path)) = path.try_into() { + Some(ack_path) + } else { + None + } + }) + .filter(non_empty) + .collect() + } else { + sequences + .into_iter() + .map(|seq| AckPath::new(&channel_end_path.0, &channel_end_path.1, seq)) + .filter(non_empty) + .collect() + }) + } + + /// UnreceivedPackets returns all the unreceived IBC packets associated with + /// a channel and sequences. + fn unreceived_packets( + &self, + channel_end_path: &ChannelEndPath, + sequences: impl ExactSizeIterator, + ) -> Result, ContextError> { + // QUESTION. Currently only works for unordered channels; ordered channels + // don't use receipts. However, ibc-go does it this way. Investigate if + // this query only ever makes sense on unordered channels. + + Ok(sequences + .into_iter() + .map(|seq| ReceiptPath::new(&channel_end_path.0, &channel_end_path.1, seq)) + .filter(|receipt_path| { + self.packet_receipt_store + .get(Height::Pending, receipt_path) + .is_none() + }) + .map(|receipts_path| receipts_path.sequence) + .collect()) + } + + /// UnreceivedAcks returns all the unreceived IBC acknowledgements associated with a channel and sequences. + /// Returns all the unreceived acks if sequences is empty. + fn unreceived_acks( + &self, + channel_end_path: &ChannelEndPath, + sequences: impl ExactSizeIterator, + ) -> Result, ContextError> { + let non_empty = |commitment_path: &CommitmentPath| -> bool { + // To check if we received an acknowledgement, we check if we still have the sent packet + // commitment (upon receiving an ack, the sent packet commitment is deleted). + if let Some(data) = self + .packet_commitment_store + .get(Height::Pending, commitment_path) + { + // commitment is removed by setting its value to an empty vec + // so ignoring removed commitments + !data.into_vec().is_empty() + } else { + false + } + }; + + Ok(if sequences.len() > 0 { + // if sequences is empty, return all the acks + let commitment_path_prefix = format!( + "commitments/ports/{}/channels/{}/sequences", + channel_end_path.0, channel_end_path.1 + ) + .try_into() + .map_err(|_| PacketError::Other { + description: "Invalid commitment path".into(), + })?; + + self.packet_commitment_store + .get_keys(&commitment_path_prefix) + .into_iter() + .flat_map(|path| { + if let Ok(IbcPath::Commitment(commitment_path)) = path.try_into() { + Some(commitment_path) + } else { + None + } + }) + .filter(non_empty) + .map(|commitment_path| commitment_path.sequence) + .collect() + } else { + sequences + .into_iter() + .map(|seq| CommitmentPath::new(&channel_end_path.0, &channel_end_path.1, seq)) + .filter(non_empty) + .map(|commitment_path| commitment_path.sequence) + .collect() + }) + } +} + impl ExecutionContext for IbcContext where S: Store + Debug, @@ -626,8 +994,9 @@ where /// Called upon client creation. /// Increases the counter which keeps track of how many clients have been created. /// Should never fail. - fn increase_client_counter(&mut self) { + fn increase_client_counter(&mut self) -> Result<(), ContextError> { self.client_counter += 1; + Ok(()) } /// Called upon successful client update. @@ -694,8 +1063,9 @@ where /// Called upon connection identifier creation (Init or Try process). /// Increases the counter which keeps track of how many connections have been created. /// Should never fail. - fn increase_connection_counter(&mut self) { + fn increase_connection_counter(&mut self) -> Result<(), ContextError> { self.conn_counter += 1; + Ok(()) } fn store_packet_commitment( @@ -792,16 +1162,19 @@ where Ok(()) } - fn increase_channel_counter(&mut self) { + fn increase_channel_counter(&mut self) -> Result<(), ContextError> { self.channel_counter += 1; + Ok(()) } - fn emit_ibc_event(&mut self, event: IbcEvent) { + fn emit_ibc_event(&mut self, event: IbcEvent) -> Result<(), ContextError> { self.events.push(event); + Ok(()) } - fn log_message(&mut self, message: String) { + fn log_message(&mut self, message: String) -> Result<(), ContextError> { self.logs.push(message); + Ok(()) } fn get_client_execution_context(&mut self) -> &mut Self::E { diff --git a/crates/app/src/modules/ibc/mod.rs b/crates/app/src/modules/ibc/mod.rs index 82a7e91b..193a939a 100644 --- a/crates/app/src/modules/ibc/mod.rs +++ b/crates/app/src/modules/ibc/mod.rs @@ -2,7 +2,6 @@ pub mod client_contexts; pub mod error; pub mod impls; mod router; -pub mod service; pub mod transfer; pub use impls::AnyConsensusState; diff --git a/crates/app/src/modules/ibc/service.rs b/crates/app/src/modules/ibc/service.rs deleted file mode 100644 index 6eeb3409..00000000 --- a/crates/app/src/modules/ibc/service.rs +++ /dev/null @@ -1,689 +0,0 @@ -use crate::CHAIN_REVISION_NUMBER; - -use basecoin_store::context::{ProvableStore, Store}; -use basecoin_store::impls::SharedStore; -use basecoin_store::types::BinStore; -use basecoin_store::types::Height; -use basecoin_store::types::JsonStore; -use basecoin_store::types::Path; -use basecoin_store::types::ProtobufStore; -use basecoin_store::types::TypedSet; -use basecoin_store::types::TypedStore; - -use ibc::clients::ics07_tendermint::client_state::ClientState as TmClientState; -use ibc::clients::ics07_tendermint::consensus_state::ConsensusState as TmConsensusState; -use ibc::core::ics03_connection::connection::ConnectionEnd; -use ibc::core::ics03_connection::connection::IdentifiedConnectionEnd; -use ibc::core::ics04_channel::channel::ChannelEnd; -use ibc::core::ics04_channel::channel::IdentifiedChannelEnd; -use ibc::core::ics04_channel::commitment::AcknowledgementCommitment; -use ibc::core::ics04_channel::commitment::PacketCommitment; -use ibc::core::ics04_channel::packet::Sequence; -use ibc::core::ics24_host::identifier::ChannelId; -use ibc::core::ics24_host::identifier::ConnectionId; -use ibc::core::ics24_host::identifier::PortId; -use ibc::core::ics24_host::path::AckPath; -use ibc::core::ics24_host::path::ChannelEndPath; -use ibc::core::ics24_host::path::ClientConnectionPath; -use ibc::core::ics24_host::path::ClientConsensusStatePath; -use ibc::core::ics24_host::path::ClientStatePath; -use ibc::core::ics24_host::path::CommitmentPath; -use ibc::core::ics24_host::path::ConnectionPath; -use ibc::core::ics24_host::path::Path as IbcPath; -use ibc::core::ics24_host::path::ReceiptPath; - -use ibc_proto::{ - google::protobuf::Any, - ibc::core::{ - channel::v1::{ - query_server::Query as ChannelQuery, Channel as RawChannelEnd, - IdentifiedChannel as RawIdentifiedChannel, PacketState, QueryChannelClientStateRequest, - QueryChannelClientStateResponse, QueryChannelConsensusStateRequest, - QueryChannelConsensusStateResponse, QueryChannelRequest, QueryChannelResponse, - QueryChannelsRequest, QueryChannelsResponse, QueryConnectionChannelsRequest, - QueryConnectionChannelsResponse, QueryNextSequenceReceiveRequest, - QueryNextSequenceReceiveResponse, QueryNextSequenceSendRequest, - QueryNextSequenceSendResponse, QueryPacketAcknowledgementRequest, - QueryPacketAcknowledgementResponse, QueryPacketAcknowledgementsRequest, - QueryPacketAcknowledgementsResponse, QueryPacketCommitmentRequest, - QueryPacketCommitmentResponse, QueryPacketCommitmentsRequest, - QueryPacketCommitmentsResponse, QueryPacketReceiptRequest, QueryPacketReceiptResponse, - QueryUnreceivedAcksRequest, QueryUnreceivedAcksResponse, QueryUnreceivedPacketsRequest, - QueryUnreceivedPacketsResponse, - }, - client::v1::{ - query_server::Query as ClientQuery, ConsensusStateWithHeight, Height as RawHeight, - IdentifiedClientState, QueryClientParamsRequest, QueryClientParamsResponse, - QueryClientStateRequest, QueryClientStateResponse, QueryClientStatesRequest, - QueryClientStatesResponse, QueryClientStatusRequest, QueryClientStatusResponse, - QueryConsensusStateHeightsRequest, QueryConsensusStateHeightsResponse, - QueryConsensusStateRequest, QueryConsensusStateResponse, QueryConsensusStatesRequest, - QueryConsensusStatesResponse, QueryUpgradedClientStateRequest, - QueryUpgradedClientStateResponse, QueryUpgradedConsensusStateRequest, - QueryUpgradedConsensusStateResponse, - }, - connection::v1::{ - query_server::Query as ConnectionQuery, ConnectionEnd as RawConnectionEnd, - IdentifiedConnection as RawIdentifiedConnection, QueryClientConnectionsRequest, - QueryClientConnectionsResponse, QueryConnectionClientStateRequest, - QueryConnectionClientStateResponse, QueryConnectionConsensusStateRequest, - QueryConnectionConsensusStateResponse, QueryConnectionParamsRequest, - QueryConnectionParamsResponse, QueryConnectionRequest, QueryConnectionResponse, - QueryConnectionsRequest, QueryConnectionsResponse, - }, - }, -}; - -use std::str::FromStr; -use tonic::{Request, Response, Status}; -use tracing::trace; - -pub struct IbcClientService { - client_state_store: ProtobufStore, ClientStatePath, TmClientState, Any>, - consensus_state_store: - ProtobufStore, ClientConsensusStatePath, TmConsensusState, Any>, -} - -impl IbcClientService { - pub fn new(store: SharedStore) -> Self { - Self { - client_state_store: TypedStore::new(store.clone()), - consensus_state_store: TypedStore::new(store), - } - } -} - -#[tonic::async_trait] -impl ClientQuery for IbcClientService { - async fn client_state( - &self, - _request: Request, - ) -> Result, Status> { - unimplemented!() - } - - async fn client_states( - &self, - request: Request, - ) -> Result, Status> { - trace!("Got client states request: {:?}", request); - - let path = "clients" - .to_owned() - .try_into() - .map_err(|e| Status::invalid_argument(format!("{e}")))?; - - let client_state_paths = |path: Path| -> Option { - match path.try_into() { - Ok(IbcPath::ClientState(p)) => Some(p), - _ => None, - } - }; - - let identified_client_state = |path: ClientStatePath| { - let client_state = self.client_state_store.get(Height::Pending, &path).unwrap(); - IdentifiedClientState { - client_id: path.0.to_string(), - client_state: Some(client_state.into()), - } - }; - - let keys = self.client_state_store.get_keys(&path); - let client_states = keys - .into_iter() - .filter_map(client_state_paths) - .map(identified_client_state) - .collect(); - - Ok(Response::new(QueryClientStatesResponse { - client_states, - pagination: None, // TODO(hu55a1n1): add pagination support - })) - } - - async fn consensus_state( - &self, - _request: Request, - ) -> Result, Status> { - unimplemented!() - } - - async fn consensus_states( - &self, - request: Request, - ) -> Result, Status> { - trace!("Got consensus states request: {:?}", request); - - let path = format!("clients/{}/consensusStates", request.get_ref().client_id) - .try_into() - .map_err(|e| Status::invalid_argument(format!("{e}")))?; - - let keys = self.consensus_state_store.get_keys(&path); - let consensus_states = keys - .into_iter() - .map(|path| { - if let Ok(IbcPath::ClientConsensusState(path)) = path.try_into() { - let consensus_state = self.consensus_state_store.get(Height::Pending, &path); - ConsensusStateWithHeight { - height: Some(RawHeight { - revision_number: path.epoch, - revision_height: path.height, - }), - consensus_state: consensus_state.map(|cs| cs.into()), - } - } else { - panic!("unexpected path") // safety - store paths are assumed to be well-formed - } - }) - .collect(); - - Ok(Response::new(QueryConsensusStatesResponse { - consensus_states, - pagination: None, // TODO(hu55a1n1): add pagination support - })) - } - - async fn consensus_state_heights( - &self, - _request: Request, - ) -> Result, Status> { - unimplemented!() - } - - async fn client_status( - &self, - _request: Request, - ) -> Result, Status> { - unimplemented!() - } - - async fn client_params( - &self, - _request: Request, - ) -> Result, Status> { - unimplemented!() - } - - async fn upgraded_client_state( - &self, - _request: Request, - ) -> Result, Status> { - unimplemented!() - } - - async fn upgraded_consensus_state( - &self, - _request: Request, - ) -> Result, Status> { - unimplemented!() - } -} - -pub struct IbcConnectionService { - connection_end_store: - ProtobufStore, ConnectionPath, ConnectionEnd, RawConnectionEnd>, - connection_ids_store: JsonStore, ClientConnectionPath, Vec>, -} - -impl IbcConnectionService { - pub fn new(store: SharedStore) -> Self { - Self { - connection_end_store: TypedStore::new(store.clone()), - connection_ids_store: TypedStore::new(store), - } - } -} - -#[tonic::async_trait] -impl ConnectionQuery for IbcConnectionService { - async fn connection( - &self, - request: Request, - ) -> Result, Status> { - let conn_id = ConnectionId::from_str(&request.get_ref().connection_id) - .map_err(|_| Status::invalid_argument("invalid connection id"))?; - let conn = self - .connection_end_store - .get(Height::Pending, &ConnectionPath::new(&conn_id)); - Ok(Response::new(QueryConnectionResponse { - connection: conn.map(|c| c.into()), - proof: vec![], - proof_height: None, - })) - } - - async fn connections( - &self, - _request: Request, - ) -> Result, Status> { - let connection_path_prefix: Path = String::from("connections") - .try_into() - .expect("'connections' expected to be a valid Path"); - - let connection_paths = self.connection_end_store.get_keys(&connection_path_prefix); - - let identified_connections: Vec = connection_paths - .into_iter() - .map(|path| match path.try_into() { - Ok(IbcPath::Connection(connections_path)) => { - let connection_end = self - .connection_end_store - .get(Height::Pending, &connections_path) - .unwrap(); - IdentifiedConnectionEnd::new(connections_path.0, connection_end).into() - } - _ => panic!("unexpected path"), - }) - .collect(); - - Ok(Response::new(QueryConnectionsResponse { - connections: identified_connections, - pagination: None, - height: None, - })) - } - - async fn client_connections( - &self, - request: Request, - ) -> Result, Status> { - trace!("Got client connections request: {:?}", request); - - let client_id = request - .get_ref() - .client_id - .parse() - .map_err(|e| Status::invalid_argument(format!("{e}")))?; - let path = ClientConnectionPath::new(&client_id); - let connection_ids = self - .connection_ids_store - .get(Height::Pending, &path) - .unwrap_or_default(); - let connection_paths = connection_ids - .into_iter() - .map(|conn_id| conn_id.to_string()) - .collect(); - - Ok(Response::new(QueryClientConnectionsResponse { - connection_paths, - // Note: proofs aren't being used by hermes currently - proof: vec![], - proof_height: None, - })) - } - - async fn connection_client_state( - &self, - _request: Request, - ) -> Result, Status> { - todo!() - } - - async fn connection_consensus_state( - &self, - _request: Request, - ) -> Result, Status> { - todo!() - } - - async fn connection_params( - &self, - _request: Request, - ) -> Result, Status> { - todo!() - } -} - -pub struct IbcChannelService { - channel_end_store: ProtobufStore, ChannelEndPath, ChannelEnd, RawChannelEnd>, - packet_commitment_store: BinStore, CommitmentPath, PacketCommitment>, - packet_ack_store: BinStore, AckPath, AcknowledgementCommitment>, - packet_receipt_store: TypedSet, ReceiptPath>, -} - -impl IbcChannelService { - pub fn new(store: SharedStore) -> Self { - Self { - channel_end_store: TypedStore::new(store.clone()), - packet_commitment_store: TypedStore::new(store.clone()), - packet_ack_store: TypedStore::new(store.clone()), - packet_receipt_store: TypedStore::new(store), - } - } -} - -#[tonic::async_trait] -impl ChannelQuery for IbcChannelService { - async fn channel( - &self, - request: Request, - ) -> Result, Status> { - let request = request.into_inner(); - let port_id = PortId::from_str(&request.port_id) - .map_err(|_| Status::invalid_argument("invalid port id"))?; - let channel_id = ChannelId::from_str(&request.channel_id) - .map_err(|_| Status::invalid_argument("invalid channel id"))?; - - let channel = self - .channel_end_store - .get(Height::Pending, &ChannelEndPath(port_id, channel_id)) - .map(|channel_end| channel_end.into()); - - Ok(Response::new(QueryChannelResponse { - channel, - proof: vec![], - proof_height: None, - })) - } - /// Channels queries all the IBC channels of a chain. - async fn channels( - &self, - _request: Request, - ) -> Result, Status> { - let channel_path_prefix: Path = String::from("channelEnds/ports") - .try_into() - .expect("'channelEnds/ports' expected to be a valid Path"); - - let channel_paths = self.channel_end_store.get_keys(&channel_path_prefix); - let identified_channels: Vec = channel_paths - .into_iter() - .map(|path| match path.try_into() { - Ok(IbcPath::ChannelEnd(channels_path)) => { - let channel_end = self - .channel_end_store - .get(Height::Pending, &channels_path) - .expect("channel path returned by get_keys() had no associated channel"); - IdentifiedChannelEnd::new(channels_path.0, channels_path.1, channel_end).into() - } - _ => panic!("unexpected path"), - }) - .collect(); - - Ok(Response::new(QueryChannelsResponse { - channels: identified_channels, - pagination: None, - height: Some(RawHeight { - revision_number: CHAIN_REVISION_NUMBER, - revision_height: self.channel_end_store.current_height(), - }), - })) - } - /// ConnectionChannels queries all the channels associated with a connection - /// end. - async fn connection_channels( - &self, - request: Request, - ) -> Result, Status> { - let conn_id = ConnectionId::from_str(&request.get_ref().connection) - .map_err(|_| Status::invalid_argument("invalid connection id"))?; - - let path = "channelEnds" - .to_owned() - .try_into() - .expect("'commitments/ports' expected to be a valid Path"); - - let keys = self.channel_end_store.get_keys(&path); - let channels = keys - .into_iter() - .filter_map(|path| { - if let Ok(IbcPath::ChannelEnd(path)) = path.try_into() { - let channel_end = self.channel_end_store.get(Height::Pending, &path)?; - if channel_end.connection_hops.first() == Some(&conn_id) { - return Some(IdentifiedChannelEnd::new(path.0, path.1, channel_end).into()); - } - } - - None - }) - .collect(); - - Ok(Response::new(QueryConnectionChannelsResponse { - channels, - pagination: None, - height: Some(RawHeight { - revision_number: CHAIN_REVISION_NUMBER, - revision_height: self.channel_end_store.current_height(), - }), - })) - } - /// ChannelClientState queries for the client state for the channel associated - /// with the provided channel identifiers. - async fn channel_client_state( - &self, - _request: Request, - ) -> Result, Status> { - todo!() - } - /// ChannelConsensusState queries for the consensus state for the channel - /// associated with the provided channel identifiers. - async fn channel_consensus_state( - &self, - _request: Request, - ) -> Result, Status> { - todo!() - } - /// PacketCommitment queries a stored packet commitment hash. - async fn packet_commitment( - &self, - _request: Request, - ) -> Result, Status> { - todo!() - } - /// PacketCommitments returns all the packet commitments hashes associated - /// with a channel. - async fn packet_commitments( - &self, - request: Request, - ) -> Result, Status> { - let request = request.into_inner(); - let port_id = PortId::from_str(&request.port_id) - .map_err(|_| Status::invalid_argument("invalid port id"))?; - let channel_id = ChannelId::from_str(&request.channel_id) - .map_err(|_| Status::invalid_argument("invalid channel id"))?; - - let commitment_paths = { - let prefix: Path = String::from("commitments/ports") - .try_into() - .expect("'commitments/ports' expected to be a valid Path"); - self.packet_commitment_store.get_keys(&prefix) - }; - - let matching_commitment_paths = |path: Path| -> Option { - match path.try_into() { - Ok(IbcPath::Commitment(p)) - if p.port_id == port_id && p.channel_id == channel_id => - { - Some(p) - } - _ => None, - } - }; - - let packet_state = |path: CommitmentPath| -> Option { - let commitment = self - .packet_commitment_store - .get(Height::Pending, &path) - .unwrap(); - let data = commitment.into_vec(); - (!data.is_empty()).then(|| PacketState { - port_id: path.port_id.to_string(), - channel_id: path.channel_id.to_string(), - sequence: path.sequence.into(), - data, - }) - }; - - let packet_states: Vec = commitment_paths - .into_iter() - .filter_map(matching_commitment_paths) - .filter_map(packet_state) - .collect(); - - Ok(Response::new(QueryPacketCommitmentsResponse { - commitments: packet_states, - pagination: None, - height: Some(RawHeight { - revision_number: CHAIN_REVISION_NUMBER, - revision_height: self.packet_commitment_store.current_height(), - }), - })) - } - - /// PacketReceipt queries if a given packet sequence has been received on the - /// queried chain - async fn packet_receipt( - &self, - _request: Request, - ) -> Result, Status> { - todo!() - } - - /// PacketAcknowledgement queries a stored packet acknowledgement hash. - async fn packet_acknowledgement( - &self, - _request: Request, - ) -> Result, Status> { - todo!() - } - - /// PacketAcknowledgements returns all the packet acknowledgements associated - /// with a channel. - async fn packet_acknowledgements( - &self, - request: Request, - ) -> Result, Status> { - let request = request.into_inner(); - let port_id = PortId::from_str(&request.port_id) - .map_err(|_| Status::invalid_argument("invalid port id"))?; - let channel_id = ChannelId::from_str(&request.channel_id) - .map_err(|_| Status::invalid_argument("invalid channel id"))?; - - let ack_paths = { - let prefix: Path = String::from("acks/ports") - .try_into() - .expect("'acks/ports' expected to be a valid Path"); - self.packet_ack_store.get_keys(&prefix) - }; - - let matching_ack_paths = |path: Path| -> Option { - match path.try_into() { - Ok(IbcPath::Ack(p)) if p.port_id == port_id && p.channel_id == channel_id => { - Some(p) - } - _ => None, - } - }; - - let packet_state = |path: AckPath| -> Option { - let commitment = self.packet_ack_store.get(Height::Pending, &path).unwrap(); - let data = commitment.into_vec(); - (!data.is_empty()).then(|| PacketState { - port_id: path.port_id.to_string(), - channel_id: path.channel_id.to_string(), - sequence: path.sequence.into(), - data, - }) - }; - - let packet_states: Vec = ack_paths - .into_iter() - .filter_map(matching_ack_paths) - .filter_map(packet_state) - .collect(); - - Ok(Response::new(QueryPacketAcknowledgementsResponse { - acknowledgements: packet_states, - pagination: None, - height: Some(RawHeight { - revision_number: CHAIN_REVISION_NUMBER, - revision_height: self.packet_ack_store.current_height(), - }), - })) - } - - /// UnreceivedPackets returns all the unreceived IBC packets associated with - /// a channel and sequences. - /// - /// QUESTION. Currently only works for unordered channels; ordered channels - /// don't use receipts. However, ibc-go does it this way. Investigate if - /// this query only ever makes sense on unordered channels. - async fn unreceived_packets( - &self, - request: Request, - ) -> Result, Status> { - let request = request.into_inner(); - let port_id = PortId::from_str(&request.port_id) - .map_err(|_| Status::invalid_argument("invalid port id"))?; - let channel_id = ChannelId::from_str(&request.channel_id) - .map_err(|_| Status::invalid_argument("invalid channel id"))?; - let sequences_to_check: Vec = request.packet_commitment_sequences; - - let unreceived_sequences: Vec = sequences_to_check - .into_iter() - .filter(|seq| { - let receipts_path = ReceiptPath::new(&port_id, &channel_id, Sequence::from(*seq)); - self.packet_receipt_store - .get(Height::Pending, &receipts_path) - .is_none() - }) - .collect(); - - Ok(Response::new(QueryUnreceivedPacketsResponse { - sequences: unreceived_sequences, - height: Some(RawHeight { - revision_number: CHAIN_REVISION_NUMBER, - revision_height: self.packet_receipt_store.current_height(), - }), - })) - } - - /// UnreceivedAcks returns all the unreceived IBC acknowledgements associated - /// with a channel and sequences. - async fn unreceived_acks( - &self, - request: Request, - ) -> Result, Status> { - let request = request.into_inner(); - let port_id = PortId::from_str(&request.port_id) - .map_err(|_| Status::invalid_argument("invalid port id"))?; - let channel_id = ChannelId::from_str(&request.channel_id) - .map_err(|_| Status::invalid_argument("invalid channel id"))?; - let sequences_to_check: Vec = request.packet_ack_sequences; - - let unreceived_sequences: Vec = sequences_to_check - .into_iter() - .filter(|seq| { - // To check if we received an acknowledgement, we check if we still have the sent packet - // commitment (upon receiving an ack, the sent packet commitment is deleted). - let commitments_path = - CommitmentPath::new(&port_id, &channel_id, Sequence::from(*seq)); - self.packet_commitment_store - .get(Height::Pending, &commitments_path) - .is_some() - }) - .collect(); - - Ok(Response::new(QueryUnreceivedAcksResponse { - sequences: unreceived_sequences, - height: Some(RawHeight { - revision_number: CHAIN_REVISION_NUMBER, - revision_height: self.packet_commitment_store.current_height(), - }), - })) - } - - async fn next_sequence_send( - &self, - _request: Request, - ) -> Result, Status> { - todo!() - } - - /// NextSequenceReceive returns the next receive sequence for a given channel. - async fn next_sequence_receive( - &self, - _request: Request, - ) -> Result, Status> { - todo!() - } -} diff --git a/crates/app/src/runner.rs b/crates/app/src/runner.rs index 28d1852d..8c173fc5 100644 --- a/crates/app/src/runner.rs +++ b/crates/app/src/runner.rs @@ -44,7 +44,7 @@ pub async fn default_app_runner(server_cfg: ServerConfig) { // instantiate gRPC services for each module let auth_service = auth.service(); let bank_service = bank.service(); - let ibc_client_service = ibc.client_service(); + let ibc_client_service = ibc.client_service(&upgrade); let ibc_conn_service = ibc.connection_service(); let ibc_channel_service = ibc.channel_service(); let governance_service = governance.service(); diff --git a/crates/store/src/impls/in_memory.rs b/crates/store/src/impls/in_memory.rs index f89706e6..f1e34c83 100644 --- a/crates/store/src/impls/in_memory.rs +++ b/crates/store/src/impls/in_memory.rs @@ -79,12 +79,8 @@ impl Store for InMemoryStore { self.pending .get_keys() .into_iter() - .filter_map(|key| { - key.as_bytes() - .as_ref() - .starts_with(key_prefix.as_ref()) - .then(|| key.clone()) - }) + .filter(|&key| key.as_bytes().as_ref().starts_with(key_prefix.as_ref())) + .cloned() .collect() } } diff --git a/crates/store/src/impls/revertible.rs b/crates/store/src/impls/revertible.rs index 38c04497..34aec72b 100644 --- a/crates/store/src/impls/revertible.rs +++ b/crates/store/src/impls/revertible.rs @@ -95,6 +95,8 @@ where match op { RevertOp::Delete(path) => self.delete(&path), RevertOp::Set(path, value) => { + // FIXME: potential non-termination + // self.set() may insert a new op into the op_log self.set(path, value).unwrap(); // safety - reset failures are unrecoverable } }