Skip to content

Commit

Permalink
proxy: pass neon options in deep object format (#6068)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Conrad Ludgate <conradludgate@gmail.com>
  • Loading branch information
prepor and conradludgate authored Dec 8, 2023
1 parent e640bc7 commit df1f8e1
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 46 deletions.
33 changes: 23 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ pin-project-lite = "0.2"
prometheus = {version = "0.13", default_features=false, features = ["process"]} # removes protobuf dependency
prost = "0.11"
rand = "0.8"
regex = "1.4"
regex = "1.10.2"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] }
reqwest-tracing = { version = "0.4.0", features = ["opentelemetry_0_19"] }
reqwest-middleware = "0.2.0"
Expand Down
9 changes: 3 additions & 6 deletions proxy/src/auth/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::{
auth::password_hack::parse_endpoint_param,
error::UserFacingError,
proxy::{neon_options, NUM_CONNECTION_ACCEPTED_BY_SNI},
proxy::{neon_options_str, NUM_CONNECTION_ACCEPTED_BY_SNI},
};
use itertools::Itertools;
use pq_proto::StartupMessageParams;
Expand Down Expand Up @@ -140,7 +140,7 @@ impl ClientCredentials {
let cache_key = format!(
"{}{}",
project.as_deref().unwrap_or(""),
neon_options(params).unwrap_or("".to_string())
neon_options_str(params)
)
.into();

Expand Down Expand Up @@ -406,10 +406,7 @@ mod tests {
let peer_addr = IpAddr::from([127, 0, 0, 1]);
let creds = ClientCredentials::parse(&options, sni, common_names, peer_addr)?;
assert_eq!(creds.project.as_deref(), Some("project"));
assert_eq!(
creds.cache_key,
"projectneon_endpoint_type:read_write neon_lsn:0/2"
);
assert_eq!(creds.cache_key, "projectendpoint_type:read_write lsn:0/2");

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions proxy/src/compute.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
auth::parse_endpoint_param, cancellation::CancelClosure, console::errors::WakeComputeError,
error::UserFacingError, proxy::is_neon_param,
error::UserFacingError, proxy::neon_option,
};
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
Expand Down Expand Up @@ -275,7 +275,7 @@ fn filtered_options(params: &StartupMessageParams) -> Option<String> {
#[allow(unstable_name_collisions)]
let options: String = params
.options_raw()?
.filter(|opt| parse_endpoint_param(opt).is_none() && !is_neon_param(opt))
.filter(|opt| parse_endpoint_param(opt).is_none() && neon_option(opt).is_none())
.intersperse(" ") // TODO: use impl from std once it's stabilized
.collect();

Expand Down
13 changes: 12 additions & 1 deletion proxy/src/console/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,18 @@ pub struct ConsoleReqExtra<'a> {
pub session_id: uuid::Uuid,
/// Name of client application, if set.
pub application_name: Option<&'a str>,
pub options: Option<&'a str>,
pub options: Vec<(String, String)>,
}

impl<'a> ConsoleReqExtra<'a> {
// https://swagger.io/docs/specification/serialization/ DeepObject format
// paramName[prop1]=value1&paramName[prop2]=value2&....
pub fn options_as_deep_object(&self) -> Vec<(String, String)> {
self.options
.iter()
.map(|(k, v)| (format!("options[{}]", k), v.to_string()))
.collect()
}
}

/// Auth secret which is managed by the cloud.
Expand Down
13 changes: 9 additions & 4 deletions proxy/src/console/provider/neon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl Api {
) -> Result<NodeInfo, WakeComputeError> {
let request_id = uuid::Uuid::new_v4().to_string();
async {
let request = self
let mut request_builder = self
.endpoint
.get("proxy_wake_compute")
.header("X-Request-ID", &request_id)
Expand All @@ -115,9 +115,14 @@ impl Api {
.query(&[
("application_name", extra.application_name),
("project", Some(&creds.endpoint)),
("options", extra.options),
])
.build()?;
]);

request_builder = if extra.options.is_empty() {
request_builder
} else {
request_builder.query(&extra.options_as_deep_object())
};
let request = request_builder.build()?;

info!(url = request.url().as_str(), "sending http request");
let start = Instant::now();
Expand Down
37 changes: 19 additions & 18 deletions proxy/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -968,12 +968,10 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
allow_self_signed_compute,
} = self;

let console_options = neon_options(params);

let extra = console::ConsoleReqExtra {
session_id, // aka this connection's id
application_name: params.get("application_name"),
options: console_options.as_deref(),
options: neon_options(params),
};

let mut latency_timer = LatencyTimer::new(mode.protocol_label());
Expand Down Expand Up @@ -1033,26 +1031,29 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
}
}

