Skip to content

Commit

Permalink
Implement lifecycle events, results, diagnostics with InfluxDB (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk committed Apr 30, 2020
1 parent daa68fa commit 207954a
Show file tree
Hide file tree
Showing 25 changed files with 1,917 additions and 781 deletions.
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,17 @@ module github.com/testground/sdk-go
go 1.14

require (
github.com/avast/retry-go v2.6.0+incompatible
github.com/dustin/go-humanize v1.0.0
github.com/go-redis/redis/v7 v7.2.0
github.com/hashicorp/go-multierror v1.1.0
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.5.1
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
github.com/stretchr/testify v1.4.0
go.uber.org/zap v1.14.1
golang.org/x/net v0.0.0-20191112182307-2180aed22343 // indirect
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f // indirect
)
16 changes: 16 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/avast/retry-go v2.6.0+incompatible h1:FelcMrm7Bxacr1/RM8+/eqkDkmVN7tjlsy51dOzB3LI=
github.com/avast/retry-go v2.6.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down Expand Up @@ -34,8 +36,14 @@ github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d h1:/WZQPMZNsjZ7IlCpsLGdQBINg5bxKQ1K1sh6awxLtkA=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
Expand All @@ -62,6 +70,8 @@ github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
Expand All @@ -79,6 +89,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
Expand Down Expand Up @@ -110,6 +122,8 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191112182307-2180aed22343 h1:00ohfJ4K98s3m6BGUoBd8nyfp4Yl0GoIKvw5abItTjI=
golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -135,6 +149,8 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f h1:kDxGY2VmgABOe55qheT/TFqUMtcTHnomIPS1iv3G4Ms=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
19 changes: 19 additions & 0 deletions runtime/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package runtime

const (
EnvTestBranch = "TEST_BRANCH"
EnvTestCase = "TEST_CASE"
EnvTestGroupID = "TEST_GROUP_ID"
EnvTestGroupInstanceCount = "TEST_GROUP_INSTANCE_COUNT"
EnvTestInstanceCount = "TEST_INSTANCE_COUNT"
EnvTestInstanceParams = "TEST_INSTANCE_PARAMS"
EnvTestInstanceRole = "TEST_INSTANCE_ROLE"
EnvTestOutputsPath = "TEST_OUTPUTS_PATH"
EnvTestPlan = "TEST_PLAN"
EnvTestRepo = "TEST_REPO"
EnvTestRun = "TEST_RUN"
EnvTestSidecar = "TEST_SIDECAR"
EnvTestStartTime = "TEST_START_TIME"
EnvTestSubnet = "TEST_SUBNET"
EnvTestTag = "TEST_TAG"
)
187 changes: 187 additions & 0 deletions runtime/influxdb_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package runtime

import (
"io"
"time"

"github.com/avast/retry-go"
_ "github.com/influxdata/influxdb1-client"
client "github.com/influxdata/influxdb1-client/v2"
)

type Batcher interface {
io.Closer

WritePoint(p *client.Point)
}

type batcher struct {
re *RunEnv
client client.Client
length int
interval time.Duration
retryOpts []retry.Option

writeCh chan *client.Point
flushCh chan struct{}
doneCh chan struct{}

pending []*client.Point
sending []*client.Point
sendRes chan error
doneErr chan error
}

func newBatcher(re *RunEnv, cli client.Client, length int, interval time.Duration, retry ...retry.Option) *batcher {
b := &batcher{
re: re,
client: cli,
length: length,
interval: interval,
retryOpts: retry,

writeCh: make(chan *client.Point),
flushCh: make(chan struct{}, 1),
sendRes: make(chan error, 1),
doneCh: make(chan struct{}),
doneErr: make(chan error),

pending: nil,
sending: nil,
}

go b.background()

return b
}

func (b *batcher) background() {
tick := time.NewTicker(b.interval)
defer tick.Stop()

attemptFlush := func() {
if b.sending != nil {
// there's already a flush taking place.
return
}
select {
case b.flushCh <- struct{}{}:
default:
// there's a flush queued to be accepted.
}
}

for {
select {
case p := <-b.writeCh:
b.pending = append(b.pending, p)
if len(b.pending) >= b.length {
attemptFlush()
}

case err := <-b.sendRes:
if err == nil {
b.pending = b.pending[len(b.sending):]
b.re.RecordMessage("influxdb: uploaded %d points", len(b.sending))
} else {
b.re.RecordMessage("influxdb: failed to upload %d points; err: %s", len(b.sending), err)
}
b.sending = nil
if len(b.pending) >= b.length {
attemptFlush()
}

case <-tick.C:
attemptFlush()

case <-b.flushCh:
if b.sending != nil {
continue
}
l := len(b.pending)
if l == 0 {
continue
}
if l > b.length {
l = b.length
}
b.sending = b.pending[:l]
go b.send()

case <-b.doneCh:
if b.sending != nil {
// we are currently sending, wait for the send to finish first.
if err := <-b.sendRes; err == nil {
b.pending = b.pending[len(b.sending):]
b.re.RecordMessage("influxdb: uploaded %d points", len(b.sending))
} else {
b.re.RecordMessage("influxdb: failed to upload %d points; err: %s", len(b.sending), err)
}
}

var err error
if len(b.pending) > 0 {
// send all remaining data at once.
b.sending = b.pending
go b.send()
err = <-b.sendRes
if err == nil {
b.re.RecordMessage("influxdb: uploaded %d points", len(b.sending))
} else {
b.re.RecordMessage("influxdb: failed to upload %d points; err: %s", len(b.sending), err)
}
b.sending = nil
}
b.doneErr <- err
return
}
}
}

func (b *batcher) WritePoint(p *client.Point) {
b.writeCh <- p
}

// Close flushes any remaining points and returns any errors from the final flush.
func (b *batcher) Close() error {
select {
case _, ok := <-b.doneCh:
if !ok {
return nil
}
default:
}
close(b.doneCh)
return <-b.doneErr
}

func (b *batcher) send() {
points, err := client.NewBatchPoints(client.BatchPointsConfig{Database: "testground"})
if err != nil {
b.sendRes <- err
return
}

for _, p := range b.sending {
points.AddPoint(p)
}

err = retry.Do(func() error { return b.client.Write(points) }, b.retryOpts...)
b.sendRes <- err
}

type nilBatcher struct {
client.Client
}

func (n *nilBatcher) WritePoint(p *client.Point) {
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{Database: "testground"})
bp.AddPoint(p)
_ = n.Write(bp)
}

func (n *nilBatcher) Close() error {
return nil
}

var _ Batcher = (*nilBatcher)(nil)
Loading

0 comments on commit 207954a

Please sign in to comment.