Skip to content

Commit

Permalink
Merge branch 'main' into tyler/kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
MovieStoreGuy authored Sep 17, 2024
2 parents fd5aeed + 7e8ebd7 commit ca0fc7d
Show file tree
Hide file tree
Showing 29 changed files with 645 additions and 93 deletions.
27 changes: 27 additions & 0 deletions .chloggen/doris-logs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: dorisexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: logs implementation

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33479]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
22 changes: 22 additions & 0 deletions .chloggen/rm-copies-deltatorateprocessor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: deltatorateprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Remove unnecessary data copies.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35165]

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
22 changes: 22 additions & 0 deletions .chloggen/rm-copy-transformprocessor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: transformprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Remove unnecessary data copy when transform sum to/from gauge

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35177]

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ internal/tools/ @open-teleme

pkg/batchperresourceattr/ @open-telemetry/collector-contrib-approvers @atoulme @dmitryax
pkg/batchpersignal/ @open-telemetry/collector-contrib-approvers @jpkrohling
pkg/experimentalmetricmetadata/ @open-telemetry/collector-contrib-approvers @rmfitzpatrick
pkg/datadog/ @open-telemetry/collector-contrib-approvers @mx-psi @dineshg13 @liustanley @songy23 @mackjmr @ankitpatel96
pkg/experimentalmetricmetadata/ @open-telemetry/collector-contrib-approvers @rmfitzpatrick
pkg/golden/ @open-telemetry/collector-contrib-approvers @djaglowski @atoulme
pkg/kafka/topic/ @open-telemetry/collector-contrib-approvers @pavolloffay @MovieStoreGuy
pkg/ottl/ @open-telemetry/collector-contrib-approvers @TylerHelmuth @kentquirk @bogdandrutu @evan-bradley
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ body:
- internal/tools
- pkg/batchperresourceattr
- pkg/batchpersignal
- pkg/datadog
- pkg/experimentalmetricmetadata
- pkg/golden
- pkg/kafka/topic
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ body:
- internal/tools
- pkg/batchperresourceattr
- pkg/batchpersignal
- pkg/datadog
- pkg/experimentalmetricmetadata
- pkg/golden
- pkg/kafka/topic
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ body:
- internal/tools
- pkg/batchperresourceattr
- pkg/batchpersignal
- pkg/datadog
- pkg/experimentalmetricmetadata
- pkg/golden
- pkg/kafka/topic
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/unmaintained.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ body:
- internal/tools
- pkg/batchperresourceattr
- pkg/batchpersignal
- pkg/datadog
- pkg/experimentalmetricmetadata
- pkg/golden
- pkg/kafka/topic
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/build-and-test-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ jobs:
- name: Ensure required ports in the dynamic range are available
run: |
& ${{ github.workspace }}\.github\workflows\scripts\win-required-ports.ps1
- name: Build shared test tools
# If component tests share Makefile targets they need to be added here to avoid
# concurrent component tests clashing when building such targets. This applies
# specifically to Windows, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/34691
run: make "$(${PWD} -replace '\\', '/')/.tools/gotestsum"
- name: Run Unit tests
run: make -j2 gotest GROUP=${{ matrix.group }}
windows-unittest:
Expand Down
17 changes: 9 additions & 8 deletions exporter/coralogixexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,15 @@ exporters:
Depending on your region, you might need to use a different domain. Here are the available domains:
| Region | Domain |
|---------|---------------------------------|
| USA1 | `coralogix.us` |
| USA2 | `cx498.coralogix.com` |
| APAC1 | `coralogix.in` |
| APAC2 | `coralogixsg.com` |
| EUROPE1 | `coralogix.com` |
| EUROPE2 | `eu2.coralogix.com` |
| Region | Domain |
|---------|-------------------------|
| USA1 | `coralogix.us` |
| USA2 | `cx498.coralogix.com` |
| APAC1 | `coralogix.in` |
| APAC2 | `coralogixsg.com` |
| APAC3 | `ap3.coralogix.com` |
| EUROPE1 | `coralogix.com` |
| EUROPE2 | `eu2.coralogix.com` |

Additionally, Coralogix supports AWS PrivateLink, which provides private connectivity between virtual private clouds (VPCs), supported AWS services, and your on-premises networks without exposing your traffic to the public internet.

Expand Down
19 changes: 19 additions & 0 deletions exporter/dorisexporter/exporter_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package dorisexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dorisexporter"

