Skip to content

Commit

Permalink
feat: support retry
Browse files Browse the repository at this point in the history
Signed-off-by: Eric Fu <fuyufjh@gmail.com>
  • Loading branch information
fuyufjh committed Dec 26, 2024
1 parent ac188cb commit f138b6c
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 39 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ echo $USER
xxchan
```

### Extension: Retry

```text
query I retry 3 backoff 5s
SELECT id FROM test;
----
1
statement ok retry 3 backoff 5s
UPDATE test SET id = 1;
```

### Extension: Environment variable substitution in query and statement

It needs to be enabled by adding `control substitution on` to the test file.
Expand Down
188 changes: 152 additions & 36 deletions sqllogictest/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ impl Location {
}
}

/// Configuration for retry behavior
#[derive(Debug, Clone, PartialEq)]
pub struct RetryConfig {
/// Number of retry attempts
pub attempts: usize,
/// Duration to wait between retries
pub backoff: Duration,
}

/// Expectation for a statement.
#[derive(Debug, Clone, PartialEq)]
pub enum StatementExpect {
Expand All @@ -86,7 +95,6 @@ pub enum QueryExpect<T: ColumnType> {
types: Vec<T>,
sort_mode: Option<SortMode>,
result_mode: Option<ResultMode>,
label: Option<String>,
results: Vec<String>,
},
/// Query should fail with the given error message.
Expand All @@ -100,7 +108,6 @@ impl<T: ColumnType> QueryExpect<T> {
types: Vec::new(),
sort_mode: None,
result_mode: None,
label: None,
results: Vec::new(),
}
}
Expand All @@ -125,6 +132,8 @@ pub enum Record<T: ColumnType> {
/// The SQL command.
sql: String,
expected: StatementExpect,
/// Optional retry configuration
retry: Option<RetryConfig>,
},
/// A query is an SQL command from which we expect to receive results. The result set might be
/// empty.
Expand All @@ -135,6 +144,8 @@ pub enum Record<T: ColumnType> {
/// The SQL command.
sql: String,
expected: QueryExpect<T>,
/// Optional retry configuration
retry: Option<RetryConfig>,
},
/// A system command is an external command that is to be executed by the shell. Currently it
/// must succeed and the output is ignored.
Expand Down Expand Up @@ -208,13 +219,22 @@ impl<T: ColumnType> std::fmt::Display for Record<T> {
connection: _,
sql,
expected,
retry,
} => {
write!(f, "statement ")?;
match expected {
StatementExpect::Ok => write!(f, "ok")?,
StatementExpect::Count(cnt) => write!(f, "count {cnt}")?,
StatementExpect::Error(err) => err.fmt_inline(f)?,
}
if let Some(retry) = retry {
write!(
f,
" retry {} backoff {}",
retry.attempts,
humantime::format_duration(retry.backoff)
)?;
}
writeln!(f)?;
// statement always end with a blank line
writeln!(f, "{sql}")?;
Expand All @@ -230,36 +250,37 @@ impl<T: ColumnType> std::fmt::Display for Record<T> {
connection: _,
sql,
expected,
retry,
} => {
write!(f, "query ")?;
match expected {
QueryExpect::Results {
types,
sort_mode,
label,
..
types, sort_mode, ..
} => {
write!(f, "{}", types.iter().map(|c| c.to_char()).join(""))?;
if let Some(sort_mode) = sort_mode {
write!(f, " {}", sort_mode.as_str())?;
}
if let Some(label) = label {
write!(f, " {label}")?;
}
}
QueryExpect::Error(err) => err.fmt_inline(f)?,
}
if let Some(retry) = retry {
write!(
f,
" retry {} backoff {}",
retry.attempts,
humantime::format_duration(retry.backoff)
)?;
}
writeln!(f)?;
writeln!(f, "{sql}")?;

match expected {
QueryExpect::Results { results, .. } => {
write!(f, "{}", RESULTS_DELIMITER)?;

for result in results {
write!(f, "\n{result}")?;
}
// query always ends with a blank line
writeln!(f)?
}
QueryExpect::Error(err) => err.fmt_multiline(f)?,
Expand Down Expand Up @@ -622,6 +643,8 @@ pub enum ParseErrorKind {
InvalidErrorMessage(String),
#[error("duplicated error messages after error` and under `----`")]
DuplicatedErrorMessage,
#[error("invalid retry config: {0:?}")]
InvalidRetryConfig(String),
#[error("statement should have no result, use `query` instead")]
StatementHasResults,
#[error("invalid duration: {0:?}")]
Expand Down Expand Up @@ -730,22 +753,28 @@ fn parse_inner<T: ColumnType>(loc: &Location, script: &str) -> Result<Vec<Record
records.push(Record::Connection(conn));
}
["statement", res @ ..] => {
let mut expected = match res {
["ok"] => StatementExpect::Ok,
["error", tokens @ ..] => {
let error = ExpectedError::parse_inline_tokens(tokens)
let (mut expected, res) = match res {
["ok", retry @ ..] => (StatementExpect::Ok, retry),
["error", err_tokens @ ..] => {
// NOTE: `statement error` can't be used with `retry` now because all the
// tokens after `error` are treated as error message.
let error = ExpectedError::parse_inline_tokens(err_tokens)
.map_err(|e| e.at(loc.clone()))?;
StatementExpect::Error(error)
(StatementExpect::Error(error), &[][..])
}
["count", count_str] => {
["count", count_str, retry @ ..] => {
let count = count_str.parse::<u64>().map_err(|_| {
ParseErrorKind::InvalidNumber((*count_str).into()).at(loc.clone())
})?;
StatementExpect::Count(count)
(StatementExpect::Count(count), retry)
}
_ => return Err(ParseErrorKind::InvalidLine(line.into()).at(loc)),
};

let retry = parse_retry_config(res).map_err(|e| e.at(loc.clone()))?;

let (sql, has_results) = parse_lines(&mut lines, &loc, Some(RESULTS_DELIMITER))?;

if has_results {
if let StatementExpect::Error(e) = &mut expected {
// If no inline error message is specified, it might be a multiline error.
Expand All @@ -765,14 +794,17 @@ fn parse_inner<T: ColumnType>(loc: &Location, script: &str) -> Result<Vec<Record
connection: std::mem::take(&mut connection),
sql,
expected,
retry,
});
}
["query", res @ ..] => {
let mut expected = match res {
["error", tokens @ ..] => {
let error = ExpectedError::parse_inline_tokens(tokens)
let (mut expected, res) = match res {
["error", err_tokens @ ..] => {
// NOTE: `query error` can't be used with `retry` now because all the tokens
// after `error` are treated as error message.
let error = ExpectedError::parse_inline_tokens(err_tokens)
.map_err(|e| e.at(loc.clone()))?;
QueryExpect::Error(error)
(QueryExpect::Error(error), &[][..])
}
[type_str, res @ ..] => {
let types = type_str
Expand All @@ -782,23 +814,24 @@ fn parse_inner<T: ColumnType>(loc: &Location, script: &str) -> Result<Vec<Record
.ok_or_else(|| ParseErrorKind::InvalidType(ch).at(loc.clone()))
})
.try_collect()?;
let sort_mode = res
.first()
.map(|&s| SortMode::try_from_str(s))
.transpose()
.map_err(|e| e.at(loc.clone()))?;
let label = res.get(1).map(|s| s.to_string());
QueryExpect::Results {
types,
sort_mode,
result_mode: None,
label,
results: Vec::new(),
}
let sort_mode = res.first().and_then(|&s| SortMode::try_from_str(s).ok()); // Could be `retry`

let retry_start = if sort_mode.is_some() { 1 } else { 0 };
(
QueryExpect::Results {
types,
sort_mode,
result_mode: None,
results: Vec::new(),
},
&res[retry_start..],
)
}
[] => QueryExpect::empty_results(),
[] => (QueryExpect::empty_results(), &[][..]),
};

let retry = parse_retry_config(res).map_err(|e| e.at(loc.clone()))?;

// The SQL for the query is found on second and subsequent lines of the record
// up to first line of the form "----" or until the end of the record.
let (sql, has_result) = parse_lines(&mut lines, &loc, Some(RESULTS_DELIMITER))?;
Expand Down Expand Up @@ -830,6 +863,7 @@ fn parse_inner<T: ColumnType>(loc: &Location, script: &str) -> Result<Vec<Record
connection: std::mem::take(&mut connection),
sql,
expected,
retry,
});
}
["system", "ok"] => {
Expand Down Expand Up @@ -982,6 +1016,77 @@ fn parse_multiline_error<'a>(
ExpectedError::Multiline(parse_multiple_result(lines))
}

/// Parse retry configuration from tokens
///
/// The retry configuration is optional and can be specified as:
///
/// ```text
/// ... retry 3 backoff 1s
/// ```
fn parse_retry_config(tokens: &[&str]) -> Result<Option<RetryConfig>, ParseErrorKind> {
if tokens.is_empty() {
return Ok(None);
}

let mut iter = tokens.iter().peekable();

// Check if we have retry clause
match iter.next() {
Some(&"retry") => {}
Some(token) => return Err(ParseErrorKind::UnexpectedToken(token.to_string())),
None => return Ok(None),
}

// Parse number of attempts
let attempts = match iter.next() {
Some(attempts_str) => attempts_str
.parse::<usize>()
.map_err(|_| ParseErrorKind::InvalidNumber(attempts_str.to_string()))?,
None => {
return Err(ParseErrorKind::InvalidRetryConfig(
"expected a positive number of attempts".to_string(),
))
}
};

if attempts == 0 {
return Err(ParseErrorKind::InvalidRetryConfig(
"attempt must be greater than 0".to_string(),
));
}

// Expect "backoff" keyword
match iter.next() {
Some(&"backoff") => {}
Some(token) => return Err(ParseErrorKind::UnexpectedToken(token.to_string())),
None => {
return Err(ParseErrorKind::InvalidRetryConfig(
"expected keyword backoff".to_string(),
))
}
}

// Parse backoff duration
let duration_str = match iter.next() {
Some(s) => s,
None => {
return Err(ParseErrorKind::InvalidRetryConfig(
"expected backoff duration".to_string(),
))
}
};

let backoff = humantime::parse_duration(duration_str)
.map_err(|_| ParseErrorKind::InvalidDuration(duration_str.to_string()))?;

// No more tokens should be present
if iter.next().is_some() {
return Err(ParseErrorKind::UnexpectedToken("extra tokens".to_string()));
}

Ok(Some(RetryConfig { attempts, backoff }))
}

#[cfg(test)]
mod tests {
use std::io::Write;
Expand Down Expand Up @@ -1092,6 +1197,7 @@ select * from foo;
connection: Connection::Default,
sql: "select * from foo;".to_string(),
expected: QueryExpect::empty_results(),
retry: None,
}]
);
}
Expand Down Expand Up @@ -1186,4 +1292,14 @@ select * from foo;
}
}
}

#[test]
fn test_statement_retry() {
parse_roundtrip::<DefaultColumnType>("../tests/retry/statement_retry.slt")
}

#[test]
fn test_query_retry() {
parse_roundtrip::<DefaultColumnType>("../tests/retry/query_retry.slt")
}
}
Loading

0 comments on commit f138b6c

Please sign in to comment.