Skip to content

Commit

Permalink
[exporter/clickhouse] Implement consume log logic (#8028)
Browse files Browse the repository at this point in the history
* [exporter/clickhouse] Implement consume log logic

Signed-off-by: Jimmie Han <hanjinming@outlook.com>
  • Loading branch information
hanjm authored Mar 8, 2022
1 parent eeb8135 commit 8de9d21
Show file tree
Hide file tree
Showing 21 changed files with 778 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- `spanmetricsprocessor`: Dropping the condition to replace _ with key_ as __ label is reserved and _ is not (#8057)
- `podmanreceiver`: Add container.runtime attribute to container metrics (#8262)
- `dockerstatsreceiver`: Add container.runtime attribute to container metrics (#8261)
- `clickhouseexporter`: Implement consume log logic. (#9705)

### 🛑 Breaking changes 🛑

Expand Down
2 changes: 2 additions & 0 deletions cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect
github.com/Azure/go-autorest/logger v0.2.1 // indirect
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/ClickHouse/clickhouse-go v1.5.4 // indirect
github.com/DataDog/agent-payload/v5 v5.0.16 // indirect
github.com/DataDog/datadog-agent/pkg/quantile v0.33.1 // indirect
github.com/DataDog/datadog-agent/pkg/trace/exportable v0.0.0-20201016145401-4646cf596b02 // indirect
Expand Down Expand Up @@ -68,6 +69,7 @@ require (
github.com/checkpoint-restore/go-criu/v5 v5.3.0 // indirect
github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect
github.com/cilium/ebpf v0.7.0 // indirect
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect
github.com/cloudfoundry-incubator/uaago v0.0.0-20190307164349-8136b7bbe76e // indirect
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4 // indirect
github.com/cncf/xds/go v0.0.0-20211130200136-a8f946100490 // indirect
Expand Down
6 changes: 6 additions & 0 deletions cmd/configschema/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions exporter/clickhouseexporter/Makefile
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
include ../../Makefile.Common

local-run-example:
cd ../../ && GOOS=linux go build -o ./local/otelcontribcol ./cmd/otelcontribcol
cd example && docker-compose up -d
recreate-otel-collector:
cd ../../ && GOOS=linux go build -o ./local/otelcontribcol ./cmd/otelcontribcol
cd example && docker-compose up --build otel-collector
64 changes: 32 additions & 32 deletions exporter/clickhouseexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ Support time-series graph, table and logs.
```clickhouse
/* get error count about my service last 1 hour.*/
SELECT count(*)
FROM logs
WHERE SeverityText="ERROR" AND timestamp >= NOW() - INTERVAL 1 HOUR;
/* find error log.*/
FROM otel_logs
WHERE SeverityText='ERROR' AND Timestamp >= NOW() - INTERVAL 1 HOUR;
/* find log.*/
SELECT *
FROM logs
WHERE SeverityText="ERROR" AND timestamp >= NOW() - INTERVAL 1 HOUR;
FROM otel_logs
WHERE Timestamp >= NOW() - INTERVAL 1 HOUR;
/* find log with specific attribute .*/
SELECT Body
FROM logs
WHERE Attributes.value[indexOf(Attributes.key, 'http.method')] = 'post' AND timestamp >= NOW() - INTERVAL 1 HOUR;
FROM otel_logs
WHERE LogAttributes.Value[indexOf(LogAttributes.Key, 'http_method')] = 'post' AND Timestamp >= NOW() - INTERVAL 1 HOUR;
```

## Configuration options
Expand All @@ -47,7 +47,8 @@ The following settings are required:

The following settings can be optionally configured:

- `ttl_days` (default=0): The data time-to-live in days, 0 means no ttl.
- `ttl_days` (defaul t= 0): The data time-to-live in days, 0 means no ttl.
- `logs_table_name` (default = otel_logs): The table name for logs.
- `timeout` (default = 5s): The timeout for every attempt to send data to the backend.
- `sending_queue`
- `queue_size` (default = 5000): Maximum number of batches kept in memory before dropping data.
Expand Down Expand Up @@ -86,29 +87,28 @@ service:
## Schema
```clickhouse
CREATE TABLE IF NOT EXISTS logs (
timestamp DateTime CODEC(Delta, ZSTD(1)),
TraceId String CODEC(ZSTD(1)),
SpanId String CODEC(ZSTD(1)),
TraceFlags Int64,
SeverityText LowCardinality(String) CODEC(ZSTD(1)),
SeverityNumber Int64,
Name LowCardinality(String) CODEC(ZSTD(1)),
Body String CODEC(ZSTD(1)),
Attributes Nested
(
key LowCardinality(String),
value String
) CODEC(ZSTD(1)),
Resource Nested
(
key LowCardinality(String),
value String
) CODEC(ZSTD(1)),
INDEX idx_attr_keys Attributes.key TYPE bloom_filter(0.01) GRANULARITY 64,
INDEX idx_res_keys Resource.key TYPE bloom_filter(0.01) GRANULARITY 64
CREATE TABLE IF NOT EXISTS otel_logs (
Timestamp DateTime CODEC(Delta, ZSTD(1)),
TraceId String CODEC(ZSTD(1)),
SpanId String CODEC(ZSTD(1)),
TraceFlags UInt32,
SeverityText LowCardinality(String) CODEC(ZSTD(1)),
SeverityNumber Int32,
Body String CODEC(ZSTD(1)),
ResourceAttributes Nested
(
Key LowCardinality(String),
Value String
) CODEC(ZSTD(1)),
LogAttributes Nested
(
Key LowCardinality(String),
Value String
) CODEC(ZSTD(1)),
INDEX idx_attr_keys ResourceAttributes.Key TYPE bloom_filter(0.01) GRANULARITY 64,
INDEX idx_res_keys LogAttributes.Key TYPE bloom_filter(0.01) GRANULARITY 64
) ENGINE MergeTree()
TTL timestamp + INTERVAL 3 DAY
PARTITION BY toDate(timestamp)
ORDER BY (Name, -toUnixTimestamp(timestamp))
TTL Timestamp + INTERVAL 3 DAY
PARTITION BY toDate(Timestamp)
ORDER BY (toUnixTimestamp(Timestamp));
```
2 changes: 2 additions & 0 deletions exporter/clickhouseexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Config struct {
// For tcp protocol reference: [ClickHouse/clickhouse-go#dsn](https://github.com/ClickHouse/clickhouse-go#dsn).
// For http protocol reference: [mailru/go-clickhouse/#dsn](https://github.com/mailru/go-clickhouse/#dsn).
DSN string `mapstructure:"dsn"`
// LogsTableName is the table name for logs. default is `otel_logs`.
LogsTableName string `mapstructure:"logs_table_name"`
// TTLDays is The data time-to-live in days, 0 means no ttl.
TTLDays uint `mapstructure:"ttl_days"`
}
Expand Down
74 changes: 74 additions & 0 deletions exporter/clickhouseexporter/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clickhouseexporter

import (
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/service/servicetest"
)

func TestLoadConfig(t *testing.T) {
factories, err := componenttest.NopFactories()
require.NoError(t, err)

factory := NewFactory()
factories.Exporters[typeStr] = factory
cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, len(cfg.Exporters), 2)

defaultCfg := factory.CreateDefaultConfig()
defaultCfg.(*Config).DSN = "tcp://127.0.0.1:9000?database=default"
r0 := cfg.Exporters[config.NewComponentID(typeStr)]
assert.Equal(t, r0, defaultCfg)

r1 := cfg.Exporters[config.NewComponentIDWithName(typeStr, "full")].(*Config)
assert.Equal(t, r1, &Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "full")),
DSN: "tcp://127.0.0.1:9000?database=default",
TTLDays: 3,
LogsTableName: "otel_logs",
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 5 * time.Second,
},
RetrySettings: exporterhelper.RetrySettings{
Enabled: true,
InitialInterval: 5 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 300 * time.Second,
},
QueueSettings: QueueSettings{
QueueSize: 100,
},
})
}

func withDefaultConfig(fns ...func(*Config)) *Config {
cfg := createDefaultConfig().(*Config)
for _, fn := range fns {
fn(cfg)
}
return cfg
}
12 changes: 12 additions & 0 deletions exporter/clickhouseexporter/example/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM alpine:latest as certs
RUN apk --update add ca-certificates

FROM scratch

ARG USER_UID=10001
USER ${USER_UID}

COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY ./otelcontribcol /otelcontribcol
ENTRYPOINT ["/otelcontribcol"]
EXPOSE 4317 55680 55679
17 changes: 17 additions & 0 deletions exporter/clickhouseexporter/example/datasource.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# config file version
apiVersion: 1

datasources:
- name: ClickHouse-official
type: grafana-clickhouse-datasource
jsonData:
defaultDatabase: default
port: 9000
server: clickhouse
username:
tlsSkipVerify: true
secureJsonData:
password:
- name: ClickHouse-vertamedia
type: vertamedia-clickhouse-datasource
url: http://clickhouse:8123
69 changes: 69 additions & 0 deletions exporter/clickhouseexporter/example/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
version: "3"

networks:
otel-clickhouse:

services:
otel-collector:
build:
context: ../../../local
dockerfile: ../exporter/clickhouseexporter/example/Dockerfile
# Uncomment the next line to use a preexisting image
# image: otelcontribcol:latest
container_name: otel
command:
- "--config=/etc/otel-collector-config.yml"
- "--set=service.telemetry.logs.level=DEBUG"
volumes:
- ./otel-collector-config.yml:/etc/otel-collector-config.yml
ports:
- "1888:1888" # pprof extension
- "8888:8888" # Prometheus metrics exposed by the collector
- "13133:13133" # health_check extension
- "55679:55679" # zpages extension
- "24224:24224" # fluentforwarder
- "24224:24224/udp" # fluentforwarder
depends_on:
- clickhouse
networks:
- otel-clickhouse

clickhouse:
image: clickhouse/clickhouse-server:latest
ports:
- "9000:9000"
- "8123:8123"
networks:
- otel-clickhouse

grafana:
image: grafana/grafana:latest
volumes:
- ./grafana.ini:/etc/grafana/grafana.ini
- ./datasource.yaml:/etc/grafana/provisioning/datasources/datasource.yaml
environment:
GF_INSTALL_PLUGINS: grafana-clickhouse-datasource,vertamedia-clickhouse-datasource
GF_PLUGINS_ALLOW_LOADING_UNSIGNED_PLUGINS: vertamedia-clickhouse-datasource
ports:
- "3000:3000"
networks:
- otel-clickhouse

# Log generator
flog:
image: mingrammer/flog:0.4.3
# Output fake log in JSON format
command: [ "--format=json", "--loop", "--delay=0.0001ms"]
networks:
- otel-clickhouse
depends_on:
- otel-collector
logging:
driver: fluentd
options:
mode: non-blocking
# Allow time for otel-collector to spin up, then forward fluentd logs to the fluentforwarder receiver.
fluentd-async-connect: "true"
# Use nanosecond precision
fluentd-sub-second-precision: "true"
stop_signal: SIGKILL
7 changes: 7 additions & 0 deletions exporter/clickhouseexporter/example/grafana.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[auth]
disable_login_form = true

[auth.anonymous]
enabled = true
org_name = Main Org.
org_role = Admin
39 changes: 39 additions & 0 deletions exporter/clickhouseexporter/example/otel-collector-config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
receivers:
fluentforward:
endpoint: 0.0.0.0:24224

processors:
batch:
send_batch_size: 100000
timeout: 2s
memory_limiter:
check_interval: 2s
limit_mib: 1800
spike_limit_mib: 500

exporters:
clickhouse:
dsn: tcp://clickhouse:9000?database=default
ttl_days: 3
timeout: 5s
sending_queue:
queue_size: 100
retry_on_failure:
enabled: true
initial_interval: 5s
max_interval: 30s
max_elapsed_time: 300s
extensions:
health_check:
pprof:
zpages:
memory_ballast:
size_mib: 1000

service:
extensions: [pprof, zpages, health_check]
pipelines:
logs:
receivers: [ fluentforward ]
processors: [ memory_limiter, batch ]
exporters: [ clickhouse ]
Loading

0 comments on commit 8de9d21

Please sign in to comment.