From bf72e224c6448f73620cb68d07f710693e928fed Mon Sep 17 00:00:00 2001 From: August Date: Tue, 14 Mar 2023 14:49:17 +0800 Subject: [PATCH] fix: avoid panic when input upstream closed for lookup --- src/stream/src/executor/lookup/sides.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/stream/src/executor/lookup/sides.rs b/src/stream/src/executor/lookup/sides.rs index f050205f3900e..403f985093935 100644 --- a/src/stream/src/executor/lookup/sides.rs +++ b/src/stream/src/executor/lookup/sides.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use anyhow::Context; use either::Either; use futures::stream::PollNext; use futures::StreamExt; @@ -247,7 +248,7 @@ pub async fn stream_lookup_arrange_prev_epoch( match input .next() .await - .expect("unexpected close of barrier aligner")? + .context("unexpected close of barrier aligner")?? { Either::Left(Message::Watermark(_)) => { todo!("https://github.com/risingwavelabs/risingwave/issues/6042") @@ -298,7 +299,7 @@ pub async fn stream_lookup_arrange_this_epoch( match input .next() .await - .expect("unexpected close of barrier aligner")? + .context("unexpected close of barrier aligner")?? { Either::Left(Message::Chunk(msg)) => { // Should wait until arrangement from this epoch is available. @@ -333,7 +334,7 @@ pub async fn stream_lookup_arrange_this_epoch( match input .next() .await - .expect("unexpected close of barrier aligner")? + .context("unexpected close of barrier aligner")?? { Either::Left(Message::Chunk(msg)) => yield ArrangeMessage::Stream(msg), Either::Left(Message::Barrier(b)) => { @@ -355,7 +356,7 @@ pub async fn stream_lookup_arrange_this_epoch( match input .next() .await - .expect("unexpected close of barrier aligner")? + .context("unexpected close of barrier aligner")?? { Either::Left(_) => unreachable!(), Either::Right(Message::Chunk(chunk)) => {