From efd50177e5612468d0c22f86de1dde514ff10803 Mon Sep 17 00:00:00 2001 From: composer <2789706336@qq.com> Date: Sat, 21 Dec 2024 00:51:18 +0800 Subject: [PATCH 1/2] refactor: json array to json line --- exporter/dorisexporter/exporter_common.go | 19 ++++++++++++++++++- .../dorisexporter/exporter_common_test.go | 9 +++++++++ exporter/dorisexporter/exporter_logs.go | 2 +- exporter/dorisexporter/exporter_traces.go | 2 +- .../metrics_exponential_histogram.go | 3 +-- exporter/dorisexporter/metrics_gauge.go | 3 +-- exporter/dorisexporter/metrics_histogram.go | 3 +-- exporter/dorisexporter/metrics_sum.go | 3 +-- exporter/dorisexporter/metrics_summary.go | 3 +-- 9 files changed, 34 insertions(+), 13 deletions(-) diff --git a/exporter/dorisexporter/exporter_common.go b/exporter/dorisexporter/exporter_common.go index 7c889c3d42f8..cc474fe47d5e 100644 --- a/exporter/dorisexporter/exporter_common.go +++ b/exporter/dorisexporter/exporter_common.go @@ -7,6 +7,7 @@ import ( "bytes" "context" "database/sql" + "encoding/json" "fmt" "net/http" "time" @@ -81,7 +82,7 @@ func streamLoadRequest(ctx context.Context, cfg *Config, table string, data []by req.Header.Set("format", "json") req.Header.Set("Expect", "100-continue") - req.Header.Set("strip_outer_array", "true") + req.Header.Set("read_json_by_line", "true") if cfg.ClientConfig.Timeout != 0 { req.Header.Set("timeout", fmt.Sprintf("%d", cfg.ClientConfig.Timeout/time.Second)) } @@ -118,3 +119,19 @@ func createAndUseDatabase(ctx context.Context, conn *sql.DB, cfg *Config) error _, err = conn.ExecContext(ctx, "USE "+cfg.Database) return err } + +type metric interface { + dMetricGauge | dMetricSum | dMetricHistogram | dMetricExponentialHistogram | dMetricSummary +} + +func toJsonLines[T dLog | dTrace | metric](data []*T) ([]byte, error) { + buf := &bytes.Buffer{} + enc := json.NewEncoder(buf) + for _, d := range data { + err := enc.Encode(d) + if err != nil { + return nil, err + } + } + return buf.Bytes(), nil +} diff --git a/exporter/dorisexporter/exporter_common_test.go b/exporter/dorisexporter/exporter_common_test.go index baea71b36bb9..afd6c83d2aee 100644 --- a/exporter/dorisexporter/exporter_common_test.go +++ b/exporter/dorisexporter/exporter_common_test.go @@ -5,6 +5,7 @@ package dorisexporter // import "github.com/open-telemetry/opentelemetry-collect import ( "net" + "strings" "testing" "time" @@ -60,3 +61,11 @@ func findRandomPort() (int, error) { return port, nil } + +func TestToJsonLines(t *testing.T) { + logs, err := toJsonLines([]*dLog{ + {}, {}, + }) + require.NoError(t, err) + require.Len(t, strings.Split(string(logs), "\n"), 2+1) +} diff --git a/exporter/dorisexporter/exporter_logs.go b/exporter/dorisexporter/exporter_logs.go index d3022e0c6290..8a81937c616f 100644 --- a/exporter/dorisexporter/exporter_logs.go +++ b/exporter/dorisexporter/exporter_logs.go @@ -122,7 +122,7 @@ func (e *logsExporter) pushLogData(ctx context.Context, ld plog.Logs) error { } func (e *logsExporter) pushLogDataInternal(ctx context.Context, logs []*dLog) error { - marshal, err := json.Marshal(logs) + marshal, err := toJsonLines(logs) if err != nil { return err } diff --git a/exporter/dorisexporter/exporter_traces.go b/exporter/dorisexporter/exporter_traces.go index 40793c613605..c59db592fb73 100644 --- a/exporter/dorisexporter/exporter_traces.go +++ b/exporter/dorisexporter/exporter_traces.go @@ -180,7 +180,7 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er } func (e *tracesExporter) pushTraceDataInternal(ctx context.Context, traces []*dTrace) error { - marshal, err := json.Marshal(traces) + marshal, err := toJsonLines(traces) if err != nil { return err } diff --git a/exporter/dorisexporter/metrics_exponential_histogram.go b/exporter/dorisexporter/metrics_exponential_histogram.go index 1498546f3229..561fbbd37aa3 100644 --- a/exporter/dorisexporter/metrics_exponential_histogram.go +++ b/exporter/dorisexporter/metrics_exponential_histogram.go @@ -5,7 +5,6 @@ package dorisexporter // import "github.com/open-telemetry/opentelemetry-collect import ( _ "embed" - "encoding/json" "fmt" "go.opentelemetry.io/collector/pdata/pmetric" @@ -118,5 +117,5 @@ func (m *metricModelExponentialHistogram) size() int { } func (m *metricModelExponentialHistogram) bytes() ([]byte, error) { - return json.Marshal(m.data) + return toJsonLines(m.data) } diff --git a/exporter/dorisexporter/metrics_gauge.go b/exporter/dorisexporter/metrics_gauge.go index a5cf1b9388dd..c27bc4142245 100644 --- a/exporter/dorisexporter/metrics_gauge.go +++ b/exporter/dorisexporter/metrics_gauge.go @@ -5,7 +5,6 @@ package dorisexporter // import "github.com/open-telemetry/opentelemetry-collect import ( _ "embed" - "encoding/json" "fmt" "go.opentelemetry.io/collector/pdata/pmetric" @@ -84,5 +83,5 @@ func (m *metricModelGauge) size() int { } func (m *metricModelGauge) bytes() ([]byte, error) { - return json.Marshal(m.data) + return toJsonLines(m.data) } diff --git a/exporter/dorisexporter/metrics_histogram.go b/exporter/dorisexporter/metrics_histogram.go index 18d1b3f3afdb..4166dd34b8f0 100644 --- a/exporter/dorisexporter/metrics_histogram.go +++ b/exporter/dorisexporter/metrics_histogram.go @@ -5,7 +5,6 @@ package dorisexporter // import "github.com/open-telemetry/opentelemetry-collect import ( _ "embed" - "encoding/json" "fmt" "go.opentelemetry.io/collector/pdata/pmetric" @@ -108,5 +107,5 @@ func (m *metricModelHistogram) size() int { } func (m *metricModelHistogram) bytes() ([]byte, error) { - return json.Marshal(m.data) + return toJsonLines(m.data) } diff --git a/exporter/dorisexporter/metrics_sum.go b/exporter/dorisexporter/metrics_sum.go index 56c66ba86419..d5281f8be92a 100644 --- a/exporter/dorisexporter/metrics_sum.go +++ b/exporter/dorisexporter/metrics_sum.go @@ -5,7 +5,6 @@ package dorisexporter // import "github.com/open-telemetry/opentelemetry-collect import ( _ "embed" - "encoding/json" "fmt" "go.opentelemetry.io/collector/pdata/pmetric" @@ -88,5 +87,5 @@ func (m *metricModelSum) size() int { } func (m *metricModelSum) bytes() ([]byte, error) { - return json.Marshal(m.data) + return toJsonLines(m.data) } diff --git a/exporter/dorisexporter/metrics_summary.go b/exporter/dorisexporter/metrics_summary.go index 499d56fbcb47..19683f1c30a1 100644 --- a/exporter/dorisexporter/metrics_summary.go +++ b/exporter/dorisexporter/metrics_summary.go @@ -5,7 +5,6 @@ package dorisexporter // import "github.com/open-telemetry/opentelemetry-collect import ( _ "embed" - "encoding/json" "fmt" "go.opentelemetry.io/collector/pdata/pmetric" @@ -89,5 +88,5 @@ func (m *metricModelSummary) size() int { } func (m *metricModelSummary) bytes() ([]byte, error) { - return json.Marshal(m.data) + return toJsonLines(m.data) } From aedacbc4b4c4f443f08b289f08bbbac5f7e7ddaa Mon Sep 17 00:00:00 2001 From: composer <2789706336@qq.com> Date: Sat, 21 Dec 2024 00:54:24 +0800 Subject: [PATCH 2/2] chore: chlog --- .chloggen/json_line.yaml | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/json_line.yaml diff --git a/.chloggen/json_line.yaml b/.chloggen/json_line.yaml new file mode 100644 index 000000000000..42f3e351c65d --- /dev/null +++ b/.chloggen/json_line.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: dorisexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: send json lines to doris rather than json array + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36896] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user]