Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pagination for channels cli and reorg the query_txs() #827

Merged
merged 9 commits into from
Apr 16, 2021
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,20 @@

- [ibc-relayer]
- Change the default for client creation to allow governance recovery in case of expiration or misbehaviour. ([#785])

### BUG FIXES

- [ibc-relayer]
- Fix pagination in gRPC query for clients ([#811])

### BREAKING CHANGES

> Nothing yet.


[#785]: https://github.com/informalsystems/ibc-rs/issues/785
[#811]: https://github.com/informalsystems/ibc-rs/issues/811


## v0.2.0
*April 14th, 2021*
Expand Down
1 change: 1 addition & 0 deletions modules/src/ics02_client/client_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ impl ConsensusState for AnyConsensusState {
}
}

/// Query request for a single client event, identified by `event_id`, for `client_id`.
#[derive(Clone, Debug)]
pub struct QueryClientEventRequest {
pub height: crate::Height,
Expand Down
3 changes: 2 additions & 1 deletion modules/src/ics04_channel/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ impl std::fmt::Display for State {
}
}

/// Used for queries and not yet standardized in channel's query.proto
/// Used to query a packet event, identified by `event_id`, for specific channel and sequences.
/// The query is preformed for the chain context at `height`.
#[derive(Clone, Debug)]
pub struct QueryPacketEventDataRequest {
pub event_id: IbcEventType,
Expand Down
4 changes: 3 additions & 1 deletion relayer-cli/src/commands/query/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ impl Runnable for QueryChannelsCmd {
let rt = Arc::new(TokioRuntime::new().unwrap());
let chain = CosmosSdkChain::bootstrap(chain_config.clone(), rt).unwrap();

let req = QueryChannelsRequest { pagination: None };
let req = QueryChannelsRequest {
pagination: ibc_proto::cosmos::base::query::pagination::all(),
};

let res = chain.query_channels(req);

Expand Down
214 changes: 101 additions & 113 deletions relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ use crate::light_client::tendermint::LightClient as TmLightClient;
use crate::light_client::LightClient;

use super::Chain;
use tendermint_rpc::endpoint::tx_search::ResultTx;

// TODO size this properly
const DEFAULT_MAX_GAS: u64 = 300000;
Expand Down Expand Up @@ -954,61 +955,88 @@ impl Chain for CosmosSdkChain {
Ok(Sequence::from(response.next_sequence_receive))
}

/// Queries the packet data for all packets with sequences included in the request.
/// Note - there is no way to format the query such that it asks for Tx-es with either
/// sequence (the query conditions can only be AND-ed)
/// There is a possibility to include "<=" and ">=" conditions but it doesn't work with
/// string attributes (sequence is emmitted as a string).
/// Therefore, here we perform one tx_search for each query. Alternatively, a single query
/// for all packets could be performed but it would return all packets ever sent.
/// This function queries transactions for events matching certain criteria.
/// 1. Client Update request - returns a vector with at most one update client event
/// 2. Packet event request - returns at most one packet event for each sequence specified
/// in the request.
/// Note - there is no way to format the packet query such that it asks for Tx-es with either
/// sequence (the query conditions can only be AND-ed).
/// There is a possibility to include "<=" and ">=" conditions but it doesn't work with
/// string attributes (sequence is emmitted as a string).
/// Therefore, for packets we perform one tx_search for each sequence.
/// Alternatively, a single query for all packets could be performed but it would return all
/// packets ever sent.
fn query_txs(&self, request: QueryTxRequest) -> Result<Vec<IbcEvent>, Error> {
crate::time!("query_txs");

match request {
QueryTxRequest::Packet(request) => {
crate::time!("query_txs");
crate::time!("query_txs: query packet events");

let mut result: Vec<IbcEvent> = vec![];

for seq in &request.sequences {
// query all Tx-es that include events related to packet with given port, channel and sequence
// query first (and only) Tx that includes the event specified in the query request
let response = self
.block_on(self.rpc_client.tx_search(
packet_query(&request, *seq),
false,
1,
1,
1, // get only the first Tx matching the query
Order::Ascending,
))
.unwrap(); // todo

let mut events =
packet_from_tx_search_response(self.id(), &request, *seq, response)
.map_or(vec![], |v| vec![v]);

result.append(&mut events);
.map_err(|e| Kind::Grpc.context(e))?;

assert!(
response.txs.len() <= 1,
romac marked this conversation as resolved.
Show resolved Hide resolved
"packet_from_tx_search_response: unexpected number of txs"
);

if let Some(event) = packet_from_tx_search_response(
self.id(),
&request,
*seq,
response.txs[0].clone(),
) {
result.push(event);
}
}
Ok(result)
}

QueryTxRequest::Client(request) => {
let response = self
crate::time!("query_txs: single client update event");

// query the first Tx that includes the event matching the client request
// Note: it is possible to have multiple Tx-es for same client and consensus height.
// In this case it must be true that the client updates were performed with tha
// same header as the first one, otherwise a subsequent transaction would have
// failed on chain. Therefore only one Tx is of interest and current API returns
// the first one.
let mut response = self
.block_on(self.rpc_client.tx_search(
header_query(&request),
false,
1,
1,
1, // get only the first Tx matching the query
Order::Ascending,
))
.unwrap(); // todo
.map_err(|e| Kind::Grpc.context(e))?;

if response.txs.is_empty() {
return Ok(vec![]);
}

let events = update_client_from_tx_search_response(self.id(), &request, response);
// the response must include a single Tx as specified in the query.
assert!(
response.txs.len() <= 1,
"packet_from_tx_search_response: unexpected number of txs"
);

let tx = response.txs.remove(0);
let event = update_client_from_tx_search_response(self.id(), &request, tx);

Ok(events)
Ok(event.into_iter().collect())
}
}
}
Expand Down Expand Up @@ -1259,109 +1287,69 @@ fn packet_from_tx_search_response(
chain_id: &ChainId,
request: &QueryPacketEventDataRequest,
seq: Sequence,
mut response: tendermint_rpc::endpoint::tx_search::Response,
response: ResultTx,
) -> Option<IbcEvent> {
assert!(
response.txs.len() <= 1,
"packet_from_tx_search_response: unexpected number of txs"
);
if let Some(r) = response.txs.pop() {
let height = ICSHeight::new(chain_id.version(), u64::from(r.height));
if height > request.height {
return None;
}

let mut matching = Vec::new();
for e in r.tx_result.events {
if e.type_str != request.event_id.as_str() {
continue;
}
let height = ICSHeight::new(chain_id.version(), u64::from(response.height));
if request.height != ICSHeight::zero() && height > request.height {
return None;
}

let res = ChannelEvents::try_from_tx(&e);
if res.is_none() {
continue;
}
let event = res.unwrap();
let packet = match &event {
response
.tx_result
.events
.into_iter()
.filter(|abci_event| abci_event.type_str == request.event_id.as_str())
.flat_map(|abci_event| ChannelEvents::try_from_tx(&abci_event))
.find(|event| {
romac marked this conversation as resolved.
Show resolved Hide resolved
let packet = match event {
IbcEvent::SendPacket(send_ev) => Some(&send_ev.packet),
IbcEvent::WriteAcknowledgement(ack_ev) => Some(&ack_ev.packet),
_ => None,
};

if packet.is_none() {
continue;
}

let packet = packet.unwrap();
if packet.source_port != request.source_port_id
|| packet.source_channel != request.source_channel_id
|| packet.destination_port != request.destination_port_id
|| packet.destination_channel != request.destination_channel_id
|| packet.sequence != seq
{
continue;
}

matching.push(event);
}

assert_eq!(
matching.len(),
1,
"packet_from_tx_search_response: unexpected number of matching packets"
);
matching.pop()
} else {
None
}
packet.map_or(false, |packet| {
packet.source_port == request.source_port_id
&& packet.source_channel == request.source_channel_id
&& packet.destination_port == request.destination_port_id
&& packet.destination_channel == request.destination_channel_id
&& packet.sequence == seq
})
})
}

// Extract all update client events for the requested client and height from the query_txs RPC response.
// Extracts from the Tx the update client event for the requested client and height.
// Note: in the Tx, there may have been multiple events, some of them may be
// for update of other clients that are not relevant to the request.
// For example, if we're querying for a transaction that includes the update for client X at
// consensus height H, it is possible that the transaction also includes an update client
// for client Y at consensus height H'. This is the reason the code iterates all event fields in the
// returned Tx to retrieve the relevant ones.
// Returns `None` if no matching event was found.
fn update_client_from_tx_search_response(
chain_id: &ChainId,
request: &QueryClientEventRequest,
response: tendermint_rpc::endpoint::tx_search::Response,
) -> Vec<IbcEvent> {
crate::time!("update_client_from_tx_search_response");

let mut matching = Vec::new();

for r in response.txs {
let height = ICSHeight::new(chain_id.version(), u64::from(r.height));
if request.height != ICSHeight::zero() && height > request.height {
return vec![];
}

for e in r.tx_result.events {
if e.type_str != request.event_id.as_str() {
continue;
}

let res = ClientEvents::try_from_tx(&e);
if res.is_none() {
continue;
}
let event = res.unwrap();
let update = match &event {
IbcEvent::UpdateClient(update) => Some(update),
_ => None,
};

if update.is_none() {
continue;
}

let update = update.unwrap();
if update.common.client_id != request.client_id
|| update.common.consensus_height != request.consensus_height
{
continue;
}

matching.push(event);
}
response: ResultTx,
) -> Option<IbcEvent> {
let height = ICSHeight::new(chain_id.version(), u64::from(response.height));
if request.height != ICSHeight::zero() && height > request.height {
return None;
}
matching

response
.tx_result
.events
.into_iter()
.filter(|event| event.type_str == request.event_id.as_str())
.flat_map(|event| ClientEvents::try_from_tx(&event))
.flat_map(|event| match event {
IbcEvent::UpdateClient(update) => Some(update),
_ => None,
})
.find(|update| {
romac marked this conversation as resolved.
Show resolved Hide resolved
update.common.client_id == request.client_id
&& update.common.consensus_height == request.consensus_height
})
.map(IbcEvent::UpdateClient)
}

/// Perform a generic `abci_query`, and return the corresponding deserialized response data.
Expand Down Expand Up @@ -1401,7 +1389,7 @@ async fn abci_query(

let response = QueryResponse {
value: response.value,
proof: convert_tm_to_ics_merkle_proof(raw_proof_ops).unwrap(),
proof: convert_tm_to_ics_merkle_proof(raw_proof_ops).unwrap(), // FIXME
height: response.height,
};

Expand Down