diff --git a/modules/hyperclient/hyperclient.d.ts b/modules/hyperclient/hyperclient.d.ts index 51596b77b..ac70d7072 100644 --- a/modules/hyperclient/hyperclient.d.ts +++ b/modules/hyperclient/hyperclient.d.ts @@ -185,10 +185,10 @@ export interface MessageDispatched { Dispatched: bigint } -// The possible initial states of a timeout (Post request or response) stream +// The possible initial states of a timeout (request or response) stream export type TimeoutStreamState = "Pending" | DestinationFinalizedState | HyperbridgeVerifiedState | HyperbridgeFinalizedState; -// The possible initial states of a message status (Post request or response) stream +// The possible initial states of a message status (request or response) stream export type MessageStatusStreamState = MessageDispatched | SourceFinalizedState | HyperbridgeVerifiedState | HyperbridgeFinalizedState; // The possible states of an inflight request @@ -337,12 +337,15 @@ export class HyperClient { ): Promise>; /** - * Return the status of a get request as a `ReadableStream` + * Return the status of a get request as a `ReadableStream`. If the stream terminates abruptly, + * perhaps as a result of some error, it can be resumed given some initial state. * @param {IGetRequest} request + * @param {MessageStatusStreamState} state * @returns {Promise>} */ get_request_status_stream( request: IGetRequest, + state: MessageStatusStreamState, ): Promise>; /** diff --git a/modules/hyperclient/package.json b/modules/hyperclient/package.json index 241ca96bc..036178bc8 100644 --- a/modules/hyperclient/package.json +++ b/modules/hyperclient/package.json @@ -1,7 +1,7 @@ { "name": "@polytope-labs/hyperclient", "description": "The hyperclient is a library for managing (in-flight) ISMP requests", - "version": "0.6.2", + "version": "0.6.5", "author": "Polytope Labs (hello@polytope.technology)", "license": "Apache-2.0", "bugs": { diff --git a/modules/hyperclient/src/internals/post_request.rs b/modules/hyperclient/src/internals/post_request.rs index 6d6414ef6..b3a5527ac 100644 --- a/modules/hyperclient/src/internals/post_request.rs +++ b/modules/hyperclient/src/internals/post_request.rs @@ -658,74 +658,76 @@ pub async fn timeout_post_request_stream( let lambda = || async { match state { TimeoutStreamState::Pending => { - let relayer = hyperbridge_client.query_request_receipt(hash).await?; - if relayer != H160::zero() { - let height = hyperbridge_client - .query_latest_state_machine_height(dest_client.state_machine_id()) - .await?; - - let state_commitment = hyperbridge_client - .query_state_machine_commitment(StateMachineHeight { - id: dest_client.state_machine_id(), - height, - }) - .await?; + let height = hyperbridge_client + .query_latest_state_machine_height(dest_client.state_machine_id()) + .await?; - if state_commitment.timestamp > post.timeout().as_secs() { - // early return if the destination has already finalized the height - return Ok(Some(( - Ok(TimeoutStatus::DestinationFinalized { - meta: Default::default(), - }), - TimeoutStreamState::DestinationFinalized(height), - ))); - } + let state_commitment = hyperbridge_client + .query_state_machine_commitment(StateMachineHeight { + id: dest_client.state_machine_id(), + height, + }) + .await?; - let mut stream = hyperbridge_client - .state_machine_update_notification(dest_client.state_machine_id()) - .await?; - let mut valid_proof_height = None; - while let Some(event) = stream.next().await { - match event { - Ok(ev) => { - let state_machine_height = StateMachineHeight { - id: ev.event.state_machine_id, - height: ev.event.latest_height, - }; - let commitment = hyperbridge_client - .query_state_machine_commitment(state_machine_height) - .await?; - if commitment.timestamp > post.timeout().as_secs() { - valid_proof_height = Some(ev); - break; - } - }, - Err(e) => - return Ok(Some(( - Err(anyhow!( - "Encountered error in time out stream {e:?}" - )), - state, - ))), - } + if state_commitment.timestamp > post.timeout().as_secs() { + // early return if the destination has already finalized the height + return Ok(Some(( + Ok(TimeoutStatus::DestinationFinalized { + meta: Default::default(), + }), + TimeoutStreamState::DestinationFinalized(height), + ))); + } + + let mut stream = hyperbridge_client + .state_machine_update_notification(dest_client.state_machine_id()) + .await?; + let mut valid_proof_height = None; + while let Some(event) = stream.next().await { + match event { + Ok(ev) => { + let state_machine_height = StateMachineHeight { + id: ev.event.state_machine_id, + height: ev.event.latest_height, + }; + let commitment = hyperbridge_client + .query_state_machine_commitment(state_machine_height) + .await?; + if commitment.timestamp > post.timeout().as_secs() { + valid_proof_height = Some(ev); + break; + } + }, + Err(e) => + return Ok(Some(( + Err(anyhow!("Encountered error in time out stream {e:?}")), + state, + ))), } - Ok(valid_proof_height.map(|ev| { - ( - Ok(TimeoutStatus::DestinationFinalized { meta: ev.meta }), - TimeoutStreamState::DestinationFinalized( - ev.event.latest_height, - ), - ) - })) - } else { - let height = hyperbridge_client.query_latest_block_height().await?; - Ok(Some(( - Ok(TimeoutStatus::HyperbridgeVerified { meta: Default::default() }), - TimeoutStreamState::HyperbridgeVerified(height), - ))) } + Ok(valid_proof_height.map(|ev| { + ( + Ok(TimeoutStatus::DestinationFinalized { meta: ev.meta }), + TimeoutStreamState::DestinationFinalized(ev.event.latest_height), + ) + })) }, TimeoutStreamState::DestinationFinalized(proof_height) => { + let relayer = hyperbridge_client.query_request_receipt(hash).await?; + if relayer == H160::zero() { + // request was never delivered + let latest_height = + hyperbridge_client.client.rpc().header(None).await?.ok_or_else( + || anyhow!("Failed to query latest hyperbridge height!"), + )?; + return Ok(Some(( + Ok(TimeoutStatus::HyperbridgeVerified { meta: Default::default() }), + TimeoutStreamState::HyperbridgeVerified( + latest_height.number.into(), + ), + ))) + } + let storage_key = dest_client.request_receipt_full_key(hash); let proof = dest_client.query_state_proof(proof_height, vec![storage_key]).await?; diff --git a/modules/ismp/pallets/rpc/src/lib.rs b/modules/ismp/pallets/rpc/src/lib.rs index 94e94b313..8a91f04cc 100644 --- a/modules/ismp/pallets/rpc/src/lib.rs +++ b/modules/ismp/pallets/rpc/src/lib.rs @@ -314,7 +314,7 @@ where .map_err(|_| runtime_error_into_rpc_error("Error accessing state backend"))?; let child_root = state .storage(child_info.prefixed_storage_key().as_slice()) - .map_err(|_| runtime_error_into_rpc_error("Error reading child trie root"))? + .map_err(|err| runtime_error_into_rpc_error(format!("Storage Read Error: {err:?}")))? .map(|r| { let mut hash = <::Hashing as Hash>::Output::default(); @@ -323,7 +323,7 @@ where hash }) - .ok_or_else(|| runtime_error_into_rpc_error("Error reading child trie root"))?; + .ok_or_else(|| runtime_error_into_rpc_error("Child trie root storage returned None"))?; let db = storage_proof.into_memory_db::<::Hashing>(); diff --git a/tesseract/messaging/src/lib.rs b/tesseract/messaging/src/lib.rs index 306f389ff..896102f8b 100644 --- a/tesseract/messaging/src/lib.rs +++ b/tesseract/messaging/src/lib.rs @@ -477,6 +477,7 @@ async fn fee_accumulation