diff --git a/README.md b/README.md index df207d7..1813577 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ Latest stable branch is [master](https://github.com/FindHotel/analytics-go/tree/ If you use [go modules](https://blog.golang.org/v2-go-modules): ```shell - go get -u github.com/FindHotel/analytics-go@3.7.0 + go get -u github.com/FindHotel/analytics-go@v4.0.0 ``` ## Documentation @@ -38,7 +38,7 @@ package main import ( "os" - analytics "github.com/FindHotel/analytics-go" + analytics "github.com/FindHotel/analytics-go/v4" ) func main() { diff --git a/analytics.go b/analytics.go index 9358956..b5f4e92 100644 --- a/analytics.go +++ b/analytics.go @@ -5,17 +5,14 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "net/http" "strconv" "sync" "time" - - metrics "github.com/rcrowley/go-metrics" ) // Version of the client. -const Version = "3.5.0" +const Version = "4.0.0" // Client is the main API exposed by the analytics package. // Values that satsify this interface are returned by the client constructors @@ -60,12 +57,6 @@ type client struct { // This HTTP client is used to send requests to the backend, it uses the // HTTP transport provided in the configuration. http http.Client - - metricsRegistry metrics.Registry - - successCounters countersFunc - failureCounters countersFunc - droppedCounters countersFunc } // NewDiscardClient returns client which discards all messages. @@ -102,7 +93,6 @@ func NewWithConfig(writeKey string, config Config) (Client, error) { return nil, err } go c.loop() - go c.loopMetrics() return c, nil } @@ -113,19 +103,14 @@ func newWithConfig(writeKey string, config Config) (*client, error) { } c := &client{ - Config: makeConfig(config), - key: writeKey, - msgs: make(chan Message, 100), - quit: make(chan struct{}), - shutdown: make(chan struct{}), - http: makeHTTPClient(config.Transport), - metricsRegistry: metrics.NewRegistry(), + Config: makeConfig(config), + key: writeKey, + msgs: make(chan Message, 100), + quit: make(chan struct{}), + shutdown: make(chan struct{}), + http: makeHTTPClient(config.Transport), } - c.successCounters = c.newCounters("submitted.success") - c.failureCounters = c.newCounters("submitted.failure") - c.droppedCounters = c.newCounters("dropped") - return c, nil } @@ -141,7 +126,7 @@ func makeHTTPClient(transport http.RoundTripper) http.Client { func (c *client) Enqueue(msg Message) (err error) { if err = msg.validate(); err != nil { - c.droppedCounters(msg.tags()...).Inc(1) + c.notifyDropped(msg, err, 1) return } @@ -200,8 +185,8 @@ func (c *client) Enqueue(msg Message) (err error) { // and instead report that the client has been closed and shouldn't be // used anymore. if recover() != nil { - c.droppedCounters(msg.tags()...).Inc(1) err = ErrClosed + c.notifyDropped(msg, err, 1) } }() @@ -316,7 +301,7 @@ func (c *client) report(res *http.Response) (err error) { return } - if body, err = ioutil.ReadAll(res.Body); err != nil { + if body, err = io.ReadAll(res.Body); err != nil { c.errorf("response %d %s - %s", res.StatusCode, res.Status, err) return } @@ -416,9 +401,21 @@ func (c *client) maxBatchBytes() int { return maxBatchBytes - len(b) } +func (c *client) reportMetrics(name string, value int64, tags []string) { + statsd := c.Config.DDStatsdClient + if statsd == nil { + return + } + + err := statsd.Count(name, value, tags, 1) + if err != nil { + c.errorf("error submitting metric %s - %s", name, err) + } +} + func (c *client) notifySuccess(msgs []message) { for _, m := range msgs { - c.successCounters(m.Msg().tags()...).Inc(1) + c.reportMetrics("submitted.success", 1, m.Msg().tags()) } if c.Callback != nil { for _, m := range msgs { @@ -429,7 +426,7 @@ func (c *client) notifySuccess(msgs []message) { func (c *client) notifyFailure(msgs []message, err error) { for _, m := range msgs { - c.failureCounters(m.Msg().tags()...).Inc(1) + c.reportMetrics("submitted.failure", 1, m.Msg().tags()) } if c.Callback != nil { for _, m := range msgs { @@ -438,15 +435,19 @@ func (c *client) notifyFailure(msgs []message, err error) { } } +func (c *client) notifyDropped(m Message, err error, count int64) { + c.reportMetrics("dropped", count, m.tags()) +} + func (c *client) notifyFailureMsg(m Message, err error, count int64) { - c.failureCounters(m.tags()...).Inc(count) + c.reportMetrics("submitted.failure", count, m.tags()) if c.Callback != nil { c.Callback.Failure(m, err) } } func (c *client) notifySuccessMsg(m Message, count int64) { - c.successCounters(m.tags()...).Inc(count) + c.reportMetrics("submitted.success", count, m.tags()) if c.Callback != nil { c.Callback.Success(m) } diff --git a/analytics_test.go b/analytics_test.go index 88b4e87..0615118 100644 --- a/analytics_test.go +++ b/analytics_test.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "net/http" "net/http/httptest" "net/url" @@ -86,7 +85,7 @@ var ( Proto: r.Proto, ProtoMajor: r.ProtoMajor, ProtoMinor: r.ProtoMinor, - Body: ioutil.NopCloser(strings.NewReader("")), + Body: io.NopCloser(strings.NewReader("")), Request: r, }, nil }) @@ -105,7 +104,7 @@ var ( Proto: r.Proto, ProtoMajor: r.ProtoMajor, ProtoMinor: r.ProtoMinor, - Body: ioutil.NopCloser(strings.NewReader("")), + Body: io.NopCloser(strings.NewReader("")), Request: r, }, nil }) @@ -118,7 +117,7 @@ var ( Proto: r.Proto, ProtoMajor: r.ProtoMajor, ProtoMinor: r.ProtoMinor, - Body: ioutil.NopCloser(readFunc(func(b []byte) (int, error) { return 0, testError })), + Body: io.NopCloser(readFunc(func(b []byte) (int, error) { return 0, testError })), Request: r, }, nil }) @@ -135,7 +134,7 @@ func fixture(name string) string { panic(err) } defer f.Close() - b, err := ioutil.ReadAll(f) + b, err := io.ReadAll(f) if err != nil { panic(err) } diff --git a/cmd/cli/main.go b/cmd/cli/main.go index f231e0b..f68e70e 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -5,7 +5,7 @@ import ( "fmt" "os" - "github.com/FindHotel/analytics-go" + "github.com/FindHotel/analytics-go/v4" "github.com/segmentio/conf" ) diff --git a/config.go b/config.go index 2f7730d..20141ea 100644 --- a/config.go +++ b/config.go @@ -4,6 +4,7 @@ import ( "net/http" "time" + ddstatsd "github.com/DataDog/datadog-go/v5/statsd" "github.com/segmentio/backo-go" "github.com/xtgo/uuid" ) @@ -61,9 +62,8 @@ type Config struct { // If not set the client will fallback to use a default retry policy. RetryAfter func(int) time.Duration - // Reporters are used to report metrics to external reporting system such - // as DataDog. Useful implementations are DatadogReporter and LogReporter. - Reporters []Reporter + // DDStatsdClient is the client used to send metrics to Datadog. + DDStatsdClient ddstatsd.ClientInterface // A function called by the client to generate unique message identifiers. // The client uses a UUID generator if none is provided. diff --git a/examples/track.go b/examples/track.go index e2c14d5..b85af64 100644 --- a/examples/track.go +++ b/examples/track.go @@ -3,7 +3,7 @@ package main import ( "fmt" - "github.com/FindHotel/analytics-go" + "github.com/FindHotel/analytics-go/v4" ) import "time" diff --git a/go.mod b/go.mod index 2ece438..38a7e66 100644 --- a/go.mod +++ b/go.mod @@ -1,23 +1,49 @@ -module github.com/FindHotel/analytics-go +module github.com/FindHotel/analytics-go/v4 -go 1.14 +go 1.21 + +toolchain go1.23.3 require ( - github.com/avast/retry-go v2.1.0+incompatible - github.com/aws/aws-sdk-go v1.19.1 - github.com/cenkalti/backoff v2.1.0+incompatible // indirect - github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect + github.com/DataDog/datadog-go/v5 v5.5.0 + github.com/aws/aws-sdk-go-v2 v1.34.0 + github.com/aws/aws-sdk-go-v2/config v1.29.2 + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.54 + github.com/aws/aws-sdk-go-v2/service/s3 v1.74.1 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a github.com/segmentio/backo-go v0.0.0-20160424052352-204274ad699c github.com/segmentio/conf v1.0.0 + github.com/stretchr/testify v1.8.1 + github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c +) + +require ( + github.com/Microsoft/go-winio v0.5.0 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.8 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.55 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.25 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.29 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.29 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.29 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.5.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.10 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.10 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.24.12 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.11 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.33.10 // indirect + github.com/aws/smithy-go v1.22.2 // indirect + github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/segmentio/go-snakecase v1.0.0 // indirect github.com/segmentio/objconv v1.0.1 // indirect - github.com/stretchr/objx v0.3.0 // indirect - github.com/stretchr/testify v1.6.1 - github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c - github.com/zorkian/go-datadog-api v2.18.0+incompatible + golang.org/x/sys v0.0.0-20210510120138-977fb7262007 // indirect + gopkg.in/go-playground/assert.v1 v1.2.1 // indirect gopkg.in/go-playground/mold.v2 v2.2.0 // indirect gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19 // indirect - gopkg.in/yaml.v2 v2.2.1 // indirect - gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect + gopkg.in/yaml.v2 v2.2.8 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index ea1adab..3dd233d 100644 --- a/go.sum +++ b/go.sum @@ -1,19 +1,64 @@ -github.com/avast/retry-go v2.1.0+incompatible h1:NDQfwOYTuYSbKEwFu+dx5YiU3jANx9n4NW2ZCzYL3AI= -github.com/avast/retry-go v2.1.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= -github.com/aws/aws-sdk-go v1.19.1 h1:8kOP0/XGJwXIFlYoD1DAtA39cAjc15Iv/QiDMKitD9U= -github.com/aws/aws-sdk-go v1.19.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= -github.com/cenkalti/backoff v2.1.0+incompatible h1:FIRvWBZrzS4YC7NT5cOuZjexzFvIr+Dbi6aD1cZaNBk= -github.com/cenkalti/backoff v2.1.0+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/DataDog/datadog-go/v5 v5.5.0 h1:G5KHeB8pWBNXT4Jtw0zAkhdxEAWSpWH00geHI6LDrKU= +github.com/DataDog/datadog-go/v5 v5.5.0/go.mod h1:K9kcYBlxkcPP8tvvjZZKs/m1edNAUFzBbdpTUKfCsuw= +github.com/Microsoft/go-winio v0.5.0 h1:Elr9Wn+sGKPlkaBvwu4mTrxtmOp3F3yV9qhaHbXGjwU= +github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= +github.com/aws/aws-sdk-go-v2 v1.34.0 h1:9iyL+cjifckRGEVpRKZP3eIxVlL06Qk1Tk13vreaVQU= +github.com/aws/aws-sdk-go-v2 v1.34.0/go.mod h1:JgstGg0JjWU1KpVJjD5H0y0yyAIpSdKEq556EI6yOOM= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.8 h1:zAxi9p3wsZMIaVCdoiQp2uZ9k1LsZvmAnoTBeZPXom0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.8/go.mod h1:3XkePX5dSaxveLAYY7nsbsZZrKxCyEuE5pM4ziFxyGg= +github.com/aws/aws-sdk-go-v2/config v1.29.2 h1:JuIxOEPcSKpMB0J+khMjznG9LIhIBdmqNiEcPclnwqc= +github.com/aws/aws-sdk-go-v2/config v1.29.2/go.mod h1:HktTHregOZwNSM/e7WTfVSu9RCX+3eOv+6ij27PtaYs= +github.com/aws/aws-sdk-go-v2/credentials v1.17.55 h1:CDhKnDEaGkLA5ZszV/qw5uwN5M8rbv9Cl0JRN+PRsaM= +github.com/aws/aws-sdk-go-v2/credentials v1.17.55/go.mod h1:kPD/vj+RB5MREDUky376+zdnjZpR+WgdBBvwrmnlmKE= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.25 h1:kU7tmXNaJ07LsyN3BUgGqAmVmQtq0w6duVIHAKfp0/w= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.25/go.mod h1:OiC8+OiqrURb1wrwmr/UbOVLFSWEGxjinj5C299VQdo= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.54 h1:6BWOAho3Cgdy4cmNJ4HWY8VZgqODEU7Gw78XXireNZI= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.54/go.mod h1:n+t/oyYErOV3jf/GxNTVlizSM9RMV1yH7jvcIvld3Do= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.29 h1:Ej0Rf3GMv50Qh4G4852j2djtoDb7AzQ7MuQeFHa3D70= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.29/go.mod h1:oeNTC7PwJNoM5AznVr23wxhLnuJv0ZDe5v7w0wqIs9M= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.29 h1:6e8a71X+9GfghragVevC5bZqvATtc3mAMgxpSNbgzF0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.29/go.mod h1:c4jkZiQ+BWpNqq7VtrxjwISrLrt/VvPq3XiopkUIolI= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2 h1:Pg9URiobXy85kgFev3og2CuOZ8JZUBENF+dcgWBaYNk= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.29 h1:g9OUETuxA8i/Www5Cby0R3WSTe7ppFTZXHVLNskNS4w= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.29/go.mod h1:CQk+koLR1QeY1+vm7lqNfFii07DEderKq6T3F1L2pyc= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2 h1:D4oz8/CzT9bAEYtVhSBmFj2dNOtaHOtMKc2vHBwYizA= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2/go.mod h1:Za3IHqTQ+yNcRHxu1OFucBh0ACZT4j4VQFF0BqpZcLY= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.5.3 h1:EP1ITDgYVPM2dL1bBBntJ7AW5yTjuWGz9XO+CZwpALU= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.5.3/go.mod h1:5lWNWeAgWenJ/BZ/CP9k9DjLbC0pjnM045WjXRPPi14= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.10 h1:hN4yJBGswmFTOVYqmbz1GBs9ZMtQe8SrYxPwrkrlRv8= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.10/go.mod h1:TsxON4fEZXyrKY+D+3d2gSTyJkGORexIYab9PTf56DA= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.10 h1:fXoWC2gi7tdJYNTPnnlSGzEVwewUchOi8xVq/dkg8Qs= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.10/go.mod h1:cvzBApD5dVazHU8C2rbBQzzzsKc8m5+wNJ9mCRZLKPc= +github.com/aws/aws-sdk-go-v2/service/s3 v1.74.1 h1:9LawY3cDJ3HE+v2GMd5SOkNLDwgN4K7TsCjyVBYu/L4= +github.com/aws/aws-sdk-go-v2/service/s3 v1.74.1/go.mod h1:hHnELVnIHltd8EOF3YzahVX6F6y2C6dNqpRj1IMkS5I= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.12 h1:kznaW4f81mNMlREkU9w3jUuJvU5g/KsqDV43ab7Rp6s= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.12/go.mod h1:bZy9r8e0/s0P7BSDHgMLXK2KvdyRRBIQ2blKlvLt0IU= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.11 h1:mUwIpAvILeKFnRx4h1dEgGEFGuV8KJ3pEScZWVFYuZA= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.11/go.mod h1:JDJtD+b8HNVv71axz8+S5492KM8wTzHRFpMKQbPlYxw= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.10 h1:g9d+TOsu3ac7SgmY2dUf1qMgu/uJVTlQ4VCbH6hRxSw= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.10/go.mod h1:WZfNmntu92HO44MVZAubQaz3qCuIdeOdog2sADfU6hU= +github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ= +github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= -github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/segmentio/backo-go v0.0.0-20160424052352-204274ad699c h1:rsRTAcCR5CeNLkvgBVSjQoDGRRt6kggsE6XYBqCv2KQ= github.com/segmentio/backo-go v0.0.0-20160424052352-204274ad699c/go.mod h1:kJ9mm9YmoWSkk+oQ+5Cj8DEoRCX2JT6As4kEtIIOp1M= github.com/segmentio/conf v1.0.0 h1:oRF4BtoJbI/+I7fUngYMnMcKFbjqVUFi8hv4Pp0l88w= @@ -22,25 +67,56 @@ github.com/segmentio/go-snakecase v1.0.0 h1:FSeHpP0sBL3O+MCpxvQZrS5a51WAki6gposZ github.com/segmentio/go-snakecase v1.0.0/go.mod h1:jk1miR5MS7Na32PZUykG89Arm+1BUSYhuGR6b7+hJto= github.com/segmentio/objconv v1.0.1 h1:QjfLzwriJj40JibCV3MGSEiAoXixbp4ybhwfTB8RXOM= github.com/segmentio/objconv v1.0.1/go.mod h1:auayaH5k3137Cl4SoXTgrzQcuQDmvuVtZgS0fb1Ahys= -github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= +github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.3.0 h1:NGXK3lHquSN08v5vWalVI/L8XU9hdzE/G6xsrze47As= -github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= -github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c h1:3lbZUMbMiGUW/LMkfsEABsc5zNT9+b1CvsJx47JzJ8g= github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c/go.mod h1:UrdRz5enIKZ63MEE3IF9l2/ebyx59GyGgPi+tICQdmM= -github.com/zorkian/go-datadog-api v2.18.0+incompatible h1:7JZOVDO8qDaXDKPAzTgiJahU3IoDyzxbLDwoT0U9n0w= -github.com/zorkian/go-datadog-api v2.18.0+incompatible/go.mod h1:PkXwHX9CUQa/FpB9ZwAD45N1uhCW4MT/Wj7m36PbKss= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM= +gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= gopkg.in/go-playground/mold.v2 v2.2.0 h1:Y4IYB4/HYQfuq43zaKh6vs9cVelLE9qbqe2fkyfCTWQ= gopkg.in/go-playground/mold.v2 v2.2.0/go.mod h1:XMyyRsGtakkDPbxXbrA5VODo6bUXyvoDjLd5l3T0XoA= gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19 h1:WB265cn5OpO+hK3pikC9hpP1zI/KTwmyMFKloW9eOVc= gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19/go.mod h1:o4V0GXN9/CAmCsvJ0oXYZvrZOe7syiDZSN1GWGZTGzc= -gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ= -gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/report.go b/report.go deleted file mode 100644 index c7e3f08..0000000 --- a/report.go +++ /dev/null @@ -1,298 +0,0 @@ -package analytics - -import ( - "fmt" - "io" - "net" - "net/http" - "os" - "strings" - "sync" - "time" - - "github.com/avast/retry-go" - metrics "github.com/rcrowley/go-metrics" - datadog "github.com/zorkian/go-datadog-api" -) - -// Reporter provides a function to reporting metrics. -type Reporter interface { - Report(metricName string, value interface{}, tags []string, ts time.Time) - Flush() error - AddTags(tags ...string) -} - -func splitTags(name string) (string, []string) { - tokens := strings.Split(name, ".") - if len(tokens) <= 1 { - return name, []string{} - } - names := []string{} - tags := []string{} - for _, token := range tokens { - if strings.Contains(token, ":") { - tags = append(tags, token) - } else { - names = append(names, token) - } - } - return strings.Join(names, "."), tags -} - -func (c *client) reportAll(prefix string, reporters []Reporter) { - if len(reporters) == 0 { - return - } - ts := time.Now() - metrics := c.metricsRegistry.GetAll() - go func() { - for key, metric := range metrics { - for measure, value := range metric { - name, tags := splitTags(key) - name = prefix + "." + name - for _, r := range reporters { - r.Report(name+"."+measure, value, tags, ts) - } - } - } - for _, r := range reporters { - if err := r.Flush(); err != nil { - c.Config.Logger.Errorf("flush failed for reporter %s: %s", r, err) - } - } - }() -} - -var hostname = func() string { - h, err := os.Hostname() - if err != nil { - return "localhost" - } - return h -}() - -// DiscardReporter discards all metrics, useful for tests. -type DiscardReporter struct{} - -// Report reports metrics. -func (r DiscardReporter) Report(metricName string, value interface{}, tags []string, ts time.Time) {} - -// AddTags adds tags to be added to each metric reported. -func (r *DiscardReporter) AddTags(tags ...string) {} - -// Flush flushes reported metrics. -func (r *DiscardReporter) Flush() error { return nil } - -// LogReporter report metrics as a log. -type LogReporter struct { - logger Logger - tags []string -} - -// NewLogReporter returns new log repoter ready to use. -func NewLogReporter(l ...Logger) *LogReporter { - if len(l) == 0 { - l = []Logger{newDefaultLogger()} - } - return &LogReporter{ - logger: l[0], - tags: []string{}, - } -} - -// Report reports metrics. -func (r LogReporter) Report(metricName string, value interface{}, tags []string, ts time.Time) { - allTags := append(tags, r.tags...) - r.logger.Logf("%s[%s] = %v", metricName, strings.Join(allTags, ", "), value) -} - -// Flush flushes reported metrics. -func (r *LogReporter) Flush() error { return nil } - -// AddTags adds tags to be added to each metric reported. -func (r *LogReporter) AddTags(tags ...string) { - r.tags = append(r.tags, tags...) -} - -func newHTTPTransport() *http.Transport { - return &http.Transport{ - DisableKeepAlives: true, - Dial: (&net.Dialer{ - Timeout: 30 * time.Second, - }).Dial, - TLSHandshakeTimeout: 30 * time.Second, - } -} - -// NewDatadogReporter is a factory method to create Datadog reporter -// with sane defaults. -func NewDatadogReporter(apiKey, appKey string, tags ...string) *DatadogReporter { - dr := DatadogReporter{ - Client: datadog.NewClient(apiKey, appKey), - } - dr.Client.HttpClient = &http.Client{ - Timeout: time.Second * 30, - Transport: newHTTPTransport(), - } - dr.logger = newDefaultLogger() - dr.tags = append(tags, "transport:http", "sdkversion:go-"+Version) - return &dr -} - -// WithLogger sets logger to DatadogReporter. -func (dd *DatadogReporter) WithLogger(logger Logger) *DatadogReporter { - dd.logger = logger - return dd -} - -// DatadogReporter reports metrics to DataDog. -type DatadogReporter struct { - Client *datadog.Client - logger Logger - metrics []datadog.Metric - tags []string - sync.Mutex -} - -// AddTags adds tags to be added to each metric reported. -func (dd *DatadogReporter) AddTags(tags ...string) { - dd.Lock() - defer dd.Unlock() - dd.tags = append(dd.tags, tags...) -} - -// Flush flushes reported metrics. -func (dd *DatadogReporter) Flush() error { - dd.Lock() - metrics := dd.metrics - dd.metrics = []datadog.Metric{} - dd.Unlock() - - err := retry.Do( - func() error { - return dd.Client.PostMetrics(metrics) - }, - retry.OnRetry(func(iteration uint, err error) { - dd.logger.Errorf("Reporting metrics failed for the %d time: %s", iteration, err) - }), - retry.RetryIf(func(err error) bool { - if err == io.EOF { - dd.Client.HttpClient.Transport = newHTTPTransport() - return true - } - return false - }), - ) - - if err != nil { - return fmt.Errorf("submission metrics to DataDog failed for the last time, metrics are gone: %s", err) - } - return nil -} - -// Report sends provided metric to Datadog. -func (dd *DatadogReporter) Report(metricName string, value interface{}, tags []string, ts time.Time) { - metricType := "gauge" - metricValue, err := func() (float64, error) { - switch v := value.(type) { - case float64: - return v, nil - case int64: - return float64(v), nil - case int: - return float64(v), nil - } - return 0, fmt.Errorf("can't handle value %+v", value) - }() - if err != nil { - dd.logger.Errorf("Serializing value for metric %s(%+v) failed: %s", metricName, value, err) - return - } - metricTimestamp := float64(ts.Truncate(time.Minute).Unix()) - allTags := append(tags, "hostname:"+hostname) - allTags = append(allTags, dd.tags...) - metric := datadog.Metric{ - Metric: &metricName, - Type: &metricType, - Tags: allTags, - Points: []datadog.DataPoint{{&metricTimestamp, &metricValue}}, - } - dd.Lock() - dd.metrics = append(dd.metrics, metric) - dd.Unlock() -} - -func (c *client) resetMetrics() { - ms := c.metricsRegistry.GetAll() - for name := range ms { - metric := c.metricsRegistry.Get(name) - switch m := metric.(type) { - case metrics.Counter: - m.Clear() - case metrics.Gauge: - m.Update(0) - case metrics.Histogram: - // do nothing as Histogram has it's own internal cleanup - } - } -} - -type countersFunc func(tags ...string) metrics.Counter - -// newCounters returns factory for tagged counters. -func (c *client) newCounters(name string) countersFunc { - counters := make(map[string]metrics.Counter) - mu := &sync.Mutex{} - - return func(tags ...string) metrics.Counter { - fullName := strings.Join(append([]string{name}, tags...), ".") - - mu.Lock() - defer mu.Unlock() - - counter, ok := counters[fullName] - if !ok { - counter = c.metricsRegistry.GetOrRegister( - fullName, - metrics.NewCounter(), - ).(metrics.Counter) - counters[fullName] = counter - } - return counter - } -} - -func (c *client) loopMetrics() { - var reporters = c.Config.Reporters - if len(reporters) == 0 { - c.Logger.Logf("No reporters are configured, metrics won't be reported") - } - - ep := strings.Split(c.Config.Endpoint, "/") - enrichReporter := func(reporter Reporter) { - reporter.AddTags( - "key:"+fmt.Sprintf("%.6s", c.key), - "endpoint:"+fmt.Sprintf("%.9s", ep[len(ep)-1]), - ) - if ctx := c.Config.DefaultContext; ctx != nil { - if app := ctx.App.Name; app != "" { - reporter.AddTags("app:" + app) - } - if version := ctx.App.Version; version != "" { - reporter.AddTags("appversion:" + version) - } - } - } - for _, reporter := range reporters { - enrichReporter(reporter) - } - for { - select { - case <-c.quit: - return - case <-time.Tick(60 * time.Second): - c.reportAll("evas.events", reporters) - c.resetMetrics() - } - } -} diff --git a/s3client.go b/s3client.go index 4fdef2b..919e114 100644 --- a/s3client.go +++ b/s3client.go @@ -4,21 +4,21 @@ import ( "bufio" "bytes" "compress/gzip" + "context" "encoding/json" "io" - "io/ioutil" "os" "path/filepath" "sync" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/aws/aws-sdk-go-v2/service/s3" ) type uploader interface { - Upload(input *s3manager.UploadInput, options ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) + Upload(ctx context.Context, input *s3.PutObjectInput, options ...func(*manager.Uploader)) (*manager.UploadOutput, error) } type s3Client struct { @@ -26,7 +26,7 @@ type s3Client struct { config S3ClientConfig apiContext *apiContext uploader uploader - //s3client works only with one type of msg + //s3Client works only with one type of msg tagsOnlyMsg tagsOnlyMsg } @@ -39,7 +39,7 @@ type S3 struct { // Examples: tuna, salmon, haring, etc. Each system receives its own stream. Stream string - // MaxBatchBytes size repsresents the size of buffer or file and when events are flushed + // MaxBatchBytes size represents the size of buffer or file and when events are flushed MaxBatchBytes int // BufferFilePath if specified the temp file will be used to store the data @@ -50,9 +50,9 @@ type S3 struct { KeyConstructor func(now func() Time, uid func() string) string - UploaderOptions []func(*s3manager.Uploader) + UploaderOptions []func(*manager.Uploader) - Session *session.Session + Cfg *aws.Config } // S3ClientConfig provides configuration for S3 Client. @@ -76,7 +76,7 @@ func NewS3ClientWithConfig(config S3ClientConfig) (Client, error) { client.msgs = make(chan Message, 1024) // overrite the buffer - uploader := s3manager.NewUploader(cfg.S3.Session, cfg.S3.UploaderOptions...) + uploader := manager.NewUploader(s3.NewFromConfig(*cfg.S3.Cfg), cfg.S3.UploaderOptions...) c := &s3Client{ client: client, @@ -88,14 +88,13 @@ func NewS3ClientWithConfig(config S3ClientConfig) (Client, error) { uploader: uploader, } - go c.loop() // custom implementation - go c.loopMetrics() // reuse client's implementation + go c.loop(context.Background()) // custom implementation return c, nil } // a copy of client.loop() function. -func (c *s3Client) loop() { +func (c *s3Client) loop(ctx context.Context) { defer close(c.shutdown) wg := &sync.WaitGroup{} @@ -119,10 +118,10 @@ func (c *s3Client) loop() { for { select { case msg := <-c.msgs: - c.push(&bw, msg, wg, ex) + c.push(ctx, &bw, msg, wg, ex) case <-tick.C: - c.flush(&bw, wg, ex) + c.flush(ctx, &bw, wg, ex) case <-c.quit: c.debugf("exit requested – draining messages") @@ -131,10 +130,10 @@ func (c *s3Client) loop() { // messages can be pushed and otherwise the loop would never end. close(c.msgs) for msg := range c.msgs { - c.push(&bw, msg, wg, ex) + c.push(ctx, &bw, msg, wg, ex) } - c.flush(&bw, wg, ex) + c.flush(ctx, &bw, wg, ex) bw.buf.Close() c.debugf("exit") return @@ -197,7 +196,7 @@ func (m *tagsOnlyMsg) validate() error { return nil } -func (c *s3Client) push(encoder *bufferedEncoder, m Message, wg *sync.WaitGroup, ex *executor) { +func (c *s3Client) push(ctx context.Context, encoder *bufferedEncoder, m Message, wg *sync.WaitGroup, ex *executor) { c.setTagsIfExsist(m) ready, err := c.encodeMessage(encoder, m) @@ -209,11 +208,11 @@ func (c *s3Client) push(encoder *bufferedEncoder, m Message, wg *sync.WaitGroup, if ready { c.debugf("exceeded messages batch limit with batch of %d messages – flushing", encoder.messages) - c.sendAsync(encoder, wg, ex) + c.sendAsync(ctx, encoder, wg, ex) } } -// we need this functio to send metrics +// we need this function to send metrics func (c *s3Client) setTagsIfExsist(m Message) { if len(c.tagsOnlyMsg.t) == 0 { c.tagsOnlyMsg.t = m.tags() @@ -241,7 +240,7 @@ func (c *s3Client) encodeMessage(bw *bufferedEncoder, m Message) (ready bool, er } // Asynchronously send a batched requests. -func (c *s3Client) sendAsync(bw *bufferedEncoder, wg *sync.WaitGroup, ex *executor) { +func (c *s3Client) sendAsync(ctx context.Context, bw *bufferedEncoder, wg *sync.WaitGroup, ex *executor) { if bw.BytesLen() == 0 { c.errorf("empty buffer, send is not possible") return @@ -266,7 +265,7 @@ func (c *s3Client) sendAsync(bw *bufferedEncoder, wg *sync.WaitGroup, ex *execut c.errorf("panic - %s", err) } }() - c.send(buf, msgs) + c.send(ctx, buf, msgs) }) { wg.Done() c.errorf("sending messages failed - %s", ErrTooManyRequests) @@ -274,16 +273,16 @@ func (c *s3Client) sendAsync(bw *bufferedEncoder, wg *sync.WaitGroup, ex *execut } } -func (c *s3Client) flush(bw *bufferedEncoder, wg *sync.WaitGroup, ex *executor) { +func (c *s3Client) flush(ctx context.Context, bw *bufferedEncoder, wg *sync.WaitGroup, ex *executor) { msgs := bw.TotalMsgs() if msgs > 0 { c.debugf("flushing %d messages", msgs) - c.sendAsync(bw, wg, ex) + c.sendAsync(ctx, bw, wg, ex) } } // Send batch request. -func (c *s3Client) send(buf encodedBuffer, msgs int) { +func (c *s3Client) send(ctx context.Context, buf encodedBuffer, msgs int) { const attempts = 10 defer buf.Close() @@ -293,7 +292,7 @@ func (c *s3Client) send(buf encodedBuffer, msgs int) { c.errorf("can't get reader", err) } - if err = c.upload(reader); err == nil { + if err = c.upload(ctx, reader); err == nil { c.notifySuccessMsg(&c.tagsOnlyMsg, int64(msgs)) return } @@ -313,17 +312,16 @@ func (c *s3Client) send(buf encodedBuffer, msgs int) { } // Upload batch to S3. -func (c *s3Client) upload(r io.Reader) error { +func (c *s3Client) upload(ctx context.Context, r io.Reader) error { key := c.config.S3.KeyConstructor(c.now, uid) c.debugf("uploading to s3://%s/%s", c.config.S3.Bucket, key) - input := &s3manager.UploadInput{ + input := &s3.PutObjectInput{ Body: r, Bucket: aws.String(c.config.S3.Bucket), - ACL: aws.String("public-read"), Key: aws.String(key), } - _, err := c.uploader.Upload(input) + _, err := c.uploader.Upload(ctx, input) return err } @@ -436,7 +434,7 @@ type fileBuffer struct { func newFileBuffer(path string) (*fileBuffer, error) { dir, file := filepath.Split(path) - fd, err := ioutil.TempFile(dir, file) + fd, err := os.CreateTemp(dir, file) if err != nil { return nil, err } diff --git a/s3client_test.go b/s3client_test.go index fcc7ce3..7aff4a2 100644 --- a/s3client_test.go +++ b/s3client_test.go @@ -4,15 +4,17 @@ import ( "bufio" "bytes" "compress/gzip" + "context" "encoding/json" "io" - "io/ioutil" "log" + "os" "path/filepath" "testing" "time" - "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -85,7 +87,7 @@ func readAndUngzip(t *testing.T, r io.Reader) []byte { require.NoError(t, err) defer q.Close() - d, err := ioutil.ReadAll(q) + d, err := io.ReadAll(q) require.NoError(t, err) return d } @@ -131,7 +133,7 @@ func writeAndReadBuffer(t *testing.T, buf encodedBuffer, expected string) { reader, err := buf.Reader() require.NoError(t, err) - result, err := ioutil.ReadAll(reader) + result, err := io.ReadAll(reader) require.NoError(t, err) require.Equal(t, expected, string(result)) @@ -236,7 +238,7 @@ func Test_MemoryLimit(t *testing.T) { func checkNoFilesLeft(t *testing.T, path string) { t.Helper() dir, fn := filepath.Split(path) - files, err := ioutil.ReadDir(dir) + files, err := os.ReadDir(dir) require.NoError(t, err) for _, file := range files { @@ -334,8 +336,8 @@ type uploadMock struct { resultChan chan []byte } -func (u *uploadMock) Upload(input *s3manager.UploadInput, options ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) { - data, err := ioutil.ReadAll(input.Body) +func (u *uploadMock) Upload(ctx context.Context, input *s3.PutObjectInput, options ...func(*manager.Uploader)) (*manager.UploadOutput, error) { + data, err := io.ReadAll(input.Body) if err != nil { return nil, err } diff --git a/s3clientconfig.go b/s3clientconfig.go index 0258db9..f6c9c8d 100644 --- a/s3clientconfig.go +++ b/s3clientconfig.go @@ -1,11 +1,12 @@ package analytics import ( + "context" "fmt" "strings" "time" - "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go-v2/config" ) // MB is the number of bytes in one megabyte. @@ -52,12 +53,12 @@ func makeS3ClientConfig(c S3ClientConfig) (S3ClientConfig, error) { } } - if c.S3.Session == nil { - var err error - c.S3.Session, err = session.NewSession() + if c.S3.Cfg == nil { + awsCfg, err := config.LoadDefaultConfig(context.Background()) if err != nil { - return c, fmt.Errorf("analytics: creating s3 client session failed: %w", err) + return c, fmt.Errorf("analytics: loading default aws config failed: %w", err) } + c.S3.Cfg = &awsCfg } return c, nil