Skip to content

Commit

Permalink
just one way of making a wavefront sender
Browse files Browse the repository at this point in the history
  • Loading branch information
LukeWinikates committed Aug 21, 2023
1 parent 1bb70b1 commit d3bb71c
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 170 deletions.
4 changes: 2 additions & 2 deletions internal/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
// The implementation of a Reporter that reports points directly to a Wavefront server.
type reporter struct {
serverURL string
tokenService token.TokenService
tokenService token.Service
client *http.Client
}

// NewReporter creates a metrics Reporter
func NewReporter(server string, tokenService token.TokenService, client *http.Client) Reporter {
func NewReporter(server string, tokenService token.Service, client *http.Client) Reporter {
return &reporter{
serverURL: server,
tokenService: tokenService,
Expand Down
19 changes: 17 additions & 2 deletions internal/token/token_csp.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,24 @@ import (
"net/http"
"net/url"
"strings"
"sync"
"time"
)

type CspServerToServerTokenService struct {
CSPBaseUrl string
CSPClientId string
CSPClientSecret string

mutex sync.Mutex
AccessToken string
tokenReady bool

ticker *time.Ticker
done chan bool
tickerDelay int
}

type CSPAuthorizeResponse struct {
IdToken string `json:"id_token"`
TokenType string `json:"token_type"`
Expand Down Expand Up @@ -149,8 +164,8 @@ func encodeCSPCredentials(CSPClientId string, CSPClientSecret string) string {
return "Basic " + base64.StdEncoding.EncodeToString([]byte(CSPClientId+":"+CSPClientSecret))
}

// NewCSPServerToServerTokenService returns a TokenService instance where it will call CSP with client credentials to return an access token
func NewCSPServerToServerTokenService(CSPBaseUrl string, CSPClientId string, CSPClientSecret string) TokenService {
// NewCSPServerToServerTokenService returns a Service instance where it will call CSP with client credentials to return an access token
func NewCSPServerToServerTokenService(CSPBaseUrl string, CSPClientId string, CSPClientSecret string) Service {
return &CspServerToServerTokenService{CSPBaseUrl: CSPBaseUrl, CSPClientId: CSPClientId, CSPClientSecret: CSPClientSecret, tickerDelay: 60}
}

Expand Down
13 changes: 8 additions & 5 deletions internal/token/token_noop.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package token

var (
defaultNoopService TokenService = &TokenNoOpService{}
defaultNoopService Service = &NoOpService{}
)

func (t TokenNoOpService) GetToken() string {
type NoOpService struct {
}

func (t NoOpService) GetToken() string {
return ""
}

func (t TokenNoOpService) Close() {
func (t NoOpService) Close() {
}

// NewNoopTokenService returns a TokenService instance where it always returns an empty string for the token (for proxy usage).
func NewNoopTokenService() TokenService {
// NewNoopTokenService returns a Service instance where it always returns an empty string for the token (for proxy usage).
func NewNoopTokenService() Service {
return defaultNoopService
}
8 changes: 6 additions & 2 deletions internal/token/token_wavefront.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package token

type WavefrontTokenService struct {
Token string
}

func (t WavefrontTokenService) GetToken() string {
return t.Token
}

func (t WavefrontTokenService) Close() {
}

// NewWavefrontTokenService returns a TokenService instance where it always returns a Wavefront API Token
func NewWavefrontTokenService(Token string) TokenService {
// NewWavefrontTokenService returns a Service instance where it always returns a Wavefront API Token
func NewWavefrontTokenService(Token string) Service {
return &WavefrontTokenService{Token: Token}
}
28 changes: 3 additions & 25 deletions internal/token/types.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,11 @@
package token

import (
"sync"
"time"
)

// TokenService Interface for getting auth tokens (Wavefront, CSP)
type TokenService interface {
// Service Interface for getting authentication tokens (Wavefront, CSP)
type Service interface {
GetToken() string
Close()
}

type TokenNoOpService struct {
}

type WavefrontTokenService struct {
type ApiToken struct {
Token string
}

type CspServerToServerTokenService struct {
CSPBaseUrl string
CSPClientId string
CSPClientSecret string

mutex sync.Mutex
AccessToken string
tokenReady bool

ticker *time.Ticker
done chan bool
tickerDelay int
}
11 changes: 11 additions & 0 deletions senders/auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package senders

type APITokenAuth struct {
Token string
}

type CSPAuth struct {
ClientID string
ClientSecret string
BaseURL string
}
143 changes: 18 additions & 125 deletions senders/client_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ type Option func(*configuration)
// Configuration for the direct ingestion sender
type configuration struct {
Server string // Wavefront URL of the form https://<INSTANCE>.wavefront.com
Token string // Wavefront API token with direct data ingestion permission

// CSP Client Credentials should return an access token with direct data ingestion permissions
CSPBaseUrl string // CSP Base URL
CSPClientId string // CSP Client Id
CSPClientSecret string // CSP Client Secret

// Optional configuration properties. Default values should suffice for most use cases.
// override the defaults only if you wish to set higher values.
Expand Down Expand Up @@ -67,10 +61,12 @@ type configuration struct {
Timeout time.Duration

TLSConfig *tls.Config

Authentication interface{}
}

func (c *configuration) Direct() bool {
return c.Token != "" || c.CSPBaseUrl != "" || c.CSPClientId != "" || c.CSPClientSecret != ""
return c.Authentication != nil
}

func (c *configuration) MetricPrefix() string {
Expand All @@ -93,22 +89,7 @@ func NewSender(wfURL string, setters ...Option) (Sender, error) {
return nil, fmt.Errorf("unable to create sender config: %s", err)
}

var tokenService = token.NewNoopTokenService()
if len(cfg.Token) > 0 {
tokenService = token.NewWavefrontTokenService(cfg.Token)
} else if len(cfg.CSPBaseUrl) > 0 && len(cfg.CSPClientId) > 0 && len(cfg.CSPClientSecret) > 0 {
tokenService = token.NewCSPServerToServerTokenService(cfg.CSPBaseUrl, cfg.CSPClientId, cfg.CSPClientSecret)
}

switch tokenService.(type) {
case *token.CspServerToServerTokenService:
log.Println("The Wavefront SDK will use CSP authentication when communicating with the Wavefront Backend for Direct Ingestion.")
case token.WavefrontTokenService:
log.Println("The Wavefront SDK will use an API TOKEN when communicating with the Wavefront Backend for Direct Ingestion.")
case token.TokenNoOpService:
log.Println("The Wavefront SDK will communicate with a Wavefront Proxy.")
}

tokenService := tokenServiceForCfg(cfg)
client := internal.NewClient(cfg.Timeout, cfg.TLSConfig)
metricsReporter := internal.NewReporter(cfg.metricsURL(), tokenService, client)
tracesReporter := internal.NewReporter(cfg.tracesURL(), tokenService, client)
Expand All @@ -132,50 +113,20 @@ func NewSender(wfURL string, setters ...Option) (Sender, error) {
return sender, nil
}

// NewCSPSender creates Wavefront client that uses CSP auth
func NewCSPSender(wfURL string, cspClientId string, cspClientSecret string, setters ...Option) (Sender, error) {
cfg, err := createCSPConfig(wfURL, defaultCSPBaseUrl, cspClientId, cspClientSecret, setters...)
if err != nil {
return nil, fmt.Errorf("unable to create sender config: %s", err)
}

var tokenService = token.NewNoopTokenService()
if len(cfg.Token) > 0 {
tokenService = token.NewWavefrontTokenService(cfg.Token)
} else if len(cfg.CSPBaseUrl) > 0 && len(cfg.CSPClientId) > 0 && len(cfg.CSPClientSecret) > 0 {
tokenService = token.NewCSPServerToServerTokenService(cfg.CSPBaseUrl, cfg.CSPClientId, cfg.CSPClientSecret)
}

switch tokenService.(type) {
case *token.CspServerToServerTokenService:
log.Println("The Wavefront SDK will use CSP authentication when communicating with the Wavefront Backend for Direct Ingestion.")
case token.WavefrontTokenService:
log.Println("The Wavefront SDK will use an API TOKEN when communicating with the Wavefront Backend for Direct Ingestion.")
case token.TokenNoOpService:
log.Println("The Wavefront SDK will communicate with a Wavefront Proxy.")
}

client := internal.NewClient(cfg.Timeout, cfg.TLSConfig)
metricsReporter := internal.NewReporter(cfg.metricsURL(), tokenService, client)
tracesReporter := internal.NewReporter(cfg.tracesURL(), tokenService, client)

sender := &wavefrontSender{
defaultSource: internal.GetHostname("wavefront_direct_sender"),
proxy: !cfg.Direct(),
}
if cfg.SendInternalMetrics {
sender.internalRegistry = sender.realInternalRegistry(cfg)
} else {
sender.internalRegistry = sdkmetrics.NewNoOpRegistry()
func tokenServiceForCfg(cfg *configuration) token.Service {
switch cfg.Authentication.(type) {
case APITokenAuth:
log.Println("The Wavefront SDK will use Direct Ingestion authenticated using an API Token.")
auth := cfg.Authentication.(APITokenAuth)
return token.NewWavefrontTokenService(auth.Token)
case CSPAuth:
log.Println("The Wavefront SDK will use Direct Ingestion authenticated using CSP.")
cspAuth := cfg.Authentication.(CSPAuth)
return token.NewCSPServerToServerTokenService(cspAuth.BaseURL, cspAuth.ClientID, cspAuth.ClientSecret)
}
sender.pointHandler = newLineHandler(metricsReporter, cfg, internal.MetricFormat, "points", sender.internalRegistry)
sender.histoHandler = newLineHandler(metricsReporter, cfg, internal.HistogramFormat, "histograms", sender.internalRegistry)
sender.spanHandler = newLineHandler(tracesReporter, cfg, internal.TraceFormat, "spans", sender.internalRegistry)
sender.spanLogHandler = newLineHandler(tracesReporter, cfg, internal.SpanLogsFormat, "span_logs", sender.internalRegistry)
sender.eventHandler = newLineHandler(metricsReporter, cfg, internal.EventFormat, "events", sender.internalRegistry)

sender.Start()
return sender, nil
log.Println("The Wavefront SDK will communicate with a Wavefront Proxy.")
return token.NewNoopTokenService()
}

func createConfig(wfURL string, setters ...Option) (*configuration, error) {
Expand All @@ -196,67 +147,9 @@ func createConfig(wfURL string, setters ...Option) (*configuration, error) {
}

if len(u.User.String()) > 0 {
cfg.Token = u.User.String()
u.User = nil
}

switch strings.ToLower(u.Scheme) {
case "http":
if cfg.Direct() {
log.Println("Detecting wavefront direct ingestion, will attempt to connect port 80.")
cfg.setDefaultPort(80)
}
case "https":
if cfg.Direct() {
log.Println("Detecting wavefront direct ingestion, will attempt to connect port 443.")
cfg.setDefaultPort(443)
}
default:
return nil, fmt.Errorf("invalid scheme '%s' in '%s', only 'http/https' is supported", u.Scheme, u)
}

if u.Path != "" {
cfg.Path = u.Path
u.Path = ""
}

if u.Port() != "" {
port, err := strconv.Atoi(u.Port())
if err != nil {
return nil, fmt.Errorf("unable to convert port to integer: %s", err)
cfg.Authentication = APITokenAuth{
Token: u.User.String(),
}
cfg.setDefaultPort(port)
u.Host = u.Hostname()
}
cfg.Server = u.String()

for _, set := range setters {
set(cfg)
}
return cfg, nil
}

func createCSPConfig(wfURL string, cspBaseUrl string, cspClientId string, cspClientSecret string, setters ...Option) (*configuration, error) {
cfg := &configuration{
MetricsPort: defaultMetricsPort,
TracesPort: defaultTracesPort,
BatchSize: defaultBatchSize,
MaxBufferSize: defaultBufferSize,
FlushInterval: defaultFlushInterval,
SDKMetricsTags: map[string]string{},
Timeout: defaultTimeout,
CSPBaseUrl: cspBaseUrl,
CSPClientId: cspClientId,
CSPClientSecret: cspClientSecret,
}

u, err := url.Parse(wfURL)
if err != nil {
return nil, err
}

if len(u.User.String()) > 0 {
cfg.Token = u.User.String()
u.User = nil
}

Expand Down
12 changes: 10 additions & 2 deletions senders/client_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,19 @@ func TestMetricsURLWithPortAndPath(t *testing.T) {
assert.Equal(t, "http://localhost:8071/wavefront", cfg.tracesURL())
}

func TestToken(t *testing.T) {
func TestTokenInUrl(t *testing.T) {
cfg, err := createConfig("https://my-api-token@localhost")
require.NoError(t, err)

assert.Equal(t, "my-api-token", cfg.Token)
assert.Equal(t, "my-api-token", cfg.Authentication.(APITokenAuth).Token)
assert.Equal(t, "https://localhost", cfg.Server)
}

func TestTokenOption(t *testing.T) {
cfg, err := createConfig("https://localhost", APIToken("my-api-token"))
require.NoError(t, err)

assert.Equal(t, "my-api-token", cfg.Authentication.(APITokenAuth).Token)
assert.Equal(t, "https://localhost", cfg.Server)
}

Expand Down
6 changes: 2 additions & 4 deletions senders/example_newsender_csp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ func ExampleNewSender_csp() {
// For CSP authorization w/Direct Ingestion endpoints, by default all data is sent to port 80
// or port 443 for unencrypted or encrypted connections, respectively.
// You would need to provide a CSP BASE URL, a Client ID, Client Secret
sender, err := wavefront.NewCSPSender(
sender, err := wavefront.NewSender(
"https://surf.wavefront.com",
"clientId",
"clientSecret",
wavefront.CSPBaseUrl("https://cspbaseurl.vmware.com"),
wavefront.CSPClientCredentialsWithBaseURL("clientId", "clientSecret", "https://cspbaseurl.vmware.com"),
)
if err != nil {
// handle error
Expand Down
27 changes: 24 additions & 3 deletions senders/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,30 @@ import (
"time"
)

func CSPBaseUrl(CSPBaseUrl string) Option {
return func(cfg *configuration) {
cfg.CSPBaseUrl = CSPBaseUrl
func APIToken(apiToken string) Option {
return func(c *configuration) {
c.Authentication = APITokenAuth{
Token: apiToken,
}
}
}

func CSPClientCredentials(clientId string, clientSecret string) Option {
return func(c *configuration) {
c.Authentication = CSPAuth{
ClientID: clientId,
ClientSecret: clientSecret,
BaseURL: defaultCSPBaseUrl,
}
}
}
func CSPClientCredentialsWithBaseURL(clientId string, clientSecret string, baseURL string) Option {
return func(c *configuration) {
c.Authentication = CSPAuth{
ClientID: clientId,
ClientSecret: clientSecret,
BaseURL: baseURL,
}
}
}

Expand Down

0 comments on commit d3bb71c

Please sign in to comment.