From f138b6c0af8448d372efbfe376dc665bd424279c Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Thu, 26 Dec 2024 14:57:53 +0800 Subject: [PATCH] feat: support retry Signed-off-by: Eric Fu --- README.md | 12 ++ sqllogictest/src/parser.rs | 188 ++++++++++++++++++++++++++------ sqllogictest/src/runner.rs | 48 +++++++- tests/retry/query_retry.slt | 16 +++ tests/retry/statement_retry.slt | 5 + 5 files changed, 230 insertions(+), 39 deletions(-) create mode 100644 tests/retry/query_retry.slt create mode 100644 tests/retry/statement_retry.slt diff --git a/README.md b/README.md index 58e0ae9..7032b49 100644 --- a/README.md +++ b/README.md @@ -132,6 +132,18 @@ echo $USER xxchan ``` +### Extension: Retry + +```text +query I retry 3 backoff 5s +SELECT id FROM test; +---- +1 + +statement ok retry 3 backoff 5s +UPDATE test SET id = 1; +``` + ### Extension: Environment variable substitution in query and statement It needs to be enabled by adding `control substitution on` to the test file. diff --git a/sqllogictest/src/parser.rs b/sqllogictest/src/parser.rs index 065fdf9..9e0b2ce 100644 --- a/sqllogictest/src/parser.rs +++ b/sqllogictest/src/parser.rs @@ -67,6 +67,15 @@ impl Location { } } +/// Configuration for retry behavior +#[derive(Debug, Clone, PartialEq)] +pub struct RetryConfig { + /// Number of retry attempts + pub attempts: usize, + /// Duration to wait between retries + pub backoff: Duration, +} + /// Expectation for a statement. #[derive(Debug, Clone, PartialEq)] pub enum StatementExpect { @@ -86,7 +95,6 @@ pub enum QueryExpect { types: Vec, sort_mode: Option, result_mode: Option, - label: Option, results: Vec, }, /// Query should fail with the given error message. @@ -100,7 +108,6 @@ impl QueryExpect { types: Vec::new(), sort_mode: None, result_mode: None, - label: None, results: Vec::new(), } } @@ -125,6 +132,8 @@ pub enum Record { /// The SQL command. sql: String, expected: StatementExpect, + /// Optional retry configuration + retry: Option, }, /// A query is an SQL command from which we expect to receive results. The result set might be /// empty. @@ -135,6 +144,8 @@ pub enum Record { /// The SQL command. sql: String, expected: QueryExpect, + /// Optional retry configuration + retry: Option, }, /// A system command is an external command that is to be executed by the shell. Currently it /// must succeed and the output is ignored. @@ -208,6 +219,7 @@ impl std::fmt::Display for Record { connection: _, sql, expected, + retry, } => { write!(f, "statement ")?; match expected { @@ -215,6 +227,14 @@ impl std::fmt::Display for Record { StatementExpect::Count(cnt) => write!(f, "count {cnt}")?, StatementExpect::Error(err) => err.fmt_inline(f)?, } + if let Some(retry) = retry { + write!( + f, + " retry {} backoff {}", + retry.attempts, + humantime::format_duration(retry.backoff) + )?; + } writeln!(f)?; // statement always end with a blank line writeln!(f, "{sql}")?; @@ -230,36 +250,37 @@ impl std::fmt::Display for Record { connection: _, sql, expected, + retry, } => { write!(f, "query ")?; match expected { QueryExpect::Results { - types, - sort_mode, - label, - .. + types, sort_mode, .. } => { write!(f, "{}", types.iter().map(|c| c.to_char()).join(""))?; if let Some(sort_mode) = sort_mode { write!(f, " {}", sort_mode.as_str())?; } - if let Some(label) = label { - write!(f, " {label}")?; - } } QueryExpect::Error(err) => err.fmt_inline(f)?, } + if let Some(retry) = retry { + write!( + f, + " retry {} backoff {}", + retry.attempts, + humantime::format_duration(retry.backoff) + )?; + } writeln!(f)?; writeln!(f, "{sql}")?; match expected { QueryExpect::Results { results, .. } => { write!(f, "{}", RESULTS_DELIMITER)?; - for result in results { write!(f, "\n{result}")?; } - // query always ends with a blank line writeln!(f)? } QueryExpect::Error(err) => err.fmt_multiline(f)?, @@ -622,6 +643,8 @@ pub enum ParseErrorKind { InvalidErrorMessage(String), #[error("duplicated error messages after error` and under `----`")] DuplicatedErrorMessage, + #[error("invalid retry config: {0:?}")] + InvalidRetryConfig(String), #[error("statement should have no result, use `query` instead")] StatementHasResults, #[error("invalid duration: {0:?}")] @@ -730,22 +753,28 @@ fn parse_inner(loc: &Location, script: &str) -> Result { - let mut expected = match res { - ["ok"] => StatementExpect::Ok, - ["error", tokens @ ..] => { - let error = ExpectedError::parse_inline_tokens(tokens) + let (mut expected, res) = match res { + ["ok", retry @ ..] => (StatementExpect::Ok, retry), + ["error", err_tokens @ ..] => { + // NOTE: `statement error` can't be used with `retry` now because all the + // tokens after `error` are treated as error message. + let error = ExpectedError::parse_inline_tokens(err_tokens) .map_err(|e| e.at(loc.clone()))?; - StatementExpect::Error(error) + (StatementExpect::Error(error), &[][..]) } - ["count", count_str] => { + ["count", count_str, retry @ ..] => { let count = count_str.parse::().map_err(|_| { ParseErrorKind::InvalidNumber((*count_str).into()).at(loc.clone()) })?; - StatementExpect::Count(count) + (StatementExpect::Count(count), retry) } _ => return Err(ParseErrorKind::InvalidLine(line.into()).at(loc)), }; + + let retry = parse_retry_config(res).map_err(|e| e.at(loc.clone()))?; + let (sql, has_results) = parse_lines(&mut lines, &loc, Some(RESULTS_DELIMITER))?; + if has_results { if let StatementExpect::Error(e) = &mut expected { // If no inline error message is specified, it might be a multiline error. @@ -765,14 +794,17 @@ fn parse_inner(loc: &Location, script: &str) -> Result { - let mut expected = match res { - ["error", tokens @ ..] => { - let error = ExpectedError::parse_inline_tokens(tokens) + let (mut expected, res) = match res { + ["error", err_tokens @ ..] => { + // NOTE: `query error` can't be used with `retry` now because all the tokens + // after `error` are treated as error message. + let error = ExpectedError::parse_inline_tokens(err_tokens) .map_err(|e| e.at(loc.clone()))?; - QueryExpect::Error(error) + (QueryExpect::Error(error), &[][..]) } [type_str, res @ ..] => { let types = type_str @@ -782,23 +814,24 @@ fn parse_inner(loc: &Location, script: &str) -> Result QueryExpect::empty_results(), + [] => (QueryExpect::empty_results(), &[][..]), }; + let retry = parse_retry_config(res).map_err(|e| e.at(loc.clone()))?; + // The SQL for the query is found on second and subsequent lines of the record // up to first line of the form "----" or until the end of the record. let (sql, has_result) = parse_lines(&mut lines, &loc, Some(RESULTS_DELIMITER))?; @@ -830,6 +863,7 @@ fn parse_inner(loc: &Location, script: &str) -> Result { @@ -982,6 +1016,77 @@ fn parse_multiline_error<'a>( ExpectedError::Multiline(parse_multiple_result(lines)) } +/// Parse retry configuration from tokens +/// +/// The retry configuration is optional and can be specified as: +/// +/// ```text +/// ... retry 3 backoff 1s +/// ``` +fn parse_retry_config(tokens: &[&str]) -> Result, ParseErrorKind> { + if tokens.is_empty() { + return Ok(None); + } + + let mut iter = tokens.iter().peekable(); + + // Check if we have retry clause + match iter.next() { + Some(&"retry") => {} + Some(token) => return Err(ParseErrorKind::UnexpectedToken(token.to_string())), + None => return Ok(None), + } + + // Parse number of attempts + let attempts = match iter.next() { + Some(attempts_str) => attempts_str + .parse::() + .map_err(|_| ParseErrorKind::InvalidNumber(attempts_str.to_string()))?, + None => { + return Err(ParseErrorKind::InvalidRetryConfig( + "expected a positive number of attempts".to_string(), + )) + } + }; + + if attempts == 0 { + return Err(ParseErrorKind::InvalidRetryConfig( + "attempt must be greater than 0".to_string(), + )); + } + + // Expect "backoff" keyword + match iter.next() { + Some(&"backoff") => {} + Some(token) => return Err(ParseErrorKind::UnexpectedToken(token.to_string())), + None => { + return Err(ParseErrorKind::InvalidRetryConfig( + "expected keyword backoff".to_string(), + )) + } + } + + // Parse backoff duration + let duration_str = match iter.next() { + Some(s) => s, + None => { + return Err(ParseErrorKind::InvalidRetryConfig( + "expected backoff duration".to_string(), + )) + } + }; + + let backoff = humantime::parse_duration(duration_str) + .map_err(|_| ParseErrorKind::InvalidDuration(duration_str.to_string()))?; + + // No more tokens should be present + if iter.next().is_some() { + return Err(ParseErrorKind::UnexpectedToken("extra tokens".to_string())); + } + + Ok(Some(RetryConfig { attempts, backoff })) +} + #[cfg(test)] mod tests { use std::io::Write; @@ -1092,6 +1197,7 @@ select * from foo; connection: Connection::Default, sql: "select * from foo;".to_string(), expected: QueryExpect::empty_results(), + retry: None, }] ); } @@ -1186,4 +1292,14 @@ select * from foo; } } } + + #[test] + fn test_statement_retry() { + parse_roundtrip::("../tests/retry/statement_retry.slt") + } + + #[test] + fn test_query_retry() { + parse_roundtrip::("../tests/retry/query_retry.slt") + } } diff --git a/sqllogictest/src/runner.rs b/sqllogictest/src/runner.rs index f6815a6..88d2b59 100644 --- a/sqllogictest/src/runner.rs +++ b/sqllogictest/src/runner.rs @@ -596,6 +596,7 @@ impl> Runner { // compare result in run_async expected: _, loc: _, + retry: _, } => { let sql = match self.may_substitute(sql, true) { Ok(sql) => sql, @@ -743,6 +744,7 @@ impl> Runner { // compare result in run_async expected, loc: _, + retry: _, } => { let sql = match self.may_substitute(sql, true) { Ok(sql) => sql, @@ -879,6 +881,36 @@ impl> Runner { pub async fn run_async( &mut self, record: Record, + ) -> Result, TestError> { + let retry = match &record { + Record::Statement { retry, .. } => retry.clone(), + Record::Query { retry, .. } => retry.clone(), + _ => None, + }; + if retry.is_none() { + return self.run_async_no_retry(record).await; + } + + // Retry for `retry.attempts` times. The parser ensures that `retry.attempts` must > 0. + let retry = retry.unwrap(); + let mut last_error = None; + for _ in 0..retry.attempts { + let result = self.run_async_no_retry(record.clone()).await; + if result.is_ok() { + return result; + } + tracing::warn!(target:"sqllogictest::retry", backoff = ?retry.backoff, error = ?result, "retrying"); + D::sleep(retry.backoff).await; + last_error = result.err(); + } + + Err(last_error.unwrap()) + } + + /// Run a single record without retry. + async fn run_async_no_retry( + &mut self, + record: Record, ) -> Result, TestError> { let result = self.apply_record(record.clone()).await; @@ -941,6 +973,7 @@ impl> Runner { conditions: _, sql, expected, + retry: _, }, RecordOutput::Statement { count, error }, ) => match (error, expected) { @@ -989,6 +1022,7 @@ impl> Runner { connection: _, sql, expected, + retry: _, }, RecordOutput::Query { types, rows, error }, ) => { @@ -1451,6 +1485,7 @@ pub fn update_record_with_output( conditions, connection, expected: mut expected @ (StatementExpect::Ok | StatementExpect::Count(_)), + retry, }, RecordOutput::Query { error: None, rows, .. @@ -1473,6 +1508,7 @@ pub fn update_record_with_output( conditions, connection, expected, + retry, }) } // query, statement @@ -1483,6 +1519,7 @@ pub fn update_record_with_output( conditions, connection, expected: _, + retry, }, RecordOutput::Statement { error: None, count }, ) => Some(Record::Statement { @@ -1491,6 +1528,7 @@ pub fn update_record_with_output( conditions, connection, expected: StatementExpect::Count(*count), + retry, }), // statement, statement ( @@ -1500,6 +1538,7 @@ pub fn update_record_with_output( connection, sql, expected, + retry, }, RecordOutput::Statement { count, error }, ) => match (error, expected) { @@ -1513,6 +1552,7 @@ pub fn update_record_with_output( StatementExpect::Count(_) => StatementExpect::Count(*count), StatementExpect::Error(_) | StatementExpect::Ok => StatementExpect::Ok, }, + retry, }), // Error match (Some(e), StatementExpect::Error(expected_error)) @@ -1535,6 +1575,7 @@ pub fn update_record_with_output( loc, conditions, connection, + retry, }) } }, @@ -1546,6 +1587,7 @@ pub fn update_record_with_output( connection, sql, expected, + retry, }, RecordOutput::Query { types, rows, error }, ) => match (error, expected) { @@ -1570,6 +1612,7 @@ pub fn update_record_with_output( loc, conditions, connection, + retry, }) } (None, expected) => { @@ -1597,7 +1640,7 @@ pub fn update_record_with_output( expected: match expected { QueryExpect::Results { sort_mode, - label, + result_mode, .. } => QueryExpect::Results { @@ -1605,16 +1648,15 @@ pub fn update_record_with_output( types, sort_mode, result_mode, - label, }, QueryExpect::Error(_) => QueryExpect::Results { results, types, sort_mode: None, result_mode: None, - label: None, }, }, + retry, }) } }, diff --git a/tests/retry/query_retry.slt b/tests/retry/query_retry.slt new file mode 100644 index 0000000..0c030ab --- /dev/null +++ b/tests/retry/query_retry.slt @@ -0,0 +1,16 @@ +query I retry 3 backoff 5s +SELECT id FROM test; +---- +1 + +query I rowsort retry 2 backoff 1s +SELECT id FROM test ORDER BY random(); +---- +1 +2 +3 + +query I retry 1 backoff 500ms +SELECT id FROM test; +---- +1 \ No newline at end of file diff --git a/tests/retry/statement_retry.slt b/tests/retry/statement_retry.slt new file mode 100644 index 0000000..6559ce6 --- /dev/null +++ b/tests/retry/statement_retry.slt @@ -0,0 +1,5 @@ +statement ok retry 3 backoff 5s +INSERT INTO test VALUES (1); + +statement count 5 retry 2 backoff 1s +UPDATE test SET value = value + 1; \ No newline at end of file