Skip to content

Commit

Permalink
Deprecate WavefrontProxyClient [WIP] (#50)
Browse files Browse the repository at this point in the history
* Deprecate WavefrontProxyClient

* big refactoring

* example app

* multi client

* gzip events bug ?

* MultiClient & events by proxy

* cleaning

* big refactoring	2

* big refactoring	2 (cont.)

* review comments

* bug

* events http gzip when using direct
  • Loading branch information
laullon authored Jul 26, 2020
1 parent 76e72d7 commit b5d9878
Show file tree
Hide file tree
Showing 19 changed files with 823 additions and 139 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ func main() {
// size of internal buffer beyond which received data is dropped.
// helps with handling brief increases in data and buffering on errors.
// separate buffers are maintained per data type (metrics, spans and distributions)
// defaults to 50,000. higher values could use more memory.
MaxBufferSize : 50000,
// defaults to 500,000. higher values could use more memory.
MaxBufferSize : 500000,
// interval (in seconds) at which to flush data to Wavefront. defaults to 1 Second.
// together with batch size controls the max theoretical throughput of the sender.
Expand Down
94 changes: 94 additions & 0 deletions examples/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package main

import (
"fmt"
"log"
"os"
"strings"
"time"

"github.com/wavefronthq/wavefront-sdk-go/application"
"github.com/wavefronthq/wavefront-sdk-go/event"

"github.com/wavefronthq/wavefront-sdk-go/senders"
)

func main() {
var wfSenders []senders.Sender

urls := strings.Split(os.Getenv("WF_URL"), "|")
for _, url := range urls {
sender, err := senders.NewSender(url)
if err != nil {
panic(err)
}
wfSenders = append(wfSenders, sender)
}

// OLD PROXY way
proxyCfg := &senders.ProxyConfiguration{
Host: "localhost",
MetricsPort: 2878,
DistributionPort: 2878,
TracingPort: 2878,
EventsPort: 2878,
FlushIntervalSeconds: 10,
}

sender, err := senders.NewProxySender(proxyCfg)
if err != nil {
panic(err)
}
wfSenders = append(wfSenders, sender)

// OLD DIRECT way
directCfg := &senders.DirectConfiguration{
Server: "https://-----.wavefront.com",
Token: "--------------",
BatchSize: 10000,
MaxBufferSize: 500000,
FlushIntervalSeconds: 1,
}

sender, err = senders.NewDirectSender(directCfg)
if err != nil {
panic(err)
}
wfSenders = append(wfSenders, sender)

wf := senders.NewMultiSender(wfSenders...)
log.Print("senders ready")

source := "go_sdk_example"

app := application.New("sample app", "main.go")
application.StartHeartbeatService(wf, app, source)

tags := make(map[string]string)
tags["namespace"] = "default"
tags["Kind"] = "Deployment"

options := []event.Option{event.Details("Details"), event.Type("type"), event.Severity("severity")}

for i := 0; i < 10; i++ {
err := wf.SendMetric("sample.metric", float64(i), time.Now().UnixNano(), source, map[string]string{"env": "test"})
if err != nil {
println("error:", err.Error())
}

txt := fmt.Sprintf("test event %d", i)
sendEvent(wf, txt, time.Now().Unix(), 0, source, tags, options...)

time.Sleep(10 * time.Second)
}

wf.Flush()
wf.Close()
}

func sendEvent(sender senders.Sender, name string, startMillis, endMillis int64, source string, tags map[string]string, setters ...event.Option) {
err := sender.SendEvent(name, startMillis, endMillis, source, tags, setters...)
if err != nil {
println("error:", err)
}
}
8 changes: 4 additions & 4 deletions internal/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ package internal
import "strings"

const (
deltaPrefix = "\u2206"
altDeltaPrefix = "\u0394"
DeltaPrefix = "\u2206"
AltDeltaPrefix = "\u0394"
)

func HasDeltaPrefix(name string) bool {
return strings.HasPrefix(name, deltaPrefix) || strings.HasPrefix(name, altDeltaPrefix)
return strings.HasPrefix(name, DeltaPrefix) || strings.HasPrefix(name, AltDeltaPrefix)
}

// Gets a delta counter name prefixed with ∆.
func DeltaCounterName(name string) string {
if HasDeltaPrefix(name) {
return name
}
return deltaPrefix + name
return DeltaPrefix + name
}
18 changes: 1 addition & 17 deletions internal/direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,6 @@ var (
errReport = errors.New("error: invalid Format or points")
)

const (
contentType = "Content-Type"
contentEncoding = "Content-Encoding"
authzHeader = "Authorization"
bearer = "Bearer "
gzipFormat = "gzip"

octetStream = "application/octet-stream"
applicationJSON = "application/json"

reportEndpoint = "/report"
eventEndpoint = "/api/v2/event"

formatKey = "f"
)

// The implementation of a Reporter that reports points directly to a Wavefront server.
type directReporter struct {
serverURL string
Expand All @@ -45,7 +29,7 @@ func NewDirectReporter(server string, token string) Reporter {

func (reporter directReporter) Report(format string, pointLines string) (*http.Response, error) {
if format == "" || pointLines == "" {
return nil, errReport
return nil, formatError
}

// compress
Expand Down
24 changes: 24 additions & 0 deletions internal/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,27 @@ type ConnectionHandler interface {

Flusher
}

const (
contentType = "Content-Type"
contentEncoding = "Content-Encoding"
authzHeader = "Authorization"
bearer = "Bearer "
gzipFormat = "gzip"

octetStream = "application/octet-stream"
applicationJSON = "application/json"

reportEndpoint = "/report"
eventEndpoint = "/api/v2/event"

formatKey = "f"
)

const formatError stringError = "error: invalid Format or points"

type stringError string

func (e stringError) Error() string {
return string(e)
}
2 changes: 1 addition & 1 deletion senders/pool.go → internal/pool.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package senders
package internal

import (
"bytes"
Expand Down
93 changes: 93 additions & 0 deletions internal/reporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package internal

import (
"bytes"
"compress/gzip"
"io"
"io/ioutil"
"net/http"
"strings"
"time"
)

// The implementation of a Reporter that reports points directly to a Wavefront server.
type reporter struct {
serverURL string
token string
client *http.Client
}

// Newreporter create a metrics Reporter
func NewReporter(server string, token string) Reporter {
return &reporter{
serverURL: server,
token: token,
client: &http.Client{Timeout: time.Second * 10},
}
}

func (reporter reporter) Report(format string, pointLines string) (*http.Response, error) {
if format == "" || pointLines == "" {
return nil, formatError
}

// compress
var buf bytes.Buffer
zw := gzip.NewWriter(&buf)
_, err := zw.Write([]byte(pointLines))
if err != nil {
zw.Close()
return nil, err
}
if err = zw.Close(); err != nil {
return nil, err
}

apiURL := reporter.serverURL + reportEndpoint
req, err := http.NewRequest("POST", apiURL, &buf)
if err != nil {
return &http.Response{}, err
}

req.Header.Set(contentType, octetStream)
req.Header.Set(contentEncoding, gzipFormat)
if len(reporter.token) > 0 {
req.Header.Set(authzHeader, bearer+reporter.token)
}

q := req.URL.Query()
q.Add(formatKey, format)
req.URL.RawQuery = q.Encode()

return reporter.execute(req)
}

func (reporter reporter) ReportEvent(event string) (*http.Response, error) {
if event == "" {
return nil, formatError
}

apiURL := reporter.serverURL + eventEndpoint
req, err := http.NewRequest("POST", apiURL, strings.NewReader(event))
if err != nil {
return &http.Response{}, err
}

req.Header.Set(contentType, applicationJSON)
if len(reporter.token) > 0 {
req.Header.Set(contentEncoding, gzipFormat)
req.Header.Set(authzHeader, bearer+reporter.token)
}

return reporter.execute(req)
}

func (reporter reporter) execute(req *http.Request) (*http.Response, error) {
resp, err := reporter.client.Do(req)
if err != nil {
return resp, err
}
io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()
return resp, nil
}
Loading

0 comments on commit b5d9878

Please sign in to comment.