Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(postgres sink): Add postgres sink #21248

Open
wants to merge 43 commits into
base: master
Choose a base branch
from

Conversation

jorgehermo9
Copy link
Contributor

@jorgehermo9 jorgehermo9 commented Sep 9, 2024

Closes #15765

This PR is not 100% ready by my side and there will likely be a few things wrong, but had a few questions and wanted to know if the direction seems right... So I would like an initial round of review if possible.

I tested the sink and it seems to be working, but I lack a lot of knowledge about Vector's internals and I'm not sure if the implementation is okay.

I inspired a lot from the databend and clickhouse sinks, but left a few questions as TODOs in the source. I found this sink a bit different from the others, as the others had the request_builder thing and encoding the payload in bytes (as most of the sinks are http based).. But I didn't think that fitted well in this case, as in the sqlx API I should wrap the events with the sqlx::types::Json type and that will do all the encoding with serde internally.

If someone want to manually test it, I used this Vector config:

[sources.demo_logs]
type = "demo_logs"
format = "apache_common"

[transforms.payload]
type = "remap"
inputs = ["demo_logs"]
source = """
.payload = .
"""

[sinks.postgres]
type = "postgres"
inputs = ["payload"]
endpoint = "postgres://postgres:postgres@localhost/test"
table = "test"

Run postgres server with podman run -e POSTGRES_PASSWORD=postgres -p 5432:5432 docker.io/postgres

and execute the following with psql -h localhost -U postgres:

CREATE DATABASE test;

then execute \c test
and last:

CREATE TABLE test (message TEXT, timestamp TIMESTAMP WITH TIME ZONE, payload JSONB);

And then, you will see logs in that table:

image

