From d86e71626f532de2f37bb580dbaf611d90dbb076 Mon Sep 17 00:00:00 2001 From: Michael Li Date: Sun, 1 Dec 2024 09:08:33 -0800 Subject: [PATCH 1/8] create uds server and create base packet server class --- .../internal/transport/packet_server.go | 64 +++++++++++++++++++ .../internal/transport/transport.go | 7 +- .../internal/transport/udp_server.go | 60 ++--------------- .../internal/transport/uds_server.go | 46 +++++++++++++ receiver/statsdreceiver/receiver.go | 12 +++- 5 files changed, 129 insertions(+), 60 deletions(-) create mode 100644 receiver/statsdreceiver/internal/transport/packet_server.go create mode 100644 receiver/statsdreceiver/internal/transport/uds_server.go diff --git a/receiver/statsdreceiver/internal/transport/packet_server.go b/receiver/statsdreceiver/internal/transport/packet_server.go new file mode 100644 index 000000000000..d34fd7ad0676 --- /dev/null +++ b/receiver/statsdreceiver/internal/transport/packet_server.go @@ -0,0 +1,64 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" + +import ( + "errors" + "net" + + "go.opentelemetry.io/collector/consumer" +) + +type packetServer struct { + packetConn net.PacketConn + transport Transport +} + +// ListenAndServe starts the server ready to receive metrics. +func (u *packetServer) ListenAndServe( + nextConsumer consumer.Metrics, + reporter Reporter, + transferChan chan<- Metric, +) error { + if nextConsumer == nil || reporter == nil { + return errNilListenAndServeParameters + } + + buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6) + for { + n, addr, err := u.packetConn.ReadFrom(buf) + if n > 0 { + u.handlePacket(n, buf, addr, transferChan) + } + if err != nil { + reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v", + u.transport, + u.packetConn.LocalAddr(), + err) + var netErr net.Error + if errors.As(err, &netErr) { + if netErr.Timeout() { + continue + } + } + return err + } + } +} + +// handlePacket is helper that parses the buffer and split it line by line to be parsed upstream. +func (u *packetServer) handlePacket( + numBytes int, + data []byte, + addr net.Addr, + transferChan chan<- Metric, +) { + splitPacket := NewSplitBytes(data[:numBytes], '\n') + for splitPacket.Next() { + chunk := splitPacket.Chunk() + if len(chunk) > 0 { + transferChan <- Metric{string(chunk), addr} + } + } +} diff --git a/receiver/statsdreceiver/internal/transport/transport.go b/receiver/statsdreceiver/internal/transport/transport.go index c065e30c746f..5e9113abcb87 100644 --- a/receiver/statsdreceiver/internal/transport/transport.go +++ b/receiver/statsdreceiver/internal/transport/transport.go @@ -21,6 +21,7 @@ const ( TCP Transport = "tcp" TCP4 Transport = "tcp4" TCP6 Transport = "tcp6" + UDS Transport = "unixgram" ) // NewTransport creates a Transport based on the transport string or returns an empty Transport. @@ -31,6 +32,8 @@ func NewTransport(ts string) Transport { return trans case TCP, TCP4, TCP6: return trans + case UDS: + return trans } return Transport("") } @@ -38,7 +41,7 @@ func NewTransport(ts string) Transport { // String casts the transport to a String if the Transport is supported. Return an empty Transport overwise. func (trans Transport) String() string { switch trans { - case UDP, UDP4, UDP6, TCP, TCP4, TCP6: + case UDP, UDP4, UDP6, TCP, TCP4, TCP6, UDS: return string(trans) } return "" @@ -47,7 +50,7 @@ func (trans Transport) String() string { // IsPacketTransport returns true if the transport is packet based. func (trans Transport) IsPacketTransport() bool { switch trans { - case UDP, UDP4, UDP6: + case UDP, UDP4, UDP6, UDS: return true } return false diff --git a/receiver/statsdreceiver/internal/transport/udp_server.go b/receiver/statsdreceiver/internal/transport/udp_server.go index 3ad113c80654..823949b7ca4a 100644 --- a/receiver/statsdreceiver/internal/transport/udp_server.go +++ b/receiver/statsdreceiver/internal/transport/udp_server.go @@ -4,16 +4,12 @@ package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" import ( - "errors" "fmt" "net" - - "go.opentelemetry.io/collector/consumer" ) type udpServer struct { - packetConn net.PacketConn - transport Transport + packetServer } // Ensure that Server is implemented on UDP Server. @@ -31,60 +27,14 @@ func NewUDPServer(transport Transport, address string) (Server, error) { } return &udpServer{ - packetConn: conn, - transport: transport, + packetServer: packetServer{ + packetConn: conn, + transport: transport, + }, }, nil } -// ListenAndServe starts the server ready to receive metrics. -func (u *udpServer) ListenAndServe( - nextConsumer consumer.Metrics, - reporter Reporter, - transferChan chan<- Metric, -) error { - if nextConsumer == nil || reporter == nil { - return errNilListenAndServeParameters - } - - buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6) - for { - n, addr, err := u.packetConn.ReadFrom(buf) - if n > 0 { - u.handlePacket(n, buf, addr, transferChan) - } - if err != nil { - reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v", - u.transport, - u.packetConn.LocalAddr(), - err) - var netErr net.Error - if errors.As(err, &netErr) { - if netErr.Timeout() { - continue - } - } - return err - } - } -} - // Close closes the server. func (u *udpServer) Close() error { return u.packetConn.Close() } - -// handlePacket is helper that parses the buffer and split it line by line to be parsed upstream. -func (u *udpServer) handlePacket( - numBytes int, - data []byte, - addr net.Addr, - transferChan chan<- Metric, -) { - splitPacket := NewSplitBytes(data[:numBytes], '\n') - for splitPacket.Next() { - chunk := splitPacket.Chunk() - if len(chunk) > 0 { - transferChan <- Metric{string(chunk), addr} - } - } -} diff --git a/receiver/statsdreceiver/internal/transport/uds_server.go b/receiver/statsdreceiver/internal/transport/uds_server.go new file mode 100644 index 000000000000..2e28b12ba785 --- /dev/null +++ b/receiver/statsdreceiver/internal/transport/uds_server.go @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" + +import ( + "fmt" + "net" + "os" +) + +type udsServer struct { + packetServer +} + +// Ensure that Server is implemented on UDS Server. +var _ (Server) = (*udsServer)(nil) + +// NewUDSServer creates a transport.Server using Unixgram as its transport. +func NewUDSServer(transport Transport, socketPath string) (Server, error) { + if !transport.IsPacketTransport() { + return nil, fmt.Errorf("NewUDSServer with %s: %w", transport.String(), ErrUnsupportedPacketTransport) + } + + if _, err := os.Stat(socketPath); err == nil { + os.Remove(socketPath) + } + + conn, err := net.ListenPacket(transport.String(), socketPath) + if err != nil { + return nil, fmt.Errorf("starting to listen %s socket: %w", transport.String(), err) + } + + return &udsServer{ + packetServer: packetServer{ + packetConn: conn, + transport: transport, + }, + }, nil +} + +// Close closes the server. +func (u *udsServer) Close() error { + os.Remove(u.packetConn.LocalAddr().String()) + return u.packetConn.Close() +} diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index 81e11a67b1e5..f5114d6c66dc 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -46,8 +46,14 @@ func newReceiver( config Config, nextConsumer consumer.Metrics, ) (receiver.Metrics, error) { + trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport))) + if config.NetAddr.Endpoint == "" { - config.NetAddr.Endpoint = "localhost:8125" + if trans == transport.UDS { + config.NetAddr.Endpoint = "/var/run/statsd-receiver.sock" + } else { + config.NetAddr.Endpoint = "localhost:8125" + } } rep, err := newReporter(set) @@ -55,7 +61,6 @@ func newReceiver( return nil, err } - trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport))) obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ LongLivedCtx: true, ReceiverID: set.ID, @@ -80,13 +85,14 @@ func newReceiver( } func buildTransportServer(config Config) (transport.Server, error) { - // TODO: Add unix socket transport implementations trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport))) switch trans { case transport.UDP, transport.UDP4, transport.UDP6: return transport.NewUDPServer(trans, config.NetAddr.Endpoint) case transport.TCP, transport.TCP4, transport.TCP6: return transport.NewTCPServer(trans, config.NetAddr.Endpoint) + case transport.UDS: + return transport.NewUDSServer(trans, config.NetAddr.Endpoint) } return nil, fmt.Errorf("unsupported transport %q", string(config.NetAddr.Transport)) From 98c7c347e6e7521489d897f563c420d2f33668f4 Mon Sep 17 00:00:00 2001 From: Michael Li Date: Sun, 1 Dec 2024 10:35:44 -0800 Subject: [PATCH 2/8] add test --- .../internal/transport/client/client.go | 9 +++++++++ .../internal/transport/packet_server.go | 20 +++++++++++++++++++ receiver/statsdreceiver/receiver_test.go | 18 +++++++++++++++++ 3 files changed, 47 insertions(+) diff --git a/receiver/statsdreceiver/internal/transport/client/client.go b/receiver/statsdreceiver/internal/transport/client/client.go index 8b9fd7f06ba3..ee1aab425cf9 100644 --- a/receiver/statsdreceiver/internal/transport/client/client.go +++ b/receiver/statsdreceiver/internal/transport/client/client.go @@ -51,6 +51,15 @@ func (s *StatsD) connect() error { if err != nil { return err } + case "unixgram": + unixAddr, err := net.ResolveUnixAddr(s.transport, s.address) + if err != nil { + return err + } + s.conn, err = net.DialUnix(s.transport, nil, unixAddr) + if err != nil { + return err + } default: return fmt.Errorf("unknown/unsupported transport: %s", s.transport) } diff --git a/receiver/statsdreceiver/internal/transport/packet_server.go b/receiver/statsdreceiver/internal/transport/packet_server.go index d34fd7ad0676..c1fbe2b7f946 100644 --- a/receiver/statsdreceiver/internal/transport/packet_server.go +++ b/receiver/statsdreceiver/internal/transport/packet_server.go @@ -28,6 +28,13 @@ func (u *packetServer) ListenAndServe( buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6) for { n, addr, err := u.packetConn.ReadFrom(buf) + if addr == nil && u.transport == UDS { + addr = &udsAddr{ + network: u.transport.String(), + address: u.packetConn.LocalAddr().String(), + } + } + if n > 0 { u.handlePacket(n, buf, addr, transferChan) } @@ -62,3 +69,16 @@ func (u *packetServer) handlePacket( } } } + +type udsAddr struct { + network string + address string +} + +func (u *udsAddr) Network() string { + return u.network +} + +func (u *udsAddr) String() string { + return u.address +} diff --git a/receiver/statsdreceiver/receiver_test.go b/receiver/statsdreceiver/receiver_test.go index 11a12d9d951e..abc425ac2628 100644 --- a/receiver/statsdreceiver/receiver_test.go +++ b/receiver/statsdreceiver/receiver_test.go @@ -106,6 +106,24 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { return c }, }, + { + name: "UDS server with 4s interval", + addr: "/tmp/statsd_test.sock", + configFn: func() *Config { + return &Config{ + NetAddr: confignet.AddrConfig{ + Endpoint: "/tmp/statsd_test.sock", + Transport: confignet.TransportTypeUnixgram, + }, + AggregationInterval: 4 * time.Second, + } + }, + clientFn: func(t *testing.T, addr string) *client.StatsD { + c, err := client.NewStatsD("unixgram", addr) + require.NoError(t, err) + return c + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { From d26455fcab46a4ac2d0b013aad5bb7ec436f26fe Mon Sep 17 00:00:00 2001 From: Michael Li Date: Sun, 1 Dec 2024 10:42:18 -0800 Subject: [PATCH 3/8] add changelog --- .chloggen/statsdreceiver-uds.yaml | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/statsdreceiver-uds.yaml diff --git a/.chloggen/statsdreceiver-uds.yaml b/.chloggen/statsdreceiver-uds.yaml new file mode 100644 index 000000000000..b6307d452d6b --- /dev/null +++ b/.chloggen/statsdreceiver-uds.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: statsdreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add UDS support to statsdreceiver + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [21385] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] From 395854a2acd89825add9b59e3e04ba9045712744 Mon Sep 17 00:00:00 2001 From: Michael Li Date: Sun, 1 Dec 2024 13:08:11 -0800 Subject: [PATCH 4/8] handle errors properly when preparing socket --- .../internal/transport/uds_server.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/receiver/statsdreceiver/internal/transport/uds_server.go b/receiver/statsdreceiver/internal/transport/uds_server.go index 2e28b12ba785..7ecf63e10083 100644 --- a/receiver/statsdreceiver/internal/transport/uds_server.go +++ b/receiver/statsdreceiver/internal/transport/uds_server.go @@ -22,8 +22,8 @@ func NewUDSServer(transport Transport, socketPath string) (Server, error) { return nil, fmt.Errorf("NewUDSServer with %s: %w", transport.String(), ErrUnsupportedPacketTransport) } - if _, err := os.Stat(socketPath); err == nil { - os.Remove(socketPath) + if err := prepareSocket(socketPath); err != nil { + return nil, err } conn, err := net.ListenPacket(transport.String(), socketPath) @@ -44,3 +44,17 @@ func (u *udsServer) Close() error { os.Remove(u.packetConn.LocalAddr().String()) return u.packetConn.Close() } + +func prepareSocket(socketPath string) error { + if _, err := os.Stat(socketPath); err == nil { + // File exists, remove it + if err := os.Remove(socketPath); err != nil { + return fmt.Errorf("failed to remove existing socket file: %w", err) + } + } else if !os.IsNotExist(err) { + // Return any error that's not "file does not exist" + return fmt.Errorf("failed to stat socket file: %w", err) + } + + return nil +} From 046096ec4609219311d4f1a00cf4c491f4508120 Mon Sep 17 00:00:00 2001 From: Michael Li Date: Thu, 5 Dec 2024 22:58:00 -0800 Subject: [PATCH 5/8] lint --- receiver/statsdreceiver/internal/transport/uds_server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/statsdreceiver/internal/transport/uds_server.go b/receiver/statsdreceiver/internal/transport/uds_server.go index 7ecf63e10083..7884b8627917 100644 --- a/receiver/statsdreceiver/internal/transport/uds_server.go +++ b/receiver/statsdreceiver/internal/transport/uds_server.go @@ -48,7 +48,7 @@ func (u *udsServer) Close() error { func prepareSocket(socketPath string) error { if _, err := os.Stat(socketPath); err == nil { // File exists, remove it - if err := os.Remove(socketPath); err != nil { + if err = os.Remove(socketPath); err != nil { return fmt.Errorf("failed to remove existing socket file: %w", err) } } else if !os.IsNotExist(err) { From 388bc467259fc35d2696bff7b1f2f08fee8a826c Mon Sep 17 00:00:00 2001 From: Michael Li Date: Wed, 11 Dec 2024 19:12:12 -0800 Subject: [PATCH 6/8] add documentation --- receiver/statsdreceiver/README.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/receiver/statsdreceiver/README.md b/receiver/statsdreceiver/README.md index 48130dfd35cd..3d682174264c 100644 --- a/receiver/statsdreceiver/README.md +++ b/receiver/statsdreceiver/README.md @@ -19,12 +19,13 @@ Use case: it does not support horizontal pool of collectors. Desired work case i ## Configuration -The following settings are required: - -- `endpoint` (default = `localhost:8125`): Address and port to listen on. +The Following settings are optional: +- `endpoint`: Address and port to listen on. + - For `udp` and `tcp` based `transport`, this config will default to `localhost:8125` + - For `unixgram` `transport`, this config will default to `/var/run/statsd-receiver.sock` -The Following settings are optional: +- `transport` (default = `udp`): Protocol used by the StatsD server. Currently supported transports can be found in [this file](/internal/transport/transport.go). - `aggregation_interval: 70s`(default value is 60s): The aggregation time that the receiver aggregates the metrics (similar to the flush interval in StatsD server) From 365d90d5a3391f80f637cff23a3cec6c959aa72d Mon Sep 17 00:00:00 2001 From: Michael Li Date: Thu, 12 Dec 2024 22:40:29 -0800 Subject: [PATCH 7/8] remove socket deletion --- .../internal/transport/uds_server.go | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/receiver/statsdreceiver/internal/transport/uds_server.go b/receiver/statsdreceiver/internal/transport/uds_server.go index 7884b8627917..9e019624e85d 100644 --- a/receiver/statsdreceiver/internal/transport/uds_server.go +++ b/receiver/statsdreceiver/internal/transport/uds_server.go @@ -22,10 +22,6 @@ func NewUDSServer(transport Transport, socketPath string) (Server, error) { return nil, fmt.Errorf("NewUDSServer with %s: %w", transport.String(), ErrUnsupportedPacketTransport) } - if err := prepareSocket(socketPath); err != nil { - return nil, err - } - conn, err := net.ListenPacket(transport.String(), socketPath) if err != nil { return nil, fmt.Errorf("starting to listen %s socket: %w", transport.String(), err) @@ -44,17 +40,3 @@ func (u *udsServer) Close() error { os.Remove(u.packetConn.LocalAddr().String()) return u.packetConn.Close() } - -func prepareSocket(socketPath string) error { - if _, err := os.Stat(socketPath); err == nil { - // File exists, remove it - if err = os.Remove(socketPath); err != nil { - return fmt.Errorf("failed to remove existing socket file: %w", err) - } - } else if !os.IsNotExist(err) { - // Return any error that's not "file does not exist" - return fmt.Errorf("failed to stat socket file: %w", err) - } - - return nil -} From 9cb473b3b8b7b22bd9e78e67d4e7c094d24d28ff Mon Sep 17 00:00:00 2001 From: Michael Li Date: Fri, 13 Dec 2024 08:59:21 -0800 Subject: [PATCH 8/8] Update README.md link --- receiver/statsdreceiver/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/statsdreceiver/README.md b/receiver/statsdreceiver/README.md index 3d682174264c..c302cde0eda1 100644 --- a/receiver/statsdreceiver/README.md +++ b/receiver/statsdreceiver/README.md @@ -25,7 +25,7 @@ The Following settings are optional: - For `udp` and `tcp` based `transport`, this config will default to `localhost:8125` - For `unixgram` `transport`, this config will default to `/var/run/statsd-receiver.sock` -- `transport` (default = `udp`): Protocol used by the StatsD server. Currently supported transports can be found in [this file](/internal/transport/transport.go). +- `transport` (default = `udp`): Protocol used by the StatsD server. Currently supported transports can be found in [this file](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/statsdreceiver/internal/transport/transport.go). - `aggregation_interval: 70s`(default value is 60s): The aggregation time that the receiver aggregates the metrics (similar to the flush interval in StatsD server)