Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for tagging internal SDK metrics #76

Merged
merged 16 commits into from
Feb 10, 2022
Merged
4 changes: 2 additions & 2 deletions internal/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ func SetSource(source string) RegistryOption {
}
}

func SetInterval(interval int) RegistryOption {
func SetInterval(interval time.Duration) RegistryOption {
return func(registry *MetricRegistry) {
registry.reportTicker = time.NewTicker(time.Second * time.Duration(interval))
registry.reportTicker = time.NewTicker(interval)
}
}

Expand Down
34 changes: 28 additions & 6 deletions senders/client_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package senders

import (
"fmt"
"github.com/wavefronthq/wavefront-sdk-go/internal"
"net/url"
"os"
"strconv"
"strings"

"github.com/wavefronthq/wavefront-sdk-go/internal"
)

const (
Expand Down Expand Up @@ -42,6 +41,7 @@ type configuration struct {
// interval (in seconds) at which to flush data to Wavefront. defaults to 1 Second.
// together with batch size controls the max theoretical throughput of the sender.
FlushIntervalSeconds int
SDKMetricsTags map[string]string
}

// NewSender creates Wavefront client
Expand Down Expand Up @@ -102,7 +102,7 @@ func newWavefrontClient(cfg *configuration) (Sender, error) {
defaultSource: internal.GetHostname("wavefront_direct_sender"),
proxy: len(cfg.Token) == 0,
}
sender.initializeInternalMetrics()
sender.initializeInternalMetrics(cfg)
sender.pointHandler = newLineHandler(metricsReporter, cfg, internal.MetricFormat, "points", sender.internalRegistry)
sender.histoHandler = newLineHandler(metricsReporter, cfg, internal.HistogramFormat, "histograms", sender.internalRegistry)
sender.spanHandler = newLineHandler(tracesReporter, cfg, internal.TraceFormat, "spans", sender.internalRegistry)
Expand All @@ -113,11 +113,19 @@ func newWavefrontClient(cfg *configuration) (Sender, error) {
return sender, nil
}

func (sender *wavefrontSender) initializeInternalMetrics() {
func (sender *wavefrontSender) initializeInternalMetrics(cfg *configuration) {

var setters []internal.RegistryOption
setters = append(setters, internal.SetPrefix("~sdk.go.core.sender.direct"))
setters = append(setters, internal.SetTag("pid", strconv.Itoa(os.Getpid())))

for key, value := range cfg.SDKMetricsTags {
setters = append(setters, internal.SetTag(key, value))
}

sender.internalRegistry = internal.NewMetricRegistry(
sender,
internal.SetPrefix("~sdk.go.core.sender.direct"),
internal.SetTag("pid", strconv.Itoa(os.Getpid())),
setters...,
)
sender.pointsValid = sender.internalRegistry.NewDeltaCounter("points.valid")
sender.pointsInvalid = sender.internalRegistry.NewDeltaCounter("points.invalid")
Expand Down Expand Up @@ -174,3 +182,17 @@ func TracesPort(port int) Option {
cfg.TracesPort = port
}
}

// SDKMetricsTags sets internal SDK metrics.
func SDKMetricsTags(tags map[string]string) Option {
return func(cfg *configuration) {
if cfg.SDKMetricsTags != nil {
for key, value := range tags {
cfg.SDKMetricsTags[key] = value
}
} else {
cfg.SDKMetricsTags = tags
}

}
}
8 changes: 8 additions & 0 deletions senders/client_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,11 @@ func TestTracesPort(t *testing.T) {

assert.Equal(t, 123, cfg.TracesPort)
}

func TestSDKMetricsTags(t *testing.T) {
cfg, err := senders.CreateConfig("https://localhost", senders.SDKMetricsTags(map[string]string{"foo": "bar"}), senders.SDKMetricsTags(map[string]string{"foo1": "bar1"}))
require.NoError(t, err)

assert.Equal(t, "bar", cfg.SDKMetricsTags["foo"])
assert.Equal(t, "bar1", cfg.SDKMetricsTags["foo1"])
}
53 changes: 53 additions & 0 deletions senders/client_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package senders_test

import (
"bytes"
"compress/gzip"
"context"
"io/ioutil"
"log"
"net/http"
"os"
"strings"
Expand All @@ -20,11 +24,14 @@ const (
token = "DUMMY_TOKEN"
)

var requests = map[string][]string{}

func TestMain(m *testing.M) {
wf := http.Server{Addr: "localhost:" + wfPort}
proxy := http.Server{Addr: "localhost:" + proxyPort}

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
readBodyIntoString(r)
if strings.HasSuffix(r.Host, wfPort) {
if strings.HasSuffix(r.Header.Get("Authorization"), token) {
w.WriteHeader(http.StatusOK)
Expand Down Expand Up @@ -52,12 +59,38 @@ func TestMain(m *testing.M) {
os.Exit(exitVal)
}

func readBodyIntoString(r *http.Request) {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Fatalln(err)
}
defer r.Body.Close()
if r.Header.Get("Content-Type") == "application/octet-stream" {
gr, err := gzip.NewReader(bytes.NewBuffer(b))
defer gr.Close()
data, err := ioutil.ReadAll(gr)
if err != nil {
log.Fatalln(err)
}
requests[wfPort] = append(requests[wfPort], string(data))
} else {
requests[wfPort] = append(requests[wfPort], string(b))
}
}

func TestSendDirect(t *testing.T) {
wf, err := senders.NewSender("http://" + token + "@localhost:" + wfPort)
require.NoError(t, err)
doTest(t, wf)
}

func TestSendDirectWithTags(t *testing.T) {
tags := map[string]string{"foo": "bar"}
wf, err := senders.NewSender("http://"+token+"@localhost:"+wfPort, senders.SDKMetricsTags(tags))
deosu marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
doTest(t, wf)
}

func TestSendProxy(t *testing.T) {
wf, err := senders.NewSender("http://localhost:" + proxyPort)
require.NoError(t, err)
Expand Down Expand Up @@ -98,4 +131,24 @@ func doTest(t *testing.T, wf senders.Sender) {
wf.Flush()
wf.Close()
assert.Equal(t, int64(0), wf.GetFailureCount(), "GetFailureCount")

metricsFlag := false
hgFlag := false
spansFlag := false

for _, request := range requests["8080"] {
if strings.Contains(request, "new-york.power.usage") {
metricsFlag = true
}
if strings.Contains(request, "request.latency") {
hgFlag = true
}
if strings.Contains(request, "0313bafe-9457-11e8-9eb6-529269fb1459") {
spansFlag = true
}
}

assert.True(t, metricsFlag)
assert.True(t, hgFlag)
assert.True(t, spansFlag)
}
1 change: 1 addition & 0 deletions senders/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ type ProxyConfiguration struct {
EventsPort int // events port on which the proxy is listening on.

FlushIntervalSeconds int // defaults to 1 second
SDKMetricsTags map[string]string
}
11 changes: 9 additions & 2 deletions senders/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,17 @@ func NewProxySender(cfg *ProxyConfiguration) (Sender, error) {
handlers: make([]internal.ConnectionHandler, handlersCount),
}

var setters []internal.RegistryOption
setters = append(setters, internal.SetPrefix("~sdk.go.core.sender.proxy"))
setters = append(setters, internal.SetTag("pid", strconv.Itoa(os.Getpid())))

for key, value := range cfg.SDKMetricsTags {
setters = append(setters, internal.SetTag(key, value))
}

sender.internalRegistry = internal.NewMetricRegistry(
sender,
internal.SetPrefix("~sdk.go.core.sender.proxy"),
internal.SetTag("pid", strconv.Itoa(os.Getpid())),
setters...,
)

if sdkVersion, e := internal.GetSemVer(version.Version); e == nil {
Expand Down
116 changes: 92 additions & 24 deletions senders/proxy_test.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,121 @@
package senders_test

import (
"github.com/wavefronthq/wavefront-sdk-go/histogram"
"github.com/wavefronthq/wavefront-sdk-go/senders"
"io"
"net"
"os"
"testing"

"github.com/wavefronthq/wavefront-sdk-go/histogram"
"github.com/wavefronthq/wavefront-sdk-go/senders"
"time"
)

var proxy senders.Sender
func netcat(addr string, keepopen bool, portCh chan int) {
laddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
panic("netcat resolve " + addr + " failed with: " + err.Error())
}

lis, err := net.ListenTCP("tcp", laddr)
if err != nil {
panic("netcat listen " + addr + " failed with: " + err.Error())
}
portCh <- lis.Addr().(*net.TCPAddr).Port

func netcat(addr string, keepopen bool) {
laddr, _ := net.ResolveTCPAddr("tcp", addr)
lis, _ := net.ListenTCP("tcp", laddr)
for loop := true; loop; loop = keepopen {
conn, _ := lis.Accept()
io.Copy(os.Stdout, conn)
conn, err := lis.Accept()
if err != nil {
panic("netcat accept " + addr + " failed with: " + err.Error())
}
_, err = io.Copy(os.Stdout, conn)
if err != nil {
panic("netcat copy " + addr + " failed with: " + err.Error())
}
}
err = lis.Close()
if err != nil {
panic("netcat close " + addr + " failed with: " + err.Error())
}
lis.Close()
}

func init() {
go netcat("localhost:30000", false)
go netcat("localhost:40000", false)
go netcat("localhost:50000", false)
}

func TestProxySends(t *testing.T) {

ports := getConnection(t)

proxyCfg := &senders.ProxyConfiguration{
Host: "localhost",
MetricsPort: ports[0],
DistributionPort: ports[1],
TracingPort: ports[2],
FlushIntervalSeconds: 10,
}

var err error
var proxy senders.Sender
if proxy, err = senders.NewProxySender(proxyCfg); err != nil {
t.Error("Failed Creating Sender", err)
}

verifyResults(t, err, proxy)

proxy.Flush()
proxy.Close()
if proxy.GetFailureCount() > 0 {
t.Error("FailureCount =", proxy.GetFailureCount())
}
}

func getConnection(t *testing.T) [3]int {
ch := make(chan int, 3)
var ports [3]int

go netcat("localhost:0", false, ch)
go netcat("localhost:0", false, ch)
go netcat("localhost:0", false, ch)

for i := 0; i < 3; {
select {
case ports[i] = <-ch:
i++
case <-time.After(time.Second):
t.Fail()
t.Logf("Could not get netcats")
}
}

return ports
}

func TestProxySendsWithTags(t *testing.T) {

ports := getConnection(t)

proxyCfg := &senders.ProxyConfiguration{
Host: "localhost",
MetricsPort: 30000,
DistributionPort: 40000,
TracingPort: 50000,
MetricsPort: ports[0],
DistributionPort: ports[1],
TracingPort: ports[2],
FlushIntervalSeconds: 10,
SDKMetricsTags: map[string]string{"foo": "bar"},
}

var err error
var proxy senders.Sender
if proxy, err = senders.NewProxySender(proxyCfg); err != nil {
t.Error("Failed Creating Sender", err)
}

verifyResults(t, err, proxy)

proxy.Flush()
proxy.Close()
if proxy.GetFailureCount() > 0 {
t.Error("FailureCount =", proxy.GetFailureCount())
}
}

func verifyResults(t *testing.T, err error, proxy senders.Sender) {
if err = proxy.SendMetric("new-york.power.usage", 42422.0, 0, "go_test", map[string]string{"env": "test"}); err != nil {
t.Error("Failed SendMetric", err)
}
Expand Down Expand Up @@ -74,10 +148,4 @@ func TestProxySends(t *testing.T) {
nil); err != nil {
t.Error("Failed SendSpan", err)
}

proxy.Flush()
proxy.Close()
if proxy.GetFailureCount() > 0 {
t.Error("FailureCount =", proxy.GetFailureCount())
}
}