From d86e71626f532de2f37bb580dbaf611d90dbb076 Mon Sep 17 00:00:00 2001
From: Michael Li
Date: Sun, 1 Dec 2024 09:08:33 -0800
Subject: [PATCH 1/8] create uds server and create base packet server class
---
.../internal/transport/packet_server.go | 64 +++++++++++++++++++
.../internal/transport/transport.go | 7 +-
.../internal/transport/udp_server.go | 60 ++---------------
.../internal/transport/uds_server.go | 46 +++++++++++++
receiver/statsdreceiver/receiver.go | 12 +++-
5 files changed, 129 insertions(+), 60 deletions(-)
create mode 100644 receiver/statsdreceiver/internal/transport/packet_server.go
create mode 100644 receiver/statsdreceiver/internal/transport/uds_server.go
diff --git a/receiver/statsdreceiver/internal/transport/packet_server.go b/receiver/statsdreceiver/internal/transport/packet_server.go
new file mode 100644
index 000000000000..d34fd7ad0676
--- /dev/null
+++ b/receiver/statsdreceiver/internal/transport/packet_server.go
@@ -0,0 +1,64 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"
+
+import (
+ "errors"
+ "net"
+
+ "go.opentelemetry.io/collector/consumer"
+)
+
+type packetServer struct {
+ packetConn net.PacketConn
+ transport Transport
+}
+
+// ListenAndServe starts the server ready to receive metrics.
+func (u *packetServer) ListenAndServe(
+ nextConsumer consumer.Metrics,
+ reporter Reporter,
+ transferChan chan<- Metric,
+) error {
+ if nextConsumer == nil || reporter == nil {
+ return errNilListenAndServeParameters
+ }
+
+ buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6)
+ for {
+ n, addr, err := u.packetConn.ReadFrom(buf)
+ if n > 0 {
+ u.handlePacket(n, buf, addr, transferChan)
+ }
+ if err != nil {
+ reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v",
+ u.transport,
+ u.packetConn.LocalAddr(),
+ err)
+ var netErr net.Error
+ if errors.As(err, &netErr) {
+ if netErr.Timeout() {
+ continue
+ }
+ }
+ return err
+ }
+ }
+}
+
+// handlePacket is helper that parses the buffer and split it line by line to be parsed upstream.
+func (u *packetServer) handlePacket(
+ numBytes int,
+ data []byte,
+ addr net.Addr,
+ transferChan chan<- Metric,
+) {
+ splitPacket := NewSplitBytes(data[:numBytes], '\n')
+ for splitPacket.Next() {
+ chunk := splitPacket.Chunk()
+ if len(chunk) > 0 {
+ transferChan <- Metric{string(chunk), addr}
+ }
+ }
+}
diff --git a/receiver/statsdreceiver/internal/transport/transport.go b/receiver/statsdreceiver/internal/transport/transport.go
index c065e30c746f..5e9113abcb87 100644
--- a/receiver/statsdreceiver/internal/transport/transport.go
+++ b/receiver/statsdreceiver/internal/transport/transport.go
@@ -21,6 +21,7 @@ const (
TCP Transport = "tcp"
TCP4 Transport = "tcp4"
TCP6 Transport = "tcp6"
+ UDS Transport = "unixgram"
)
// NewTransport creates a Transport based on the transport string or returns an empty Transport.
@@ -31,6 +32,8 @@ func NewTransport(ts string) Transport {
return trans
case TCP, TCP4, TCP6:
return trans
+ case UDS:
+ return trans
}
return Transport("")
}
@@ -38,7 +41,7 @@ func NewTransport(ts string) Transport {
// String casts the transport to a String if the Transport is supported. Return an empty Transport overwise.
func (trans Transport) String() string {
switch trans {
- case UDP, UDP4, UDP6, TCP, TCP4, TCP6:
+ case UDP, UDP4, UDP6, TCP, TCP4, TCP6, UDS:
return string(trans)
}
return ""
@@ -47,7 +50,7 @@ func (trans Transport) String() string {
// IsPacketTransport returns true if the transport is packet based.
func (trans Transport) IsPacketTransport() bool {
switch trans {
- case UDP, UDP4, UDP6:
+ case UDP, UDP4, UDP6, UDS:
return true
}
return false
diff --git a/receiver/statsdreceiver/internal/transport/udp_server.go b/receiver/statsdreceiver/internal/transport/udp_server.go
index 3ad113c80654..823949b7ca4a 100644
--- a/receiver/statsdreceiver/internal/transport/udp_server.go
+++ b/receiver/statsdreceiver/internal/transport/udp_server.go
@@ -4,16 +4,12 @@
package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"
import (
- "errors"
"fmt"
"net"
-
- "go.opentelemetry.io/collector/consumer"
)
type udpServer struct {
- packetConn net.PacketConn
- transport Transport
+ packetServer
}
// Ensure that Server is implemented on UDP Server.
@@ -31,60 +27,14 @@ func NewUDPServer(transport Transport, address string) (Server, error) {
}
return &udpServer{
- packetConn: conn,
- transport: transport,
+ packetServer: packetServer{
+ packetConn: conn,
+ transport: transport,
+ },
}, nil
}
-// ListenAndServe starts the server ready to receive metrics.
-func (u *udpServer) ListenAndServe(
- nextConsumer consumer.Metrics,
- reporter Reporter,
- transferChan chan<- Metric,
-) error {
- if nextConsumer == nil || reporter == nil {
- return errNilListenAndServeParameters
- }
-
- buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6)
- for {
- n, addr, err := u.packetConn.ReadFrom(buf)
- if n > 0 {
- u.handlePacket(n, buf, addr, transferChan)
- }
- if err != nil {
- reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v",
- u.transport,
- u.packetConn.LocalAddr(),
- err)
- var netErr net.Error
- if errors.As(err, &netErr) {
- if netErr.Timeout() {
- continue
- }
- }
- return err
- }
- }
-}
-
// Close closes the server.
func (u *udpServer) Close() error {
return u.packetConn.Close()
}
-
-// handlePacket is helper that parses the buffer and split it line by line to be parsed upstream.
-func (u *udpServer) handlePacket(
- numBytes int,
- data []byte,
- addr net.Addr,
- transferChan chan<- Metric,
-) {
- splitPacket := NewSplitBytes(data[:numBytes], '\n')
- for splitPacket.Next() {
- chunk := splitPacket.Chunk()
- if len(chunk) > 0 {
- transferChan <- Metric{string(chunk), addr}
- }
- }
-}
diff --git a/receiver/statsdreceiver/internal/transport/uds_server.go b/receiver/statsdreceiver/internal/transport/uds_server.go
new file mode 100644
index 000000000000..2e28b12ba785
--- /dev/null
+++ b/receiver/statsdreceiver/internal/transport/uds_server.go
@@ -0,0 +1,46 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"
+
+import (
+ "fmt"
+ "net"
+ "os"
+)
+
+type udsServer struct {
+ packetServer
+}
+
+// Ensure that Server is implemented on UDS Server.
+var _ (Server) = (*udsServer)(nil)
+
+// NewUDSServer creates a transport.Server using Unixgram as its transport.
+func NewUDSServer(transport Transport, socketPath string) (Server, error) {
+ if !transport.IsPacketTransport() {
+ return nil, fmt.Errorf("NewUDSServer with %s: %w", transport.String(), ErrUnsupportedPacketTransport)
+ }
+
+ if _, err := os.Stat(socketPath); err == nil {
+ os.Remove(socketPath)
+ }
+
+ conn, err := net.ListenPacket(transport.String(), socketPath)
+ if err != nil {
+ return nil, fmt.Errorf("starting to listen %s socket: %w", transport.String(), err)
+ }
+
+ return &udsServer{
+ packetServer: packetServer{
+ packetConn: conn,
+ transport: transport,
+ },
+ }, nil
+}
+
+// Close closes the server.
+func (u *udsServer) Close() error {
+ os.Remove(u.packetConn.LocalAddr().String())
+ return u.packetConn.Close()
+}
diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go
index 81e11a67b1e5..f5114d6c66dc 100644
--- a/receiver/statsdreceiver/receiver.go
+++ b/receiver/statsdreceiver/receiver.go
@@ -46,8 +46,14 @@ func newReceiver(
config Config,
nextConsumer consumer.Metrics,
) (receiver.Metrics, error) {
+ trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport)))
+
if config.NetAddr.Endpoint == "" {
- config.NetAddr.Endpoint = "localhost:8125"
+ if trans == transport.UDS {
+ config.NetAddr.Endpoint = "/var/run/statsd-receiver.sock"
+ } else {
+ config.NetAddr.Endpoint = "localhost:8125"
+ }
}
rep, err := newReporter(set)
@@ -55,7 +61,6 @@ func newReceiver(
return nil, err
}
- trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport)))
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
LongLivedCtx: true,
ReceiverID: set.ID,
@@ -80,13 +85,14 @@ func newReceiver(
}
func buildTransportServer(config Config) (transport.Server, error) {
- // TODO: Add unix socket transport implementations
trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport)))
switch trans {
case transport.UDP, transport.UDP4, transport.UDP6:
return transport.NewUDPServer(trans, config.NetAddr.Endpoint)
case transport.TCP, transport.TCP4, transport.TCP6:
return transport.NewTCPServer(trans, config.NetAddr.Endpoint)
+ case transport.UDS:
+ return transport.NewUDSServer(trans, config.NetAddr.Endpoint)
}
return nil, fmt.Errorf("unsupported transport %q", string(config.NetAddr.Transport))
From 98c7c347e6e7521489d897f563c420d2f33668f4 Mon Sep 17 00:00:00 2001
From: Michael Li
Date: Sun, 1 Dec 2024 10:35:44 -0800
Subject: [PATCH 2/8] add test
---
.../internal/transport/client/client.go | 9 +++++++++
.../internal/transport/packet_server.go | 20 +++++++++++++++++++
receiver/statsdreceiver/receiver_test.go | 18 +++++++++++++++++
3 files changed, 47 insertions(+)
diff --git a/receiver/statsdreceiver/internal/transport/client/client.go b/receiver/statsdreceiver/internal/transport/client/client.go
index 8b9fd7f06ba3..ee1aab425cf9 100644
--- a/receiver/statsdreceiver/internal/transport/client/client.go
+++ b/receiver/statsdreceiver/internal/transport/client/client.go
@@ -51,6 +51,15 @@ func (s *StatsD) connect() error {
if err != nil {
return err
}
+ case "unixgram":
+ unixAddr, err := net.ResolveUnixAddr(s.transport, s.address)
+ if err != nil {
+ return err
+ }
+ s.conn, err = net.DialUnix(s.transport, nil, unixAddr)
+ if err != nil {
+ return err
+ }
default:
return fmt.Errorf("unknown/unsupported transport: %s", s.transport)
}
diff --git a/receiver/statsdreceiver/internal/transport/packet_server.go b/receiver/statsdreceiver/internal/transport/packet_server.go
index d34fd7ad0676..c1fbe2b7f946 100644
--- a/receiver/statsdreceiver/internal/transport/packet_server.go
+++ b/receiver/statsdreceiver/internal/transport/packet_server.go
@@ -28,6 +28,13 @@ func (u *packetServer) ListenAndServe(
buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6)
for {
n, addr, err := u.packetConn.ReadFrom(buf)
+ if addr == nil && u.transport == UDS {
+ addr = &udsAddr{
+ network: u.transport.String(),
+ address: u.packetConn.LocalAddr().String(),
+ }
+ }
+
if n > 0 {
u.handlePacket(n, buf, addr, transferChan)
}
@@ -62,3 +69,16 @@ func (u *packetServer) handlePacket(
}
}
}
+
+type udsAddr struct {
+ network string
+ address string
+}
+
+func (u *udsAddr) Network() string {
+ return u.network
+}
+
+func (u *udsAddr) String() string {
+ return u.address
+}
diff --git a/receiver/statsdreceiver/receiver_test.go b/receiver/statsdreceiver/receiver_test.go
index 11a12d9d951e..abc425ac2628 100644
--- a/receiver/statsdreceiver/receiver_test.go
+++ b/receiver/statsdreceiver/receiver_test.go
@@ -106,6 +106,24 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) {
return c
},
},
+ {
+ name: "UDS server with 4s interval",
+ addr: "/tmp/statsd_test.sock",
+ configFn: func() *Config {
+ return &Config{
+ NetAddr: confignet.AddrConfig{
+ Endpoint: "/tmp/statsd_test.sock",
+ Transport: confignet.TransportTypeUnixgram,
+ },
+ AggregationInterval: 4 * time.Second,
+ }
+ },
+ clientFn: func(t *testing.T, addr string) *client.StatsD {
+ c, err := client.NewStatsD("unixgram", addr)
+ require.NoError(t, err)
+ return c
+ },
+ },
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
From d26455fcab46a4ac2d0b013aad5bb7ec436f26fe Mon Sep 17 00:00:00 2001
From: Michael Li
Date: Sun, 1 Dec 2024 10:42:18 -0800
Subject: [PATCH 3/8] add changelog
---
.chloggen/statsdreceiver-uds.yaml | 27 +++++++++++++++++++++++++++
1 file changed, 27 insertions(+)
create mode 100644 .chloggen/statsdreceiver-uds.yaml
diff --git a/.chloggen/statsdreceiver-uds.yaml b/.chloggen/statsdreceiver-uds.yaml
new file mode 100644
index 000000000000..b6307d452d6b
--- /dev/null
+++ b/.chloggen/statsdreceiver-uds.yaml
@@ -0,0 +1,27 @@
+# Use this changelog template to create an entry for release notes.
+
+# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
+change_type: enhancement
+
+# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
+component: statsdreceiver
+
+# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
+note: Add UDS support to statsdreceiver
+
+# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
+issues: [21385]
+
+# (Optional) One or more lines of additional information to render under the primary note.
+# These lines will be padded with 2 spaces and then inserted directly into the document.
+# Use pipe (|) for multiline entries.
+subtext:
+
+# If your change doesn't affect end users or the exported elements of any package,
+# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
+# Optional: The change log or logs in which this entry should be included.
+# e.g. '[user]' or '[user, api]'
+# Include 'user' if the change is relevant to end users.
+# Include 'api' if there is a change to a library API.
+# Default: '[user]'
+change_logs: []
From 395854a2acd89825add9b59e3e04ba9045712744 Mon Sep 17 00:00:00 2001
From: Michael Li
Date: Sun, 1 Dec 2024 13:08:11 -0800
Subject: [PATCH 4/8] handle errors properly when preparing socket
---
.../internal/transport/uds_server.go | 18 ++++++++++++++++--
1 file changed, 16 insertions(+), 2 deletions(-)
diff --git a/receiver/statsdreceiver/internal/transport/uds_server.go b/receiver/statsdreceiver/internal/transport/uds_server.go
index 2e28b12ba785..7ecf63e10083 100644
--- a/receiver/statsdreceiver/internal/transport/uds_server.go
+++ b/receiver/statsdreceiver/internal/transport/uds_server.go
@@ -22,8 +22,8 @@ func NewUDSServer(transport Transport, socketPath string) (Server, error) {
return nil, fmt.Errorf("NewUDSServer with %s: %w", transport.String(), ErrUnsupportedPacketTransport)
}
- if _, err := os.Stat(socketPath); err == nil {
- os.Remove(socketPath)
+ if err := prepareSocket(socketPath); err != nil {
+ return nil, err
}
conn, err := net.ListenPacket(transport.String(), socketPath)
@@ -44,3 +44,17 @@ func (u *udsServer) Close() error {
os.Remove(u.packetConn.LocalAddr().String())
return u.packetConn.Close()
}
+
+func prepareSocket(socketPath string) error {
+ if _, err := os.Stat(socketPath); err == nil {
+ // File exists, remove it
+ if err := os.Remove(socketPath); err != nil {
+ return fmt.Errorf("failed to remove existing socket file: %w", err)
+ }
+ } else if !os.IsNotExist(err) {
+ // Return any error that's not "file does not exist"
+ return fmt.Errorf("failed to stat socket file: %w", err)
+ }
+
+ return nil
+}
From 046096ec4609219311d4f1a00cf4c491f4508120 Mon Sep 17 00:00:00 2001
From: Michael Li
Date: Thu, 5 Dec 2024 22:58:00 -0800
Subject: [PATCH 5/8] lint
---
receiver/statsdreceiver/internal/transport/uds_server.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/receiver/statsdreceiver/internal/transport/uds_server.go b/receiver/statsdreceiver/internal/transport/uds_server.go
index 7ecf63e10083..7884b8627917 100644
--- a/receiver/statsdreceiver/internal/transport/uds_server.go
+++ b/receiver/statsdreceiver/internal/transport/uds_server.go
@@ -48,7 +48,7 @@ func (u *udsServer) Close() error {
func prepareSocket(socketPath string) error {
if _, err := os.Stat(socketPath); err == nil {
// File exists, remove it
- if err := os.Remove(socketPath); err != nil {
+ if err = os.Remove(socketPath); err != nil {
return fmt.Errorf("failed to remove existing socket file: %w", err)
}
} else if !os.IsNotExist(err) {
From 388bc467259fc35d2696bff7b1f2f08fee8a826c Mon Sep 17 00:00:00 2001
From: Michael Li
Date: Wed, 11 Dec 2024 19:12:12 -0800
Subject: [PATCH 6/8] add documentation
---
receiver/statsdreceiver/README.md | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
diff --git a/receiver/statsdreceiver/README.md b/receiver/statsdreceiver/README.md
index 48130dfd35cd..3d682174264c 100644
--- a/receiver/statsdreceiver/README.md
+++ b/receiver/statsdreceiver/README.md
@@ -19,12 +19,13 @@ Use case: it does not support horizontal pool of collectors. Desired work case i
## Configuration
-The following settings are required:
-
-- `endpoint` (default = `localhost:8125`): Address and port to listen on.
+The Following settings are optional:
+- `endpoint`: Address and port to listen on.
+ - For `udp` and `tcp` based `transport`, this config will default to `localhost:8125`
+ - For `unixgram` `transport`, this config will default to `/var/run/statsd-receiver.sock`
-The Following settings are optional:
+- `transport` (default = `udp`): Protocol used by the StatsD server. Currently supported transports can be found in [this file](/internal/transport/transport.go).
- `aggregation_interval: 70s`(default value is 60s): The aggregation time that the receiver aggregates the metrics (similar to the flush interval in StatsD server)
From 365d90d5a3391f80f637cff23a3cec6c959aa72d Mon Sep 17 00:00:00 2001
From: Michael Li
Date: Thu, 12 Dec 2024 22:40:29 -0800
Subject: [PATCH 7/8] remove socket deletion
---
.../internal/transport/uds_server.go | 18 ------------------
1 file changed, 18 deletions(-)
diff --git a/receiver/statsdreceiver/internal/transport/uds_server.go b/receiver/statsdreceiver/internal/transport/uds_server.go
index 7884b8627917..9e019624e85d 100644
--- a/receiver/statsdreceiver/internal/transport/uds_server.go
+++ b/receiver/statsdreceiver/internal/transport/uds_server.go
@@ -22,10 +22,6 @@ func NewUDSServer(transport Transport, socketPath string) (Server, error) {
return nil, fmt.Errorf("NewUDSServer with %s: %w", transport.String(), ErrUnsupportedPacketTransport)
}
- if err := prepareSocket(socketPath); err != nil {
- return nil, err
- }
-
conn, err := net.ListenPacket(transport.String(), socketPath)
if err != nil {
return nil, fmt.Errorf("starting to listen %s socket: %w", transport.String(), err)
@@ -44,17 +40,3 @@ func (u *udsServer) Close() error {
os.Remove(u.packetConn.LocalAddr().String())
return u.packetConn.Close()
}
-
-func prepareSocket(socketPath string) error {
- if _, err := os.Stat(socketPath); err == nil {
- // File exists, remove it
- if err = os.Remove(socketPath); err != nil {
- return fmt.Errorf("failed to remove existing socket file: %w", err)
- }
- } else if !os.IsNotExist(err) {
- // Return any error that's not "file does not exist"
- return fmt.Errorf("failed to stat socket file: %w", err)
- }
-
- return nil
-}
From 9cb473b3b8b7b22bd9e78e67d4e7c094d24d28ff Mon Sep 17 00:00:00 2001
From: Michael Li
Date: Fri, 13 Dec 2024 08:59:21 -0800
Subject: [PATCH 8/8] Update README.md link
---
receiver/statsdreceiver/README.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/receiver/statsdreceiver/README.md b/receiver/statsdreceiver/README.md
index 3d682174264c..c302cde0eda1 100644
--- a/receiver/statsdreceiver/README.md
+++ b/receiver/statsdreceiver/README.md
@@ -25,7 +25,7 @@ The Following settings are optional:
- For `udp` and `tcp` based `transport`, this config will default to `localhost:8125`
- For `unixgram` `transport`, this config will default to `/var/run/statsd-receiver.sock`
-- `transport` (default = `udp`): Protocol used by the StatsD server. Currently supported transports can be found in [this file](/internal/transport/transport.go).
+- `transport` (default = `udp`): Protocol used by the StatsD server. Currently supported transports can be found in [this file](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/statsdreceiver/internal/transport/transport.go).
- `aggregation_interval: 70s`(default value is 60s): The aggregation time that the receiver aggregates the metrics (similar to the flush interval in StatsD server)