Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Notices on transactions #2443

Merged
merged 8 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,16 @@ and asserts that we receive the appropriate backend messages. These tests ensure
postgres protocol compatability as well as allowing us to assert the contents of
error and notice messages.

Test cases can be found in `./testdata/pgprototest`.
Test cases can be found in `./testdata/pgprototest` and
`./testdata/pgprototest_glaredb`.

The `pgprototest` directory is for test cases to assert that GlareDB matches
Postgres exactly, and the expected output should be generated from an actual
Postgres instance.

The `pgprototest_glaredb` directory contains test cases that do match Postgres
output exactly either because of an incomplete feature, or differing behavior.
The expected output for these tests need to be hand-crafted.

Tests can be ran with the `pgprototest` command:

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/datafusion_ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ tracing = { workspace = true }
thiserror.workspace = true
decimal = { path = "../decimal" }
protogen = { path = "../protogen" }
pgrepr = { path = "../pgrepr" }
futures = { workspace = true }
parking_lot = "0.12.1"
bson = "2.7.0"
Expand Down
2 changes: 2 additions & 0 deletions crates/datafusion_ext/src/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use constants::*;
use datafusion::arrow::datatypes::{DataType, Field};
use datafusion::config::{ConfigExtension, ExtensionOptions};
use datafusion::scalar::ScalarValue;
use pgrepr::notice::NoticeSeverity;
use utils::*;

