diff --git a/.gitignore b/.gitignore index 791e850..cd01e2d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .idea vendor/ -.DS_Store \ No newline at end of file +.DS_Store +.envrc diff --git a/internal/auth/csp/api_token_client.go b/internal/auth/csp/api_token_client.go new file mode 100644 index 0000000..316e3a5 --- /dev/null +++ b/internal/auth/csp/api_token_client.go @@ -0,0 +1,51 @@ +package csp + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" +) + +type APITokenClient struct { + BaseURL string + APIToken string +} + +func (c *APITokenClient) GetAccessToken() (*AuthorizeResponse, error) { + var oauthPath = "/csp/gateway/am/api/auth/api-tokens/authorize" + client := &http.Client{} + + requestBody := url.Values{"grant_type": {"api_token"}, "refresh_token": {c.APIToken}}.Encode() + req, err := http.NewRequest("POST", c.BaseURL+oauthPath, strings.NewReader(requestBody)) + + if err != nil { + return nil, err + } + + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + req.Header.Add("Accept", "application/json") + + resp, err := client.Do(req) + + if err != nil { + return nil, err + } + + defer resp.Body.Close() + + if resp.StatusCode > 399 { + return nil, fmt.Errorf("authentication failed: %d", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + var cspResponse AuthorizeResponse + err = json.Unmarshal(body, &cspResponse) + + if err != nil { + return nil, err + } + return &cspResponse, nil +} diff --git a/internal/auth/csp/client_credentials.go b/internal/auth/csp/client_credentials.go new file mode 100644 index 0000000..e8613ba --- /dev/null +++ b/internal/auth/csp/client_credentials.go @@ -0,0 +1,57 @@ +package csp + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" +) + +type ClientCredentialsClient struct { + BaseURL string + ClientID string + ClientSecret string +} + +func (c *ClientCredentialsClient) authHeaderValue() string { + return "Basic " + base64.StdEncoding.EncodeToString([]byte(c.ClientID+":"+c.ClientSecret)) +} + +func (c *ClientCredentialsClient) GetAccessToken() (*AuthorizeResponse, error) { + var oauthPath = "/csp/gateway/am/api/auth/authorize" + client := &http.Client{} + + requestBody := url.Values{"grant_type": {"client_credentials"}}.Encode() + req, err := http.NewRequest("POST", c.BaseURL+oauthPath, strings.NewReader(requestBody)) + + if err != nil { + return nil, err + } + + req.Header.Add("Authorization", c.authHeaderValue()) + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + + resp, err := client.Do(req) + + if err != nil { + return nil, err + } + + defer resp.Body.Close() + + if resp.StatusCode > 399 { + return nil, fmt.Errorf("authentication failed: %d", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + var cspResponse AuthorizeResponse + err = json.Unmarshal(body, &cspResponse) + + if err != nil { + return nil, err + } + return &cspResponse, nil +} diff --git a/internal/auth/csp/fake_csp_handler.go b/internal/auth/csp/fake_csp_handler.go new file mode 100644 index 0000000..2a08bc7 --- /dev/null +++ b/internal/auth/csp/fake_csp_handler.go @@ -0,0 +1,88 @@ +package csp + +import ( + "encoding/base64" + "encoding/json" + "net/http" + "strings" +) + +func FakeCSPHandler(apiTokens []string) http.Handler { + basicAuthCredentials := "Basic " + base64.StdEncoding.EncodeToString([]byte("a:b")) + firstRun := true + + mux := http.NewServeMux() + mux.HandleFunc("/csp/gateway/am/api/auth/authorize", func(w http.ResponseWriter, r *http.Request) { + if strings.HasSuffix(r.Header.Get("Authorization"), basicAuthCredentials) { + var sup AuthorizeResponse + + if firstRun { + sup = AuthorizeResponse{ + ExpiresIn: 1, + AccessToken: "abc", + Scope: "aoa:directDataIngestion", + } + firstRun = false + } else { + sup = AuthorizeResponse{ + ExpiresIn: 1, + AccessToken: "def", + Scope: "aoa:directDataIngestion", + } + } + + w.WriteHeader(http.StatusOK) + marshal, _ := json.Marshal(sup) + w.Write(marshal) + return + } + w.WriteHeader(http.StatusUnauthorized) + }) + mux.HandleFunc("/csp/gateway/am/api/auth/api-tokens/authorize", func(w http.ResponseWriter, r *http.Request) { + if err := r.ParseForm(); err != nil { + w.WriteHeader(http.StatusNotAcceptable) + return + } + if !(r.Form.Has("grant_type") && r.Form.Get("grant_type") == "api_token") { + w.WriteHeader(http.StatusUnauthorized) + return + } + if !(r.Form.Has("refresh_token")) { + w.WriteHeader(http.StatusUnauthorized) + return + } + var tokenMatch = false + for _, token := range apiTokens { + if r.Form.Get("refresh_token") == token { + tokenMatch = true + break + } + } + + if !(tokenMatch) { + w.WriteHeader(http.StatusUnauthorized) + return + } + var sup AuthorizeResponse + if firstRun { + sup = AuthorizeResponse{ + ExpiresIn: 1, + AccessToken: "abc", + Scope: "aoa:directDataIngestion", + } + firstRun = false + } else { + sup = AuthorizeResponse{ + ExpiresIn: 1, + AccessToken: "def", + Scope: "aoa:directDataIngestion", + } + } + + w.WriteHeader(http.StatusOK) + marshal, _ := json.Marshal(sup) + w.Write(marshal) + return + }) + return mux +} diff --git a/internal/auth/csp/scope.go b/internal/auth/csp/scope.go new file mode 100644 index 0000000..abf363f --- /dev/null +++ b/internal/auth/csp/scope.go @@ -0,0 +1,17 @@ +package csp + +import "strings" + +func HasDirectIngestScope(scope string) bool { + if len(scope) == 0 { + return false + } + + for _, s := range strings.Split(scope, " ") { + if strings.Contains(s, "aoa:directDataIngestion") || strings.Contains(s, "aoa/*") || strings.Contains(s, "aoa:*") { + return true + } + } + + return false +} diff --git a/internal/auth/csp/scope_test.go b/internal/auth/csp/scope_test.go new file mode 100644 index 0000000..c99fa64 --- /dev/null +++ b/internal/auth/csp/scope_test.go @@ -0,0 +1,17 @@ +package csp + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestDirectIngestScopes(t *testing.T) { + assert.False(t, HasDirectIngestScope("")) + assert.False(t, HasDirectIngestScope("no direct ingest scopes")) + + var scopeString = "external/51d98d2c-3ae1-11ee-be56-0242ac120002/*/aoa:directDataIngestion external/51d98d2c-3ae1-11ee-be56-0242ac120002/aoa:directDataIngestion csp:org_member" + + assert.True(t, HasDirectIngestScope(scopeString)) + assert.True(t, HasDirectIngestScope("some aoa:*")) + assert.True(t, HasDirectIngestScope("some aoa/*")) +} diff --git a/internal/auth/csp/types.go b/internal/auth/csp/types.go new file mode 100644 index 0000000..3c22a5f --- /dev/null +++ b/internal/auth/csp/types.go @@ -0,0 +1,14 @@ +package csp + +type Client interface { + GetAccessToken() (*AuthorizeResponse, error) +} + +type AuthorizeResponse struct { + IdToken string `json:"id_token"` + TokenType string `json:"token_type"` + ExpiresIn int `json:"expires_in"` + Scope string `json:"scope"` + AccessToken string `json:"access_token"` + RefreshToken string `json:"refresh_token"` +} diff --git a/internal/auth/csp_service.go b/internal/auth/csp_service.go new file mode 100644 index 0000000..8dd6976 --- /dev/null +++ b/internal/auth/csp_service.go @@ -0,0 +1,129 @@ +package auth + +import ( + "fmt" + "github.com/wavefronthq/wavefront-sdk-go/internal/auth/csp" + "log" + "net/http" + "sync" + "time" +) + +type tokenResult struct { + accessToken string + err error +} + +type CSPService struct { + client csp.Client + mutex sync.Mutex + tokenResult *tokenResult + refreshTicker *time.Ticker + done chan bool + defaultRefreshInterval time.Duration +} + +// NewCSPServerToServerService returns a Service instance that gets access tokens via CSP client credentials +func NewCSPServerToServerService(CSPBaseUrl string, ClientId string, ClientSecret string) Service { + return newService(&csp.ClientCredentialsClient{ + BaseURL: CSPBaseUrl, + ClientID: ClientId, + ClientSecret: ClientSecret, + }) +} + +func NewCSPTokenService(CSPBaseUrl, apiToken string) Service { + return newService(&csp.APITokenClient{ + BaseURL: CSPBaseUrl, + APIToken: apiToken, + }) +} + +func newService(client csp.Client) Service { + return &CSPService{ + client: client, + defaultRefreshInterval: 60 * time.Second, + } +} + +func (s *CSPService) IsDirect() bool { + return true +} + +func (s *CSPService) Authorize(r *http.Request) error { + s.mutex.Lock() + defer s.mutex.Unlock() + + if s.tokenResult == nil { + s.RefreshAccessToken() + } + + if s.tokenResult.err != nil { + return &Err{ + error: s.tokenResult.err, + } + } + + r.Header.Set("Authorization", "Bearer "+s.tokenResult.accessToken) + return nil +} + +func (s *CSPService) RefreshAccessToken() { + cspResponse, err := s.client.GetAccessToken() + + if err != nil { + s.tokenResult = &tokenResult{ + accessToken: "", + err: err, + } + s.scheduleNextTokenRefresh(s.defaultRefreshInterval) + return + } + + if !csp.HasDirectIngestScope(cspResponse.Scope) { + s.tokenResult = &tokenResult{ + accessToken: "", + err: fmt.Errorf("response did not include required scope: 'aoa:directDataIngestion'"), + } + s.scheduleNextTokenRefresh(s.defaultRefreshInterval) + return + } + + s.scheduleNextTokenRefresh(time.Duration(cspResponse.ExpiresIn) * time.Second) + s.tokenResult = &tokenResult{ + accessToken: cspResponse.AccessToken, + err: nil, + } +} + +func (s *CSPService) scheduleNextTokenRefresh(expiresIn time.Duration) { + tickerInterval := calculateNewTickerInterval(expiresIn, s.defaultRefreshInterval) + + if s.refreshTicker == nil { + s.refreshTicker = time.NewTicker(tickerInterval) + s.done = make(chan bool) + go func() { + for { + select { + case <-s.done: + return + case tick := <-s.refreshTicker.C: + s.mutex.Lock() + log.Printf("Re-fetching CSP credentials at: %v \n", tick) + s.RefreshAccessToken() + s.mutex.Unlock() + } + } + }() + } else { + s.refreshTicker.Reset(tickerInterval) + } +} + +func (s *CSPService) Close() { + log.Println("Shutting down the CSPService") + if s.refreshTicker == nil { + return + } + s.done <- true +} diff --git a/internal/auth/csp_service_test.go b/internal/auth/csp_service_test.go new file mode 100644 index 0000000..90cb1f7 --- /dev/null +++ b/internal/auth/csp_service_test.go @@ -0,0 +1,55 @@ +package auth + +import ( + "github.com/stretchr/testify/assert" + "github.com/wavefronthq/wavefront-sdk-go/internal/auth/csp" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestCSPService_MultipleCSPRequests(t *testing.T) { + cspServer := httptest.NewServer(csp.FakeCSPHandler(nil)) + defer cspServer.Close() + tokenService := NewCSPServerToServerService(cspServer.URL, "a", "b") + + cspTokenService := tokenService.(*CSPService) + cspTokenService.defaultRefreshInterval = 1 * time.Second + + assert.NotNil(t, tokenService) + req, _ := http.NewRequest("GET", "https://example.com", nil) + assert.NoError(t, tokenService.Authorize(req)) + token := req.Header.Get("Authorization") + assert.NotNil(t, token) + assert.NotEmpty(t, token) + assert.NotEqual(t, "INVALID_TOKEN", token) + assert.Equal(t, "Bearer abc", token) + + time.Sleep(2 * time.Second) + req, _ = http.NewRequest("GET", "https://example.com", nil) + assert.NoError(t, tokenService.Authorize(req)) + token = req.Header.Get("Authorization") + + assert.NotNil(t, token) + assert.NotEmpty(t, token) + assert.NotEqual(t, "INVALID_TOKEN", token) + assert.Equal(t, "Bearer def", token) + tokenService.Close() +} + +func TestCSPService_WhenAuthenticationFails_AuthorizeReturnsError(t *testing.T) { + cspServer := httptest.NewServer(csp.FakeCSPHandler(nil)) + defer cspServer.Close() + tokenService := NewCSPServerToServerService(cspServer.URL, "nope", "wrong") + defer tokenService.Close() + + cspTokenService := tokenService.(*CSPService) + cspTokenService.defaultRefreshInterval = 1 * time.Second + + assert.NotNil(t, tokenService) + req, _ := http.NewRequest("GET", "https://example.com", nil) + assert.Error(t, tokenService.Authorize(req)) + token := req.Header.Get("Authorization") + assert.Equal(t, "", token) +} diff --git a/internal/auth/err.go b/internal/auth/err.go new file mode 100644 index 0000000..5579388 --- /dev/null +++ b/internal/auth/err.go @@ -0,0 +1,13 @@ +package auth + +type Err struct { + error +} + +func NewAuthError(err error) error { + return &Err{error: err} +} + +func (e *Err) Error() string { + return e.error.Error() +} diff --git a/internal/auth/noop_service.go b/internal/auth/noop_service.go new file mode 100644 index 0000000..14a5193 --- /dev/null +++ b/internal/auth/noop_service.go @@ -0,0 +1,28 @@ +package auth + +import ( + "net/http" +) + +var ( + defaultNoopService Service = &NoOpService{} +) + +type NoOpService struct { +} + +func (t NoOpService) IsDirect() bool { + return false +} + +func (t NoOpService) Authorize(*http.Request) error { + return nil +} + +func (t NoOpService) Close() { +} + +// NewNoopTokenService returns a Service instance where it always returns an empty string for the token (for proxy usage). +func NewNoopTokenService() Service { + return defaultNoopService +} diff --git a/internal/auth/ticker_interval.go b/internal/auth/ticker_interval.go new file mode 100644 index 0000000..9aab791 --- /dev/null +++ b/internal/auth/ticker_interval.go @@ -0,0 +1,14 @@ +package auth + +import "time" + +func calculateNewTickerInterval(expiresIn time.Duration, fallback time.Duration) time.Duration { + if expiresIn < (30 * time.Second) { + return fallback + } + + if expiresIn < (10 * time.Minute) { + return expiresIn - (30 * time.Second) + } + return expiresIn - (3 * time.Minute) +} diff --git a/internal/auth/ticker_interval_test.go b/internal/auth/ticker_interval_test.go new file mode 100644 index 0000000..7c7eb57 --- /dev/null +++ b/internal/auth/ticker_interval_test.go @@ -0,0 +1,25 @@ +package auth + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestCalculateNewTickerInterval(t *testing.T) { + fallback := 1 * time.Second + assert.Equal(t, 999999820*time.Second, calculateNewTickerInterval(1000000000*time.Second, fallback)) + assert.Equal(t, 569*time.Second, calculateNewTickerInterval(599*time.Second, fallback)) + assert.Equal(t, 1*time.Second, calculateNewTickerInterval(3*time.Second, fallback)) + assert.Equal(t, 1*time.Second, calculateNewTickerInterval(1*time.Second, fallback)) + assert.Equal(t, 1*time.Second, calculateNewTickerInterval(0*time.Second, fallback)) + assert.Equal(t, 1*time.Second, calculateNewTickerInterval(-180*time.Second, fallback)) + + fallback = 60 * time.Second + assert.Equal(t, 999999820*time.Second, calculateNewTickerInterval(1000000000*time.Second, fallback)) + assert.Equal(t, 569*time.Second, calculateNewTickerInterval(599*time.Second, fallback)) + assert.Equal(t, 60*time.Second, calculateNewTickerInterval(3*time.Second, fallback)) + assert.Equal(t, 60*time.Second, calculateNewTickerInterval(1*time.Second, fallback)) + assert.Equal(t, 60*time.Second, calculateNewTickerInterval(0*time.Second, fallback)) + assert.Equal(t, 60*time.Second, calculateNewTickerInterval(-180*time.Second, fallback)) +} diff --git a/internal/auth/types.go b/internal/auth/types.go new file mode 100644 index 0000000..7aaacec --- /dev/null +++ b/internal/auth/types.go @@ -0,0 +1,25 @@ +package auth + +import "net/http" + +// Service Interface for getting authentication tokens (Wavefront, CSP) +type Service interface { + Authorize(r *http.Request) error + Close() + IsDirect() bool +} + +type APIToken struct { + Token string +} + +type CSPClientCredentials struct { + ClientID string + ClientSecret string + BaseURL string +} + +type CSPAPIToken struct { + Token string + BaseURL string +} diff --git a/internal/auth/wavefront_service.go b/internal/auth/wavefront_service.go new file mode 100644 index 0000000..1789260 --- /dev/null +++ b/internal/auth/wavefront_service.go @@ -0,0 +1,24 @@ +package auth + +import "net/http" + +type WavefrontTokenService struct { + Token string +} + +func (t WavefrontTokenService) IsDirect() bool { + return true +} + +func (t WavefrontTokenService) Authorize(req *http.Request) error { + req.Header.Set("Authorization", "Bearer "+t.Token) + return nil +} + +func (t WavefrontTokenService) Close() { +} + +// NewWavefrontTokenService returns a Service instance where it always returns a Wavefront API Token +func NewWavefrontTokenService(Token string) Service { + return &WavefrontTokenService{Token: Token} +} diff --git a/internal/interfaces.go b/internal/interfaces.go index 6d72531..f821144 100644 --- a/internal/interfaces.go +++ b/internal/interfaces.go @@ -36,8 +36,6 @@ type LineHandler interface { const ( contentType = "Content-Type" contentEncoding = "Content-Encoding" - authzHeader = "Authorization" - bearer = "Bearer " gzipFormat = "gzip" octetStream = "application/octet-stream" diff --git a/internal/lines.go b/internal/lines.go index c595e74..5f8bbf8 100644 --- a/internal/lines.go +++ b/internal/lines.go @@ -3,6 +3,7 @@ package internal import ( "errors" "fmt" + "github.com/wavefronthq/wavefront-sdk-go/internal/auth" "github.com/wavefronthq/wavefront-sdk-go/internal/sdkmetrics" "log" "net/http" @@ -181,7 +182,9 @@ func (lh *RealLineHandler) report(lines []string) error { } if err != nil { - lh.bufferLines(lines) + if shouldRetry(err) { + lh.bufferLines(lines) + } return fmt.Errorf("error reporting %s format data to Wavefront: %q", lh.Format, err) } @@ -196,10 +199,18 @@ func (lh *RealLineHandler) report(lines []string) error { return nil } +func shouldRetry(err error) bool { + switch err.(type) { + case *auth.Err: + return false + } + return true +} + func (lh *RealLineHandler) bufferLines(batch []string) { log.Println("error reporting to Wavefront. buffering lines.") for _, line := range batch { - lh.HandleLine(line) + _ = lh.HandleLine(line) } } diff --git a/internal/lines_test.go b/internal/lines_test.go index 8629a93..be3c7eb 100644 --- a/internal/lines_test.go +++ b/internal/lines_test.go @@ -2,6 +2,7 @@ package internal import ( "fmt" + "github.com/wavefronthq/wavefront-sdk-go/internal/auth" "net/http" "testing" @@ -9,13 +10,13 @@ import ( ) type fakeReporter struct { - raiseError bool - errorCode int + errorCode int + error error } -func (reporter *fakeReporter) Report(format string, pointLines string) (*http.Response, error) { - if reporter.raiseError { - return nil, fmt.Errorf("error reporting points") +func (reporter *fakeReporter) Report(string, string) (*http.Response, error) { + if reporter.error != nil { + return nil, reporter.error } if reporter.errorCode != 0 { return &http.Response{StatusCode: reporter.errorCode}, nil @@ -23,7 +24,7 @@ func (reporter *fakeReporter) Report(format string, pointLines string) (*http.Re return &http.Response{StatusCode: 200}, nil } -func (reporter *fakeReporter) ReportEvent(event string) (*http.Response, error) { +func (reporter *fakeReporter) ReportEvent(string) (*http.Response, error) { return &http.Response{StatusCode: 200}, nil } @@ -57,21 +58,38 @@ func TestBufferLines(t *testing.T) { checkLength(lh.buffer, 95, "error buffering lines", t) } +func TestHandleLine_OnAuthError_DoNotBuffer(t *testing.T) { + lh := makeLineHandler(100, 10) // cap: 100, batchSize: 10 + lh.Reporter = &fakeReporter{ + error: auth.NewAuthError(fmt.Errorf("fake auth error that shouldn't be buffered")), + } + assert.NoError(t, lh.HandleLine("this is a metric, but CSP is down, or my credentials are wrong")) + assert.Error(t, lh.Flush()) + checkLength(lh.buffer, 0, "", t) + lh.Reporter = &fakeReporter{ + error: fmt.Errorf("error that should be buffered"), + } + assert.NoError(t, lh.HandleLine("this is a metric, but it was a network timeout or something like that")) + assert.Error(t, lh.Flush()) + checkLength(lh.buffer, 1, "", t) +} + func TestFlush(t *testing.T) { lh := makeLineHandler(100, 10) // cap: 100, batchSize: 10 addLines(lh, 100, 100, t) - lh.Flush() + assert.NoError(t, lh.Flush()) assert.Equal(t, 90, len(lh.buffer), "error flushing lines") - lh.Reporter = &fakeReporter{raiseError: true} - lh.Flush() + e := fmt.Errorf("error reporting points") + lh.Reporter = &fakeReporter{error: e} + assert.Error(t, lh.Flush()) assert.Equal(t, 90, len(lh.buffer), "error flushing lines") lh.Reporter = &fakeReporter{} lh.buffer = make(chan string, 100) addLines(lh, 5, 5, t) - lh.Flush() + assert.NoError(t, lh.Flush()) assert.Equal(t, 0, len(lh.buffer), "error flushing lines") } diff --git a/internal/reporter.go b/internal/reporter.go index d552c0f..dcf1371 100644 --- a/internal/reporter.go +++ b/internal/reporter.go @@ -4,8 +4,8 @@ import ( "bytes" "compress/gzip" "crypto/tls" + "github.com/wavefronthq/wavefront-sdk-go/internal/auth" "io" - "io/ioutil" "net/http" "strings" "time" @@ -13,17 +13,17 @@ import ( // The implementation of a Reporter that reports points directly to a Wavefront server. type reporter struct { - serverURL string - token string - client *http.Client + serverURL string + tokenService auth.Service + client *http.Client } // NewReporter creates a metrics Reporter -func NewReporter(server string, token string, client *http.Client) Reporter { +func NewReporter(server string, tokenService auth.Service, client *http.Client) Reporter { return &reporter{ - serverURL: server, - token: token, - client: client, + serverURL: server, + tokenService: tokenService, + client: client, } } @@ -77,8 +77,10 @@ func (reporter reporter) buildRequest(format string, body []byte) (*http.Request req.Header.Set(contentType, octetStream) req.Header.Set(contentEncoding, gzipFormat) - if len(reporter.token) > 0 { - req.Header.Set(authzHeader, bearer+reporter.token) + + err = reporter.tokenService.Authorize(req) + if err != nil { + return nil, err } q := req.URL.Query() @@ -99,9 +101,14 @@ func (reporter reporter) ReportEvent(event string) (*http.Response, error) { } req.Header.Set(contentType, applicationJSON) - if len(reporter.token) > 0 { + + if reporter.IsDirect() { req.Header.Set(contentEncoding, gzipFormat) - req.Header.Set(authzHeader, bearer+reporter.token) + } + + err = reporter.tokenService.Authorize(req) + if err != nil { + return nil, err } return reporter.execute(req) @@ -112,7 +119,15 @@ func (reporter reporter) execute(req *http.Request) (*http.Response, error) { if err != nil { return resp, err } - io.Copy(ioutil.Discard, resp.Body) + io.Copy(io.Discard, resp.Body) defer resp.Body.Close() return resp, nil } + +func (reporter reporter) Close() { + reporter.tokenService.Close() +} + +func (reporter reporter) IsDirect() bool { + return reporter.tokenService.IsDirect() +} diff --git a/internal/reporter_test.go b/internal/reporter_test.go index 961898e..7d7ec0b 100644 --- a/internal/reporter_test.go +++ b/internal/reporter_test.go @@ -5,25 +5,26 @@ import ( "crypto/x509" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/wavefronthq/wavefront-sdk-go/internal/auth" "net/http" "testing" "time" ) -func TestBuildRequest(t *testing.T) { +func TestReporter_BuildRequest(t *testing.T) { var r *reporter - r = NewReporter("http://localhost:8010/wavefront", "", &http.Client{}).(*reporter) + r = NewReporter("http://localhost:8010/wavefront", auth.NewNoopTokenService(), &http.Client{}).(*reporter) request, err := r.buildRequest("wavefront", nil) require.NoError(t, err) assert.Equal(t, "http://localhost:8010/wavefront/report?f=wavefront", request.URL.String()) } -func TestNewClientWithNilTLSConfig(t *testing.T) { +func TestNewClient_WithNilTLSConfig(t *testing.T) { client := NewClient(10*time.Second, nil) assert.Equal(t, nil, client.Transport) } -func TestNewClientWithCustomTLSConfig(t *testing.T) { +func TestNewClient_WithCustomTLSConfig(t *testing.T) { caCertPool := x509.NewCertPool() fakeCert := []byte("Not a real cert") caCertPool.AppendCertsFromPEM(fakeCert) @@ -40,5 +41,4 @@ func TestNewClientWithCustomTLSConfig(t *testing.T) { client := NewClient(10*time.Second, tlsConfig) assert.Equal(t, transport, client.Transport) assert.NotEqual(t, transportWithEmptyTLSConfig, client.Transport) - } diff --git a/senders/auth.go b/senders/auth.go new file mode 100644 index 0000000..0569fd4 --- /dev/null +++ b/senders/auth.go @@ -0,0 +1,26 @@ +package senders + +import ( + "github.com/wavefronthq/wavefront-sdk-go/internal/auth" + "log" +) + +func tokenServiceForCfg(cfg *configuration) auth.Service { + switch cfg.Authentication.(type) { + case auth.APIToken: + log.Println("The Wavefront SDK will use Direct Ingestion authenticated using an API Token.") + tokenAuth := cfg.Authentication.(auth.APIToken) + return auth.NewWavefrontTokenService(tokenAuth.Token) + case auth.CSPClientCredentials: + log.Println("The Wavefront SDK will use Direct Ingestion authenticated using CSP client credentials.") + cspAuth := cfg.Authentication.(auth.CSPClientCredentials) + return auth.NewCSPServerToServerService(cspAuth.BaseURL, cspAuth.ClientID, cspAuth.ClientSecret) + case auth.CSPAPIToken: + log.Println("The Wavefront SDK will use Direct Ingestion authenticated using CSP API Token.") + cspAuth := cfg.Authentication.(auth.CSPAPIToken) + return auth.NewCSPTokenService(cspAuth.BaseURL, cspAuth.Token) + } + + log.Println("The Wavefront SDK will communicate with a Wavefront Proxy.") + return auth.NewNoopTokenService() +} diff --git a/senders/client_factory.go b/senders/client_factory.go deleted file mode 100644 index 8568e36..0000000 --- a/senders/client_factory.go +++ /dev/null @@ -1,278 +0,0 @@ -package senders - -import ( - "crypto/tls" - "fmt" - "github.com/wavefronthq/wavefront-sdk-go/internal/sdkmetrics" - "net/url" - "os" - "strconv" - "strings" - "time" - - "github.com/wavefronthq/wavefront-sdk-go/internal" - "github.com/wavefronthq/wavefront-sdk-go/version" -) - -const ( - defaultTracesPort = 30001 - defaultMetricsPort = 2878 - defaultBatchSize = 10_000 - defaultBufferSize = 50_000 - defaultFlushInterval = 1 * time.Second - defaultTimeout = 10 * time.Second -) - -// Option Wavefront client configuration options -type Option func(*configuration) - -// Configuration for the direct ingestion sender -type configuration struct { - Server string // Wavefront URL of the form https://.wavefront.com - Token string // Wavefront API token with direct data ingestion permission - - // Optional configuration properties. Default values should suffice for most use cases. - // override the defaults only if you wish to set higher values. - - MetricsPort int - TracesPort int - - // max batch of data sent per flush interval. defaults to 10,000. recommended not to exceed 40,000. - BatchSize int - - // send, or don't send, internal SDK metrics that begin with ~sdk.go.core - SendInternalMetrics bool - - // size of internal buffers beyond which received data is dropped. - // helps with handling brief increases in data and buffering on errors. - // separate buffers are maintained per data type (metrics, spans and distributions) - // buffers are not pre-allocated to max size and vary based on actual usage. - // defaults to 500,000. higher values could use more memory. - MaxBufferSize int - - // interval (in seconds) at which to flush data to Wavefront. defaults to 1 Second. - // together with batch size controls the max theoretical throughput of the sender. - FlushInterval time.Duration - SDKMetricsTags map[string]string - Path string - - Timeout time.Duration - - TLSConfig *tls.Config -} - -func (c *configuration) Direct() bool { - return c.Token != "" -} - -func (c *configuration) MetricPrefix() string { - result := "~sdk.go.core.sender.proxy" - if c.Direct() { - result = "~sdk.go.core.sender.direct" - } - return result -} - -func (c *configuration) setDefaultPort(port int) { - c.MetricsPort = port - c.TracesPort = port -} - -// NewSender creates Wavefront Sender using the provided URL and Options -func NewSender(wfURL string, setters ...Option) (Sender, error) { - cfg, err := createConfig(wfURL, setters...) - if err != nil { - return nil, fmt.Errorf("unable to create sender config: %s", err) - } - - client := internal.NewClient(cfg.Timeout, cfg.TLSConfig) - metricsReporter := internal.NewReporter(cfg.metricsURL(), cfg.Token, client) - tracesReporter := internal.NewReporter(cfg.tracesURL(), cfg.Token, client) - sender := &wavefrontSender{ - defaultSource: internal.GetHostname("wavefront_direct_sender"), - proxy: !cfg.Direct(), - } - if cfg.SendInternalMetrics { - sender.internalRegistry = sender.realInternalRegistry(cfg) - } else { - sender.internalRegistry = sdkmetrics.NewNoOpRegistry() - } - sender.pointHandler = newLineHandler(metricsReporter, cfg, internal.MetricFormat, "points", sender.internalRegistry) - sender.histoHandler = newLineHandler(metricsReporter, cfg, internal.HistogramFormat, "histograms", sender.internalRegistry) - sender.spanHandler = newLineHandler(tracesReporter, cfg, internal.TraceFormat, "spans", sender.internalRegistry) - sender.spanLogHandler = newLineHandler(tracesReporter, cfg, internal.SpanLogsFormat, "span_logs", sender.internalRegistry) - sender.eventHandler = newLineHandler(metricsReporter, cfg, internal.EventFormat, "events", sender.internalRegistry) - - sender.Start() - return sender, nil -} - -func createConfig(wfURL string, setters ...Option) (*configuration, error) { - cfg := &configuration{ - MetricsPort: defaultMetricsPort, - TracesPort: defaultTracesPort, - BatchSize: defaultBatchSize, - MaxBufferSize: defaultBufferSize, - FlushInterval: defaultFlushInterval, - SendInternalMetrics: true, - SDKMetricsTags: map[string]string{}, - Timeout: defaultTimeout, - } - - u, err := url.Parse(wfURL) - if err != nil { - return nil, err - } - - if len(u.User.String()) > 0 { - cfg.Token = u.User.String() - u.User = nil - } - - switch strings.ToLower(u.Scheme) { - case "http": - if cfg.Direct() { - cfg.setDefaultPort(80) - } - case "https": - if cfg.Direct() { - cfg.setDefaultPort(443) - } - default: - return nil, fmt.Errorf("invalid scheme '%s' in '%s', only 'http' is supported", u.Scheme, u) - } - - if u.Path != "" { - cfg.Path = u.Path - u.Path = "" - } - - if u.Port() != "" { - port, err := strconv.Atoi(u.Port()) - if err != nil { - return nil, fmt.Errorf("unable to convert port to integer: %s", err) - } - cfg.setDefaultPort(port) - u.Host = u.Hostname() - } - cfg.Server = u.String() - - for _, set := range setters { - set(cfg) - } - return cfg, nil -} - -func (c *configuration) tracesURL() string { - return fmt.Sprintf("%s:%d%s", c.Server, c.TracesPort, c.Path) -} - -func (c *configuration) metricsURL() string { - return fmt.Sprintf("%s:%d%s", c.Server, c.MetricsPort, c.Path) -} - -func (sender *wavefrontSender) realInternalRegistry(cfg *configuration) sdkmetrics.Registry { - var setters []sdkmetrics.RegistryOption - - setters = append(setters, sdkmetrics.SetPrefix(cfg.MetricPrefix())) - setters = append(setters, sdkmetrics.SetTag("pid", strconv.Itoa(os.Getpid()))) - setters = append(setters, sdkmetrics.SetTag("version", version.Version)) - - for key, value := range cfg.SDKMetricsTags { - setters = append(setters, sdkmetrics.SetTag(key, value)) - } - - return sdkmetrics.NewMetricRegistry( - sender, - setters..., - ) - -} - -// BatchSize set max batch of data sent per flush interval. Defaults to 10,000. recommended not to exceed 40,000. -func BatchSize(n int) Option { - return func(cfg *configuration) { - cfg.BatchSize = n - } -} - -// MaxBufferSize set the size of internal buffers beyond which received data is dropped. Defaults to 50,000. -func MaxBufferSize(n int) Option { - return func(cfg *configuration) { - cfg.MaxBufferSize = n - } -} - -// FlushIntervalSeconds set the interval (in seconds) at which to flush data to Wavefront. Defaults to 1 Second. -func FlushIntervalSeconds(n int) Option { - return func(cfg *configuration) { - cfg.FlushInterval = time.Second * time.Duration(n) - } -} - -// FlushInterval set the interval at which to flush data to Wavefront. Defaults to 1 Second. -func FlushInterval(interval time.Duration) Option { - return func(cfg *configuration) { - cfg.FlushInterval = interval - } -} - -// MetricsPort sets the port on which to report metrics. Default is 2878. -func MetricsPort(port int) Option { - return func(cfg *configuration) { - cfg.MetricsPort = port - } -} - -// TracesPort sets the port on which to report traces. Default is 30001. -func TracesPort(port int) Option { - return func(cfg *configuration) { - cfg.TracesPort = port - } -} - -// Timeout sets the HTTP timeout (in seconds). Defaults to 10 seconds. -func Timeout(timeout time.Duration) Option { - return func(cfg *configuration) { - cfg.Timeout = timeout - } -} - -// TLSConfigOptions sets the tls.Config used by the HTTP Client to send data to Wavefront. -func TLSConfigOptions(tlsCfg *tls.Config) Option { - tlsCfgCopy := tlsCfg.Clone() - return func(cfg *configuration) { - cfg.TLSConfig = tlsCfgCopy - } -} - -// SendInternalMetrics turns sending of internal SDK metrics on/off. -func SendInternalMetrics(enabled bool) Option { - return func(cfg *configuration) { - cfg.SendInternalMetrics = enabled - } -} - -// SDKMetricsTags adds the additional tags provided in tags to all internal -// metrics this library reports. Clients can use multiple SDKMetricsTags -// calls when creating a sender. In that case, the sender sends all the -// tags from each of the SDKMetricsTags calls in addition to the standard -// "pid" and "version" tags to all internal metrics. The "pid" tag is the -// process ID; the "version" tag is the version of this SDK. -func SDKMetricsTags(tags map[string]string) Option { - // prevent caller from accidentally mutating this option. - copiedTags := copyTags(tags) - return func(cfg *configuration) { - for key, value := range copiedTags { - cfg.SDKMetricsTags[key] = value - } - } -} - -func copyTags(orig map[string]string) map[string]string { - result := make(map[string]string, len(orig)) - for key, value := range orig { - result[key] = value - } - return result -} diff --git a/senders/client_noop.go b/senders/client_noop.go deleted file mode 100644 index 60d2cc9..0000000 --- a/senders/client_noop.go +++ /dev/null @@ -1,59 +0,0 @@ -package senders - -import ( - "github.com/wavefronthq/wavefront-sdk-go/event" - "github.com/wavefronthq/wavefront-sdk-go/histogram" -) - -type wavefrontNoOpSender struct { -} - -var ( - defaultNoopClient Sender = &wavefrontNoOpSender{} -) - -// NewWavefrontNoOpClient returns a Wavefront Client instance for which all operations are no-ops. -func NewWavefrontNoOpClient() (Sender, error) { - return defaultNoopClient, nil -} - -func (sender *wavefrontNoOpSender) private() { -} - -func (sender *wavefrontNoOpSender) Start() { - // no-op -} - -func (sender *wavefrontNoOpSender) SendMetric(name string, value float64, ts int64, source string, tags map[string]string) error { - return nil -} - -func (sender *wavefrontNoOpSender) SendDeltaCounter(name string, value float64, source string, tags map[string]string) error { - return nil -} - -func (sender *wavefrontNoOpSender) SendDistribution(name string, centroids []histogram.Centroid, - hgs map[histogram.Granularity]bool, ts int64, source string, tags map[string]string) error { - return nil -} - -func (sender *wavefrontNoOpSender) SendSpan(name string, startMillis, durationMillis int64, source, traceId, spanId string, - parents, followsFrom []string, tags []SpanTag, spanLogs []SpanLog) error { - return nil -} - -func (sender *wavefrontNoOpSender) SendEvent(name string, startMillis, endMillis int64, source string, tags map[string]string, setters ...event.Option) error { - return nil -} - -func (sender *wavefrontNoOpSender) Close() { - // no-op -} - -func (sender *wavefrontNoOpSender) Flush() error { - return nil -} - -func (sender *wavefrontNoOpSender) GetFailureCount() int64 { - return 0 -} diff --git a/senders/client_test.go b/senders/client_test.go deleted file mode 100644 index b455fbe..0000000 --- a/senders/client_test.go +++ /dev/null @@ -1,170 +0,0 @@ -package senders_test - -import ( - "bytes" - "compress/gzip" - "io/ioutil" - "log" - "net/http" - "net/http/httptest" - "os" - "strings" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/wavefronthq/wavefront-sdk-go/histogram" - "github.com/wavefronthq/wavefront-sdk-go/senders" -) - -const ( - token = "DUMMY_TOKEN" -) - -var requests = []string{} -var wfPort string -var proxyPort string - -func TestMain(m *testing.M) { - - directServer := httptest.NewServer(directHandler()) - proxyServer := httptest.NewServer(proxyHandler()) - wfPort = extractPort(directServer.URL) - proxyPort = extractPort(proxyServer.URL) - - exitVal := m.Run() - - directServer.Close() - proxyServer.Close() - - os.Exit(exitVal) -} - -func extractPort(url string) string { - idx := strings.LastIndex(url, ":") - if idx == -1 { - log.Fatal("No port found.") - } - return url[idx+1:] -} - -func directHandler() http.Handler { - mux := http.NewServeMux() - mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - readBodyIntoString(r) - if strings.HasSuffix(r.Header.Get("Authorization"), token) { - w.WriteHeader(http.StatusOK) - return - } - w.WriteHeader(http.StatusForbidden) - }) - return mux -} - -func proxyHandler() http.Handler { - mux := http.NewServeMux() - mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - readBodyIntoString(r) - if len(r.Header.Get("Authorization")) == 0 { - w.WriteHeader(http.StatusOK) - return - } - w.WriteHeader(http.StatusForbidden) - }) - return mux -} - -func readBodyIntoString(r *http.Request) { - b, err := ioutil.ReadAll(r.Body) - if err != nil { - log.Fatalln(err) - } - defer r.Body.Close() - if r.Header.Get("Content-Type") == "application/octet-stream" { - gr, err := gzip.NewReader(bytes.NewBuffer(b)) - defer gr.Close() - data, err := ioutil.ReadAll(gr) - if err != nil { - log.Fatalln(err) - } - requests = append(requests, string(data)) - } else { - requests = append(requests, string(b)) - } -} - -func TestSendDirect(t *testing.T) { - wf, err := senders.NewSender("http://" + token + "@localhost:" + wfPort) - require.NoError(t, err) - doTest(t, wf) -} - -func TestSendDirectWithTags(t *testing.T) { - tags := map[string]string{"foo": "bar"} - wf, err := senders.NewSender("http://"+token+"@localhost:"+wfPort, senders.SDKMetricsTags(tags)) - require.NoError(t, err) - doTest(t, wf) -} - -func TestSendProxy(t *testing.T) { - wf, err := senders.NewSender("http://localhost:" + proxyPort) - require.NoError(t, err) - doTest(t, wf) -} - -func doTest(t *testing.T, wf senders.Sender) { - if err := wf.SendMetric("new-york.power.usage", 42422.0, 0, "go_test", map[string]string{"env": "test"}); err != nil { - t.Error("Failed SendMetric", err) - } - - centroids := []histogram.Centroid{ - {Value: 30.0, Count: 20}, - {Value: 5.1, Count: 10}, - } - - hgs := map[histogram.Granularity]bool{ - histogram.MINUTE: true, - histogram.HOUR: true, - histogram.DAY: true, - } - - if err := wf.SendDistribution("request.latency", centroids, hgs, 0, "appServer1", map[string]string{"region": "us-west"}); err != nil { - t.Error("Failed SendDistribution", err) - } - - if err := wf.SendSpan("getAllUsers", 0, 343500, "localhost", - "7b3bf470-9456-11e8-9eb6-529269fb1459", "0313bafe-9457-11e8-9eb6-529269fb1459", - []string{"2f64e538-9457-11e8-9eb6-529269fb1459"}, nil, - []senders.SpanTag{ - {Key: "application", Value: "Wavefront"}, - {Key: "http.method", Value: "GET"}, - }, - nil); err != nil { - t.Error("Failed SendSpan", err) - } - - wf.Flush() - wf.Close() - assert.Equal(t, int64(0), wf.GetFailureCount(), "GetFailureCount") - - metricsFlag := false - hgFlag := false - spansFlag := false - - for _, request := range requests { - if strings.Contains(request, "new-york.power.usage") { - metricsFlag = true - } - if strings.Contains(request, "request.latency") { - hgFlag = true - } - if strings.Contains(request, "0313bafe-9457-11e8-9eb6-529269fb1459") { - spansFlag = true - } - } - - assert.True(t, metricsFlag) - assert.True(t, hgFlag) - assert.True(t, spansFlag) -} diff --git a/senders/configuration.go b/senders/configuration.go new file mode 100644 index 0000000..efa18c9 --- /dev/null +++ b/senders/configuration.go @@ -0,0 +1,143 @@ +package senders + +import ( + "crypto/tls" + "fmt" + "github.com/wavefronthq/wavefront-sdk-go/internal/auth" + "log" + "net/url" + "strconv" + "strings" + "time" +) + +const ( + defaultCSPBaseUrl = "https://console.cloud.vmware.com/" + defaultTracesPort = 30001 + defaultMetricsPort = 2878 + defaultBatchSize = 10_000 + defaultBufferSize = 50_000 + defaultFlushInterval = 1 * time.Second + defaultTimeout = 10 * time.Second +) + +// Configuration for the direct ingestion sender +type configuration struct { + Server string // Wavefront URL of the form https://.wavefront.com + + // Optional configuration properties. Default values should suffice for most use cases. + // override the defaults only if you wish to set higher values. + + MetricsPort int + TracesPort int + + // max batch of data sent per flush interval. defaults to 10,000. recommended not to exceed 40,000. + BatchSize int + + // send, or don't send, internal SDK metrics that begin with ~sdk.go.core + SendInternalMetrics bool + + // size of internal buffers beyond which received data is dropped. + // helps with handling brief increases in data and buffering on errors. + // separate buffers are maintained per data type (metrics, spans and distributions) + // buffers are not pre-allocated to max size and vary based on actual usage. + // defaults to 500,000. higher values could use more memory. + MaxBufferSize int + + // interval (in seconds) at which to flush data to Wavefront. defaults to 1 Second. + // together with batch size controls the max theoretical throughput of the sender. + FlushInterval time.Duration + SDKMetricsTags map[string]string + Path string + + Timeout time.Duration + + TLSConfig *tls.Config + + Authentication interface{} +} + +func (c *configuration) Direct() bool { + return c.Authentication != nil +} + +func createConfig(wfURL string, setters ...Option) (*configuration, error) { + cfg := &configuration{ + MetricsPort: defaultMetricsPort, + TracesPort: defaultTracesPort, + BatchSize: defaultBatchSize, + MaxBufferSize: defaultBufferSize, + FlushInterval: defaultFlushInterval, + SendInternalMetrics: true, + SDKMetricsTags: map[string]string{}, + Timeout: defaultTimeout, + } + + u, err := url.Parse(wfURL) + if err != nil { + return nil, err + } + + if len(u.User.String()) > 0 { + cfg.Authentication = auth.APIToken{ + Token: u.User.String(), + } + u.User = nil + } + + switch strings.ToLower(u.Scheme) { + case "http": + if cfg.Direct() { + log.Println("Detecting wavefront direct ingestion, will attempt to connect port 80.") + cfg.setDefaultPort(80) + } + case "https": + if cfg.Direct() { + log.Println("Detecting wavefront direct ingestion, will attempt to connect port 443.") + cfg.setDefaultPort(443) + } + default: + return nil, fmt.Errorf("invalid scheme '%s' in '%s', only 'http/https' is supported", u.Scheme, u) + } + + if u.Path != "" { + cfg.Path = u.Path + u.Path = "" + } + + if u.Port() != "" { + port, err := strconv.Atoi(u.Port()) + if err != nil { + return nil, fmt.Errorf("unable to convert port to integer: %s", err) + } + cfg.setDefaultPort(port) + u.Host = u.Hostname() + } + cfg.Server = u.String() + + for _, set := range setters { + set(cfg) + } + return cfg, nil +} + +func (c *configuration) setDefaultPort(port int) { + c.MetricsPort = port + c.TracesPort = port +} + +func (c *configuration) tracesURL() string { + return fmt.Sprintf("%s:%d%s", c.Server, c.TracesPort, c.Path) +} + +func (c *configuration) metricsURL() string { + return fmt.Sprintf("%s:%d%s", c.Server, c.MetricsPort, c.Path) +} + +func (c *configuration) MetricPrefix() string { + result := "~sdk.go.core.sender.proxy" + if c.Direct() { + result = "~sdk.go.core.sender.direct" + } + return result +} diff --git a/senders/example_newsender_direct_test.go b/senders/example_newsender_direct_test.go index 6a7f51e..d402070 100644 --- a/senders/example_newsender_direct_test.go +++ b/senders/example_newsender_direct_test.go @@ -5,8 +5,40 @@ import wavefront "github.com/wavefronthq/wavefront-sdk-go/senders" func ExampleNewSender_direct() { // For Direct Ingestion endpoints, by default all data is sent to port 80 // or port 443 for unencrypted or encrypted connections, respectively. + + // Direct Ingestion requires authentication. + + // Wavefront API tokens: + // Set your API token using the APIToken Option // 11111111-2222-3333-4444-555555555555 is your API token with direct ingestion permission. - sender, err := wavefront.NewSender("https://11111111-2222-3333-4444-555555555555@surf.wavefront.com") + sender, err := wavefront.NewSender("https://surf.wavefront.com", + wavefront.APIToken("11111111-2222-3333-4444-555555555555")) + + // CSP API tokens: + // Set your API token using the CSPAPIToken Option + // is your CSP API token with the aoa:directDataIngestion scope. + sender, err = wavefront.NewSender("https://surf.wavefront.com", + wavefront.CSPAPIToken("")) + + // CSP Client Credentials: + // Set your API token using the CSPClientCredentials Option + sender, err = wavefront.NewSender("https://surf.wavefront.com", + wavefront.CSPClientCredentials("", "")) + + // CSP Options also have "WithBaseURL" variants + sender, err = wavefront.NewSender("https://surf.wavefront.com", + wavefront.CSPClientCredentialsWithBaseURL( + "", + "", + "", + )) + + sender, err = wavefront.NewSender("https://surf.wavefront.com", + wavefront.CSPAPITokenWithBaseURL( + "", + "", + )) + if err != nil { // handle error } diff --git a/senders/integration_test.go b/senders/integration_test.go index dc6d834..bf4a81c 100644 --- a/senders/integration_test.go +++ b/senders/integration_test.go @@ -7,7 +7,7 @@ import ( ) func TestEndToEnd(t *testing.T) { - testServer := startTestServer() + testServer := startTestServer(false) defer testServer.Close() sender, err := NewSender(testServer.URL) require.NoError(t, err) @@ -20,7 +20,7 @@ func TestEndToEnd(t *testing.T) { } func TestEndToEndWithPath(t *testing.T) { - testServer := startTestServer() + testServer := startTestServer(false) defer testServer.Close() sender, err := NewSender(testServer.URL + "/test-path") require.NoError(t, err) @@ -33,7 +33,7 @@ func TestEndToEndWithPath(t *testing.T) { } func TestTLSEndToEnd(t *testing.T) { - testServer := startTLSTestServer() + testServer := startTestServer(true) defer testServer.Close() testServer.httpServer.Client() tlsConfig := testServer.TLSConfig() @@ -48,13 +48,13 @@ func TestTLSEndToEnd(t *testing.T) { } func TestEndToEndWithInternalMetrics(t *testing.T) { - testServer := startTestServer() + testServer := startTestServer(false) defer testServer.Close() sender, err := NewSender(testServer.URL, SendInternalMetrics(true)) require.NoError(t, err) require.NoError(t, sender.SendMetric("my metric", 20, 0, "localhost", nil)) - sender.(*wavefrontSender).internalRegistry.Flush() + sender.(*realSender).internalRegistry.Flush() require.NoError(t, sender.Flush()) metricLines := testServer.MetricLines @@ -65,14 +65,13 @@ func TestEndToEndWithInternalMetrics(t *testing.T) { } func TestEndToEndWithoutInternalMetrics(t *testing.T) { - - testServer := startTestServer() + testServer := startTestServer(false) defer testServer.Close() sender, err := NewSender(testServer.URL, SendInternalMetrics(false)) require.NoError(t, err) require.NoError(t, sender.SendMetric("my metric", 20, 0, "localhost", nil)) - sender.(*wavefrontSender).internalRegistry.Flush() + sender.(*realSender).internalRegistry.Flush() require.NoError(t, sender.Flush()) metricLines := testServer.MetricLines diff --git a/senders/live_test.go b/senders/live_test.go new file mode 100644 index 0000000..ad23077 --- /dev/null +++ b/senders/live_test.go @@ -0,0 +1,58 @@ +package senders + +import ( + "github.com/stretchr/testify/assert" + "os" + "testing" +) + +func skipUnlessVarsAreSet(t *testing.T) { + if os.Getenv("LIVE_TEST_HOST") == "" { + t.Skip() + } +} + +func TestCSP_LIVE(t *testing.T) { + skipUnlessVarsAreSet(t) + + sender, err := NewSender(os.Getenv("LIVE_TEST_HOST"), + CSPClientCredentialsWithBaseURL( + os.Getenv("LIVE_TEST_CSP_CLIENT_ID"), + os.Getenv("LIVE_TEST_CSP_CLIENT_SECRET"), + os.Getenv("LIVE_TEST_CSP_BASE_URL"), + )) + assert.NoError(t, err) + assert.NoError(t, sender.SendMetric("test.go-metrics.can-send", 1, 0, "go test", + map[string]string{"scenario": "direct-csp-server-to-server"})) + assert.NoError(t, sender.Flush()) + sender.Close() +} + +func TestCSP_API_TOKEN_LIVE(t *testing.T) { + skipUnlessVarsAreSet(t) + + sender, err := NewSender(os.Getenv("LIVE_TEST_HOST"), + CSPAPITokenWithBaseURL( + os.Getenv("LIVE_TEST_CSP_API_TOKEN"), + os.Getenv("LIVE_TEST_CSP_BASE_URL"), + )) + assert.NoError(t, err) + assert.NoError(t, sender.SendMetric("test.go-metrics.can-send", 1, 0, "go test", + map[string]string{"scenario": "direct-csp-api-token"})) + assert.NoError(t, sender.Flush()) + sender.Close() +} + +func TestWF_API_TOKEN_LIVE(t *testing.T) { + skipUnlessVarsAreSet(t) + + sender, err := NewSender( + os.Getenv("LIVE_TEST_HOST"), + APIToken(os.Getenv("LIVE_TEST_WF_API_TOKEN")), + ) + assert.NoError(t, err) + assert.NoError(t, sender.SendMetric("test.go-metrics.can-send", 1, 0, "go test", + map[string]string{"scenario": "direct-wf-token"})) + assert.NoError(t, sender.Flush()) + sender.Close() +} diff --git a/senders/client_multi.go b/senders/multi_sender.go similarity index 100% rename from senders/client_multi.go rename to senders/multi_sender.go diff --git a/senders/new_sender.go b/senders/new_sender.go new file mode 100644 index 0000000..e7242e8 --- /dev/null +++ b/senders/new_sender.go @@ -0,0 +1,46 @@ +package senders + +import ( + "fmt" + "github.com/wavefronthq/wavefront-sdk-go/internal" + "github.com/wavefronthq/wavefront-sdk-go/internal/sdkmetrics" +) + +// NewSender creates a Sender using the provided URL and Options +func NewSender(wfURL string, setters ...Option) (Sender, error) { + cfg, err := createConfig(wfURL, setters...) + if err != nil { + return nil, fmt.Errorf("unable to create sender config: %s", err) + } + + tokenService := tokenServiceForCfg(cfg) + client := internal.NewClient(cfg.Timeout, cfg.TLSConfig) + metricsReporter := internal.NewReporter(cfg.metricsURL(), tokenService, client) + tracesReporter := internal.NewReporter(cfg.tracesURL(), tokenService, client) + + sender := &realSender{ + defaultSource: internal.GetHostname("wavefront_direct_sender"), + proxy: !cfg.Direct(), + } + if cfg.SendInternalMetrics { + sender.internalRegistry = sender.realInternalRegistry(cfg) + } else { + sender.internalRegistry = sdkmetrics.NewNoOpRegistry() + } + sender.pointHandler = newLineHandler(metricsReporter, cfg, internal.MetricFormat, "points", sender.internalRegistry) + sender.histoHandler = newLineHandler(metricsReporter, cfg, internal.HistogramFormat, "histograms", sender.internalRegistry) + sender.spanHandler = newLineHandler(tracesReporter, cfg, internal.TraceFormat, "spans", sender.internalRegistry) + sender.spanLogHandler = newLineHandler(tracesReporter, cfg, internal.SpanLogsFormat, "span_logs", sender.internalRegistry) + sender.eventHandler = newLineHandler(metricsReporter, cfg, internal.EventFormat, "events", sender.internalRegistry) + + sender.Start() + return sender, nil +} + +func copyTags(orig map[string]string) map[string]string { + result := make(map[string]string, len(orig)) + for key, value := range orig { + result[key] = value + } + return result +} diff --git a/senders/client_factory_test.go b/senders/new_sender_test.go similarity index 92% rename from senders/client_factory_test.go rename to senders/new_sender_test.go index 0faf527..31e82a4 100644 --- a/senders/client_factory_test.go +++ b/senders/new_sender_test.go @@ -3,6 +3,7 @@ package senders import ( "crypto/tls" "crypto/x509" + "github.com/wavefronthq/wavefront-sdk-go/internal/auth" "testing" "time" @@ -82,11 +83,19 @@ func TestMetricsURLWithPortAndPath(t *testing.T) { assert.Equal(t, "http://localhost:8071/wavefront", cfg.tracesURL()) } -func TestToken(t *testing.T) { +func TestTokenInUrl(t *testing.T) { cfg, err := createConfig("https://my-api-token@localhost") require.NoError(t, err) - assert.Equal(t, "my-api-token", cfg.Token) + assert.Equal(t, "my-api-token", cfg.Authentication.(auth.APIToken).Token) + assert.Equal(t, "https://localhost", cfg.Server) +} + +func TestTokenOption(t *testing.T) { + cfg, err := createConfig("https://localhost", APIToken("my-api-token")) + require.NoError(t, err) + + assert.Equal(t, "my-api-token", cfg.Authentication.(auth.APIToken).Token) assert.Equal(t, "https://localhost", cfg.Server) } diff --git a/senders/noop.go b/senders/noop.go new file mode 100644 index 0000000..402db98 --- /dev/null +++ b/senders/noop.go @@ -0,0 +1,59 @@ +package senders + +import ( + "github.com/wavefronthq/wavefront-sdk-go/event" + "github.com/wavefronthq/wavefront-sdk-go/histogram" +) + +type noOpSender struct { +} + +var ( + defaultNoopClient Sender = &noOpSender{} +) + +// NewWavefrontNoOpClient returns a Wavefront Client instance for which all operations are no-ops. +func NewWavefrontNoOpClient() (Sender, error) { + return defaultNoopClient, nil +} + +func (sender *noOpSender) private() { +} + +func (sender *noOpSender) Start() { + // no-op +} + +func (sender *noOpSender) SendMetric(name string, value float64, ts int64, source string, tags map[string]string) error { + return nil +} + +func (sender *noOpSender) SendDeltaCounter(name string, value float64, source string, tags map[string]string) error { + return nil +} + +func (sender *noOpSender) SendDistribution(name string, centroids []histogram.Centroid, + hgs map[histogram.Granularity]bool, ts int64, source string, tags map[string]string) error { + return nil +} + +func (sender *noOpSender) SendSpan(name string, startMillis, durationMillis int64, source, traceId, spanId string, + parents, followsFrom []string, tags []SpanTag, spanLogs []SpanLog) error { + return nil +} + +func (sender *noOpSender) SendEvent(name string, startMillis, endMillis int64, source string, tags map[string]string, setters ...event.Option) error { + return nil +} + +func (sender *noOpSender) Close() { + // no-op +} + +func (sender *noOpSender) Flush() error { + return nil +} + +func (sender *noOpSender) GetFailureCount() int64 { + return 0 +} diff --git a/senders/client_noop_test.go b/senders/noop_test.go similarity index 100% rename from senders/client_noop_test.go rename to senders/noop_test.go diff --git a/senders/option.go b/senders/option.go new file mode 100644 index 0000000..1714694 --- /dev/null +++ b/senders/option.go @@ -0,0 +1,125 @@ +package senders + +import ( + "crypto/tls" + "github.com/wavefronthq/wavefront-sdk-go/internal/auth" + "time" +) + +// Option Wavefront client configuration options +type Option func(*configuration) + +func APIToken(apiToken string) Option { + return func(c *configuration) { + c.Authentication = auth.APIToken{ + Token: apiToken, + } + } +} + +func CSPAPIToken(cspAPIToken string) Option { + return CSPAPITokenWithBaseURL(cspAPIToken, defaultCSPBaseUrl) +} + +func CSPAPITokenWithBaseURL(cspAPIToken string, baseURL string) Option { + return func(c *configuration) { + c.Authentication = auth.CSPAPIToken{ + Token: cspAPIToken, + BaseURL: baseURL, + } + } +} + +func CSPClientCredentials(clientId string, clientSecret string) Option { + return CSPClientCredentialsWithBaseURL(clientId, clientSecret, defaultCSPBaseUrl) +} + +func CSPClientCredentialsWithBaseURL(clientId string, clientSecret string, baseURL string) Option { + return func(c *configuration) { + c.Authentication = auth.CSPClientCredentials{ + ClientID: clientId, + ClientSecret: clientSecret, + BaseURL: baseURL, + } + } +} + +// BatchSize set max batch of data sent per flush interval. Defaults to 10,000. recommended not to exceed 40,000. +func BatchSize(n int) Option { + return func(cfg *configuration) { + cfg.BatchSize = n + } +} + +// MaxBufferSize set the size of internal buffers beyond which received data is dropped. Defaults to 50,000. +func MaxBufferSize(n int) Option { + return func(cfg *configuration) { + cfg.MaxBufferSize = n + } +} + +// FlushIntervalSeconds set the interval (in seconds) at which to flush data to Wavefront. Defaults to 1 Second. +func FlushIntervalSeconds(n int) Option { + return func(cfg *configuration) { + cfg.FlushInterval = time.Second * time.Duration(n) + } +} + +// FlushInterval set the interval at which to flush data to Wavefront. Defaults to 1 Second. +func FlushInterval(interval time.Duration) Option { + return func(cfg *configuration) { + cfg.FlushInterval = interval + } +} + +// MetricsPort sets the port on which to report metrics. Default is 2878. +func MetricsPort(port int) Option { + return func(cfg *configuration) { + cfg.MetricsPort = port + } +} + +// TracesPort sets the port on which to report traces. Default is 30001. +func TracesPort(port int) Option { + return func(cfg *configuration) { + cfg.TracesPort = port + } +} + +// Timeout sets the HTTP timeout (in seconds). Defaults to 10 seconds. +func Timeout(timeout time.Duration) Option { + return func(cfg *configuration) { + cfg.Timeout = timeout + } +} + +// TLSConfigOptions sets the tls.Config used by the HTTP Client to send data to Wavefront. +func TLSConfigOptions(tlsCfg *tls.Config) Option { + tlsCfgCopy := tlsCfg.Clone() + return func(cfg *configuration) { + cfg.TLSConfig = tlsCfgCopy + } +} + +// SendInternalMetrics turns sending of internal SDK metrics on/off. +func SendInternalMetrics(enabled bool) Option { + return func(cfg *configuration) { + cfg.SendInternalMetrics = enabled + } +} + +// SDKMetricsTags adds the additional tags provided in tags to all internal +// metrics this library reports. Clients can use multiple SDKMetricsTags +// calls when creating a sender. In that case, the sender sends all the +// tags from each of the SDKMetricsTags calls in addition to the standard +// "pid" and "version" tags to all internal metrics. The "pid" tag is the +// process ID; the "version" tag is the version of this SDK. +func SDKMetricsTags(tags map[string]string) Option { + // prevent caller from accidentally mutating this option. + copiedTags := copyTags(tags) + return func(cfg *configuration) { + for key, value := range copiedTags { + cfg.SDKMetricsTags[key] = value + } + } +} diff --git a/senders/client.go b/senders/real_sender.go similarity index 81% rename from senders/client.go rename to senders/real_sender.go index 63bdbf7..51a17c2 100644 --- a/senders/client.go +++ b/senders/real_sender.go @@ -10,6 +10,9 @@ import ( "github.com/wavefronthq/wavefront-sdk-go/internal/metric" "github.com/wavefronthq/wavefront-sdk-go/internal/sdkmetrics" "github.com/wavefronthq/wavefront-sdk-go/internal/span" + "github.com/wavefronthq/wavefront-sdk-go/version" + "os" + "strconv" ) // Sender Interface for sending metrics, distributions and spans to Wavefront @@ -23,7 +26,7 @@ type Sender interface { private() } -type wavefrontSender struct { +type realSender struct { reporter internal.Reporter defaultSource string pointHandler internal.LineHandler @@ -46,7 +49,7 @@ func newLineHandler(reporter internal.Reporter, cfg *configuration, format, pref return internal.NewLineHandler(reporter, format, cfg.FlushInterval, batchSize, cfg.MaxBufferSize, opts...) } -func (sender *wavefrontSender) Start() { +func (sender *realSender) Start() { sender.pointHandler.Start() sender.histoHandler.Start() sender.spanHandler.Start() @@ -55,10 +58,10 @@ func (sender *wavefrontSender) Start() { sender.eventHandler.Start() } -func (sender *wavefrontSender) private() { +func (sender *realSender) private() { } -func (sender *wavefrontSender) SendMetric(name string, value float64, ts int64, source string, tags map[string]string) error { +func (sender *realSender) SendMetric(name string, value float64, ts int64, source string, tags map[string]string) error { line, err := metric.Line(name, value, ts, source, tags, sender.defaultSource) return trySendWith( line, @@ -68,7 +71,7 @@ func (sender *wavefrontSender) SendMetric(name string, value float64, ts int64, ) } -func (sender *wavefrontSender) SendDeltaCounter(name string, value float64, source string, tags map[string]string) error { +func (sender *realSender) SendDeltaCounter(name string, value float64, source string, tags map[string]string) error { if name == "" { sender.internalRegistry.PointsTracker().IncInvalid() return fmt.Errorf("empty metric name") @@ -82,7 +85,7 @@ func (sender *wavefrontSender) SendDeltaCounter(name string, value float64, sour return nil } -func (sender *wavefrontSender) SendDistribution( +func (sender *realSender) SendDistribution( name string, centroids []histogram.Centroid, hgs map[histogram.Granularity]bool, @@ -113,7 +116,7 @@ func trySendWith(line string, err error, handler internal.LineHandler, tracker s return err } -func (sender *wavefrontSender) SendSpan( +func (sender *realSender) SendSpan( name string, startMillis, durationMillis int64, source, traceId, spanId string, @@ -172,7 +175,7 @@ func makeSpanLogs(logs []SpanLog) []span.Log { return spanLogs } -func (sender *wavefrontSender) SendEvent( +func (sender *realSender) SendEvent( name string, startMillis, endMillis int64, source string, @@ -195,7 +198,7 @@ func (sender *wavefrontSender) SendEvent( ) } -func (sender *wavefrontSender) Close() { +func (sender *realSender) Close() { sender.pointHandler.Stop() sender.histoHandler.Stop() sender.spanHandler.Stop() @@ -204,7 +207,7 @@ func (sender *wavefrontSender) Close() { sender.eventHandler.Stop() } -func (sender *wavefrontSender) Flush() error { +func (sender *realSender) Flush() error { errStr := "" err := sender.pointHandler.Flush() if err != nil { @@ -232,10 +235,27 @@ func (sender *wavefrontSender) Flush() error { return nil } -func (sender *wavefrontSender) GetFailureCount() int64 { +func (sender *realSender) GetFailureCount() int64 { return sender.pointHandler.GetFailureCount() + sender.histoHandler.GetFailureCount() + sender.spanHandler.GetFailureCount() + sender.spanLogHandler.GetFailureCount() + sender.eventHandler.GetFailureCount() } + +func (sender *realSender) realInternalRegistry(cfg *configuration) sdkmetrics.Registry { + var setters []sdkmetrics.RegistryOption + + setters = append(setters, sdkmetrics.SetPrefix(cfg.MetricPrefix())) + setters = append(setters, sdkmetrics.SetTag("pid", strconv.Itoa(os.Getpid()))) + setters = append(setters, sdkmetrics.SetTag("version", version.Version)) + + for key, value := range cfg.SDKMetricsTags { + setters = append(setters, sdkmetrics.SetTag(key, value)) + } + + return sdkmetrics.NewMetricRegistry( + sender, + setters..., + ) +} diff --git a/senders/real_sender_test.go b/senders/real_sender_test.go new file mode 100644 index 0000000..2b384fc --- /dev/null +++ b/senders/real_sender_test.go @@ -0,0 +1,151 @@ +package senders + +import ( + "github.com/wavefronthq/wavefront-sdk-go/internal/auth/csp" + "net/http/httptest" + "net/url" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/wavefronthq/wavefront-sdk-go/histogram" +) + +func TestSendDirect(t *testing.T) { + token := "direct-send-api-token" + directServer := startTestServer(false) + defer directServer.Close() + updatedUrl, err := url.Parse(directServer.URL) + updatedUrl.User = url.User(token) + wf, err := NewSender(updatedUrl.String()) + + require.NoError(t, err) + testSender(t, wf, directServer) + assert.Equal(t, + []string{ + "Bearer direct-send-api-token", + "Bearer direct-send-api-token", + "Bearer direct-send-api-token", + }, + directServer.AuthHeaders) +} + +func TestSendDirectWithTags(t *testing.T) { + token := "direct-send-api-token" + directServer := startTestServer(false) + defer directServer.Close() + + updatedUrl, err := url.Parse(directServer.URL) + updatedUrl.User = url.User(token) + tags := map[string]string{"foo": "bar"} + wf, err := NewSender(updatedUrl.String(), SDKMetricsTags(tags)) + require.NoError(t, err) + testSender(t, wf, directServer) + + assert.Equal(t, + []string{ + "Bearer direct-send-api-token", + "Bearer direct-send-api-token", + "Bearer direct-send-api-token", + }, + directServer.AuthHeaders) +} + +func TestSendProxy(t *testing.T) { + proxyServer := startTestServer(false) + defer proxyServer.Close() + + wf, err := NewSender(proxyServer.URL) + require.NoError(t, err) + testSender(t, wf, proxyServer) + assert.Equal(t, []string{"", "", ""}, proxyServer.AuthHeaders) +} + +func TestSendCSPClientCredentials(t *testing.T) { + proxyServer := startTestServer(false) + cspServer := httptest.NewServer(csp.FakeCSPHandler(nil)) + defer proxyServer.Close() + defer cspServer.Close() + + wf, err := NewSender(proxyServer.URL, CSPClientCredentialsWithBaseURL( + "a", + "b", + cspServer.URL, + )) + require.NoError(t, err) + testSender(t, wf, proxyServer) + assert.Equal(t, []string{"Bearer abc", "Bearer abc", "Bearer abc"}, proxyServer.AuthHeaders) +} + +func TestSendCSPAPIToken(t *testing.T) { + proxyServer := startTestServer(false) + cspServer := httptest.NewServer(csp.FakeCSPHandler([]string{"12345"})) + defer proxyServer.Close() + defer cspServer.Close() + + wf, err := NewSender(proxyServer.URL, CSPAPITokenWithBaseURL( + "12345", + cspServer.URL, + )) + require.NoError(t, err) + testSender(t, wf, proxyServer) + assert.Equal(t, []string{"Bearer abc", "Bearer abc", "Bearer abc"}, proxyServer.AuthHeaders) +} + +func testSender(t *testing.T, wf Sender, server *testServer) { + if err := wf.SendMetric("new-york.power.usage", 42422.0, 0, "go_test", map[string]string{"env": "test"}); err != nil { + t.Error("Failed SendMetric", err) + } + + centroids := []histogram.Centroid{ + {Value: 30.0, Count: 20}, + {Value: 5.1, Count: 10}, + } + + hgs := map[histogram.Granularity]bool{ + histogram.MINUTE: true, + histogram.HOUR: true, + histogram.DAY: true, + } + + if err := wf.SendDistribution("request.latency", centroids, hgs, 0, "appServer1", map[string]string{"region": "us-west"}); err != nil { + t.Error("Failed SendDistribution", err) + } + + if err := wf.SendSpan("getAllUsers", 0, 343500, "localhost", + "7b3bf470-9456-11e8-9eb6-529269fb1459", "0313bafe-9457-11e8-9eb6-529269fb1459", + []string{"2f64e538-9457-11e8-9eb6-529269fb1459"}, nil, + []SpanTag{ + {Key: "application", Value: "Wavefront"}, + {Key: "http.method", Value: "GET"}, + }, + nil); err != nil { + t.Error("Failed SendSpan", err) + } + + wf.Flush() + wf.Close() + assert.Equal(t, int64(0), wf.GetFailureCount(), "GetFailureCount") + + metricsFlag := false + hgFlag := false + spansFlag := false + + for _, request := range server.MetricLines { + if strings.Contains(request, "new-york.power.usage") { + metricsFlag = true + } + if strings.Contains(request, "request.latency") { + hgFlag = true + } + if strings.Contains(request, "0313bafe-9457-11e8-9eb6-529269fb1459") { + spansFlag = true + } + } + + assert.True(t, metricsFlag) + assert.True(t, hgFlag) + assert.True(t, spansFlag) +} diff --git a/senders/test_server_test.go b/senders/test_server_test.go index ca7668f..732fda0 100644 --- a/senders/test_server_test.go +++ b/senders/test_server_test.go @@ -10,24 +10,20 @@ import ( "strings" ) -func startTestServer() *testServer { +func startTestServer(useTLS bool) *testServer { handler := &testServer{} - server := httptest.NewServer(handler) - handler.httpServer = server - handler.URL = server.URL - return handler -} - -func startTLSTestServer() *testServer { - handler := &testServer{} - server := httptest.NewTLSServer(handler) - handler.httpServer = server - handler.URL = server.URL + if useTLS { + handler.httpServer = httptest.NewTLSServer(handler) + } else { + handler.httpServer = httptest.NewServer(handler) + } + handler.URL = handler.httpServer.URL return handler } type testServer struct { MetricLines []string + AuthHeaders []string httpServer *httptest.Server URL string LastRequestURL string @@ -47,6 +43,7 @@ func (s *testServer) ServeHTTP(writer http.ResponseWriter, request *http.Request writer.WriteHeader(500) } s.MetricLines = append(s.MetricLines, newLines...) + s.AuthHeaders = append(s.AuthHeaders, request.Header.Get("Authorization")) s.LastRequestURL = request.URL.String() writer.WriteHeader(200) } diff --git a/senders/wavefront_sender_test.go b/senders/wavefront_sender_test.go index 477bb61..f266fba 100644 --- a/senders/wavefront_sender_test.go +++ b/senders/wavefront_sender_test.go @@ -15,7 +15,7 @@ func TestWavefrontSender_SendMetric(t *testing.T) { spanHandler := &mockHandler{} spanLogHandler := &mockHandler{} eventHandler := &mockHandler{} - sender := wavefrontSender{ + sender := realSender{ reporter: nil, defaultSource: "test", pointHandler: pointHandler, @@ -50,7 +50,7 @@ func TestWavefrontSender_SendDeltaCounter(t *testing.T) { spanHandler := &mockHandler{} spanLogHandler := &mockHandler{} eventHandler := &mockHandler{} - sender := wavefrontSender{ + sender := realSender{ reporter: nil, defaultSource: "test", pointHandler: pointHandler, @@ -90,7 +90,7 @@ func TestWavefrontSender_SendDistribution(t *testing.T) { spanHandler := &mockHandler{} spanLogHandler := &mockHandler{} eventHandler := &mockHandler{} - sender := wavefrontSender{ + sender := realSender{ reporter: nil, defaultSource: "test", pointHandler: pointHandler, @@ -135,7 +135,7 @@ func TestWavefrontSender_SendSpan(t *testing.T) { spanHandler := &mockHandler{} spanLogHandler := &mockHandler{} eventHandler := &mockHandler{} - sender := wavefrontSender{ + sender := realSender{ reporter: nil, defaultSource: "test", pointHandler: pointHandler, @@ -202,7 +202,7 @@ func TestWavefrontSender_SendSpan_SpanLogs(t *testing.T) { spanHandler := &mockHandler{} spanLogHandler := &mockHandler{} eventHandler := &mockHandler{} - sender := wavefrontSender{ + sender := realSender{ reporter: nil, defaultSource: "test", pointHandler: pointHandler, @@ -268,7 +268,7 @@ func TestWavefrontSender_SendEventWithProxyFalse(t *testing.T) { spanHandler := &mockHandler{} spanLogHandler := &mockHandler{} eventHandler := &mockHandler{} - sender := wavefrontSender{ + sender := realSender{ reporter: nil, defaultSource: "test", pointHandler: pointHandler, @@ -299,7 +299,7 @@ func TestWavefrontSender_SendEventWithProxyTrue(t *testing.T) { spanHandler := &mockHandler{} spanLogHandler := &mockHandler{} eventHandler := &mockHandler{} - sender := wavefrontSender{ + sender := realSender{ reporter: nil, defaultSource: "test", pointHandler: pointHandler,