Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement lifecycle events, results, diagnostics with InfluxDB #3

Merged
merged 13 commits into from
Apr 30, 2020
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,17 @@ module github.com/testground/sdk-go
go 1.14

require (
github.com/avast/retry-go v2.6.0+incompatible
github.com/dustin/go-humanize v1.0.0
github.com/go-redis/redis/v7 v7.2.0
github.com/hashicorp/go-multierror v1.1.0
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.5.1
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
github.com/stretchr/testify v1.4.0
go.uber.org/zap v1.14.1
golang.org/x/net v0.0.0-20191112182307-2180aed22343 // indirect
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f // indirect
)
16 changes: 16 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/avast/retry-go v2.6.0+incompatible h1:FelcMrm7Bxacr1/RM8+/eqkDkmVN7tjlsy51dOzB3LI=
github.com/avast/retry-go v2.6.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down Expand Up @@ -34,8 +36,14 @@ github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d h1:/WZQPMZNsjZ7IlCpsLGdQBINg5bxKQ1K1sh6awxLtkA=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
Expand All @@ -62,6 +70,8 @@ github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
Expand All @@ -79,6 +89,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
Expand Down Expand Up @@ -110,6 +122,8 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191112182307-2180aed22343 h1:00ohfJ4K98s3m6BGUoBd8nyfp4Yl0GoIKvw5abItTjI=
golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -135,6 +149,8 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f h1:kDxGY2VmgABOe55qheT/TFqUMtcTHnomIPS1iv3G4Ms=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
19 changes: 19 additions & 0 deletions runtime/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package runtime

const (
EnvTestBranch = "TEST_BRANCH"
EnvTestCase = "TEST_CASE"
EnvTestGroupID = "TEST_GROUP_ID"
EnvTestGroupInstanceCount = "TEST_GROUP_INSTANCE_COUNT"
EnvTestInstanceCount = "TEST_INSTANCE_COUNT"
EnvTestInstanceParams = "TEST_INSTANCE_PARAMS"
EnvTestInstanceRole = "TEST_INSTANCE_ROLE"
EnvTestOutputsPath = "TEST_OUTPUTS_PATH"
EnvTestPlan = "TEST_PLAN"
EnvTestRepo = "TEST_REPO"
EnvTestRun = "TEST_RUN"
EnvTestSidecar = "TEST_SIDECAR"
EnvTestStartTime = "TEST_START_TIME"
EnvTestSubnet = "TEST_SUBNET"
EnvTestTag = "TEST_TAG"
)
187 changes: 187 additions & 0 deletions runtime/influxdb_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package runtime

import (
"io"
"time"

"github.com/avast/retry-go"
_ "github.com/influxdata/influxdb1-client"
client "github.com/influxdata/influxdb1-client/v2"
)

type Batcher interface {
io.Closer

WritePoint(p *client.Point)
}

type batcher struct {
re *RunEnv
client client.Client
length int
interval time.Duration
retryOpts []retry.Option

writeCh chan *client.Point
flushCh chan struct{}
doneCh chan struct{}

pending []*client.Point
sending []*client.Point
sendRes chan error
doneErr chan error
}

func newBatcher(re *RunEnv, cli client.Client, length int, interval time.Duration, retry ...retry.Option) *batcher {
b := &batcher{
re: re,
client: cli,
length: length,
interval: interval,
retryOpts: retry,

writeCh: make(chan *client.Point),
flushCh: make(chan struct{}, 1),
sendRes: make(chan error, 1),
doneCh: make(chan struct{}),
doneErr: make(chan error),

pending: nil,
sending: nil,
}

go b.background()

return b
}

func (b *batcher) background() {
tick := time.NewTicker(b.interval)
defer tick.Stop()

attemptFlush := func() {
if b.sending != nil {
// there's already a flush taking place.
return
}
select {
case b.flushCh <- struct{}{}:
default:
// there's a flush queued to be accepted.
}
}

for {
select {
case p := <-b.writeCh:
b.pending = append(b.pending, p)
if len(b.pending) >= b.length {
attemptFlush()
}

case err := <-b.sendRes:
if err == nil {
b.pending = b.pending[len(b.sending):]
b.re.RecordMessage("influxdb: uploaded %d points", len(b.sending))
} else {
b.re.RecordMessage("influxdb: failed to upload %d points; err: %s", len(b.sending), err)
}
b.sending = nil
if len(b.pending) >= b.length {
attemptFlush()
}

case <-tick.C:
attemptFlush()

case <-b.flushCh:
if b.sending != nil {
continue
}
l := len(b.pending)
if l == 0 {
continue
}
if l > b.length {
l = b.length
}
b.sending = b.pending[:l]
go b.send()

case <-b.doneCh:
if b.sending != nil {
// we are currently sending, wait for the send to finish first.
if err := <-b.sendRes; err == nil {
b.pending = b.pending[len(b.sending):]
b.re.RecordMessage("influxdb: uploaded %d points", len(b.sending))
} else {
b.re.RecordMessage("influxdb: failed to upload %d points; err: %s", len(b.sending), err)
}
}

var err error
if len(b.pending) > 0 {
// send all remaining data at once.
b.sending = b.pending
go b.send()
err = <-b.sendRes
if err == nil {
b.re.RecordMessage("influxdb: uploaded %d points", len(b.sending))
} else {
b.re.RecordMessage("influxdb: failed to upload %d points; err: %s", len(b.sending), err)
}
b.sending = nil
}
b.doneErr <- err
return
}
}
}

func (b *batcher) WritePoint(p *client.Point) {
b.writeCh <- p
}

// Close flushes any remaining points and returns any errors from the final flush.
func (b *batcher) Close() error {
select {
case _, ok := <-b.doneCh:
if !ok {
return nil
}
default:
}
close(b.doneCh)
return <-b.doneErr
}

func (b *batcher) send() {
points, err := client.NewBatchPoints(client.BatchPointsConfig{Database: "testground"})

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to specify the precision here? I actually don't know.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default precision is ns, that's why I didn't specify it ;-)

if err != nil {
b.sendRes <- err
return
}

for _, p := range b.sending {
points.AddPoint(p)
}

err = retry.Do(func() error { return b.client.Write(points) }, b.retryOpts...)
b.sendRes <- err
}

type nilBatcher struct {
client.Client
}

func (n *nilBatcher) WritePoint(p *client.Point) {
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{Database: "testground"})
bp.AddPoint(p)
_ = n.Write(bp)
}

func (n *nilBatcher) Close() error {
return nil
}

var _ Batcher = (*nilBatcher)(nil)
Loading