Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate WavefrontProxyClient [WIP] #50

Merged
merged 13 commits into from
Jul 26, 2020
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ When you use a Sender SDK, you won’t see span-level RED metrics by default unl
// <tracingSpanName> source=<source> [pointTags] <start_millis> <duration_milliseconds>
// Example:
// "getAllUsers source=localhost traceId=7b3bf470-9456-11e8-9eb6-529269fb1459
// spanId=0313bafe-9457-11e8-9eb6-529269fb1459 parent=2f64e538-9457-11e8-9eb6-529269fb1459
// spanID=0313bafe-9457-11e8-9eb6-529269fb1459 parent=2f64e538-9457-11e8-9eb6-529269fb1459
laullon marked this conversation as resolved.
Show resolved Hide resolved
// application=Wavefront http.method=GET 1552949776000 343"

sender.SendSpan("getAllUsers", 1552949776000, 343, "localhost",
Expand Down
94 changes: 94 additions & 0 deletions examples/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package main

import (
"fmt"
"log"
"os"
"strings"
"time"

"github.com/wavefronthq/wavefront-sdk-go/application"
"github.com/wavefronthq/wavefront-sdk-go/event"

"github.com/wavefronthq/wavefront-sdk-go/senders"
)

func main() {
wf := senders.NewMultiSender()

urls := strings.Split(os.Getenv("WF_URL"), "|")
for idx, url := range urls {
sender, err := senders.NewSender(url)
if err != nil {
panic(err)
}
sender.SetSource(fmt.Sprintf("sender_%d", idx))
wf.Add(sender)
}

// OLD PROXY way
proxyCfg := &senders.ProxyConfiguration{
Host: "localhost",
MetricsPort: 2878,
DistributionPort: 2878,
TracingPort: 2878,
EventsPort: 2878,
FlushIntervalSeconds: 10,
}

sender, err := senders.NewProxySender(proxyCfg)
if err != nil {
panic(err)
}
sender.SetSource("client_proxy")
wf.Add(sender)

// OLD DIRECT way
directCfg := &senders.DirectConfiguration{
Server: "https://-----.wavefront.com",
Token: "--------------",
BatchSize: 10000,
MaxBufferSize: 50000,
FlushIntervalSeconds: 1,
}

sender, err = senders.NewDirectSender(directCfg)
if err != nil {
panic(err)
}
sender.SetSource("client_direct")
wf.Add(sender)

log.Print("senders ready")

source := "" //"go_sdk_example"

app := application.New("sample app", "main.go")
application.StartHeartbeatService(wf, app, source)

tags := make(map[string]string)
tags["namespace"] = "default"
tags["Kind"] = "Deployment"

options := []event.Option{event.Details("Details"), event.Type("type"), event.Severity("severity")}

for i := 0; i < 10; i++ {
wf.SendMetric("sample.metric", float64(i), time.Now().UnixNano(), source, map[string]string{"env": "test"})

txt := fmt.Sprintf("test event %d", i)
sendEvent(wf, txt, time.Now().Unix(), 0, source, tags, options...)

time.Sleep(10 * time.Second)
}

wf.Flush()
wf.Close()
}

func sendEvent(sender senders.Sender, name string, startMillis, endMillis int64, source string, tags map[string]string, setters ...event.Option) {
err := sender.SendEvent(name, startMillis, endMillis, source, tags, setters...)
if err != nil {
log.Fatal(err)
os.Exit(-1)
}
}
8 changes: 4 additions & 4 deletions internal/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ package internal
import "strings"

const (
deltaPrefix = "\u2206"
altDeltaPrefix = "\u0394"
DeltaPrefix = "\u2206"
AltDeltaPrefix = "\u0394"
)

func HasDeltaPrefix(name string) bool {
return strings.HasPrefix(name, deltaPrefix) || strings.HasPrefix(name, altDeltaPrefix)
return strings.HasPrefix(name, DeltaPrefix) || strings.HasPrefix(name, AltDeltaPrefix)
}

// Gets a delta counter name prefixed with ∆.
func DeltaCounterName(name string) string {
if HasDeltaPrefix(name) {
return name
}
return deltaPrefix + name
return DeltaPrefix + name
}
18 changes: 1 addition & 17 deletions internal/direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,6 @@ var (
errReport = errors.New("error: invalid Format or points")
)

const (
contentType = "Content-Type"
contentEncoding = "Content-Encoding"
authzHeader = "Authorization"
bearer = "Bearer "
gzipFormat = "gzip"

octetStream = "application/octet-stream"
applicationJSON = "application/json"

reportEndpoint = "/report"
eventEndpoint = "/api/v2/event"

formatKey = "f"
)

// The implementation of a Reporter that reports points directly to a Wavefront server.
type directReporter struct {
serverURL string
Expand All @@ -45,7 +29,7 @@ func NewDirectReporter(server string, token string) Reporter {

func (reporter directReporter) Report(format string, pointLines string) (*http.Response, error) {
if format == "" || pointLines == "" {
return nil, errReport
return nil, formatError
}

// compress
Expand Down
24 changes: 24 additions & 0 deletions internal/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,27 @@ type ConnectionHandler interface {

Flusher
}

const (
contentType = "Content-Type"
contentEncoding = "Content-Encoding"
authzHeader = "Authorization"
bearer = "Bearer "
gzipFormat = "gzip"

octetStream = "application/octet-stream"
applicationJSON = "application/json"

reportEndpoint = "/report"
eventEndpoint = "/api/v2/event"

formatKey = "f"
)

const formatError stringError = "error: invalid Format or points"

type stringError string

func (e stringError) Error() string {
return string(e)
}
2 changes: 1 addition & 1 deletion senders/pool.go → internal/pool.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package senders
package internal

import (
"bytes"
Expand Down
93 changes: 93 additions & 0 deletions internal/reporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package internal

import (
"bytes"
"compress/gzip"
"io"
"io/ioutil"
"net/http"
"strings"
"time"
)

// The implementation of a Reporter that reports points directly to a Wavefront server.
type reporter struct {
serverURL string
token string
client *http.Client
}

// Newreporter create a metrics Reporter
func NewReporter(server string, token string) Reporter {
return &reporter{
serverURL: server,
token: token,
client: &http.Client{Timeout: time.Second * 10},
}
}

func (reporter reporter) Report(format string, pointLines string) (*http.Response, error) {
if format == "" || pointLines == "" {
return nil, formatError
}

// compress
var buf bytes.Buffer
zw := gzip.NewWriter(&buf)
_, err := zw.Write([]byte(pointLines))
if err != nil {
zw.Close()
return nil, err
}
if err = zw.Close(); err != nil {
return nil, err
}

apiURL := reporter.serverURL + reportEndpoint
req, err := http.NewRequest("POST", apiURL, &buf)
if err != nil {
return &http.Response{}, err
}

req.Header.Set(contentType, octetStream)
req.Header.Set(contentEncoding, gzipFormat)
if len(reporter.token) > 0 {
req.Header.Set(authzHeader, bearer+reporter.token)
}

q := req.URL.Query()
q.Add(formatKey, format)
req.URL.RawQuery = q.Encode()

return reporter.execute(req)
}

func (reporter reporter) ReportEvent(event string) (*http.Response, error) {
if event == "" {
return nil, formatError
}

apiURL := reporter.serverURL + eventEndpoint
req, err := http.NewRequest("POST", apiURL, strings.NewReader(event))
if err != nil {
return &http.Response{}, err
}

req.Header.Set(contentType, applicationJSON)
// req.Header.Set(contentEncoding, gzipFormat)
laullon marked this conversation as resolved.
Show resolved Hide resolved
if len(reporter.token) > 0 {
req.Header.Set(authzHeader, bearer+reporter.token)
}

return reporter.execute(req)
}

func (reporter reporter) execute(req *http.Request) (*http.Response, error) {
resp, err := reporter.client.Do(req)
if err != nil {
return resp, err
}
io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()
return resp, nil
}
Loading