diff --git a/.chloggen/telemetrygen-logs-http.yaml b/.chloggen/telemetrygen-logs-http.yaml new file mode 100644 index 000000000000..602ced835c2f --- /dev/null +++ b/.chloggen/telemetrygen-logs-http.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: cmd/telemetrygen + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for --otlp-http for telemetrygen logs + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [18867] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/cmd/telemetrygen/config_test.go b/cmd/telemetrygen/config_test.go index 3a67a0a6ed54..2eabfd7b373a 100644 --- a/cmd/telemetrygen/config_test.go +++ b/cmd/telemetrygen/config_test.go @@ -11,8 +11,8 @@ import ( // TestConfig_HTTPPath verifies that the HTTPPath configuration defaults are correctly set for each sub-command. func TestConfig_HTTPPath(t *testing.T) { - t.Run("LogsConfigEmptyDefaultUrlPath", func(t *testing.T) { - assert.Equal(t, "", logsCfg.HTTPPath) + t.Run("LogsConfigValidDefaultUrlPath", func(t *testing.T) { + assert.Equal(t, "/v1/logs", logsCfg.HTTPPath) }) t.Run("MetricsConfigValidDefaultUrlPath", func(t *testing.T) { diff --git a/cmd/telemetrygen/internal/logs/config.go b/cmd/telemetrygen/internal/logs/config.go index a071e7f86ce1..e6fdfac534b5 100644 --- a/cmd/telemetrygen/internal/logs/config.go +++ b/cmd/telemetrygen/internal/logs/config.go @@ -19,6 +19,9 @@ type Config struct { // Flags registers config flags. func (c *Config) Flags(fs *pflag.FlagSet) { c.CommonFlags(fs) + + fs.StringVar(&c.HTTPPath, "otlp-http-url-path", "/v1/logs", "Which URL path to write to") + fs.IntVar(&c.NumLogs, "logs", 1, "Number of logs to generate in each worker (ignored if duration is provided)") fs.StringVar(&c.Body, "body", "the message", "Body of the log") } diff --git a/cmd/telemetrygen/internal/logs/exporter.go b/cmd/telemetrygen/internal/logs/exporter.go new file mode 100644 index 000000000000..b8b3601e50af --- /dev/null +++ b/cmd/telemetrygen/internal/logs/exporter.go @@ -0,0 +1,93 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package logs + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type exporter interface { + export(plog.Logs) error +} + +func newExporter(ctx context.Context, cfg *Config) (exporter, error) { + if cfg.UseHTTP { + return &httpClientExporter{ + client: http.DefaultClient, + cfg: cfg, + }, nil + } + + if !cfg.Insecure { + return nil, fmt.Errorf("'telemetrygen logs' only supports insecure gRPC") + } + // only support grpc in insecure mode + clientConn, err := grpc.DialContext(ctx, cfg.Endpoint(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, err + } + return &gRPCClientExporter{client: plogotlp.NewGRPCClient(clientConn)}, nil +} + +type gRPCClientExporter struct { + client plogotlp.GRPCClient +} + +func (e *gRPCClientExporter) export(logs plog.Logs) error { + req := plogotlp.NewExportRequestFromLogs(logs) + if _, err := e.client.Export(context.Background(), req); err != nil { + return err + } + return nil +} + +type httpClientExporter struct { + client *http.Client + cfg *Config +} + +func (e *httpClientExporter) export(logs plog.Logs) error { + scheme := "https" + if e.cfg.Insecure { + scheme = "http" + } + path := e.cfg.HTTPPath + url := fmt.Sprintf("%s://%s%s", scheme, e.cfg.Endpoint(), path) + + req := plogotlp.NewExportRequestFromLogs(logs) + body, err := req.MarshalProto() + if err != nil { + return fmt.Errorf("failed to marshal logs to protobuf: %w", err) + } + + httpReq, err := http.NewRequestWithContext(context.Background(), "POST", url, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("failed to create logs HTTP request: %w", err) + } + for k, v := range e.cfg.Headers { + httpReq.Header.Set(k, v) + } + httpReq.Header.Set("Content-Type", "application/x-protobuf") + resp, err := e.client.Do(httpReq) + if err != nil { + return fmt.Errorf("failed to execute logs HTTP request: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + var respData bytes.Buffer + _, _ = io.Copy(&respData, resp.Body) + return fmt.Errorf("log request failed with status %s (%s)", resp.Status, respData.String()) + } + + return nil +} diff --git a/cmd/telemetrygen/internal/logs/logs.go b/cmd/telemetrygen/internal/logs/logs.go index 777fd26bbcf0..656e367ee3dc 100644 --- a/cmd/telemetrygen/internal/logs/logs.go +++ b/cmd/telemetrygen/internal/logs/logs.go @@ -10,34 +10,14 @@ import ( "sync/atomic" "time" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/otel/sdk/resource" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "go.uber.org/zap" "golang.org/x/time/rate" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/common" ) -type exporter interface { - export(plog.Logs) error -} - -type gRPCClientExporter struct { - client plogotlp.GRPCClient -} - -func (e *gRPCClientExporter) export(logs plog.Logs) error { - req := plogotlp.NewExportRequestFromLogs(logs) - if _, err := e.client.Export(context.Background(), req); err != nil { - return err - } - return nil -} - // Start starts the log telemetry generator func Start(cfg *Config) error { logger, err := common.CreateLogger(cfg.SkipSettingGRPCLogger) @@ -45,24 +25,12 @@ func Start(cfg *Config) error { return err } - if cfg.UseHTTP { - return fmt.Errorf("http is not supported by 'telemetrygen logs'") - } - - if !cfg.Insecure { - return fmt.Errorf("'telemetrygen logs' only supports insecure gRPC") - } - - // only support grpc in insecure mode - clientConn, err := grpc.DialContext(context.TODO(), cfg.Endpoint(), grpc.WithTransportCredentials(insecure.NewCredentials())) + e, err := newExporter(context.Background(), cfg) if err != nil { return err } - exporter := &gRPCClientExporter{ - client: plogotlp.NewGRPCClient(clientConn), - } - if err = Run(cfg, exporter, logger); err != nil { + if err = Run(cfg, e, logger); err != nil { logger.Error("failed to stop the exporter", zap.Error(err)) return err }