import (
"net"
"testing"
"time"

Expand Down Expand Up @@ -51,3 +52,21 @@ func TestStreamLoadUrl(t *testing.T) {
url := streamLoadURL("http://doris:8030", "otel", "otel_logs")
require.Equal(t, "http://doris:8030/api/otel/otel_logs/_stream_load", url)
}

func findRandomPort() (int, error) {
l, err := net.Listen("tcp", "localhost:0")

if err != nil {
return 0, err
}

port := l.Addr().(*net.TCPAddr).Port

err = l.Close()

if err != nil {
return 0, err
}

return port, nil
}
159 changes: 159 additions & 0 deletions exporter/dorisexporter/exporter_logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package dorisexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dorisexporter"

import (
"context"
_ "embed" // for SQL file embedding
"encoding/json"
"fmt"
"io"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
semconv "go.opentelemetry.io/collector/semconv/v1.25.0"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
)

//go:embed sql/logs_ddl.sql
var logsDDL string

// dLog Log to Doris
type dLog struct {
ServiceName string `json:"service_name"`
Timestamp string `json:"timestamp"`
TraceID string `json:"trace_id"`
SpanID string `json:"span_id"`
SeverityNumber int32 `json:"severity_number"`
SeverityText string `json:"severity_text"`
Body string `json:"body"`
ResourceAttributes map[string]any `json:"resource_attributes"`
LogAttributes map[string]any `json:"log_attributes"`
ScopeName string `json:"scope_name"`
ScopeVersion string `json:"scope_version"`
}

type logsExporter struct {
*commonExporter
}

func newLogsExporter(logger *zap.Logger, cfg *Config, set component.TelemetrySettings) *logsExporter {
return &logsExporter{
commonExporter: newExporter(logger, cfg, set),
}
}

func (e *logsExporter) start(ctx context.Context, host component.Host) error {
client, err := createDorisHTTPClient(ctx, e.cfg, host, e.TelemetrySettings)
if err != nil {
return err
}
e.client = client

if !e.cfg.CreateSchema {
return nil
}

conn, err := createDorisMySQLClient(e.cfg)
if err != nil {
return err
}
defer conn.Close()

err = createAndUseDatabase(ctx, conn, e.cfg)
if err != nil {
return err
}

ddl := fmt.Sprintf(logsDDL, e.cfg.Table.Logs, e.cfg.propertiesStr())
_, err = conn.ExecContext(ctx, ddl)
return err
}

func (e *logsExporter) shutdown(_ context.Context) error {
if e.client != nil {
e.client.CloseIdleConnections()
}
return nil
}

func (e *logsExporter) pushLogData(ctx context.Context, ld plog.Logs) error {
logs := make([]*dLog, 0, ld.LogRecordCount())

for i := 0; i < ld.ResourceLogs().Len(); i++ {
resourceLogs := ld.ResourceLogs().At(i)
resource := resourceLogs.Resource()
resourceAttributes := resource.Attributes()
serviceName := ""
v, ok := resourceAttributes.Get(semconv.AttributeServiceName)
if ok {
serviceName = v.AsString()
}

for j := 0; j < resourceLogs.ScopeLogs().Len(); j++ {
scopeLogs := resourceLogs.ScopeLogs().At(j)

for k := 0; k < scopeLogs.LogRecords().Len(); k++ {
logRecord := scopeLogs.LogRecords().At(k)

log := &dLog{
ServiceName: serviceName,
Timestamp: e.formatTime(logRecord.Timestamp().AsTime()),
TraceID: traceutil.TraceIDToHexOrEmptyString(logRecord.TraceID()),
SpanID: traceutil.SpanIDToHexOrEmptyString(logRecord.SpanID()),
SeverityNumber: int32(logRecord.SeverityNumber()),
SeverityText: logRecord.SeverityText(),
Body: logRecord.Body().AsString(),
ResourceAttributes: resourceAttributes.AsRaw(),
LogAttributes: logRecord.Attributes().AsRaw(),
ScopeName: scopeLogs.Scope().Name(),
ScopeVersion: scopeLogs.Scope().Version(),
}

logs = append(logs, log)
}

}

}

return e.pushLogDataInternal(ctx, logs)
}

func (e *logsExporter) pushLogDataInternal(ctx context.Context, logs []*dLog) error {
marshal, err := json.Marshal(logs)
if err != nil {
return err
}

req, err := streamLoadRequest(ctx, e.cfg, e.cfg.Table.Logs, marshal)
if err != nil {
return err
}

res, err := e.client.Do(req)
if err != nil {
return err
}
defer res.Body.Close()

body, err := io.ReadAll(res.Body)
if err != nil {
return err
}

response := streamLoadResponse{}
err = json.Unmarshal(body, &response)
if err != nil {
return err
}

if !response.success() {
return fmt.Errorf("failed to push log data: %s", response.Message)
}

return nil
}
Loading

0 comments on commit ca0fc7d

Please sign in to comment.