diff --git a/rust-arroyo/src/processing/strategies/reduce.rs b/rust-arroyo/src/processing/strategies/reduce.rs index d738b40a..8c86e39f 100644 --- a/rust-arroyo/src/processing/strategies/reduce.rs +++ b/rust-arroyo/src/processing/strategies/reduce.rs @@ -94,20 +94,25 @@ impl ProcessingStrategy for Reduce) -> Result, StrategyError> { let deadline = timeout.map(Deadline::new); - if self.message_carried_over.is_some() { - while self.message_carried_over.is_some() { - let next_commit = self.next_step.poll()?; - self.commit_request_carried_over = - merge_commit_request(self.commit_request_carried_over.take(), next_commit); - self.flush(true)?; - - if deadline.map_or(false, |d| d.has_elapsed()) { - tracing::warn!("Timeout reached while waiting for tasks to finish"); - break; - } + + loop { + if deadline.map_or(false, |d| d.has_elapsed()) { + tracing::warn!( + "Timeout {:?} reached while waiting for tasks to finish", + timeout + ); + break; } - } else { + + let next_commit = self.next_step.poll()?; + self.commit_request_carried_over = + merge_commit_request(self.commit_request_carried_over.take(), next_commit); + self.flush(true)?; + + if self.message_carried_over.is_none() { + break; + } } let next_commit = self.next_step.join(deadline.map(|d| d.remaining()))?;