use datafusion::variable::{VarProvider, VarType};
Expand Down Expand Up @@ -79,6 +80,7 @@ impl SessionVars {
datestyle: String,
transaction_isolation: String,
search_path: Vec<String>,
client_min_messages: NoticeSeverity,
enable_debug_datasources: bool,
force_catalog_refresh: bool,
glaredb_version: String,
Expand Down
10 changes: 10 additions & 0 deletions crates/datafusion_ext/src/vars/constants.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::*;

use pgrepr::notice::NoticeSeverity;

// TODO: Decide proper postgres version to spoof/support
pub(super) const SERVER_VERSION: ServerVar<str> = ServerVar {
name: "server_version",
Expand Down Expand Up @@ -74,6 +76,14 @@ pub(super) static SEARCH_PATH: Lazy<ServerVar<[String]>> = Lazy::new(|| ServerVa
description: "Search path for schemas",
});

pub(super) const CLIENT_MIN_MESSAGES: ServerVar<NoticeSeverity> = ServerVar {
name: "client_min_messages",
value: &NoticeSeverity::Notice,
group: "postgres",
user_configurable: true,
description: "Controls which messages are sent to the client, defaults NOTICE",
};

pub(super) static GLAREDB_VERSION_OWNED: Lazy<String> =
Lazy::new(|| format!("v{}", env!("CARGO_PKG_VERSION")));
pub(super) static GLAREDB_VERSION: Lazy<ServerVar<str>> = Lazy::new(|| ServerVar {
Expand Down
7 changes: 7 additions & 0 deletions crates/datafusion_ext/src/vars/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::config::ConfigEntry;
use datafusion::error::Result;
use datafusion::variable::VarType;
use pgrepr::notice::NoticeSeverity;
use std::borrow::Borrow;

use super::constants::*;
Expand Down Expand Up @@ -32,6 +33,7 @@ pub struct SessionVarsInner {
pub datestyle: SessionVar<str>,
pub transaction_isolation: SessionVar<str>,
pub search_path: SessionVar<[String]>,
pub client_min_messages: SessionVar<NoticeSeverity>,
pub enable_debug_datasources: SessionVar<bool>,
pub force_catalog_refresh: SessionVar<bool>,
pub glaredb_version: SessionVar<str>,
Expand Down Expand Up @@ -82,6 +84,8 @@ impl SessionVarsInner {
Ok(&self.transaction_isolation)
} else if name.eq_ignore_ascii_case(SEARCH_PATH.name) {
Ok(&self.search_path)
} else if name.eq_ignore_ascii_case(CLIENT_MIN_MESSAGES.name) {
Ok(&self.client_min_messages)
} else if name.eq_ignore_ascii_case(ENABLE_DEBUG_DATASOURCES.name) {
Ok(&self.enable_debug_datasources)
} else if name.eq_ignore_ascii_case(FORCE_CATALOG_REFRESH.name) {
Expand Down Expand Up @@ -139,6 +143,8 @@ impl SessionVarsInner {
self.transaction_isolation.set_from_str(val, setter)
} else if name.eq_ignore_ascii_case(SEARCH_PATH.name) {
self.search_path.set_from_str(val, setter)
} else if name.eq_ignore_ascii_case(CLIENT_MIN_MESSAGES.name) {
self.client_min_messages.set_from_str(val, setter)
} else if name.eq_ignore_ascii_case(ENABLE_DEBUG_DATASOURCES.name) {
self.enable_debug_datasources.set_from_str(val, setter)
} else if name.eq_ignore_ascii_case(FORCE_CATALOG_REFRESH.name) {
Expand Down Expand Up @@ -214,6 +220,7 @@ impl Default for SessionVarsInner {
datestyle: SessionVar::new(&DATESTYLE),
transaction_isolation: SessionVar::new(&TRANSACTION_ISOLATION),
search_path: SessionVar::new(&SEARCH_PATH),
client_min_messages: SessionVar::new(&CLIENT_MIN_MESSAGES),
enable_debug_datasources: SessionVar::new(&ENABLE_DEBUG_DATASOURCES),
force_catalog_refresh: SessionVar::new(&FORCE_CATALOG_REFRESH),
glaredb_version: SessionVar::new(&GLAREDB_VERSION),
Expand Down
13 changes: 13 additions & 0 deletions crates/datafusion_ext/src/vars/value.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use pgrepr::notice::NoticeSeverity;

use super::*;

pub trait Value: ToOwned + std::fmt::Debug {
fn try_parse(s: &str) -> Option<Self::Owned>;
fn format(&self) -> String;
Expand Down Expand Up @@ -111,3 +114,13 @@ impl Value for Dialect {
}
}
}

impl Value for NoticeSeverity {
fn try_parse(s: &str) -> Option<NoticeSeverity> {
NoticeSeverity::from_str(s).ok()
}

fn format(&self) -> String {
self.to_string()
}
}
20 changes: 20 additions & 0 deletions crates/glaredb/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::StreamExt;
use pgrepr::format::Format;
use pgrepr::notice::NoticeSeverity;
use reedline::{FileBackedHistory, Reedline, Signal};
use std::collections::HashMap;

Expand Down Expand Up @@ -160,6 +161,25 @@ impl LocalSession {
Ok(_) => {}
Err(e) => println!("Error: {e}"),
};

// Print out notices as needed.
//
// Note this isn't being called in the above `execute`
// function since that can be called in a
// non-interactive fashion which and having notice
// messages interspersed with the output would be
// annoying.
for notice in self.sess.take_notices() {
eprintln!(
"{}: {}",
match notice.severity {
s @ (NoticeSeverity::Warning | NoticeSeverity::Error) =>
s.to_string().red(),
other => other.to_string().blue(),
},
notice.message
);
}
}
},
Ok(Signal::CtrlD) => break,
Expand Down
4 changes: 3 additions & 1 deletion crates/pgprototest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ mod proto;
#[clap(about = "Data driven postgres protocol testing", long_about = None)]
struct Cli {
/// The directory containing the test files.
///
/// Maybe specified multiple times to run tests from multiple directories.
#[clap(long)]
dir: String,
dir: Vec<String>,
/// Address of the postgres compatible server.
#[clap(long)]
addr: String,
Expand Down
14 changes: 10 additions & 4 deletions crates/pgprototest/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,15 @@ impl TryFrom<(char, Message)> for SerializedMessage {
.collect()?,
})?,
),
Message::NoticeResponse(_msg) => {
("NoticeResponse", serde_json::to_string(&NoticeResponse {})?)
}
Message::NoticeResponse(msg) => (
"NoticeResponse",
serde_json::to_string(&ErrorResponse {
fields: msg
.fields()
.map(|field| Ok(field.value().to_string()))
.collect()?,
})?,
),
_ => return Err(anyhow!("unhandle message, type identifier: {}", id)),
};
Ok(SerializedMessage {
Expand Down Expand Up @@ -177,5 +183,5 @@ pub struct ErrorResponse {

#[derive(Serialize)]
pub struct NoticeResponse {
// TODO: Fill me in. Currently we don't assert notices.
pub fields: Vec<String>,
}
34 changes: 19 additions & 15 deletions crates/pgprototest/src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,33 @@ use std::time::{Duration, Instant};
/// Each file will open a unique connection, with each test case in that file
/// being ran sequentially using that connection.
pub fn walk(
dir: String,
dirs: Vec<String>,
addr: String,
options: HashMap<String, String>,
password: Option<String>,
timeout: Duration,
verbose: bool,
) {
datadriven::walk(&dir, |file| {
let mut conn = PgConn::connect(&addr, &options, &password, timeout).unwrap();
file.run(|testcase| {
if verbose {
println!();
println!("--- TESTCASE ({}) ---", testcase.directive);
println!("{}", testcase.input);
}
for dir in dirs {
datadriven::walk(&dir, |file| {
let mut conn = PgConn::connect(&addr, &options, &password, timeout).unwrap();
file.run(|testcase| {
if verbose {
println!();
println!("--- TESTCASE ({}) ---", testcase.directive);
println!("{}", testcase.input);
}

match testcase.directive.as_str() {
"send" => run_send(&mut conn, &testcase.args, &testcase.input, verbose),
"until" => run_until(&mut conn, &testcase.args, &testcase.input, timeout, verbose),
unknown => panic!("unknown directive: {}", unknown),
}
match testcase.directive.as_str() {
"send" => run_send(&mut conn, &testcase.args, &testcase.input, verbose),
"until" => {
run_until(&mut conn, &testcase.args, &testcase.input, timeout, verbose)
}
unknown => panic!("unknown directive: {}", unknown),
}
});
});
});
}
}

/// Run a "send" directive.
Expand Down
3 changes: 3 additions & 0 deletions crates/pgrepr/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ pub enum PgReprError {

#[error("Internal error: {0}")]
InternalError(String),

#[error("{0}")]
String(String),
tychoish marked this conversation as resolved.
Show resolved Hide resolved
}

pub type Result<T, E = PgReprError> = std::result::Result<T, E>;
1 change: 1 addition & 0 deletions crates/pgrepr/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod error;
pub mod format;
pub mod notice;
pub mod oid;
pub mod reader;
pub mod scalar;
Expand Down
Loading
Loading