pub fn neon_options(params: &StartupMessageParams) -> Option<String> {
pub fn neon_options(params: &StartupMessageParams) -> Vec<(String, String)> {
#[allow(unstable_name_collisions)]
let options: String = params
.options_raw()?
.filter(|opt| is_neon_param(opt))
.sorted() // we sort it to use as cache key
.intersperse(" ") // TODO: use impl from std once it's stabilized
.collect();

// Don't even bother with empty options.
if options.is_empty() {
return None;
match params.options_raw() {
Some(options) => options.filter_map(neon_option).collect(),
None => vec![],
}
}

Some(options)
pub fn neon_options_str(params: &StartupMessageParams) -> String {
#[allow(unstable_name_collisions)]
neon_options(params)
.iter()
.map(|(k, v)| format!("{}:{}", k, v))
.sorted() // we sort it to use as cache key
.intersperse(" ".to_owned())
.collect()
}

pub fn is_neon_param(bytes: &str) -> bool {
pub fn neon_option(bytes: &str) -> Option<(String, String)> {
static RE: OnceCell<Regex> = OnceCell::new();
RE.get_or_init(|| Regex::new(r"^neon_\w+:").unwrap());
let re = RE.get_or_init(|| Regex::new(r"^neon_(\w+):(.+)").unwrap());

RE.get().unwrap().is_match(bytes)
let cap = re.captures(bytes)?;
let (_, [k, v]) = cap.extract();
Some((k.to_owned(), v.to_owned()))
}
2 changes: 1 addition & 1 deletion proxy/src/proxy/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ fn helper_create_connect_info(
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some("TEST"),
options: None,
options: vec![],
};
let creds = auth::BackendType::Test(mechanism);
(cache, extra, creds)
Expand Down
2 changes: 1 addition & 1 deletion proxy/src/serverless/conn_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ async fn connect_to_compute(
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some(APP_NAME),
options: console_options.as_deref(),
options: console_options,
};
// TODO(anna): this is a bit hacky way, consider using console notification listener.
if !config.disable_ip_check_for_http {
Expand Down
6 changes: 4 additions & 2 deletions workspace_hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ num-traits = { version = "0.2", features = ["i128"] }
prost = { version = "0.11" }
rand = { version = "0.8", features = ["small_rng"] }
regex = { version = "1" }
regex-syntax = { version = "0.7" }
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
regex-syntax = { version = "0.8" }
reqwest = { version = "0.11", default-features = false, features = ["blocking", "default-tls", "json", "multipart", "rustls-tls", "stream"] }
ring = { version = "0.16", features = ["std"] }
rustls = { version = "0.21", features = ["dangerous_configuration"] }
Expand Down Expand Up @@ -90,7 +91,8 @@ memchr = { version = "2" }
nom = { version = "7" }
prost = { version = "0.11" }
regex = { version = "1" }
regex-syntax = { version = "0.7" }
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
regex-syntax = { version = "0.8" }
serde = { version = "1", features = ["alloc", "derive"] }
syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit"] }
syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "full", "visit", "visit-mut"] }
Expand Down

1 comment on commit df1f8e1

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2216 tests run: 2126 passed, 0 failed, 90 skipped (full report)


Flaky tests (2)

Postgres 16

Postgres 14

  • test_statvfs_pressure_usage: release

Code coverage (full report)

  • functions: 54.8% (9367 of 17080 functions)
  • lines: 82.0% (54387 of 66301 lines)

The comment gets automatically updated with the latest test results
df1f8e1 at 2023-12-08T19:39:49.997Z :recycle:

Please sign in to comment.