diff --git a/.chloggen/clickhouseexporter-traces.yaml b/.chloggen/clickhouseexporter-traces.yaml new file mode 100644 index 000000000000..b0feaef6b2a6 --- /dev/null +++ b/.chloggen/clickhouseexporter-traces.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: clickhouseexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add support export OTLP traces to ClickHouse" + +# One or more tracking issues related to the change +issues: [8028] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: \ No newline at end of file diff --git a/exporter/clickhouseexporter/Makefile b/exporter/clickhouseexporter/Makefile index d3d428b54024..b25a796464ef 100644 --- a/exporter/clickhouseexporter/Makefile +++ b/exporter/clickhouseexporter/Makefile @@ -5,4 +5,4 @@ local-run-example: cd example && docker-compose up -d recreate-otel-collector: cd ../../ && GOOS=linux go build -o ./local/otelcontribcol ./cmd/otelcontribcol - cd example && docker-compose up --build otel-collector + cd example && docker-compose up --build otelcollector diff --git a/exporter/clickhouseexporter/README.md b/exporter/clickhouseexporter/README.md index ee995a5b8d4e..c91a9c40ac44 100644 --- a/exporter/clickhouseexporter/README.md +++ b/exporter/clickhouseexporter/README.md @@ -1,31 +1,40 @@ # ClickHouse Exporter +| Status | | +| ------------------------ |--------------| +| Stability | [alpha] | +| Supported pipeline types | traces, logs | +| Distributions | [contrib] | -| Status | | -| ------------------------ |-----------| -| Stability | [alpha] | -| Supported pipeline types | logs | -| Distributions | [contrib] | - -This exporter supports sending OpenTelemetry logs to [ClickHouse](https://clickhouse.com/). It will also support spans and metrics in the future. -> ClickHouse is an open-source, high performance columnar OLAP database management system for real-time analytics using SQL. -> Throughput can be measured in rows per second or megabytes per second. -> If the data is placed in the page cache, a query that is not too complex is processed on modern hardware at a speed of approximately 2-10 GB/s of uncompressed data on a single server. +This exporter supports sending OpenTelemetry logs and spans to [ClickHouse](https://clickhouse.com/). +> ClickHouse is an open-source, high performance columnar OLAP database management system for real-time analytics using +> SQL. +> Throughput can be measured in rows per second or megabytes per second. +> If the data is placed in the page cache, a query that is not too complex is processed on modern hardware at a speed of +> approximately 2-10 GB/s of uncompressed data on a single server. > If 10 bytes of columns are extracted, the speed is expected to be around 100-200 million rows per second. Note: -Always add [batch-processor](https://github.com/open-telemetry/opentelemetry-collector/tree/main/processor/batchprocessor) to collector pipeline, as [ClickHouse document says:](https://clickhouse.com/docs/en/introduction/performance/#performance-when-inserting-data) -> We recommend inserting data in packets of at least 1000 rows, or no more than a single request per second. When inserting to a MergeTree table from a tab-separated dump, the insertion speed can be from 50 to 200 MB/s. +Always +add [batch-processor](https://github.com/open-telemetry/opentelemetry-collector/tree/main/processor/batchprocessor) to +collector pipeline, +as [ClickHouse document says:](https://clickhouse.com/docs/en/introduction/performance/#performance-when-inserting-data) +> We recommend inserting data in packets of at least 1000 rows, or no more than a single request per second. When +> inserting to a MergeTree table from a tab-separated dump, the insertion speed can be from 50 to 200 MB/s. ## User Cases 1. Use [Grafana Clickhouse datasource](https://grafana.com/grafana/plugins/grafana-clickhouse-datasource/) or -[vertamedia-clickhouse-datasource](https://grafana.com/grafana/plugins/vertamedia-clickhouse-datasource/) to make dashboard. -Support time-series graph, table and logs. + [vertamedia-clickhouse-datasource](https://grafana.com/grafana/plugins/vertamedia-clickhouse-datasource/) to make + dashboard. + Support time-series graph, table and logs. 2. Analyze logs via powerful clickhouse SQL. +### Logs + - Get log severity count time series. + ```clickhouse SELECT toDateTime(toStartOfInterval(Timestamp, INTERVAL 60 second)) as time, SeverityText, count() as count FROM otel_logs @@ -33,85 +42,214 @@ WHERE time >= NOW() - INTERVAL 1 HOUR GROUP BY SeverityText, time ORDER BY time; ``` + - Find any log. + ```clickhouse SELECT Timestamp as log_time, Body -FROM otel_logs +FROM otel_logs WHERE Timestamp >= NOW() - INTERVAL 1 HOUR Limit 100; ``` + - Find log with specific service. + ```clickhouse SELECT Timestamp as log_time, Body -FROM otel_logs -WHERE ServiceName = 'clickhouse-exporter' AND Timestamp >= NOW() - INTERVAL 1 HOUR +FROM otel_logs +WHERE ServiceName = 'clickhouse-exporter' + AND Timestamp >= NOW() - INTERVAL 1 HOUR Limit 100; ``` + - Find log with specific attribute. + ```clickhouse SELECT Timestamp as log_time, Body -FROM otel_logs -WHERE LogAttributes['container_name'] = '/example_flog_1' AND Timestamp >= NOW() - INTERVAL 1 HOUR +FROM otel_logs +WHERE LogAttributes['container_name'] = '/example_flog_1' + AND Timestamp >= NOW() - INTERVAL 1 HOUR Limit 100; ``` + - Find log with body contain string token. + ```clickhouse SELECT Timestamp as log_time, Body -FROM otel_logs -WHERE hasToken(Body, 'http') AND Timestamp >= NOW() - INTERVAL 1 HOUR +FROM otel_logs +WHERE hasToken(Body, 'http') + AND Timestamp >= NOW() - INTERVAL 1 HOUR Limit 100; ``` + - Find log with body contain string. + ```clickhouse SELECT Timestamp as log_time, Body -FROM otel_logs -WHERE Body like '%http%' AND Timestamp >= NOW() - INTERVAL 1 HOUR +FROM otel_logs +WHERE Body like '%http%' + AND Timestamp >= NOW() - INTERVAL 1 HOUR Limit 100; ``` + - Find log with body regexp match string. + ```clickhouse SELECT Timestamp as log_time, Body -FROM otel_logs -WHERE match(Body, 'http') AND Timestamp >= NOW() - INTERVAL 1 HOUR +FROM otel_logs +WHERE match(Body, 'http') + AND Timestamp >= NOW() - INTERVAL 1 HOUR Limit 100; ``` + - Find log with body json extract. + ```clickhouse SELECT Timestamp as log_time, Body -FROM otel_logs -WHERE JSONExtractFloat(Body, 'bytes')>1000 AND Timestamp >= NOW() - INTERVAL 1 HOUR +FROM otel_logs +WHERE JSONExtractFloat(Body, 'bytes') > 1000 + AND Timestamp >= NOW() - INTERVAL 1 HOUR +Limit 100; +``` + +### Traces + +- Find spans with specific attribute. + +```clickhouse +SELECT Timestamp as log_time, + TraceId, + SpanId, + ParentSpanId, + SpanName, + SpanKind, + ServiceName, + Duration, + StatusCode, + StatusMessage, + toString(SpanAttributes), + toString(ResourceAttributes), + toString(Events.Name), + toString(Links.TraceId) +FROM otel_traces +WHERE ServiceName = 'clickhouse-exporter' + AND SpanAttributes['peer.service'] = 'tracegen-server' + AND Timestamp >= NOW() - INTERVAL 1 HOUR +Limit 100; +``` + +- Find traces with traceID (using time primary index and TraceID skip index). + +```clickhouse +WITH + '391dae938234560b16bb63f51501cb6f' as trace_id, + (SELECT min(Start) FROM otel_traces_trace_id_ts WHERE TraceId = trace_id) as start, + (SELECT max(End) + 1 FROM otel_traces_trace_id_ts WHERE TraceId = trace_id) as end +SELECT Timestamp as log_time, + TraceId, + SpanId, + ParentSpanId, + SpanName, + SpanKind, + ServiceName, + Duration, + StatusCode, + StatusMessage, + toString(SpanAttributes), + toString(ResourceAttributes), + toString(Events.Name), + toString(Links.TraceId) +FROM otel_traces +WHERE TraceId = trace_id + AND Timestamp >= start + AND Timestamp <= end +Limit 100; +``` + +- Find spans is error. + +```clickhouse +SELECT Timestamp as log_time, + TraceId, + SpanId, + ParentSpanId, + SpanName, + SpanKind, + ServiceName, + Duration, + StatusCode, + StatusMessage, + toString(SpanAttributes), + toString(ResourceAttributes), + toString(Events.Name), + toString(Links.TraceId) +FROM otel_traces +WHERE ServiceName = 'clickhouse-exporter' + AND StatusCode = 'STATUS_CODE_ERROR' + AND Timestamp >= NOW() - INTERVAL 1 HOUR +Limit 100; +``` + +- Find slow spans. + +```clickhouse +SELECT Timestamp as log_time, + TraceId, + SpanId, + ParentSpanId, + SpanName, + SpanKind, + ServiceName, + Duration, + StatusCode, + StatusMessage, + toString(SpanAttributes), + toString(ResourceAttributes), + toString(Events.Name), + toString(Links.TraceId) +FROM otel_traces +WHERE ServiceName = 'clickhouse-exporter' + AND Duration > 1 * 1e9 + AND Timestamp >= NOW() - INTERVAL 1 HOUR Limit 100; ``` ## Performance Guide -A single clickhouse instance with 32 CPU cores and 128 GB RAM can handle around 20 TB (20 Billion) logs per day, +A single clickhouse instance with 32 CPU cores and 128 GB RAM can handle around 20 TB (20 Billion) logs per day, the data compression ratio is 7 ~ 11, the compressed data store in disk is 1.8 TB ~ 2.85 TB, add more clickhouse node to cluster can increase linearly. -The otel-collector with `otlp receiver/batch processor/clickhouse tcp exporter` can process +The otel-collector with `otlp receiver/batch processor/clickhouse tcp exporter` can process around 40k/s logs entry per CPU cores, add more collector node can increase linearly. ## Configuration options The following settings are required: -- `dsn` (no default): The ClickHouse server DSN (Data Source Name), for example `tcp://127.0.0.1:9000?username=user&password=qwerty&database=default` - For tcp protocol reference: [ClickHouse/clickhouse-go#dsn](https://github.com/ClickHouse/clickhouse-go#dsn). - For http protocol reference: [mailru/go-clickhouse/#dsn](https://github.com/mailru/go-clickhouse/#dsn). +- `dsn` (no default): The ClickHouse server DSN (Data Source Name), for + example `tcp://user:password@127.0.0.1:9000/default` + For tcp protocol reference: [ClickHouse/clickhouse-go#dsn](https://github.com/ClickHouse/clickhouse-go#dsn). + For http protocol + reference: [ClickHouse/clickhouse-go#http-support-experimental](https://github.com/ClickHouse/clickhouse-go/tree/main#http-support-experimental) + . The following settings can be optionally configured: -- `ttl_days` (defaul t= 0): The data time-to-live in days, 0 means no ttl. +- `ttl_days` (default= 0): The data time-to-live in days, 0 means no ttl. +- `database` (default = otel): The database name. - `logs_table_name` (default = otel_logs): The table name for logs. +- `traces_table_name` (default = otel_traces): The table name for traces. - `timeout` (default = 5s): The timeout for every attempt to send data to the backend. - `sending_queue` - - `queue_size` (default = 5000): Maximum number of batches kept in memory before dropping data. + - `queue_size` (default = 5000): Maximum number of batches kept in memory before dropping data. - `retry_on_failure` - `enabled` (default = true) - - `initial_interval` (default = 5s): The Time to wait after the first failure before retrying; ignored if `enabled` is `false` + - `initial_interval` (default = 5s): The Time to wait after the first failure before retrying; ignored if `enabled` + is `false` - `max_interval` (default = 30s): The upper bound on backoff; ignored if `enabled` is `false` - - `max_elapsed_time` (default = 300s): The maximum amount of time spent trying to send a batch; ignored if `enabled` is `false` + - `max_elapsed_time` (default = 300s): The maximum amount of time spent trying to send a batch; ignored if `enabled` + is `false` ## Example @@ -124,8 +262,10 @@ processors: send_batch_size: 100000 exporters: clickhouse: - dsn: tcp://127.0.0.1:9000/default + dsn: tcp://127.0.0.1:9000/otel ttl_days: 3 + logs_table: otel_logs + traces_table: otel_traces timeout: 5s retry_on_failure: enabled: true @@ -135,26 +275,28 @@ exporters: service: pipelines: logs: - receivers: [examplereceiver] - processors: [batch] - exporters: [clickhouse] + receivers: [ examplereceiver ] + processors: [ batch ] + exporters: [ clickhouse ] ``` ## Schema +### Logs + ```clickhouse CREATE TABLE otel_logs ( - `Timestamp` DateTime64(9) CODEC(Delta, ZSTD(1)), - `TraceId` String CODEC(ZSTD(1)), - `SpanId` String CODEC(ZSTD(1)), - `TraceFlags` UInt32 CODEC(ZSTD(1)), - `SeverityText` LowCardinality(String) CODEC(ZSTD(1)), - `SeverityNumber` Int32 CODEC(ZSTD(1)), - `ServiceName` LowCardinality(String) CODEC(ZSTD(1)), - `Body` String CODEC(ZSTD(1)), - `ResourceAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), - `LogAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + `Timestamp` DateTime64(9) CODEC (Delta, ZSTD(1)), + `TraceId` String CODEC (ZSTD(1)), + `SpanId` String CODEC (ZSTD(1)), + `TraceFlags` UInt32 CODEC (ZSTD(1)), + `SeverityText` LowCardinality(String) CODEC (ZSTD(1)), + `SeverityNumber` Int32 CODEC (ZSTD(1)), + `ServiceName` LowCardinality(String) CODEC (ZSTD(1)), + `Body` String CODEC (ZSTD(1)), + `ResourceAttributes` Map(LowCardinality(String), String) CODEC (ZSTD(1)), + `LogAttributes` Map(LowCardinality(String), String) CODEC (ZSTD(1)), INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, @@ -169,5 +311,70 @@ CREATE TABLE otel_logs SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; ``` +### Traces + +```clickhouse +CREATE TABLE otel.otel_traces +( + `Timestamp` DateTime64(9) CODEC (Delta(8), ZSTD(1)), + `TraceId` String CODEC (ZSTD(1)), + `SpanId` String CODEC (ZSTD(1)), + `ParentSpanId` String CODEC (ZSTD(1)), + `TraceState` String CODEC (ZSTD(1)), + `SpanName` LowCardinality(String) CODEC (ZSTD(1)), + `SpanKind` LowCardinality(String) CODEC (ZSTD(1)), + `ServiceName` LowCardinality(String) CODEC (ZSTD(1)), + `ResourceAttributes` Map(LowCardinality(String), String) CODEC (ZSTD(1)), + `SpanAttributes` Map(LowCardinality(String), String) CODEC (ZSTD(1)), + `Duration` Int64 CODEC (ZSTD(1)), + `StatusCode` LowCardinality(String) CODEC (ZSTD(1)), + `StatusMessage` String CODEC (ZSTD(1)), + `Events` Nested ( + `Timestamp` DateTime64(9), + `Name` LowCardinality(String), + `Attributes` Map(LowCardinality(String), String) + ) CODEC(ZSTD(1)), + `Links` Nested ( + `TraceId` String, + `SpanId` String, + `TraceState` String, + `Attributes` Map(LowCardinality(String), String) + ) CODEC(ZSTD(1)), + INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_span_attr_key mapKeys(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_span_attr_value mapValues(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_duration Duration TYPE minmax GRANULARITY 1 +) + ENGINE = MergeTree + PARTITION BY toDate(Timestamp) + ORDER BY (ServiceName, SpanName, toUnixTimestamp(Timestamp), TraceId) + TTL toDateTime(Timestamp) + toIntervalDay(3) + SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; + +CREATE TABLE otel_traces_trace_id_ts +( + `TraceId` String CODEC (ZSTD(1)), + `Start` DateTime CODEC (ZSTD(1)), + `End` DateTime CODEC (ZSTD(1)), + INDEX idx_trace_id TraceId TYPE bloom_filter(0.01) GRANULARITY 1 +) + ENGINE = MergeTree + ORDER BY (TraceId, toUnixTimestamp(Start)) + TTL toDateTime(Start) + toIntervalDay(3) + SETTINGS index_granularity = 8192; + +CREATE MATERIALIZED VIEW otel.otel_traces_trace_id_ts_mv TO otel.otel_traces_trace_id_ts +AS +SELECT TraceId, + min(toDateTime(Timestamp)) AS Start, + max(toDateTime(Timestamp)) AS End +FROM otel.otel_traces +WHERE TraceId != '' +GROUP BY TraceId; +``` + [alpha]:https://github.com/open-telemetry/opentelemetry-collector#alpha + [contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib diff --git a/exporter/clickhouseexporter/config.go b/exporter/clickhouseexporter/config.go index 3d193a02e711..365175fc7c51 100644 --- a/exporter/clickhouseexporter/config.go +++ b/exporter/clickhouseexporter/config.go @@ -16,6 +16,9 @@ package clickhouseexporter // import "github.com/open-telemetry/opentelemetry-co import ( "errors" + "fmt" + "net/url" + "strings" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -37,6 +40,8 @@ type Config struct { DSN string `mapstructure:"dsn"` // LogsTableName is the table name for logs. default is `otel_logs`. LogsTableName string `mapstructure:"logs_table_name"` + // TracesTableName is the table name for logs. default is `otel_traces`. + TracesTableName string `mapstructure:"traces_table_name"` // TTLDays is The data time-to-live in days, 0 means no ttl. TTLDays uint `mapstructure:"ttl_days"` } @@ -56,9 +61,23 @@ func (cfg *Config) Validate() (err error) { if cfg.DSN == "" { err = multierr.Append(err, errConfigNoDSN) } + _, e := parseDSNDatabase(cfg.DSN) + if e != nil { + err = multierr.Append(err, fmt.Errorf("invalid dsn format:%w", err)) + } return err } +const defaultDatabase = "default" + +func parseDSNDatabase(dsn string) (string, error) { + u, err := url.Parse(dsn) + if err != nil { + return defaultDatabase, err + } + return strings.TrimPrefix(u.Path, "/"), nil +} + func (cfg *Config) enforcedQueueSettings() exporterhelper.QueueSettings { return exporterhelper.QueueSettings{ Enabled: true, diff --git a/exporter/clickhouseexporter/config_test.go b/exporter/clickhouseexporter/config_test.go index 06789e3fa75c..4921ab55340d 100644 --- a/exporter/clickhouseexporter/config_test.go +++ b/exporter/clickhouseexporter/config_test.go @@ -40,16 +40,17 @@ func TestLoadConfig(t *testing.T) { assert.Equal(t, len(cfg.Exporters), 2) defaultCfg := factory.CreateDefaultConfig() - defaultCfg.(*Config).DSN = "tcp://127.0.0.1:9000?database=default" + defaultCfg.(*Config).DSN = defaultDSN r0 := cfg.Exporters[config.NewComponentID(typeStr)] assert.Equal(t, r0, defaultCfg) r1 := cfg.Exporters[config.NewComponentIDWithName(typeStr, "full")].(*Config) assert.Equal(t, r1, &Config{ ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "full")), - DSN: "tcp://127.0.0.1:9000?database=default", + DSN: defaultDSN, TTLDays: 3, LogsTableName: "otel_logs", + TracesTableName: "otel_traces", TimeoutSettings: exporterhelper.TimeoutSettings{ Timeout: 5 * time.Second, }, @@ -72,3 +73,7 @@ func withDefaultConfig(fns ...func(*Config)) *Config { } return cfg } + +const ( + defaultDSN = "tcp://127.0.0.1:9000/otel" +) diff --git a/exporter/clickhouseexporter/example/Dockerfile b/exporter/clickhouseexporter/example/Dockerfile index a8b446186482..bd50d0e56d78 100644 --- a/exporter/clickhouseexporter/example/Dockerfile +++ b/exporter/clickhouseexporter/example/Dockerfile @@ -1,7 +1,7 @@ FROM alpine:latest as certs RUN apk --update add ca-certificates -FROM scratch +FROM golang:latest ARG USER_UID=10001 USER ${USER_UID} diff --git a/exporter/clickhouseexporter/example/datasource.yaml b/exporter/clickhouseexporter/example/datasource.yaml index afb8c492073b..44410655f87e 100644 --- a/exporter/clickhouseexporter/example/datasource.yaml +++ b/exporter/clickhouseexporter/example/datasource.yaml @@ -5,7 +5,7 @@ datasources: - name: ClickHouse-official type: grafana-clickhouse-datasource jsonData: - defaultDatabase: default + defaultDatabase: otel port: 9000 server: clickhouse username: diff --git a/exporter/clickhouseexporter/example/docker-compose.yml b/exporter/clickhouseexporter/example/docker-compose.yml index 04fbbeeb7665..3160a3b7404a 100644 --- a/exporter/clickhouseexporter/example/docker-compose.yml +++ b/exporter/clickhouseexporter/example/docker-compose.yml @@ -4,7 +4,7 @@ networks: otel-clickhouse: services: - otel-collector: + otelcollector: build: context: ../../../local dockerfile: ../exporter/clickhouseexporter/example/Dockerfile @@ -17,8 +17,8 @@ services: volumes: - ./otel-collector-config.yml:/etc/otel-collector-config.yml ports: + - "4317:4317" # otlp receiver - "1888:1888" # pprof extension - - "8888:8888" # Prometheus metrics exposed by the collector - "13133:13133" # health_check extension - "55679:55679" # zpages extension - "24224:24224" # fluentforwarder @@ -45,7 +45,7 @@ services: GF_INSTALL_PLUGINS: grafana-clickhouse-datasource,vertamedia-clickhouse-datasource GF_PLUGINS_ALLOW_LOADING_UNSIGNED_PLUGINS: vertamedia-clickhouse-datasource ports: - - "3000:3000" + - "3001:3000" networks: - otel-clickhouse @@ -53,11 +53,11 @@ services: flog: image: mingrammer/flog:0.4.3 # Output fake log in JSON format - command: [ "--format=json", "--loop", "--delay=0.0001ms"] + command: [ "--format=json", "--loop", "--delay=100ms"] networks: - otel-clickhouse depends_on: - - otel-collector + - otelcollector logging: driver: fluentd options: @@ -66,4 +66,18 @@ services: fluentd-async-connect: "true" # Use nanosecond precision fluentd-sub-second-precision: "true" - stop_signal: SIGKILL \ No newline at end of file + stop_signal: SIGKILL + + # Traces generator + tracegen: + build: + context: ../../../cmd/tracegen/ + command: + - --otlp-endpoint=otelcollector:4317 + - --otlp-insecure + - --rate=100 + - --duration=10000h + networks: + - otel-clickhouse + depends_on: + - otelcollector diff --git a/exporter/clickhouseexporter/example/otel-collector-config.yml b/exporter/clickhouseexporter/example/otel-collector-config.yml index 7d9660322831..d7c086ffa097 100644 --- a/exporter/clickhouseexporter/example/otel-collector-config.yml +++ b/exporter/clickhouseexporter/example/otel-collector-config.yml @@ -1,7 +1,10 @@ receivers: fluentforward: endpoint: 0.0.0.0:24224 - + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 processors: batch: send_batch_size: 100000 @@ -11,17 +14,19 @@ processors: limit_mib: 1800 spike_limit_mib: 500 resourcedetection/system: - detectors: ["system"] + detectors: [ "system" ] system: - hostname_sources: ["os"] + hostname_sources: [ "os" ] resource: attributes: - key: service.name - value: "clickhouse-exporter" + value: "serviceName" action: upsert exporters: clickhouse: - dsn: tcp://clickhouse:9000/default + dsn: tcp://clickhouse:9000/otel + logs_table_name: otel_logs + traces_table_name: otel_traces ttl_days: 3 timeout: 10s sending_queue: @@ -39,9 +44,13 @@ extensions: size_mib: 1000 service: - extensions: [pprof, zpages, health_check] + extensions: [ pprof, zpages, health_check ] pipelines: logs: - receivers: [ fluentforward ] + receivers: [ fluentforward, otlp ] + processors: [ memory_limiter, resourcedetection/system, resource, batch ] + exporters: [ clickhouse ] + traces: + receivers: [ otlp ] processors: [ memory_limiter, resourcedetection/system, resource, batch ] exporters: [ clickhouse ] diff --git a/exporter/clickhouseexporter/exporter.go b/exporter/clickhouseexporter/exporter_logs.go similarity index 76% rename from exporter/clickhouseexporter/exporter.go rename to exporter/clickhouseexporter/exporter_logs.go index 763eb1fe365e..7bc4ab56f719 100644 --- a/exporter/clickhouseexporter/exporter.go +++ b/exporter/clickhouseexporter/exporter_logs.go @@ -18,6 +18,7 @@ import ( "context" "database/sql" "fmt" + "strings" "time" _ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver. @@ -27,16 +28,17 @@ import ( "go.uber.org/zap" ) -type clickhouseExporter struct { - client *sql.DB - insertLogsSQL string +type logsExporter struct { + client *sql.DB + insertSQL string logger *zap.Logger cfg *Config } -func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseExporter, error) { - if err := cfg.Validate(); err != nil { +func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) { + + if err := createDatabase(cfg); err != nil { return nil, err } @@ -45,39 +47,41 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseExporter, error) { return nil, err } - insertLogsSQL := renderInsertLogsSQL(cfg) + if err = createLogsTable(cfg, client); err != nil { + return nil, err + } - return &clickhouseExporter{ - client: client, - insertLogsSQL: insertLogsSQL, - logger: logger, - cfg: cfg, + return &logsExporter{ + client: client, + insertSQL: renderInsertLogsSQL(cfg), + logger: logger, + cfg: cfg, }, nil } // Shutdown will shutdown the exporter. -func (e *clickhouseExporter) Shutdown(_ context.Context) error { +func (e *logsExporter) Shutdown(_ context.Context) error { if e.client != nil { return e.client.Close() } return nil } -func (e *clickhouseExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { +func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { start := time.Now() err := doWithTx(ctx, e.client, func(tx *sql.Tx) error { - statement, err := tx.PrepareContext(ctx, e.insertLogsSQL) + statement, err := tx.PrepareContext(ctx, e.insertSQL) if err != nil { return fmt.Errorf("PrepareContext:%w", err) } defer func() { _ = statement.Close() }() + var serviceName string for i := 0; i < ld.ResourceLogs().Len(); i++ { logs := ld.ResourceLogs().At(i) res := logs.Resource() resAttr := attributesToMap(res.Attributes()) - var serviceName string if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok { serviceName = v.Str() } @@ -177,22 +181,44 @@ var driverName = "clickhouse" // for testing // newClickhouseClient create a clickhouse client. func newClickhouseClient(cfg *Config) (*sql.DB, error) { - // use empty database to create database - db, err := sql.Open(driverName, cfg.DSN) + return sql.Open(driverName, cfg.DSN) +} + +func createDatabase(cfg *Config) error { + database, _ := parseDSNDatabase(cfg.DSN) + if database == defaultDatabase { + return nil + } + // use default database to create new database + dsnUseDefaultDatabase := strings.Replace(cfg.DSN, database, defaultDatabase, 1) + db, err := sql.Open(driverName, dsnUseDefaultDatabase) if err != nil { - return nil, fmt.Errorf("sql.Open:%w", err) + return fmt.Errorf("sql.Open:%w", err) } - // create table - query := fmt.Sprintf(createLogsTableSQL, cfg.LogsTableName, "") - if cfg.TTLDays > 0 { - query = fmt.Sprintf(createLogsTableSQL, - cfg.LogsTableName, - fmt.Sprintf(`TTL toDateTime(Timestamp) + toIntervalDay(%d)`, cfg.TTLDays)) + defer func() { + _ = db.Close() + }() + query := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", database) + _, err = db.Exec(query) + if err != nil { + return fmt.Errorf("create database:%w", err) + } + return nil +} + +func createLogsTable(cfg *Config, db *sql.DB) error { + if _, err := db.Exec(renderCreateLogsTableSQL(cfg)); err != nil { + return fmt.Errorf("exec create logs table sql: %w", err) } - if _, err := db.Exec(query); err != nil { - return nil, fmt.Errorf("exec create table sql: %w", err) + return nil +} + +func renderCreateLogsTableSQL(cfg *Config) string { + var ttlExpr string + if cfg.TTLDays > 0 { + ttlExpr = fmt.Sprintf(`TTL toDateTime(Timestamp) + toIntervalDay(%d)`, cfg.TTLDays) } - return db, nil + return fmt.Sprintf(createLogsTableSQL, cfg.LogsTableName, ttlExpr) } func renderInsertLogsSQL(cfg *Config) string { diff --git a/exporter/clickhouseexporter/exporter_test.go b/exporter/clickhouseexporter/exporter_logs_test.go similarity index 79% rename from exporter/clickhouseexporter/exporter_test.go rename to exporter/clickhouseexporter/exporter_logs_test.go index d29c037ed55c..8956d44ca21b 100644 --- a/exporter/clickhouseexporter/exporter_test.go +++ b/exporter/clickhouseexporter/exporter_logs_test.go @@ -26,20 +26,21 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + conventions "go.opentelemetry.io/collector/semconv/v1.6.1" "go.uber.org/zap" "go.uber.org/zap/zaptest" ) -func TestExporter_New(t *testing.T) { - type validate func(*testing.T, *clickhouseExporter, error) +func TestLogsExporter_New(t *testing.T) { + type validate func(*testing.T, *logsExporter, error) - _ = func(t *testing.T, exporter *clickhouseExporter, err error) { + _ = func(t *testing.T, exporter *logsExporter, err error) { require.Nil(t, err) require.NotNil(t, exporter) } - failWith := func(want error) validate { - return func(t *testing.T, exporter *clickhouseExporter, err error) { + _ = func(want error) validate { + return func(t *testing.T, exporter *logsExporter, err error) { require.Nil(t, exporter) require.NotNil(t, err) if !errors.Is(err, want) { @@ -48,8 +49,8 @@ func TestExporter_New(t *testing.T) { } } - _ = func(msg string) validate { - return func(t *testing.T, exporter *clickhouseExporter, err error) { + failWithMsg := func(msg string) validate { + return func(t *testing.T, exporter *logsExporter, err error) { require.Nil(t, exporter) require.NotNil(t, err) require.Contains(t, err.Error(), msg) @@ -62,14 +63,14 @@ func TestExporter_New(t *testing.T) { }{ "no dsn": { config: withDefaultConfig(), - want: failWith(errConfigNoDSN), + want: failWithMsg("dial tcp: missing address"), }, } for name, test := range tests { t.Run(name, func(t *testing.T) { - exporter, err := newExporter(zap.NewNop(), test.config) + exporter, err := newLogsExporter(zap.NewNop(), test.config) if exporter != nil { defer func() { require.NoError(t, exporter.Shutdown(context.TODO())) @@ -81,10 +82,6 @@ func TestExporter_New(t *testing.T) { } } -const ( - defaultDSN = "tcp://127.0.0.1:9000?database=default" -) - func TestExporter_pushLogsData(t *testing.T) { t.Run("push success", func(t *testing.T) { var items int @@ -96,7 +93,7 @@ func TestExporter_pushLogsData(t *testing.T) { return nil }) - exporter := newTestExporter(t, defaultDSN) + exporter := newTestLogsExporter(t, defaultDSN) mustPushLogsData(t, exporter, simpleLogs(1)) mustPushLogsData(t, exporter, simpleLogs(2)) @@ -104,8 +101,8 @@ func TestExporter_pushLogsData(t *testing.T) { }) } -func newTestExporter(t *testing.T, dsn string, fns ...func(*Config)) *clickhouseExporter { - exporter, err := newExporter(zaptest.NewLogger(t), withTestExporterConfig(fns...)(dsn)) +func newTestLogsExporter(t *testing.T, dsn string, fns ...func(*Config)) *logsExporter { + exporter, err := newLogsExporter(zaptest.NewLogger(t), withTestExporterConfig(fns...)(dsn)) require.NoError(t, err) t.Cleanup(func() { _ = exporter.Shutdown(context.TODO()) }) @@ -130,21 +127,19 @@ func simpleLogs(count int) plog.Logs { for i := 0; i < count; i++ { r := sl.LogRecords().AppendEmpty() r.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) - r.Attributes().PutStr("k", "v") + r.Attributes().PutStr(conventions.AttributeServiceName, "v") } return logs } -func mustPushLogsData(t *testing.T, exporter *clickhouseExporter, ld plog.Logs) { +func mustPushLogsData(t *testing.T, exporter *logsExporter, ld plog.Logs) { err := exporter.pushLogsData(context.TODO(), ld) require.NoError(t, err) } -const testDriverName = "clickhouse-test" - -func initClickhouseTestServer(_ *testing.T, recorder recorder) { - driverName = testDriverName - sql.Register(testDriverName, &testClickhouseDriver{ +func initClickhouseTestServer(t *testing.T, recorder recorder) { + driverName = t.Name() + sql.Register(t.Name(), &testClickhouseDriver{ recorder: recorder, }) } diff --git a/exporter/clickhouseexporter/exporter_traces.go b/exporter/clickhouseexporter/exporter_traces.go new file mode 100644 index 000000000000..50cf4434f9e5 --- /dev/null +++ b/exporter/clickhouseexporter/exporter_traces.go @@ -0,0 +1,312 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clickhouseexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter" + +import ( + "context" + "database/sql" + "fmt" + "strings" + "time" + + _ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver. + "go.opentelemetry.io/collector/pdata/ptrace" + conventions "go.opentelemetry.io/collector/semconv/v1.6.1" + "go.uber.org/zap" +) + +type tracesExporter struct { + client *sql.DB + insertSQL string + + logger *zap.Logger + cfg *Config +} + +func newTracesExporter(logger *zap.Logger, cfg *Config) (*tracesExporter, error) { + + if err := createDatabase(cfg); err != nil { + return nil, err + } + + client, err := newClickhouseClient(cfg) + if err != nil { + return nil, err + } + + if err = createTracesTable(cfg, client); err != nil { + return nil, err + } + + return &tracesExporter{ + client: client, + insertSQL: renderInsertTracesSQL(cfg), + logger: logger, + cfg: cfg, + }, nil +} + +// Shutdown will shutdown the exporter. +func (e *tracesExporter) Shutdown(_ context.Context) error { + if e.client != nil { + return e.client.Close() + } + return nil +} + +func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error { + start := time.Now() + err := doWithTx(ctx, e.client, func(tx *sql.Tx) error { + statement, err := tx.PrepareContext(ctx, e.insertSQL) + if err != nil { + return fmt.Errorf("PrepareContext:%w", err) + } + defer func() { + _ = statement.Close() + }() + for i := 0; i < td.ResourceSpans().Len(); i++ { + spans := td.ResourceSpans().At(i) + res := spans.Resource() + resAttr := attributesToMap(res.Attributes()) + var serviceName string + if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok { + serviceName = v.Str() + } + for j := 0; j < spans.ScopeSpans().Len(); j++ { + rs := spans.ScopeSpans().At(j).Spans() + for k := 0; k < rs.Len(); k++ { + r := rs.At(k) + spanAttr := attributesToMap(r.Attributes()) + status := r.Status() + eventTimes, eventNames, eventAttrs := convertEvents(r.Events()) + linksTraceIDs, linksSpanIDs, linksTraceStates, linksAttrs := convertLinks(r.Links()) + _, err = statement.ExecContext(ctx, + r.StartTimestamp().AsTime(), + r.TraceID().HexString(), + r.SpanID().HexString(), + r.ParentSpanID().HexString(), + r.TraceState().AsRaw(), + r.Name(), + r.Kind().String(), + serviceName, + resAttr, + spanAttr, + r.EndTimestamp().AsTime().Sub(r.StartTimestamp().AsTime()).Nanoseconds(), + status.Code().String(), + status.Message(), + eventTimes, + eventNames, + eventAttrs, + linksTraceIDs, + linksSpanIDs, + linksTraceStates, + linksAttrs, + ) + if err != nil { + return fmt.Errorf("ExecContext:%w", err) + } + } + } + } + return nil + }) + duration := time.Since(start) + e.logger.Info("insert traces", zap.Int("records", td.SpanCount()), + zap.String("cost", duration.String())) + return err +} + +func convertEvents(events ptrace.SpanEventSlice) ([]time.Time, []string, []map[string]string) { + var ( + times []time.Time + names []string + attrs []map[string]string + ) + for i := 0; i < events.Len(); i++ { + event := events.At(i) + times = append(times, event.Timestamp().AsTime()) + names = append(names, event.Name()) + attrs = append(attrs, attributesToMap(event.Attributes())) + } + return times, names, attrs +} + +func convertLinks(links ptrace.SpanLinkSlice) ([]string, []string, []string, []map[string]string) { + var ( + traceIDs []string + spanIDs []string + states []string + attrs []map[string]string + ) + for i := 0; i < links.Len(); i++ { + link := links.At(i) + traceIDs = append(traceIDs, link.TraceID().HexString()) + spanIDs = append(spanIDs, link.SpanID().HexString()) + states = append(states, link.TraceState().AsRaw()) + attrs = append(attrs, attributesToMap(link.Attributes())) + } + return traceIDs, spanIDs, states, attrs +} + +const ( + // language=ClickHouse SQL + createTracesTableSQL = ` +CREATE TABLE IF NOT EXISTS %s ( + Timestamp DateTime64(9) CODEC(Delta, ZSTD(1)), + TraceId String CODEC(ZSTD(1)), + SpanId String CODEC(ZSTD(1)), + ParentSpanId String CODEC(ZSTD(1)), + TraceState String CODEC(ZSTD(1)), + SpanName LowCardinality(String) CODEC(ZSTD(1)), + SpanKind LowCardinality(String) CODEC(ZSTD(1)), + ServiceName LowCardinality(String) CODEC(ZSTD(1)), + ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)), + SpanAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)), + Duration Int64 CODEC(ZSTD(1)), + StatusCode LowCardinality(String) CODEC(ZSTD(1)), + StatusMessage String CODEC(ZSTD(1)), + Events Nested ( + Timestamp DateTime64(9), + Name LowCardinality(String), + Attributes Map(LowCardinality(String), String) + ) CODEC(ZSTD(1)), + Links Nested ( + TraceId String, + SpanId String, + TraceState String, + Attributes Map(LowCardinality(String), String) + ) CODEC(ZSTD(1)), + INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_span_attr_key mapKeys(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_span_attr_value mapValues(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_duration Duration TYPE minmax GRANULARITY 1 +) ENGINE MergeTree() +%s +PARTITION BY toDate(Timestamp) +ORDER BY (ServiceName, SpanName, toUnixTimestamp(Timestamp), TraceId) +SETTINGS index_granularity=8192, ttl_only_drop_parts = 1; +` + // language=ClickHouse SQL + insertTracesSQLTemplate = `INSERT INTO %s ( + Timestamp, + TraceId, + SpanId, + ParentSpanId, + TraceState, + SpanName, + SpanKind, + ServiceName, + ResourceAttributes, + SpanAttributes, + Duration, + StatusCode, + StatusMessage, + Events.Timestamp, + Events.Name, + Events.Attributes, + Links.TraceId, + Links.SpanId, + Links.TraceState, + Links.Attributes + ) VALUES ( + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ? + )` +) + +const ( + createTraceIDTsTableSQL = ` +create table IF NOT EXISTS %s_trace_id_ts ( + TraceId String CODEC(ZSTD(1)), + Start DateTime CODEC(ZSTD(1)), + End DateTime CODEC(ZSTD(1)), + INDEX idx_trace_id TraceId TYPE bloom_filter(0.01) GRANULARITY 1 +) ENGINE MergeTree() +%s +ORDER BY (TraceId, toUnixTimestamp(Start)) +SETTINGS index_granularity=8192; +` + createTraceIDTsMaterializedViewSQL = ` +CREATE MATERIALIZED VIEW IF NOT EXISTS %s_trace_id_ts_mv +TO %s.%s_trace_id_ts +AS SELECT +TraceId, +min(toDateTime(Timestamp)) as Start, +max(toDateTime(Timestamp)) as End +FROM +%s.%s +WHERE TraceId!='' +GROUP BY TraceId; +` +) + +func createTracesTable(cfg *Config, db *sql.DB) error { + if _, err := db.Exec(renderCreateTracesTableSQL(cfg)); err != nil { + return fmt.Errorf("exec create traces table sql: %w", err) + } + if _, err := db.Exec(renderCreateTraceIDTsTableSQL(cfg)); err != nil { + return fmt.Errorf("exec create traceIDTs table sql: %w", err) + } + if _, err := db.Exec(renderTraceIDTsMaterializedViewSQL(cfg)); err != nil { + return fmt.Errorf("exec create traceIDTs view sql: %w", err) + } + return nil +} + +func renderInsertTracesSQL(cfg *Config) string { + return fmt.Sprintf(strings.ReplaceAll(insertTracesSQLTemplate, "'", "`"), cfg.TracesTableName) +} + +func renderCreateTracesTableSQL(cfg *Config) string { + var ttlExpr string + if cfg.TTLDays > 0 { + ttlExpr = fmt.Sprintf(`TTL toDateTime(Timestamp) + toIntervalDay(%d)`, cfg.TTLDays) + } + return fmt.Sprintf(createTracesTableSQL, cfg.TracesTableName, ttlExpr) +} + +func renderCreateTraceIDTsTableSQL(cfg *Config) string { + var ttlExpr string + if cfg.TTLDays > 0 { + ttlExpr = fmt.Sprintf(`TTL toDateTime(Start) + toIntervalDay(%d)`, cfg.TTLDays) + } + return fmt.Sprintf(createTraceIDTsTableSQL, cfg.TracesTableName, ttlExpr) +} + +func renderTraceIDTsMaterializedViewSQL(cfg *Config) string { + database, _ := parseDSNDatabase(cfg.DSN) + return fmt.Sprintf(createTraceIDTsMaterializedViewSQL, cfg.TracesTableName, + database, cfg.TracesTableName, database, cfg.TracesTableName) +} diff --git a/exporter/clickhouseexporter/exporter_traces_test.go b/exporter/clickhouseexporter/exporter_traces_test.go new file mode 100644 index 000000000000..7f303d735921 --- /dev/null +++ b/exporter/clickhouseexporter/exporter_traces_test.go @@ -0,0 +1,79 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clickhouseexporter + +import ( + "context" + "database/sql/driver" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + conventions "go.opentelemetry.io/collector/semconv/v1.6.1" + "go.uber.org/zap/zaptest" +) + +func TestExporter_pushTracesData(t *testing.T) { + t.Run("push success", func(t *testing.T) { + var items int + initClickhouseTestServer(t, func(query string, values []driver.Value) error { + t.Logf("%d, values:%+v", items, values) + if strings.HasPrefix(query, "INSERT") { + items++ + } + return nil + }) + + exporter := newTestTracesExporter(t, defaultDSN) + mustPushTracesData(t, exporter, simpleTraces(1)) + mustPushTracesData(t, exporter, simpleTraces(2)) + + require.Equal(t, 3, items) + }) +} + +func newTestTracesExporter(t *testing.T, dsn string, fns ...func(*Config)) *tracesExporter { + exporter, err := newTracesExporter(zaptest.NewLogger(t), withTestExporterConfig(fns...)(dsn)) + require.NoError(t, err) + + t.Cleanup(func() { _ = exporter.Shutdown(context.TODO()) }) + return exporter +} + +func simpleTraces(count int) ptrace.Traces { + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + ss := rs.ScopeSpans().AppendEmpty() + for i := 0; i < count; i++ { + s := ss.Spans().AppendEmpty() + s.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now())) + s.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now())) + s.Attributes().PutStr(conventions.AttributeServiceName, "v") + event := s.Events().AppendEmpty() + event.SetName("event1") + event.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + link := s.Links().AppendEmpty() + link.Attributes().PutStr("k", "v") + } + return traces +} + +func mustPushTracesData(t *testing.T, exporter *tracesExporter, td ptrace.Traces) { + err := exporter.pushTraceData(context.TODO(), td) + require.NoError(t, err) +} diff --git a/exporter/clickhouseexporter/factory.go b/exporter/clickhouseexporter/factory.go index c9c1b41f856c..b4d48867efc2 100644 --- a/exporter/clickhouseexporter/factory.go +++ b/exporter/clickhouseexporter/factory.go @@ -36,6 +36,7 @@ func NewFactory() component.ExporterFactory { typeStr, createDefaultConfig, component.WithLogsExporter(createLogsExporter, stability), + component.WithTracesExporter(createTracesExporter, stability), ) } @@ -46,6 +47,8 @@ func createDefaultConfig() config.Exporter { QueueSettings: QueueSettings{QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize}, RetrySettings: exporterhelper.NewDefaultRetrySettings(), LogsTableName: "otel_logs", + TracesTableName: "otel_traces", + TTLDays: 7, } } @@ -57,7 +60,7 @@ func createLogsExporter( cfg config.Exporter, ) (component.LogsExporter, error) { c := cfg.(*Config) - exporter, err := newExporter(set.Logger, c) + exporter, err := newLogsExporter(set.Logger, c) if err != nil { return nil, fmt.Errorf("cannot configure clickhouse logs exporter: %w", err) } @@ -73,3 +76,28 @@ func createLogsExporter( exporterhelper.WithRetry(c.RetrySettings), ) } + +// createTracesExporter creates a new exporter for traces. +// Traces are directly insert into clickhouse. +func createTracesExporter( + ctx context.Context, + set component.ExporterCreateSettings, + cfg config.Exporter, +) (component.TracesExporter, error) { + c := cfg.(*Config) + exporter, err := newTracesExporter(set.Logger, c) + if err != nil { + return nil, fmt.Errorf("cannot configure clickhouse traces exporter: %w", err) + } + + return exporterhelper.NewTracesExporter( + ctx, + set, + cfg, + exporter.pushTraceData, + exporterhelper.WithShutdown(exporter.Shutdown), + exporterhelper.WithTimeout(c.TimeoutSettings), + exporterhelper.WithQueue(c.enforcedQueueSettings()), + exporterhelper.WithRetry(c.RetrySettings), + ) +} diff --git a/exporter/clickhouseexporter/factory_test.go b/exporter/clickhouseexporter/factory_test.go index 435eb9eef4e0..9107e749460e 100644 --- a/exporter/clickhouseexporter/factory_test.go +++ b/exporter/clickhouseexporter/factory_test.go @@ -44,18 +44,23 @@ func TestFactory_CreateLogsExporter(t *testing.T) { require.NoError(t, exporter.Shutdown(context.TODO())) } -func TestFactory_CreateMetricsExporter_Fail(t *testing.T) { +func TestFactory_CreateTracesExporter(t *testing.T) { factory := NewFactory() - cfg := factory.CreateDefaultConfig() + cfg := withDefaultConfig(func(cfg *Config) { + cfg.DSN = defaultDSN + }) params := componenttest.NewNopExporterCreateSettings() - _, err := factory.CreateMetricsExporter(context.Background(), params, cfg) - require.Error(t, err, "expected an error when creating a traces exporter") + exporter, err := factory.CreateTracesExporter(context.Background(), params, cfg) + require.NoError(t, err) + require.NotNil(t, exporter) + + require.NoError(t, exporter.Shutdown(context.TODO())) } -func TestFactory_CreateTracesExporter_Fail(t *testing.T) { +func TestFactory_CreateMetricsExporter_Fail(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() params := componenttest.NewNopExporterCreateSettings() - _, err := factory.CreateTracesExporter(context.Background(), params, cfg) + _, err := factory.CreateMetricsExporter(context.Background(), params, cfg) require.Error(t, err, "expected an error when creating a traces exporter") } diff --git a/exporter/clickhouseexporter/testdata/config.yaml b/exporter/clickhouseexporter/testdata/config.yaml index 9ee7280a0fe8..8c0347f2c7aa 100644 --- a/exporter/clickhouseexporter/testdata/config.yaml +++ b/exporter/clickhouseexporter/testdata/config.yaml @@ -6,10 +6,12 @@ processors: exporters: clickhouse: - dsn: tcp://127.0.0.1:9000?database=default + dsn: tcp://127.0.0.1:9000/otel clickhouse/full: - dsn: tcp://127.0.0.1:9000?database=default + dsn: tcp://127.0.0.1:9000/otel ttl_days: 3 + logs_table_name: otel_logs + traces_table_name: otel_traces timeout: 5s retry_on_failure: enabled: true