Skip to content

Commit

Permalink
log exporter completed
Browse files Browse the repository at this point in the history
  • Loading branch information
destrex271 committed Oct 9, 2024
1 parent 7fe9727 commit 154bbb4
Show file tree
Hide file tree
Showing 6 changed files with 316 additions and 162 deletions.
21 changes: 12 additions & 9 deletions exporter/postgresexporter/config.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package postgresexporter

import (
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

type Config struct {
QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Database string `mapstructure:"database"`
Port string `mapstructure:"port"`
Host string `mapstructure:"host"`
LogsTableName string `mapstructure:"logs_table_name"`
TracesTableName string `mapstructure:"traces_table_name"`
MetricsTableName string `mapstructure:"metrics_table_name"`
TimeoutSettings exporterhelper.TimeoutConfig `mapstructure:",squash"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Database string `mapstructure:"database"`
Port int `mapstructure:"port"`
Host string `mapstructure:"host"`
LogsTableName string `mapstructure:"logs_table_name"`
TracesTableName string `mapstructure:"traces_table_name"`
MetricsTableName string `mapstructure:"metrics_table_name"`
}
24 changes: 10 additions & 14 deletions exporter/postgresexporter/example/otel-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ receivers:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318

exporters:
postgresexporter:
username: "postgres"
Expand All @@ -14,23 +13,20 @@ exporters:
logs_table_name: "otellogs"
traces_table_name: "oteltraces"
metrics_table_name: "otelmetrics"
port: "5432"
hostname: "localhost"
port: 5432
host: "localhost"
timeout: 1000s
sending_queue:
queue_size: 100
retry_on_failure:
enabled: true
initial_interval: 5s
max_interval: 30s
max_elapsed_time: 300s
debug:

service:
pipelines:
logs:
receivers: [ otlp ]
exporters:
- debug
- postgresexporter
traces:
receivers: [ otlp ]
exporters:
- debug
- postgresexporter
metrics:
receivers: [ otlp ]
exporters:
- debug
Expand Down
206 changes: 127 additions & 79 deletions exporter/postgresexporter/exporter_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package postgresexporter
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"strings"
"time"

_ "github.com/lib/pq"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -35,7 +39,53 @@ func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) {
}, nil
}

type LogRecord struct {
Timestamp string
TimestampTime string
TraceId string
SpanId string
TraceFlags int16
SeverityText string
SeverityNumber int16
ServiceName string
Body string
ResourceSchemaUrl string
ResourceAttributes string
ScopeSchemaUrl string
ScopeName string
ScopeVersion string
ScopeAttributes string
LogAttributes string
}

func (e *logsExporter) start(ctx context.Context, _ component.Host) error {
ac, err := e.client.Query("SELECT * FROM " + e.cfg.LogsTableName)
for ac.Next() {
var logRecord LogRecord
err := ac.Scan(
&logRecord.Timestamp,
&logRecord.TimestampTime,
&logRecord.TraceId,
&logRecord.SpanId,
&logRecord.TraceFlags,
&logRecord.SeverityText,
&logRecord.SeverityNumber,
&logRecord.ServiceName,
&logRecord.Body,
&logRecord.ResourceSchemaUrl,
&logRecord.ResourceAttributes,
&logRecord.ScopeSchemaUrl,
&logRecord.ScopeName,
&logRecord.ScopeVersion,
&logRecord.ScopeAttributes,
&logRecord.LogAttributes,
)
if err != nil {
log.Fatalf("Scan failed: %v", err)
}
fmt.Printf("%+v\n", logRecord) // Print the log record
}
log.Println(ac, err)
return createLogsTable(ctx, e.cfg, e.client)
}

Expand All @@ -46,16 +96,17 @@ func (e *logsExporter) shutdown(_ context.Context) error {
return nil
}

func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
func (e *logsExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
e.logger.Info("Received logs", zap.Any("logs", ld))
start := time.Now()

err := doWithTx(ctx, e.client, func(tx *sql.Tx) error {
statement, err := tx.PrepareContext(ctx, e.insertSQL)
if err != nil {
return fmt.Errorf("PrepareContext:%w", err)
return fmt.Errorf("failed to prepare insert statement: %w", err)
}
defer func() {
_ = statement.Close()
}()
defer statement.Close()

var serviceName string

for i := 0; i < ld.ResourceLogs().Len(); i++ {
Expand All @@ -68,11 +119,12 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
}

for j := 0; j < logs.ScopeLogs().Len(); j++ {
rs := logs.ScopeLogs().At(j).LogRecords()
scopeURL := logs.ScopeLogs().At(j).SchemaUrl()
scopeName := logs.ScopeLogs().At(j).Scope().Name()
scopeVersion := logs.ScopeLogs().At(j).Scope().Version()
scopeAttr := attributesToMap(logs.ScopeLogs().At(j).Scope().Attributes())
scopeLogs := logs.ScopeLogs().At(j)
rs := scopeLogs.LogRecords()
scopeURL := scopeLogs.SchemaUrl()
scopeName := scopeLogs.Scope().Name()
scopeVersion := scopeLogs.Scope().Version()
scopeAttr := attributesToMap(scopeLogs.Scope().Attributes())

for k := 0; k < rs.Len(); k++ {
r := rs.At(k)
Expand Down Expand Up @@ -101,16 +153,18 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
logAttr,
)
if err != nil {
return fmt.Errorf("ExecContext:%w", err)
return fmt.Errorf("failed to execute insert statement: %w", err)
}
}
}
}
return nil
})

duration := time.Since(start)
e.logger.Debug("insert logs", zap.Int("records", ld.LogRecordCount()),
zap.String("cost", duration.String()))

return err
}

Expand All @@ -122,6 +176,8 @@ func newPostgresClient(cfg *Config) (*sql.DB, error) {
if err != nil {
return nil, err
}
r, err := db.Exec("select version();")
log.Println("checkkkk", r)
return db, nil
}

Expand All @@ -135,7 +191,7 @@ func createLogsTable(ctx context.Context, cfg *Config, db *sql.DB) error {

// SQL rendering functions below
func renderCreateLogsTableSQL(cfg *Config) string {
return fmt.Sprintf(createLogsTableSQL, cfg.LogsTableName)
return strings.ReplaceAll(createLogsTableSQL, "{{TABLE_NAME}}", cfg.LogsTableName)
}

func renderInsertLogsSQL(cfg *Config) string {
Expand All @@ -144,71 +200,56 @@ func renderInsertLogsSQL(cfg *Config) string {

const (
createLogsTableSQL = `
CREATE TABLE IF NOT EXISTS %s (
"Timestamp" TIMESTAMP(9) NOT NULL,
"TimestampTime" TIMESTAMP DEFAULT "Timestamp",
"TraceId" TEXT,
"SpanId" TEXT,
"TraceFlags" SMALLINT,
"SeverityText" TEXT,
"SeverityNumber" SMALLINT,
"ServiceName" TEXT,
"Body" TEXT,
"ResourceSchemaUrl" TEXT,
"ResourceAttributes" JSONB,
"ScopeSchemaUrl" TEXT,
"ScopeName" TEXT,
"ScopeVersion" TEXT,
"ScopeAttributes" JSONB,
"LogAttributes" JSONB,
PRIMARY KEY ("ServiceName", "TimestampTime"),
INDEX idx_trace_id ("TraceId"),
INDEX idx_res_attr_key (("ResourceAttributes"->>'key')),
INDEX idx_res_attr_value (("ResourceAttributes"->>'value')),
INDEX idx_scope_attr_key (("ScopeAttributes"->>'key')),
INDEX idx_scope_attr_value (("ScopeAttributes"->>'value')),
INDEX idx_log_attr_key (("LogAttributes"->>'key')),
INDEX idx_log_attr_value (("LogAttributes"->>'value')),
INDEX idx_body ("Body")
);
`

insertLogsSQLTemplate = `
INSERT INTO %s (
"Timestamp",
"TraceId",
"SpanId",
"TraceFlags",
"SeverityText",
"SeverityNumber",
"ServiceName",
"Body",
"ResourceSchemaUrl",
"ResourceAttributes",
"ScopeSchemaUrl",
"ScopeName",
"ScopeVersion",
"ScopeAttributes",
"LogAttributes"
) VALUES (
?,
?,
?,
?,
?,
?,
?,
?,
?,
?::jsonb,
?,
?,
?,
?::jsonb,
?::jsonb
);
`
CREATE TABLE IF NOT EXISTS {{TABLE_NAME}} (
"Timestamp" TIMESTAMP(9) NOT NULL,
"TimestampTime" TIMESTAMP NOT NULL DEFAULT NOW(),
"TraceId" TEXT,
"SpanId" TEXT,
"TraceFlags" SMALLINT,
"SeverityText" TEXT,
"SeverityNumber" SMALLINT,
"ServiceName" TEXT,
"Body" TEXT,
"ResourceSchemaUrl" TEXT,
"ResourceAttributes" JSONB,
"ScopeSchemaUrl" TEXT,
"ScopeName" TEXT,
"ScopeVersion" TEXT,
"ScopeAttributes" JSONB,
"LogAttributes" JSONB,
PRIMARY KEY ("ServiceName", "TimestampTime")
);
CREATE INDEX IF NOT EXISTS idx_trace_id ON {{TABLE_NAME}} ("TraceId");
CREATE INDEX IF NOT EXISTS idx_res_attr_key ON {{TABLE_NAME}} (("ResourceAttributes"->>'key'));
CREATE INDEX IF NOT EXISTS idx_res_attr_value ON {{TABLE_NAME}} (("ResourceAttributes"->>'value'));
CREATE INDEX IF NOT EXISTS idx_scope_attr_key ON {{TABLE_NAME}} (("ScopeAttributes"->>'key'));
CREATE INDEX IF NOT EXISTS idx_scope_attr_value ON {{TABLE_NAME}} (("ScopeAttributes"->>'value'));
CREATE INDEX IF NOT EXISTS idx_log_attr_key ON {{TABLE_NAME}} (("LogAttributes"->>'key'));
CREATE INDEX IF NOT EXISTS idx_log_attr_value ON {{TABLE_NAME}} (("LogAttributes"->>'value'));
CREATE INDEX IF NOT EXISTS idx_body ON {{TABLE_NAME}} ("Body");
`

insertLogsSQLTemplate = `INSERT INTO %s (
"Timestamp",
"TraceId",
"SpanId",
"TraceFlags",
"SeverityText",
"SeverityNumber",
"ServiceName",
"Body",
"ResourceSchemaUrl",
"ResourceAttributes",
"ScopeSchemaUrl",
"ScopeName",
"ScopeVersion",
"ScopeAttributes",
"LogAttributes"
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15
)`
)

func doWithTx(_ context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error {
Expand All @@ -225,11 +266,18 @@ func doWithTx(_ context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error {
return tx.Commit()
}

func attributesToMap(attributes pcommon.Map) map[string]string {
func attributesToMap(attributes pcommon.Map) string {
m := make(map[string]string, attributes.Len())
attributes.Range(func(k string, v pcommon.Value) bool {
m[k] = v.AsString()
return true
})
return m

jsonData, err := json.Marshal(m)
if err != nil {
// Handle error more gracefully
return "{}" // or any appropriate fallback
}

return string(jsonData) // Return the JSON string
}
Loading

0 comments on commit 154bbb4

Please sign in to comment.