Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clickhouseexporter: update table schema #12664

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ require (
github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect
github.com/Azure/go-autorest/logger v0.2.1 // indirect
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/ClickHouse/clickhouse-go v1.5.4 // indirect
github.com/ClickHouse/clickhouse-go/v2 v2.2.0 // indirect
github.com/DataDog/agent-payload/v5 v5.0.26 // indirect
github.com/DataDog/datadog-agent/pkg/obfuscate v0.38.0-rc.3.0.20220804102556-2fec6abdb5f7 // indirect
github.com/DataDog/datadog-agent/pkg/otlp/model v0.38.0-rc.3.0.20220804102556-2fec6abdb5f7 // indirect
Expand Down Expand Up @@ -105,7 +105,6 @@ require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/checkpoint-restore/go-criu/v5 v5.3.0 // indirect
github.com/cilium/ebpf v0.8.1 // indirect
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect
github.com/cloudfoundry-incubator/uaago v0.0.0-20190307164349-8136b7bbe76e // indirect
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4 // indirect
github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc // indirect
Expand Down Expand Up @@ -459,6 +458,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/openzipkin/zipkin-go v0.4.0 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/paulmach/orb v0.7.1 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
github.com/philhofer/fwd v1.1.1 // indirect
Expand All @@ -482,6 +482,7 @@ require (
github.com/seccomp/libseccomp-golang v0.9.2-0.20210429002308-3879420cc921 // indirect
github.com/secure-systems-lab/go-securesystemslib v0.3.1 // indirect
github.com/shirou/gopsutil/v3 v3.22.7 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/signalfx/com_signalfx_metrics_protobuf v0.0.3 // indirect
github.com/signalfx/gohistogram v0.0.0-20160107210732-1ccfd2ff5083 // indirect
github.com/signalfx/golib/v3 v3.3.37 // indirect
Expand Down
20 changes: 17 additions & 3 deletions cmd/configschema/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

78 changes: 36 additions & 42 deletions exporter/clickhouseexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import (
"context"
"database/sql"
"fmt"
"strings"
"time"

_ "github.com/ClickHouse/clickhouse-go" // For register database driver.
hanjm marked this conversation as resolved.
Show resolved Hide resolved
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.uber.org/zap"

_ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver.
)

type clickhouseExporter struct {
Expand Down Expand Up @@ -73,27 +74,30 @@ func (e *clickhouseExporter) pushLogsData(ctx context.Context, ld plog.Logs) err
defer func() {
_ = statement.Close()
}()
var serviceName string
for i := 0; i < ld.ResourceLogs().Len(); i++ {
logs := ld.ResourceLogs().At(i)
res := logs.Resource()
resourceKeys, resourceValues := attributesToSlice(res.Attributes())
resAttr := attributesToMap(res.Attributes())
if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok {
dmitryax marked this conversation as resolved.
Show resolved Hide resolved
serviceName = v.StringVal()
}
for j := 0; j < logs.ScopeLogs().Len(); j++ {
rs := logs.ScopeLogs().At(j).LogRecords()
for k := 0; k < rs.Len(); k++ {
r := rs.At(k)
attrKeys, attrValues := attributesToSlice(r.Attributes())
logAttr := attributesToMap(r.Attributes())
_, err = statement.ExecContext(ctx,
r.Timestamp().AsTime(),
r.TraceID().HexString(),
r.SpanID().HexString(),
r.Flags(),
r.SeverityText(),
r.SeverityNumber(),
int32(r.SeverityNumber()),
serviceName,
r.Body().AsString(),
resourceKeys,
resourceValues,
attrKeys,
attrValues,
resAttr,
logAttr,
)
if err != nil {
return fmt.Errorf("ExecContext:%w", err)
Expand All @@ -104,53 +108,45 @@ func (e *clickhouseExporter) pushLogsData(ctx context.Context, ld plog.Logs) err
return nil
})
duration := time.Since(start)
e.logger.Debug("insert logs", zap.Int("records", ld.LogRecordCount()),
e.logger.Info("insert logs", zap.Int("records", ld.LogRecordCount()),
zap.String("cost", duration.String()))
return err
}

func attributesToSlice(attributes pcommon.Map) ([]string, []string) {
keys := make([]string, 0, attributes.Len())
values := make([]string, 0, attributes.Len())
func attributesToMap(attributes pcommon.Map) map[string]string {
m := make(map[string]string, attributes.Len())
attributes.Range(func(k string, v pcommon.Value) bool {
keys = append(keys, formatKey(k))
values = append(values, v.AsString())
m[k] = v.StringVal()
return true
})
return keys, values
}

func formatKey(k string) string {
return strings.ReplaceAll(k, ".", "_")
return m
}

const (
// language=ClickHouse SQL
createLogsTableSQL = `
CREATE TABLE IF NOT EXISTS %s (
Timestamp DateTime CODEC(Delta, ZSTD(1)),
Timestamp DateTime64(9) CODEC(Delta, ZSTD(1)),
TraceId String CODEC(ZSTD(1)),
SpanId String CODEC(ZSTD(1)),
TraceFlags UInt32,
TraceFlags UInt32 CODEC(ZSTD(1)),
SeverityText LowCardinality(String) CODEC(ZSTD(1)),
SeverityNumber Int32,
SeverityNumber Int32 CODEC(ZSTD(1)),
ServiceName LowCardinality(String) CODEC(ZSTD(1)),
Body String CODEC(ZSTD(1)),
ResourceAttributes Nested
(
Key LowCardinality(String),
Value String
) CODEC(ZSTD(1)),
LogAttributes Nested
(
Key LowCardinality(String),
Value String
) CODEC(ZSTD(1)),
INDEX idx_attr_keys ResourceAttributes.Key TYPE bloom_filter(0.01) GRANULARITY 64,
INDEX idx_res_keys LogAttributes.Key TYPE bloom_filter(0.01) GRANULARITY 64
ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
LogAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_log_attr_key mapKeys(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_log_attr_value mapValues(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1
) ENGINE MergeTree()
%s
PARTITION BY toDate(Timestamp)
ORDER BY (toUnixTimestamp(Timestamp));
ORDER BY (ServiceName, SeverityText, toUnixTimestamp(Timestamp), TraceId)
SETTINGS index_granularity=8192, ttl_only_drop_parts = 1;
`
// language=ClickHouse SQL
insertLogsSQLTemplate = `INSERT INTO %s (
Expand All @@ -160,11 +156,10 @@ ORDER BY (toUnixTimestamp(Timestamp));
TraceFlags,
SeverityText,
SeverityNumber,
ServiceName,
Body,
ResourceAttributes.Key,
ResourceAttributes.Value,
LogAttributes.Key,
LogAttributes.Value
ResourceAttributes,
LogAttributes
) VALUES (
?,
?,
Expand All @@ -175,7 +170,6 @@ ORDER BY (toUnixTimestamp(Timestamp));
?,
?,
?,
?,
?
)`
)
Expand All @@ -194,7 +188,7 @@ func newClickhouseClient(cfg *Config) (*sql.DB, error) {
if cfg.TTLDays > 0 {
query = fmt.Sprintf(createLogsTableSQL,
cfg.LogsTableName,
fmt.Sprintf(`TTL Timestamp + INTERVAL %d DAY`, cfg.TTLDays))
fmt.Sprintf(`TTL toDateTime(Timestamp) + toIntervalDay(%d)`, cfg.TTLDays))
}
if _, err := db.Exec(query); err != nil {
return nil, fmt.Errorf("exec create table sql: %w", err)
Expand Down
8 changes: 6 additions & 2 deletions exporter/clickhouseexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,20 @@ require (
require go.uber.org/multierr v1.8.0

require (
github.com/ClickHouse/clickhouse-go v1.5.4
github.com/ClickHouse/clickhouse-go/v2 v2.2.0
github.com/stretchr/testify v1.8.0
go.opentelemetry.io/collector/pdata v0.57.2
go.opentelemetry.io/collector/semconv v0.57.2
)

require (
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/knadh/koanf v1.4.2 // indirect
github.com/kr/pretty v0.3.0 // indirect
Expand All @@ -32,9 +33,12 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/paulmach/orb v0.7.1 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/otel v1.8.0 // indirect
go.opentelemetry.io/otel/metric v0.31.0 // indirect
Expand Down
Loading