@jorgehermo9 jorgehermo9 requested a review from a team as a code owner September 9, 2024 22:33
@github-actions github-actions bot added domain: sinks Anything related to the Vector's sinks domain: ci Anything related to Vector's CI environment labels Sep 9, 2024
@jorgehermo9 jorgehermo9 requested review from a team as code owners September 9, 2024 22:38
@github-actions github-actions bot added the domain: external docs Anything related to Vector's external, public documentation label Sep 9, 2024
// TODO: If a single item of the batch fails, the whole batch will fail its insert.
// Is this intended behaviour?
sqlx::query(&format!(
"INSERT INTO {table} SELECT * FROM jsonb_populate_recordset(NULL::{table}, $1)"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the table configuration can be a victim of sql injection, but in my opinion, we shouldn't avoid that kind of attacks at this level and the user should be responsible of ensuring that there is not sql injection in the config... The databend sink works like this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I suppose sqlx does not support parameterized table names? Does the query builder help here? If none of the above works, then we can leave as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that could help in this case. See this statement about sqlx's query builder.

And we cannot use a variable bind ($ syntax) in postgres for table names, as the prepared statements are bounded to a query plan and it cannot change if the target table changes.

I think this is the better way to do it... sqlx does not check for sql injection

Copy link
Member

@pront pront Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Is it just the table config param currently, so we can add a validation check when building the config verification. This becomes more complicated if table becomes a template (per comment),

Edit: In that case we might be able to validate per event, or we could just add a notice in the docs to communicate that this sink isn't trying to be smart about security.

Copy link
Contributor Author

@jorgehermo9 jorgehermo9 Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could just add a notice in the docs to communicate that this sink isn't trying to be smart about security.

Yes, I think we should do this at least for now...For example, Clickhouse and databend Vector sinks do not try to protect users from slq injection

pub endpoint: String,

/// The table that data is inserted into.
pub table: String,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make the table templatable? Like the clickhouse sink. That would complicate the code a little bit (with KeyPartitioner and so. If yes, I would like some guidance about it if possible

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a nice feature but not a must-have, we can do this incrementally. Once we finalized the rest of the comments we can come back to this if you are motivated to add this feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay!

Copy link
Contributor

@aliciascott aliciascott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good for docs

@pront pront self-assigned this Oct 15, 2024
Copy link
Member

@pront pront left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jorgehermo9, thank you for this sizable contribution! On a high level, it looks great. I did a first review and left some comments. Don't hesitate to follow up, happy to discuss details.

scripts/integration/postgres/test.yaml Outdated Show resolved Hide resolved
src/sinks/postgres/config.rs Show resolved Hide resolved
pub endpoint: String,

/// The table that data is inserted into.
pub table: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a nice feature but not a must-have, we can do this incrementally. Once we finalized the rest of the comments we can come back to this if you are motivated to add this feature.

// TODO: If a single item of the batch fails, the whole batch will fail its insert.
// Is this intended behaviour?
sqlx::query(&format!(
"INSERT INTO {table} SELECT * FROM jsonb_populate_recordset(NULL::{table}, $1)"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I suppose sqlx does not support parameterized table names? Does the query builder help here? If none of the above works, then we can leave as is.

/// The table that data is inserted into.
pub table: String,

/// The postgres connection pool size.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it would be useful to explain what this pool is used for. Maybe a link to relevant docs would suffice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in 21f3ad3. Do you think it is enough?

I also have doubts about using a connection pool. Can the event batches be executed in parallel for this sink? I don't know the specifics of vector's internals...

.batched(self.batch_settings.as_byte_size_config())

If the batches of events can be processed in parallel, then a connection pool is beneficial. If the batches are processed sequentially, then we should use a single postgres connection as a pooled connection does not have sense

src/sinks/postgres/integration_tests.rs Outdated Show resolved Hide resolved
src/sinks/postgres/integration_tests.rs Outdated Show resolved Hide resolved
@jorgehermo9
Copy link
Contributor Author

Thank you very much for the review @pront! I'm kinda busy these days but I will revisit this as soon as I can :)

@github-actions github-actions bot added the domain: sources Anything related to the Vector's sources label Feb 7, 2025
@@ -67,6 +67,9 @@ pub mod mock;

pub mod stats;

#[cfg(test)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this cfg gate okay? I don't see any feature flag related to "any integration test". I would like to compile this module only if any (and no all) of the integration tests feature flags behind the feature flag all-integration-tests are specified..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can technically write....

#[cfg(any(
    test,
    feature = "amqp-integration-tests",
    feature = "appsignal-integration-tests",
    feature = "aws-integration-tests",
    feature = "axiom-integration-tests",
    feature = "azure-integration-tests",
    feature = "chronicle-integration-tests",
    feature = "clickhouse-integration-tests",
    feature = "databend-integration-tests",
    feature = "datadog-agent-integration-tests",
    feature = "datadog-logs-integration-tests",
    feature = "datadog-metrics-integration-tests",
    feature = "datadog-traces-integration-tests",
    feature = "dnstap-integration-tests",
    feature = "docker-logs-integration-tests",
    feature = "es-integration-tests",
    feature = "eventstoredb_metrics-integration-tests",
    feature = "fluent-integration-tests",
    feature = "gcp-cloud-storage-integration-tests",
    feature = "gcp-integration-tests",
    feature = "gcp-pubsub-integration-tests",
    feature = "greptimedb-integration-tests",
    feature = "http-client-integration-tests",
    feature = "humio-integration-tests",
    feature = "influxdb-integration-tests",
    feature = "kafka-integration-tests",
    feature = "logstash-integration-tests",
    feature = "loki-integration-tests",
    feature = "mongodb_metrics-integration-tests",
    feature = "nats-integration-tests",
    feature = "nginx-integration-tests",
    feature = "opentelemetry-integration-tests",
    feature = "postgresql_metrics-integration-tests",
    feature = "prometheus-integration-tests",
    feature = "pulsar-integration-tests",
    feature = "redis-integration-tests",
    feature = "splunk-integration-tests",
    feature = "webhdfs-integration-tests"
))]

I don't know if there's a more elegant way to achieve the above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah 😄 I fear that we will forgot about maintaining that list synchronized with the IT test feature flags in Cargo.toml... If you prefer to specify that, I'm okay to include it in this PR, but as there is no much code in that module as of right now... maybe is it okay to just leave #[cfg(test)]?

@jorgehermo9
Copy link
Contributor Author

jorgehermo9 commented Feb 7, 2025

Hi @pront, I have more time available these days and will work in this PR. I have left to test how this sink behaves with metrics and traces and enable them if they work ok. Will update about that soon. I think I answered all of your comments, but correct me if I'm wrong

I will also think about adding more integration tests (and if traces & metrics works, I'll add integration tests for that too)

Are we talking about timescaledb and risingwave? Are there more? I am not familiar with all the nuances and I don't think we can plan for all extensions in advance. The proposed flavour (or flavor) property can be added incrementally without breaking the behavior of the sink.

Yes, I was talking about those two specifically. I agree to not plan in advance, we can look into that in future PRs

@pront
Copy link
Member

pront commented Feb 7, 2025

Hi @pront, I have more time available these days and will work in this PR. I have left to test how this sink behaves with metrics and traces and enable if its works ok. Will update about that soon. I think I answered all of your comments, but correct me if I'm wrong

🎉 Awesome, I will prioritize reviewing this PR.

@jorgehermo9
Copy link
Contributor Author

jorgehermo9 commented Feb 8, 2025

I tried with metrics (I have left to test with traces)

with this config

[sources.internal_metrics]
type = "internal_metrics"

[sinks.postgres]
type = "postgres"
inputs = ["internal_metrics"]
endpoint = "postgres://postgres:password123@localhost/test"
table = "metrics"

and this commands to spin-up a local postgres and connect to it:

docker run -e POSTGRES_PASSWORD=password123 -p 5432:5432 docker.io/postgres
psql -h localhost -U postgres

and this sql statements inside psql:

CREATE DATABASE test;
\c test
CREATE TABLE metrics(name TEXT, namespace TEXT, tags JSONB, timestamp TIMESTAMP WITH TIME ZONE, kind TEXT, counter JSONB, gauge JSONB, aggregated_histogram JSONB);

The table is populated with the given metrics and we can query it:
image

I had to create a database with the union of the fields of all metrics (counter JSONB, gauge JSONB, aggregated_histogram JSONB) but once the table parameter is templatable, we could have different postgres tables per metric type (one for counters, other for gauges, etc) and that would be very instersting!

this looks very promising ;)

I will update once I do more tests!

@jorgehermo9
Copy link
Contributor Author

jorgehermo9 commented Feb 8, 2025

@pront I don't know how to propertly generate traces in Vector.. Is there any minimal configuration example to get traces similar to what I did with metrics?

I can try to ingest traces within integration tests, but I'd like to do a manual testing as I did with metrics & logs

EDIT: added traces tests in c525a45 and e9eb688

}

#[derive(Debug, Serialize, Deserialize, FromRow)]
struct TestCounterMetric {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to create this struct to derive FromRow so we can read from the sqlx query result. Didn't find any other alternative...

I tried to deserialize the sqlx result into a serde_json::Value and then let metric: Metric = serde_json::from_value(value) but it seems that serde_json::Value does not implement sqlx::FromRow...
This is the only workaround that I could get working.

@jorgehermo9
Copy link
Contributor Author

Added support for metrics and traces in 1979627. Left a few TODOs for the review and Im ready for another review round!

}

#[tokio::test]
async fn insert_metric() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added this one test for metrics ingestion. Do you think it is enough or should we think of more test cases?

I just used one type of metric (absolute counter), because I didn't want to bloat these tests... Should I test with more types (histogram, gauges...) or not?

type Response = PostgresResponse;

fn is_retriable_error(&self, _error: &Self::Error) -> bool {
// TODO: Implement this
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what type of errors should consider Vector as retryable... Query timeout?

sqlx::Error does have an Io variant https://docs.rs/sqlx/latest/sqlx/enum.Error.html#variant.Io and a pool timeout variant https://docs.rs/sqlx/latest/sqlx/enum.Error.html#variant.PoolTimedOut, maybe we can retry in those two cases exclusively?

@@ -231,6 +234,10 @@ pub fn temp_dir() -> PathBuf {
path.join(dir_name)
}

pub fn temp_table() -> String {
Copy link
Contributor Author

@jorgehermo9 jorgehermo9 Feb 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts about the name of this function?

.unwrap();
let create_table_sql = format!(
"CREATE TABLE {table} (agent_version TEXT, env TEXT, error_tps BIGINT, host TEXT,
language TEXT, spans trace_span[], target_tps BIGINT, trace_id BIGINT)"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I typed the span (TestTraceSpan struct) instead of clearing a jsonb[] column. I wanted to test insertions in postgres composite types (https://www.postgresql.org/docs/current/rowtypes.html).

Is is okay to leave the test of this feature (inserting in composite types) in the traces tests, or should I create another test to explicitly test this...? For example, it would be a test similar to insert_single_event, but with field that is a composite type (instead of payload: serde_json::Value) in a very similar way to what is done here with trace_span postgres type.

I think it is okay to just test the composite type inserting here... But maybe we should separate concerns and use here spans JSONB[] and create a simpler test for composite type inserting?

I don't know if I'm explaining well :/


#[derive(Debug, Serialize, Deserialize, sqlx::Type, FromRow)]
#[sqlx(type_name = "trace_span")]
struct TestTraceSpan {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created this type because of https://github.com/vectordotdev/vector/pull/21248/files#r1948104371

Resolve this conversation once read!

Comment on lines +67 to +110
fn create_span(resource: &str) -> ObjectMap {
ObjectMap::from([
("service".into(), Value::from("a_service")),
("name".into(), Value::from("a_name")),
("resource".into(), Value::from(resource)),
("type".into(), Value::from("a_type")),
("trace_id".into(), Value::Integer(123)),
("span_id".into(), Value::Integer(456)),
("parent_id".into(), Value::Integer(789)),
("start".into(), Value::from(timestamp())),
("duration".into(), Value::Integer(1_000_000)),
("error".into(), Value::Integer(404)),
(
"meta".into(),
Value::Object(ObjectMap::from([
("foo".into(), Value::from("bar")),
("bar".into(), Value::from("baz")),
])),
),
(
"metrics".into(),
Value::Object(ObjectMap::from([
("a_metric".into(), Value::Float(NotNan::new(0.577).unwrap())),
("_top_level".into(), Value::Float(NotNan::new(1.0).unwrap())),
])),
),
])
}

pub fn create_trace(resource: &str) -> TraceEvent {
let mut t = TraceEvent::default();
t.insert(event_path!("language"), "a_language");
t.insert(event_path!("agent_version"), "1.23456");
t.insert(event_path!("host"), "a_host");
t.insert(event_path!("env"), "an_env");
t.insert(event_path!("trace_id"), Value::Integer(123));
t.insert(event_path!("target_tps"), Value::Integer(10));
t.insert(event_path!("error_tps"), Value::Integer(5));
t.insert(
event_path!("spans"),
Value::Array(vec![Value::from(create_span(resource))]),
);
t
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copied those two utility methods from

pub fn simple_trace_event(resource: String) -> TraceEvent {
(and modified them a little bit, see the timestamp set in the start field in the create_span function, we should use the microsecond-resolution timestamp due to postgres limitation).

Should I move them to other module such as test_util and reuse them in those two places? I didn't see utility methods for event (metric, traces or logs) creation and it was always done adhoc in each test/IT test module... Thats the reason why I copied them

@jorgehermo9
Copy link
Contributor Author

jorgehermo9 commented Feb 9, 2025

Derived from e9eb688 I manually tested composite type insertion

with this config

[sources.demo_logs]
type = "demo_logs"
format = "apache_common"


[transforms.structured]
type = "remap"
inputs = ["demo_logs"]
source = """
. = {
    "message": .message,
    "structured_field": {
        "string": "value",
        "integer": 1,
        "float": 1.1,
        "boolean": true,
    }
}
"""

[sinks.postgres]
type = "postgres"
inputs = ["structured"]
endpoint = "postgres://postgres:password123@localhost/test"
table = "structured_table"

and those sql statements:

CREATE TYPE structured AS (string TEXT, integer BIGINT, float real, boolean BOOLEAN);
CREATE TABLE structured_table(message text, structured_field structured);

we can insert pg's composite types (records) https://www.postgresql.org/docs/current/rowtypes.html from nested fields, instead of having to use JSONB for those fields

image

I would like to document also an example of this in the website docs! This looks great 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: ci Anything related to Vector's CI environment domain: external docs Anything related to Vector's external, public documentation domain: sinks Anything related to the Vector's sinks domain: sources Anything related to the Vector's sources meta: awaiting author Pull requests that are awaiting their author.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

New sink: postgres
3 participants