-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(postgres sink): Add postgres sink #21248
base: master
Are you sure you want to change the base?
feat(postgres sink): Add postgres sink #21248
Conversation
// TODO: If a single item of the batch fails, the whole batch will fail its insert. | ||
// Is this intended behaviour? | ||
sqlx::query(&format!( | ||
"INSERT INTO {table} SELECT * FROM jsonb_populate_recordset(NULL::{table}, $1)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the table configuration can be a victim of sql injection, but in my opinion, we shouldn't avoid that kind of attacks at this level and the user should be responsible of ensuring that there is not sql injection in the config... The databend
sink works like this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I suppose sqlx
does not support parameterized table names? Does the query builder help here? If none of the above works, then we can leave as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that could help in this case. See this statement about sqlx
's query builder.
And we cannot use a variable bind ($
syntax) in postgres for table names, as the prepared statements are bounded to a query plan and it cannot change if the target table changes.
I think this is the better way to do it... sqlx
does not check for sql injection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Is it just the table
config param currently, so we can add a validation check when building the config verification. This becomes more complicated if table
becomes a template (per comment),
Edit: In that case we might be able to validate per event, or we could just add a notice in the docs to communicate that this sink isn't trying to be smart about security.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could just add a notice in the docs to communicate that this sink isn't trying to be smart about security.
Yes, I think we should do this at least for now...For example, Clickhouse and databend Vector sinks do not try to protect users from slq injection
pub endpoint: String, | ||
|
||
/// The table that data is inserted into. | ||
pub table: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make the table templatable? Like the clickhouse
sink. That would complicate the code a little bit (with KeyPartitioner
and so. If yes, I would like some guidance about it if possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a nice feature but not a must-have, we can do this incrementally. Once we finalized the rest of the comments we can come back to this if you are motivated to add this feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good for docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @jorgehermo9, thank you for this sizable contribution! On a high level, it looks great. I did a first review and left some comments. Don't hesitate to follow up, happy to discuss details.
pub endpoint: String, | ||
|
||
/// The table that data is inserted into. | ||
pub table: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a nice feature but not a must-have, we can do this incrementally. Once we finalized the rest of the comments we can come back to this if you are motivated to add this feature.
// TODO: If a single item of the batch fails, the whole batch will fail its insert. | ||
// Is this intended behaviour? | ||
sqlx::query(&format!( | ||
"INSERT INTO {table} SELECT * FROM jsonb_populate_recordset(NULL::{table}, $1)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I suppose sqlx
does not support parameterized table names? Does the query builder help here? If none of the above works, then we can leave as is.
src/sinks/postgres/config.rs
Outdated
/// The table that data is inserted into. | ||
pub table: String, | ||
|
||
/// The postgres connection pool size. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here it would be useful to explain what this pool is used for. Maybe a link to relevant docs would suffice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in 21f3ad3. Do you think it is enough?
I also have doubts about using a connection pool. Can the event batches be executed in parallel for this sink? I don't know the specifics of vector's internals...
vector/src/sinks/postgres/sink.rs
Line 24 in 21f3ad3
.batched(self.batch_settings.as_byte_size_config()) |
If the batches of events can be processed in parallel, then a connection pool is beneficial. If the batches are processed sequentially, then we should use a single postgres connection as a pooled connection does not have sense
Thank you very much for the review @pront! I'm kinda busy these days but I will revisit this as soon as I can :) |
@@ -67,6 +67,9 @@ pub mod mock; | |||
|
|||
pub mod stats; | |||
|
|||
#[cfg(test)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this cfg gate okay? I don't see any feature flag related to "any integration test". I would like to compile this module only if any (and no all) of the integration tests feature flags behind the feature flag all-integration-tests
are specified..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can technically write....
#[cfg(any(
test,
feature = "amqp-integration-tests",
feature = "appsignal-integration-tests",
feature = "aws-integration-tests",
feature = "axiom-integration-tests",
feature = "azure-integration-tests",
feature = "chronicle-integration-tests",
feature = "clickhouse-integration-tests",
feature = "databend-integration-tests",
feature = "datadog-agent-integration-tests",
feature = "datadog-logs-integration-tests",
feature = "datadog-metrics-integration-tests",
feature = "datadog-traces-integration-tests",
feature = "dnstap-integration-tests",
feature = "docker-logs-integration-tests",
feature = "es-integration-tests",
feature = "eventstoredb_metrics-integration-tests",
feature = "fluent-integration-tests",
feature = "gcp-cloud-storage-integration-tests",
feature = "gcp-integration-tests",
feature = "gcp-pubsub-integration-tests",
feature = "greptimedb-integration-tests",
feature = "http-client-integration-tests",
feature = "humio-integration-tests",
feature = "influxdb-integration-tests",
feature = "kafka-integration-tests",
feature = "logstash-integration-tests",
feature = "loki-integration-tests",
feature = "mongodb_metrics-integration-tests",
feature = "nats-integration-tests",
feature = "nginx-integration-tests",
feature = "opentelemetry-integration-tests",
feature = "postgresql_metrics-integration-tests",
feature = "prometheus-integration-tests",
feature = "pulsar-integration-tests",
feature = "redis-integration-tests",
feature = "splunk-integration-tests",
feature = "webhdfs-integration-tests"
))]
I don't know if there's a more elegant way to achieve the above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah 😄 I fear that we will forgot about maintaining that list synchronized with the IT test feature flags in Cargo.toml... If you prefer to specify that, I'm okay to include it in this PR, but as there is no much code in that module as of right now... maybe is it okay to just leave #[cfg(test)]
?
Hi @pront, I have more time available these days and will work in this PR. I have left to test how this sink behaves with metrics and traces and enable them if they work ok. Will update about that soon. I think I answered all of your comments, but correct me if I'm wrong I will also think about adding more integration tests (and if traces & metrics works, I'll add integration tests for that too)
Yes, I was talking about those two specifically. I agree to not plan in advance, we can look into that in future PRs |
🎉 Awesome, I will prioritize reviewing this PR. |
@pront I don't know how to propertly generate traces in Vector.. Is there any minimal configuration example to get traces similar to what I did with metrics? I can try to ingest traces within integration tests, but I'd like to do a manual testing as I did with metrics & logs |
} | ||
|
||
#[derive(Debug, Serialize, Deserialize, FromRow)] | ||
struct TestCounterMetric { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to create this struct to derive FromRow
so we can read from the sqlx
query result. Didn't find any other alternative...
I tried to deserialize the sqlx
result into a serde_json::Value
and then let metric: Metric = serde_json::from_value(value)
but it seems that serde_json::Value
does not implement sqlx::FromRow
...
This is the only workaround that I could get working.
Added support for metrics and traces in 1979627. Left a few |
} | ||
|
||
#[tokio::test] | ||
async fn insert_metric() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just added this one test for metrics ingestion. Do you think it is enough or should we think of more test cases?
I just used one type of metric (absolute counter), because I didn't want to bloat these tests... Should I test with more types (histogram, gauges...) or not?
type Response = PostgresResponse; | ||
|
||
fn is_retriable_error(&self, _error: &Self::Error) -> bool { | ||
// TODO: Implement this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know what type of errors should consider Vector as retryable... Query timeout?
sqlx::Error
does have an Io
variant https://docs.rs/sqlx/latest/sqlx/enum.Error.html#variant.Io and a pool timeout variant https://docs.rs/sqlx/latest/sqlx/enum.Error.html#variant.PoolTimedOut, maybe we can retry in those two cases exclusively?
@@ -231,6 +234,10 @@ pub fn temp_dir() -> PathBuf { | |||
path.join(dir_name) | |||
} | |||
|
|||
pub fn temp_table() -> String { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts about the name of this function?
.unwrap(); | ||
let create_table_sql = format!( | ||
"CREATE TABLE {table} (agent_version TEXT, env TEXT, error_tps BIGINT, host TEXT, | ||
language TEXT, spans trace_span[], target_tps BIGINT, trace_id BIGINT)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I typed the span (TestTraceSpan
struct) instead of clearing a jsonb[]
column. I wanted to test insertions in postgres composite types (https://www.postgresql.org/docs/current/rowtypes.html).
Is is okay to leave the test of this feature (inserting in composite types) in the traces tests, or should I create another test to explicitly test this...? For example, it would be a test similar to insert_single_event
, but with field that is a composite type (instead of payload: serde_json::Value
) in a very similar way to what is done here with trace_span
postgres type.
I think it is okay to just test the composite type inserting here... But maybe we should separate concerns and use here spans JSONB[]
and create a simpler test for composite type inserting?
I don't know if I'm explaining well :/
|
||
#[derive(Debug, Serialize, Deserialize, sqlx::Type, FromRow)] | ||
#[sqlx(type_name = "trace_span")] | ||
struct TestTraceSpan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created this type because of https://github.com/vectordotdev/vector/pull/21248/files#r1948104371
Resolve this conversation once read!
fn create_span(resource: &str) -> ObjectMap { | ||
ObjectMap::from([ | ||
("service".into(), Value::from("a_service")), | ||
("name".into(), Value::from("a_name")), | ||
("resource".into(), Value::from(resource)), | ||
("type".into(), Value::from("a_type")), | ||
("trace_id".into(), Value::Integer(123)), | ||
("span_id".into(), Value::Integer(456)), | ||
("parent_id".into(), Value::Integer(789)), | ||
("start".into(), Value::from(timestamp())), | ||
("duration".into(), Value::Integer(1_000_000)), | ||
("error".into(), Value::Integer(404)), | ||
( | ||
"meta".into(), | ||
Value::Object(ObjectMap::from([ | ||
("foo".into(), Value::from("bar")), | ||
("bar".into(), Value::from("baz")), | ||
])), | ||
), | ||
( | ||
"metrics".into(), | ||
Value::Object(ObjectMap::from([ | ||
("a_metric".into(), Value::Float(NotNan::new(0.577).unwrap())), | ||
("_top_level".into(), Value::Float(NotNan::new(1.0).unwrap())), | ||
])), | ||
), | ||
]) | ||
} | ||
|
||
pub fn create_trace(resource: &str) -> TraceEvent { | ||
let mut t = TraceEvent::default(); | ||
t.insert(event_path!("language"), "a_language"); | ||
t.insert(event_path!("agent_version"), "1.23456"); | ||
t.insert(event_path!("host"), "a_host"); | ||
t.insert(event_path!("env"), "an_env"); | ||
t.insert(event_path!("trace_id"), Value::Integer(123)); | ||
t.insert(event_path!("target_tps"), Value::Integer(10)); | ||
t.insert(event_path!("error_tps"), Value::Integer(5)); | ||
t.insert( | ||
event_path!("spans"), | ||
Value::Array(vec![Value::from(create_span(resource))]), | ||
); | ||
t | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copied those two utility methods from
vector/src/sinks/datadog/traces/tests.rs
Line 93 in 9593581
pub fn simple_trace_event(resource: String) -> TraceEvent { |
start
field in the create_span
function, we should use the microsecond-resolution timestamp due to postgres limitation).
Should I move them to other module such as test_util
and reuse them in those two places? I didn't see utility methods for event (metric, traces or logs) creation and it was always done adhoc in each test/IT test module... Thats the reason why I copied them
Derived from e9eb688 I manually tested composite type insertion with this config [sources.demo_logs]
type = "demo_logs"
format = "apache_common"
[transforms.structured]
type = "remap"
inputs = ["demo_logs"]
source = """
. = {
"message": .message,
"structured_field": {
"string": "value",
"integer": 1,
"float": 1.1,
"boolean": true,
}
}
"""
[sinks.postgres]
type = "postgres"
inputs = ["structured"]
endpoint = "postgres://postgres:password123@localhost/test"
table = "structured_table" and those sql statements: CREATE TYPE structured AS (string TEXT, integer BIGINT, float real, boolean BOOLEAN);
CREATE TABLE structured_table(message text, structured_field structured); we can insert pg's composite types (records) https://www.postgresql.org/docs/current/rowtypes.html from nested fields, instead of having to use I would like to document also an example of this in the |
Closes #15765
This PR is not 100% ready by my side and there will likely be a few things wrong, but had a few questions and wanted to know if the direction seems right... So I would like an initial round of review if possible.
I tested the sink and it seems to be working, but I lack a lot of knowledge about Vector's internals and I'm not sure if the implementation is okay.
I inspired a lot from the
databend
andclickhouse
sinks, but left a few questions as TODOs in the source. I found this sink a bit different from the others, as the others had therequest_builder
thing and encoding the payload in bytes (as most of the sinks are http based).. But I didn't think that fitted well in this case, as in thesqlx
API I should wrap the events with thesqlx::types::Json
type and that will do all the encoding withserde
internally.If someone want to manually test it, I used this Vector config:
Run postgres server with
podman run -e POSTGRES_PASSWORD=postgres -p 5432:5432 docker.io/postgres
and execute the following with
psql -h localhost -U postgres
:then execute
\c test
and last:
And then, you will see logs in that table: