diff --git a/go.mod b/go.mod index e33b271..bfaa946 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,17 @@ module github.com/testground/sdk-go go 1.14 require ( + github.com/avast/retry-go v2.6.0+incompatible github.com/dustin/go-humanize v1.0.0 github.com/go-redis/redis/v7 v7.2.0 + github.com/hashicorp/go-multierror v1.1.0 + github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d + github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.5.1 + github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 + github.com/stretchr/testify v1.4.0 go.uber.org/zap v1.14.1 + golang.org/x/net v0.0.0-20191112182307-2180aed22343 // indirect golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a + golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f // indirect ) diff --git a/go.sum b/go.sum index 76619d7..5b9b1b7 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/avast/retry-go v2.6.0+incompatible h1:FelcMrm7Bxacr1/RM8+/eqkDkmVN7tjlsy51dOzB3LI= +github.com/avast/retry-go v2.6.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -34,8 +36,14 @@ github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI= +github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d h1:/WZQPMZNsjZ7IlCpsLGdQBINg5bxKQ1K1sh6awxLtkA= +github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= @@ -62,6 +70,8 @@ github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= @@ -79,6 +89,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= +github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ= +github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -110,6 +122,8 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g= golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191112182307-2180aed22343 h1:00ohfJ4K98s3m6BGUoBd8nyfp4Yl0GoIKvw5abItTjI= +golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -135,6 +149,8 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f h1:kDxGY2VmgABOe55qheT/TFqUMtcTHnomIPS1iv3G4Ms= +golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/runtime/env.go b/runtime/env.go new file mode 100644 index 0000000..e15f780 --- /dev/null +++ b/runtime/env.go @@ -0,0 +1,19 @@ +package runtime + +const ( + EnvTestBranch = "TEST_BRANCH" + EnvTestCase = "TEST_CASE" + EnvTestGroupID = "TEST_GROUP_ID" + EnvTestGroupInstanceCount = "TEST_GROUP_INSTANCE_COUNT" + EnvTestInstanceCount = "TEST_INSTANCE_COUNT" + EnvTestInstanceParams = "TEST_INSTANCE_PARAMS" + EnvTestInstanceRole = "TEST_INSTANCE_ROLE" + EnvTestOutputsPath = "TEST_OUTPUTS_PATH" + EnvTestPlan = "TEST_PLAN" + EnvTestRepo = "TEST_REPO" + EnvTestRun = "TEST_RUN" + EnvTestSidecar = "TEST_SIDECAR" + EnvTestStartTime = "TEST_START_TIME" + EnvTestSubnet = "TEST_SUBNET" + EnvTestTag = "TEST_TAG" +) diff --git a/runtime/influxdb_batch.go b/runtime/influxdb_batch.go new file mode 100644 index 0000000..707492f --- /dev/null +++ b/runtime/influxdb_batch.go @@ -0,0 +1,187 @@ +package runtime + +import ( + "io" + "time" + + "github.com/avast/retry-go" + _ "github.com/influxdata/influxdb1-client" + client "github.com/influxdata/influxdb1-client/v2" +) + +type Batcher interface { + io.Closer + + WritePoint(p *client.Point) +} + +type batcher struct { + re *RunEnv + client client.Client + length int + interval time.Duration + retryOpts []retry.Option + + writeCh chan *client.Point + flushCh chan struct{} + doneCh chan struct{} + + pending []*client.Point + sending []*client.Point + sendRes chan error + doneErr chan error +} + +func newBatcher(re *RunEnv, cli client.Client, length int, interval time.Duration, retry ...retry.Option) *batcher { + b := &batcher{ + re: re, + client: cli, + length: length, + interval: interval, + retryOpts: retry, + + writeCh: make(chan *client.Point), + flushCh: make(chan struct{}, 1), + sendRes: make(chan error, 1), + doneCh: make(chan struct{}), + doneErr: make(chan error), + + pending: nil, + sending: nil, + } + + go b.background() + + return b +} + +func (b *batcher) background() { + tick := time.NewTicker(b.interval) + defer tick.Stop() + + attemptFlush := func() { + if b.sending != nil { + // there's already a flush taking place. + return + } + select { + case b.flushCh <- struct{}{}: + default: + // there's a flush queued to be accepted. + } + } + + for { + select { + case p := <-b.writeCh: + b.pending = append(b.pending, p) + if len(b.pending) >= b.length { + attemptFlush() + } + + case err := <-b.sendRes: + if err == nil { + b.pending = b.pending[len(b.sending):] + b.re.RecordMessage("influxdb: uploaded %d points", len(b.sending)) + } else { + b.re.RecordMessage("influxdb: failed to upload %d points; err: %s", len(b.sending), err) + } + b.sending = nil + if len(b.pending) >= b.length { + attemptFlush() + } + + case <-tick.C: + attemptFlush() + + case <-b.flushCh: + if b.sending != nil { + continue + } + l := len(b.pending) + if l == 0 { + continue + } + if l > b.length { + l = b.length + } + b.sending = b.pending[:l] + go b.send() + + case <-b.doneCh: + if b.sending != nil { + // we are currently sending, wait for the send to finish first. + if err := <-b.sendRes; err == nil { + b.pending = b.pending[len(b.sending):] + b.re.RecordMessage("influxdb: uploaded %d points", len(b.sending)) + } else { + b.re.RecordMessage("influxdb: failed to upload %d points; err: %s", len(b.sending), err) + } + } + + var err error + if len(b.pending) > 0 { + // send all remaining data at once. + b.sending = b.pending + go b.send() + err = <-b.sendRes + if err == nil { + b.re.RecordMessage("influxdb: uploaded %d points", len(b.sending)) + } else { + b.re.RecordMessage("influxdb: failed to upload %d points; err: %s", len(b.sending), err) + } + b.sending = nil + } + b.doneErr <- err + return + } + } +} + +func (b *batcher) WritePoint(p *client.Point) { + b.writeCh <- p +} + +// Close flushes any remaining points and returns any errors from the final flush. +func (b *batcher) Close() error { + select { + case _, ok := <-b.doneCh: + if !ok { + return nil + } + default: + } + close(b.doneCh) + return <-b.doneErr +} + +func (b *batcher) send() { + points, err := client.NewBatchPoints(client.BatchPointsConfig{Database: "testground"}) + if err != nil { + b.sendRes <- err + return + } + + for _, p := range b.sending { + points.AddPoint(p) + } + + err = retry.Do(func() error { return b.client.Write(points) }, b.retryOpts...) + b.sendRes <- err +} + +type nilBatcher struct { + client.Client +} + +func (n *nilBatcher) WritePoint(p *client.Point) { + bp, _ := client.NewBatchPoints(client.BatchPointsConfig{Database: "testground"}) + bp.AddPoint(p) + _ = n.Write(bp) +} + +func (n *nilBatcher) Close() error { + return nil +} + +var _ Batcher = (*nilBatcher)(nil) diff --git a/runtime/influxdb_batch_test.go b/runtime/influxdb_batch_test.go new file mode 100644 index 0000000..99acbce --- /dev/null +++ b/runtime/influxdb_batch_test.go @@ -0,0 +1,151 @@ +package runtime + +import ( + "testing" + "time" + + "github.com/avast/retry-go" + client "github.com/influxdata/influxdb1-client/v2" + "github.com/stretchr/testify/require" +) + +func TestLengthBatching(t *testing.T) { + re, cleanup := RandomTestRunEnv(t) + t.Cleanup(cleanup) + + tc := &testClient{} + b := newBatcher(re, tc, 16, 24*time.Hour) + + writePoints(t, b, 0, 36) + + time.Sleep(1 * time.Second) + + require := require.New(t) + + // we should've received two batches. + tc.RLock() + require.Len(tc.batchPoints, 2) + require.Len(tc.batchPoints[0].Points(), 16) + require.Len(tc.batchPoints[1].Points(), 16) + tc.RUnlock() + + require.NoError(b.Close()) + tc.RLock() + require.Len(tc.batchPoints, 3) + require.Len(tc.batchPoints[2].Points(), 4) + tc.RUnlock() +} + +func TestIntervalBatching(t *testing.T) { + re, cleanup := RandomTestRunEnv(t) + t.Cleanup(cleanup) + + tc := &testClient{} + b := newBatcher(re, tc, 1000, 500*time.Millisecond) + + writePoints(t, b, 0, 10) + + time.Sleep(2 * time.Second) + + require := require.New(t) + + // we should've received two batches. + tc.RLock() + require.Len(tc.batchPoints, 1) + require.Len(tc.batchPoints[0].Points(), 10) + tc.RUnlock() + + require.NoError(b.Close()) + tc.RLock() + require.Len(tc.batchPoints, 1) + tc.RUnlock() +} + +func TestBatchFailure(t *testing.T) { + re, cleanup := RandomTestRunEnv(t) + t.Cleanup(cleanup) + + test := func(b *batcher) func(t *testing.T) { + tc := &testClient{} + b.client = tc + + return func(t *testing.T) { + + // Enable failures. + tc.EnableFail(true) + + // Write three batches of 10 points each. + writePoints(t, b, 0, 10) + writePoints(t, b, 10, 10) + writePoints(t, b, 20, 10) + + time.Sleep(2 * time.Second) + + require := require.New(t) + + // we should've received the same batch many times. + tc.RLock() + require.Greater(len(tc.batchPoints), 1) + assertPointsExactly(t, tc.batchPoints[0], 0, 10) + tc.RUnlock() + + // get out of failure mode. + tc.EnableFail(false) + + // wait for the retries to be done. + time.Sleep(2 * time.Second) + + // now the last four elements should be: + // batch(0-9) (failed), batch(0-9) (ok), batch(10-19) (ok), batch(20-29) (ok) + tc.RLock() + require.Greater(len(tc.batchPoints), 1) + assertPointsExactly(t, tc.batchPoints[len(tc.batchPoints)-4], 0, 10) + assertPointsExactly(t, tc.batchPoints[len(tc.batchPoints)-3], 0, 10) + assertPointsExactly(t, tc.batchPoints[len(tc.batchPoints)-2], 10, 10) + assertPointsExactly(t, tc.batchPoints[len(tc.batchPoints)-1], 20, 10) + tc.RUnlock() + } + } + + t.Run("batches_by_length", test(newBatcher(re, nil, 10, 24*time.Hour, + retry.Attempts(3), + retry.Delay(100*time.Millisecond), + ))) + + t.Run("batches_by_time", test(newBatcher(re, nil, 10, 100*time.Millisecond, + retry.Attempts(3), + retry.Delay(100*time.Millisecond), + ))) + +} + +func writePoints(t *testing.T, b *batcher, offset, count int) { + t.Helper() + + for i := offset; i < offset+count; i++ { + tags := map[string]string{} + fields := map[string]interface{}{ + "i": i, + } + p, err := client.NewPoint("point", tags, fields) + if err != nil { + t.Fatal(err) + } + b.WritePoint(p) + } +} + +func assertPointsExactly(t *testing.T, bp client.BatchPoints, offset, length int) { + t.Helper() + + if l := len(bp.Points()); l != length { + t.Fatalf("length did not match; expected: %d, got %d", length, l) + } + + for i, p := range bp.Points() { + f, _ := p.Fields() + if actual := f["i"].(int64); int64(i+offset) != actual { + t.Fatalf("comparison failed; expected: %d, got %d", i+offset, actual) + } + } +} diff --git a/runtime/influxdb_client.go b/runtime/influxdb_client.go new file mode 100644 index 0000000..46729c2 --- /dev/null +++ b/runtime/influxdb_client.go @@ -0,0 +1,32 @@ +package runtime + +import ( + "fmt" + "os" + "time" + + _ "github.com/influxdata/influxdb1-client" // this is important because of the bug in go mod + client "github.com/influxdata/influxdb1-client/v2" +) + +const EnvInfluxDBURL = "INFLUXDB_URL" + +var ( + // TestInfluxDBClient sets a client for testing. If this value is set, + // NewInfluxDBClient will always return it. + TestInfluxDBClient client.Client +) + +func NewInfluxDBClient(re *RunEnv) (client.Client, error) { + if TestInfluxDBClient != nil { + return TestInfluxDBClient, nil + } + + addr := os.Getenv(EnvInfluxDBURL) + if addr == "" { + return nil, fmt.Errorf("no InfluxDB URL in $%s env var", EnvInfluxDBURL) + } + + cfg := client.HTTPConfig{Addr: addr, Timeout: 5 * time.Second} + return client.NewHTTPClient(cfg) +} diff --git a/runtime/influxdb_client_test.go b/runtime/influxdb_client_test.go new file mode 100644 index 0000000..76e505b --- /dev/null +++ b/runtime/influxdb_client_test.go @@ -0,0 +1,55 @@ +package runtime + +import ( + "fmt" + "sync" + "time" + + _ "github.com/influxdata/influxdb1-client" + client "github.com/influxdata/influxdb1-client/v2" +) + +type testClient struct { + sync.RWMutex + + fail bool + batchPoints []client.BatchPoints +} + +var _ client.Client = (*testClient)(nil) + +func (t *testClient) EnableFail(fail bool) { + t.Lock() + defer t.Unlock() + + t.fail = fail +} + +func (t *testClient) Ping(_ time.Duration) (time.Duration, string, error) { + return 0, "", nil +} + +func (t *testClient) Write(bp client.BatchPoints) error { + t.Lock() + defer t.Unlock() + + t.batchPoints = append(t.batchPoints, bp) + + var err error + if t.fail { + err = fmt.Errorf("error") + } + return err +} + +func (t *testClient) Query(_ client.Query) (*client.Response, error) { + return nil, nil +} + +func (t *testClient) QueryAsChunk(_ client.Query) (*client.ChunkedResponse, error) { + return nil, nil +} + +func (t *testClient) Close() error { + return nil +} diff --git a/runtime/metrics.go b/runtime/metrics.go index a3a46a6..a071f0c 100644 --- a/runtime/metrics.go +++ b/runtime/metrics.go @@ -1,171 +1,177 @@ package runtime import ( - "context" - "io" - "net/http" + "encoding/json" "os" - "path" - "strconv" + "path/filepath" + "strings" "time" - "github.com/prometheus/client_golang/prometheus" -) - -// Type aliases to hide implementation details in the APIs. -type ( - Counter = prometheus.Counter - Gauge = prometheus.Gauge - Histogram = prometheus.Histogram - Summary = prometheus.Summary - - CounterOpts = prometheus.CounterOpts - GaugeOpts = prometheus.GaugeOpts - HistogramOpts = prometheus.HistogramOpts - SummaryOpts = prometheus.SummaryOpts - - CounterVec = prometheus.CounterVec - GaugeVec = prometheus.GaugeVec - HistogramVec = prometheus.HistogramVec - SummaryVec = prometheus.SummaryVec + "github.com/hashicorp/go-multierror" + _ "github.com/influxdata/influxdb1-client" + client "github.com/influxdata/influxdb1-client/v2" + "github.com/rcrowley/go-metrics" ) type Metrics struct { - runenv *RunEnv + re *RunEnv + diagnostics *MetricsApi + results *MetricsApi + influxdb client.Client + batcher Batcher + tags map[string]string } -func (*Metrics) NewCounter(o CounterOpts) Counter { - m := prometheus.NewCounter(o) - switch err := prometheus.Register(m); err.(type) { - case nil, prometheus.AlreadyRegisteredError: - default: - panic(err) +func newMetrics(re *RunEnv) *Metrics { + m := &Metrics{re: re} + + var dsinks = []MetricSinkFn{m.logSinkJSON("diagnostics.out")} + if client, err := NewInfluxDBClient(re); err == nil { + m.tags = map[string]string{ + "plan": re.TestPlan, + "case": re.TestCase, + "run": re.TestRun, + "group_id": re.TestGroupID, + } + + m.influxdb = client + if InfluxBatching { + m.batcher = newBatcher(re, client, InfluxBatchLength, InfluxBatchInterval, InfluxBatchRetryOpts(re)...) + } else { + m.batcher = &nilBatcher{client} + } + + dsinks = append(dsinks, m.writeToInfluxDBSink("diagnostics")) + } else { + re.RecordMessage("InfluxDB unavailable; no metrics will be dispatched: %s", err) } + + m.diagnostics = newMetricsApi(re, metricsApiOpts{ + freq: 5 * time.Second, + preregister: metrics.RegisterRuntimeMemStats, + callbacks: []func(metrics.Registry){metrics.CaptureRuntimeMemStatsOnce}, + sinks: dsinks, + }) + + m.results = newMetricsApi(re, metricsApiOpts{ + freq: 1 * time.Second, + sinks: []MetricSinkFn{m.logSinkJSON("results.out")}, + }) + return m } -func (*Metrics) NewGauge(o GaugeOpts) Gauge { - m := prometheus.NewGauge(o) - switch err := prometheus.Register(m); err.(type) { - case nil, prometheus.AlreadyRegisteredError: - default: - panic(err) - } - return m +func (m *Metrics) R() *MetricsApi { + return m.results } -func (*Metrics) NewHistogram(o HistogramOpts) Histogram { - m := prometheus.NewHistogram(o) - switch err := prometheus.Register(m); err.(type) { - case nil, prometheus.AlreadyRegisteredError: - default: - panic(err) - } - return m +func (m *Metrics) D() *MetricsApi { + return m.diagnostics } -func (*Metrics) NewSummary(o SummaryOpts) Summary { - m := prometheus.NewSummary(o) - switch err := prometheus.Register(m); err.(type) { - case nil, prometheus.AlreadyRegisteredError: - default: - panic(err) +func (m *Metrics) Close() error { + var err *multierror.Error + + // close diagnostics; this stops the ticker and any further observations on + // runenv.D() will fail/panic. + err = multierror.Append(err, m.diagnostics.Close()) + + // close results; no more results via runenv.R() can be recorded. + err = multierror.Append(err, m.results.Close()) + + if m.influxdb != nil { + // Next, we reopen the results.out file, and write all points to InfluxDB. + results := filepath.Join(m.re.TestOutputsPath, "results.out") + if file, errf := os.OpenFile(results, os.O_RDONLY, 0666); errf == nil { + err = multierror.Append(err, m.batchInsertInfluxDB(file)) + } else { + err = multierror.Append(err, errf) + } } - return m -} -func (*Metrics) NewCounterVec(o CounterOpts, labels ...string) *CounterVec { - m := prometheus.NewCounterVec(o, labels) - switch err := prometheus.Register(m); err.(type) { - case nil, prometheus.AlreadyRegisteredError: - default: - panic(err) + // Flush the immediate InfluxDB writer. + if m.batcher != nil { + err = multierror.Append(err, m.batcher.Close()) } - return m + + // Now we're ready to close InfluxDB. + if m.influxdb != nil { + err = multierror.Append(err, m.influxdb.Close()) + } + + return err.ErrorOrNil() } -func (*Metrics) NewGaugeVec(o GaugeOpts, labels ...string) *GaugeVec { - m := prometheus.NewGaugeVec(o, labels) - switch err := prometheus.Register(m); err.(type) { - case nil, prometheus.AlreadyRegisteredError: - default: - panic(err) +func (m *Metrics) batchInsertInfluxDB(results *os.File) error { + sink := m.writeToInfluxDBSink("results") + + for dec := json.NewDecoder(results); dec.More(); { + var me Metric + if err := dec.Decode(&me); err != nil { + m.re.RecordMessage("failed to decode Metric from results.out: %s", err) + continue + } + + if err := sink(&me); err != nil { + m.re.RecordMessage("failed to process Metric from results.out: %s", err) + } } - return m + return nil } -func (*Metrics) NewHistogramVec(o HistogramOpts, labels ...string) *HistogramVec { - m := prometheus.NewHistogramVec(o, labels) - switch err := prometheus.Register(m); err.(type) { - case nil, prometheus.AlreadyRegisteredError: - default: +func (m *Metrics) logSinkJSON(filename string) MetricSinkFn { + f, err := m.re.CreateRawAsset(filename) + if err != nil { panic(err) } - return m + + enc := json.NewEncoder(f) + return func(m *Metric) error { + return enc.Encode(m) + } } -func (*Metrics) NewSummaryVec(o SummaryOpts, labels ...string) *SummaryVec { - m := prometheus.NewSummaryVec(o, labels) - switch err := prometheus.Register(m); err.(type) { - case nil, prometheus.AlreadyRegisteredError: - default: - panic(err) +func (m *Metrics) writeToInfluxDBSink(measurement string) MetricSinkFn { + return func(metric *Metric) error { + fields := make(map[string]interface{}, len(metric.Measures)) + for k, v := range metric.Measures { + key := strings.Join([]string{metric.Name, metric.Type.String(), k}, ".") + fields[key] = v + } + + p, err := client.NewPoint(measurement, m.tags, fields, time.Unix(0, metric.Timestamp)) + if err != nil { + return err + } + m.batcher.WritePoint(p) + return nil } - return m } -// HTTPPeriodicSnapshots periodically fetches the snapshots from the given address -// and outputs them to the out directory. Every file will be in the format timestamp.out. -func (re *RunEnv) HTTPPeriodicSnapshots(ctx context.Context, addr string, dur time.Duration, outDir string) error { - err := os.MkdirAll(path.Join(re.TestOutputsPath, outDir), 0777) - if err != nil { - return err +func (m *Metrics) recordEvent(evt *Event) { + if m.influxdb == nil { + return } - nextFile := func() (*os.File, error) { - timestamp := strconv.FormatInt(time.Now().Unix(), 10) - return os.Create(path.Join(re.TestOutputsPath, outDir, timestamp+".out")) + // this map copy is terrible; the influxdb v2 SDK makes points mutable. + tags := make(map[string]string, len(m.tags)+1) + for k, v := range m.tags { + tags[k] = v } - go func() { - ticker := time.NewTicker(dur) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - func() { - req, err := http.NewRequestWithContext(ctx, "GET", addr, nil) - if err != nil { - re.RecordMessage("error while creating http request: %v", err) - return - } - - resp, err := http.DefaultClient.Do(req) - if err != nil { - re.RecordMessage("error while scraping http endpoint: %v", err) - return - } - defer resp.Body.Close() - - file, err := nextFile() - if err != nil { - re.RecordMessage("error while getting metrics output file: %v", err) - return - } - defer file.Close() - - _, err = io.Copy(file, resp.Body) - if err != nil { - re.RecordMessage("error while copying data to file: %v", err) - return - } - }() - } - } - }() + tags["type"] = string(evt.Type) - return nil + if evt.Outcome != "" { + tags["outcome"] = string(evt.Outcome) + } + + f := map[string]interface{}{ + "error": evt.Error, + } + + p, err := client.NewPoint("events", tags, f) + if err != nil { + m.re.RecordMessage("failed to create InfluxDB point: %s", err) + } + m.batcher.WritePoint(p) } diff --git a/runtime/metrics_api.go b/runtime/metrics_api.go new file mode 100644 index 0000000..f01ac09 --- /dev/null +++ b/runtime/metrics_api.go @@ -0,0 +1,181 @@ +package runtime + +import ( + "sync" + "time" + + "github.com/rcrowley/go-metrics" +) + +// Type aliases to hide implementation details in the APIs. +type ( + Counter = metrics.Counter + EWMA = metrics.EWMA + Gauge = metrics.GaugeFloat64 + Histogram = metrics.Histogram + Meter = metrics.Meter + Sample = metrics.Sample + Timer = metrics.Timer + Point float64 +) + +type MetricSinkFn func(m *Metric) error + +type MetricsApi struct { + // re is the RunEnv this MetricsApi object is attached to. + re *RunEnv + + // reg is the go-metrics Registry this MetricsApi object creates metrics under. + reg metrics.Registry + + // sinks to invoke when a new observation has been made. + // 1) data points are sent immediately. + // 2) aggregated metrics are sent periodically, based on freq. + sinks []MetricSinkFn + + // freq is the frequency with which to materialize aggregated metrics. + freq time.Duration + + // callbacks are callbacks functions to call on every tick. + callbacks []func(registry metrics.Registry) + + wg sync.WaitGroup + freqChangeCh chan time.Duration + doneCh chan struct{} +} + +type metricsApiOpts struct { + freq time.Duration + preregister func(registry metrics.Registry) + callbacks []func(registry metrics.Registry) + sinks []MetricSinkFn +} + +func newMetricsApi(re *RunEnv, opts metricsApiOpts) *MetricsApi { + m := &MetricsApi{ + re: re, + reg: metrics.NewRegistry(), + sinks: opts.sinks, + freq: opts.freq, + callbacks: opts.callbacks, + freqChangeCh: make(chan time.Duration), + doneCh: make(chan struct{}), + } + + if opts.preregister != nil { + opts.preregister(m.reg) + } + + m.wg.Add(1) + go m.background() + return m +} + +func (m *MetricsApi) background() { + var ( + tick *time.Ticker + c <-chan time.Time + ) + + defer m.wg.Done() + + // resetTicker resets the ticker to a new frequency. + resetTicker := func(d time.Duration) { + if tick != nil { + tick.Stop() + tick = nil + c = nil + } + if d <= 0 { + return + } + tick = time.NewTicker(d) + c = tick.C + } + + // Will stop and nullify the ticker. + defer resetTicker(0) + + // Set the initial tick frequency. + resetTicker(m.freq) + + for { + select { + case <-c: + for _, a := range m.callbacks { + a(m.reg) + } + m.reg.Each(m.broadcast) + + case f := <-m.freqChangeCh: + m.freq = f + resetTicker(f) + + case <-m.doneCh: + return + } + } +} + +// broadcast sends an observation to all emitters. +func (m *MetricsApi) broadcast(name string, obj interface{}) { + metric := NewMetric(name, obj) + defer metric.Release() + + for _, sink := range m.sinks { + if err := sink(metric); err != nil { + m.re.RecordMessage("failed to emit metric: %s", err) + } + } +} + +func (m *MetricsApi) Close() error { + close(m.doneCh) + m.wg.Wait() + + return nil +} + +func (m *MetricsApi) SetFrequency(freq time.Duration) { + m.freqChangeCh <- freq +} + +func (m *MetricsApi) RecordPoint(name string, value float64) { + m.broadcast(name, Point(value)) +} + +func (m *MetricsApi) Counter(name string) Counter { + return m.reg.GetOrRegister(name, metrics.NewCounter()).(metrics.Counter) +} + +func (m *MetricsApi) EWMA(name string, alpha float64) EWMA { + return m.reg.GetOrRegister(name, metrics.NewEWMA(alpha)).(metrics.EWMA) +} + +func (m *MetricsApi) Gauge(name string) Gauge { + return m.reg.GetOrRegister(name, metrics.NewGaugeFloat64()).(metrics.GaugeFloat64) +} + +func (m *MetricsApi) GaugeF(name string, f func() float64) Gauge { + return m.reg.GetOrRegister(name, metrics.NewFunctionalGaugeFloat64(f)).(metrics.GaugeFloat64) +} + +func (m *MetricsApi) Histogram(name string, s Sample) Histogram { + return m.reg.GetOrRegister(name, metrics.NewHistogram(s)).(metrics.Histogram) +} + +func (m *MetricsApi) Meter(name string) Meter { + return m.reg.GetOrRegister(name, metrics.NewMeter()).(metrics.Meter) +} + +func (m *MetricsApi) Timer(name string) Timer { + return m.reg.GetOrRegister(name, metrics.NewTimer()).(metrics.Timer) +} + +func (m *MetricsApi) NewExpDecaySample(reservoirSize int, alpha float64) Sample { + return metrics.NewExpDecaySample(reservoirSize, alpha) +} + +func (m *MetricsApi) NewUniformSample(reservoirSize int) Sample { + return metrics.NewUniformSample(reservoirSize) +} diff --git a/runtime/metrics_types.go b/runtime/metrics_types.go new file mode 100644 index 0000000..12e1b21 --- /dev/null +++ b/runtime/metrics_types.go @@ -0,0 +1,165 @@ +package runtime + +import ( + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/rcrowley/go-metrics" +) + +type MetricType int + +const ( + MetricPoint MetricType = iota + MetricCounter + MetricEWMA + MetricGauge + MetricHistogram + MetricMeter + MetricTimer +) + +var typeMappings = [...]string{"point", "counter", "ewma", "gauge", "histogram", "meter", "timer"} + +func (mt MetricType) String() string { + return typeMappings[mt] +} + +func (mt MetricType) MarshalJSON() ([]byte, error) { + return json.Marshal(mt.String()) +} + +// UnmarshalJSON is only used for testing; it's inefficient but not relevant. +func (mt *MetricType) UnmarshalJSON(b []byte) error { + var s string + if err := json.Unmarshal(b, &s); err != nil { + return nil + } + for i, m := range typeMappings { + if m == s { + *mt = MetricType(i) + return nil + } + } + return fmt.Errorf("invalid metric type") +} + +var pools = func() (p [7]sync.Pool) { + for i := range p { + p[i].New = func() interface{} { + return &Metric{Type: MetricType(i), Measures: make(map[string]interface{}, 1)} + } + } + return p +}() + +type Metric struct { + Timestamp int64 `json:"ts"` + Type MetricType `json:"type"` + Name string `json:"name"` + Measures map[string]interface{} `json:"measures"` +} + +func (m *Metric) Release() { + pools[m.Type].Put(m) +} + +func NewMetric(name string, i interface{}) *Metric { + var ( + m *Metric + t MetricType + ts = time.Now().UnixNano() + ) + + switch v := i.(type) { + case Point: + t = MetricPoint + m = pools[t].Get().(*Metric) + m.Measures["value"] = float64(v) + + case Counter: + t = MetricCounter + m = pools[t].Get().(*Metric) + s := v.Snapshot() + m.Measures["count"] = s.Count() + + case EWMA: + t = MetricEWMA + m = pools[t].Get().(*Metric) + s := v.Snapshot() + m.Measures["rate"] = s.Rate() + + case Gauge: + t = MetricGauge + m = pools[t].Get().(*Metric) + s := v.Snapshot() + m.Measures["value"] = s.Value() + + case metrics.Gauge: + t = MetricGauge + m = pools[t].Get().(*Metric) + s := v.Snapshot() + m.Measures["value"] = float64(s.Value()) + + case Histogram: + t = MetricHistogram + m = pools[t].Get().(*Metric) + s := v.Snapshot() + p := s.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999}) + m.Measures["count"] = float64(s.Count()) + m.Measures["max"] = float64(s.Max()) + m.Measures["mean"] = s.Mean() + m.Measures["min"] = float64(s.Min()) + m.Measures["stddev"] = s.StdDev() + m.Measures["variance"] = s.Variance() + m.Measures["p50"] = p[0] + m.Measures["p75"] = p[1] + m.Measures["p95"] = p[2] + m.Measures["p99"] = p[3] + m.Measures["p999"] = p[4] + m.Measures["p9999"] = p[5] + + case Meter: + t = MetricMeter + m = pools[t].Get().(*Metric) + s := v.Snapshot() + m.Measures["count"] = float64(s.Count()) + m.Measures["m1"] = s.Rate1() + m.Measures["m5"] = s.Rate5() + m.Measures["m15"] = s.Rate15() + m.Measures["mean"] = s.RateMean() + + case Timer: + t = MetricTimer + m = pools[t].Get().(*Metric) + s := v.Snapshot() + p := s.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999}) + m.Measures["count"] = float64(s.Count()) + m.Measures["max"] = float64(s.Max()) + m.Measures["mean"] = s.Mean() + m.Measures["min"] = float64(s.Min()) + m.Measures["stddev"] = s.StdDev() + m.Measures["variance"] = s.Variance() + m.Measures["p50"] = p[0] + m.Measures["p75"] = p[1] + m.Measures["p95"] = p[2] + m.Measures["p99"] = p[3] + m.Measures["p999"] = p[4] + m.Measures["p9999"] = p[5] + m.Measures["m1"] = s.Rate1() + m.Measures["m5"] = s.Rate5() + m.Measures["m15"] = s.Rate15() + m.Measures["meanrate"] = s.RateMean() + + default: + panic("unexpected metric type") + + } + + m.Timestamp = ts + m.Type = t + m.Name = name + return m +} diff --git a/runtime/output.go b/runtime/output.go deleted file mode 100644 index 9b3cd47..0000000 --- a/runtime/output.go +++ /dev/null @@ -1,200 +0,0 @@ -package runtime - -import ( - "fmt" - "runtime/debug" - - "go.uber.org/zap" - "go.uber.org/zap/zapcore" -) - -type ( - EventType string - EventOutcome string -) - -const ( - EventTypeStart = EventType("start") - EventTypeMessage = EventType("message") - EventTypeMetric = EventType("metric") - EventTypeFinish = EventType("finish") - - EventOutcomeOK = EventOutcome("ok") - EventOutcomeFailed = EventOutcome("failed") - EventOutcomeCrashed = EventOutcome("crashed") -) - -type Event struct { - Type EventType `json:"type"` - Outcome EventOutcome `json:"outcome,omitempty"` - Error string `json:"error,omitempty"` - Stacktrace string `json:"stacktrace,omitempty"` - Message string `json:"message,omitempty"` - Metric *MetricValue `json:"metric,omitempty"` - Runenv *RunParams `json:"runenv,omitempty"` -} - -type MetricDefinition struct { - Name string `json:"name"` - Unit string `json:"unit"` - ImprovementDir int `json:"dir"` -} - -type MetricValue struct { - MetricDefinition - Value float64 `json:"value"` -} - -func (e Event) MarshalLogObject(oe zapcore.ObjectEncoder) error { - oe.AddString("type", string(e.Type)) - - if e.Outcome != "" { - oe.AddString("outcome", string(e.Outcome)) - } - if e.Error != "" { - oe.AddString("error", e.Error) - } - if e.Stacktrace != "" { - oe.AddString("stacktrace", e.Stacktrace) - } - if e.Message != "" { - oe.AddString("message", e.Message) - } - if e.Metric != nil { - if err := oe.AddObject("metric", e.Metric); err != nil { - return err - } - } - if e.Runenv != nil { - if err := oe.AddObject("runenv", e.Runenv); err != nil { - return err - } - } - - return nil -} - -func (m MetricValue) MarshalLogObject(oe zapcore.ObjectEncoder) error { - if m.Name == "" { - return nil - } - oe.AddString("name", m.Name) - oe.AddString("unit", m.Unit) - oe.AddInt("dir", m.ImprovementDir) - oe.AddFloat64("value", m.Value) - return nil -} - -func (r *RunParams) MarshalLogObject(oe zapcore.ObjectEncoder) error { - oe.AddString("plan", r.TestPlan) - oe.AddString("case", r.TestCase) - if err := oe.AddReflected("params", r.TestInstanceParams); err != nil { - return err - } - oe.AddInt("instances", r.TestInstanceCount) - oe.AddString("outputs_path", r.TestOutputsPath) - oe.AddString("network", func() string { - if r.TestSubnet == nil { - return "" - } - return r.TestSubnet.Network() - }()) - - oe.AddString("group", r.TestGroupID) - oe.AddInt("group_instances", r.TestGroupInstanceCount) - - if r.TestRepo != "" { - oe.AddString("repo", r.TestRepo) - } - if r.TestCommit != "" { - oe.AddString("commit", r.TestCommit) - } - if r.TestBranch != "" { - oe.AddString("branch", r.TestBranch) - } - if r.TestTag != "" { - oe.AddString("tag", r.TestTag) - } - return nil -} - -// RecordMessage records an informational message. -func (l *logger) RecordMessage(msg string, a ...interface{}) { - if len(a) > 0 { - msg = fmt.Sprintf(msg, a...) - } - evt := Event{ - Type: EventTypeMessage, - Message: msg, - } - l.logger.Info("", zap.Object("event", evt)) -} - -func (l *logger) RecordStart() { - evt := Event{ - Type: EventTypeStart, - Runenv: l.runenv, - } - - l.logger.Info("", zap.Object("event", evt)) -} - -// RecordSuccess records that the calling instance succeeded. -func (l *logger) RecordSuccess() { - evt := Event{ - Type: EventTypeFinish, - Outcome: EventOutcomeOK, - } - l.logger.Info("", zap.Object("event", evt)) -} - -// RecordFailure records that the calling instance failed with the supplied -// error. -func (l *logger) RecordFailure(err error) { - evt := Event{ - Type: EventTypeFinish, - Outcome: EventOutcomeFailed, - Error: err.Error(), - } - l.logger.Info("", zap.Object("event", evt)) -} - -// RecordCrash records that the calling instance crashed/panicked with the -// supplied error. -func (l *logger) RecordCrash(err interface{}) { - evt := Event{ - Type: EventTypeFinish, - Outcome: EventOutcomeFailed, - Error: fmt.Sprintf("%s", err), - Stacktrace: string(debug.Stack()), - } - l.logger.Error("", zap.Object("event", evt)) -} - -// RecordMetric records a metric event associated with the provided metric -// definition, giving it value `value`. -func (l *logger) RecordMetric(metric *MetricDefinition, value float64) { - evt := Event{ - Type: EventTypeMetric, - Metric: &MetricValue{ - MetricDefinition: *metric, - Value: value, - }, - } - l.logger.Info("", zap.Object("event", evt)) -} - -// Message prints out an informational message. -// -// Deprecated: use RecordMessage. -func (r *RunEnv) Message(msg string, a ...interface{}) { - r.RecordMessage(msg, a...) -} - -// EmitMetric outputs a metric event associated with the provided metric -// definition, giving it value `value`. -// -// Deprecated: use RecordMetric. -func (r *RunEnv) EmitMetric(metric *MetricDefinition, value float64) { - r.RecordMetric(metric, value) -} diff --git a/runtime/runenv.go b/runtime/runenv.go index 05add22..243f5a7 100644 --- a/runtime/runenv.go +++ b/runtime/runenv.go @@ -1,219 +1,125 @@ package runtime import ( - "encoding/json" - "fmt" - "net" "os" - "strconv" - "strings" + "sync" "time" - "github.com/dustin/go-humanize" + "github.com/avast/retry-go" + "github.com/hashicorp/go-multierror" + _ "github.com/influxdata/influxdb1-client" // this is important because of the bug in go mod "go.uber.org/zap" ) -const ( - EnvTestBranch = "TEST_BRANCH" - EnvTestCase = "TEST_CASE" - EnvTestGroupID = "TEST_GROUP_ID" - EnvTestGroupInstanceCount = "TEST_GROUP_INSTANCE_COUNT" - EnvTestInstanceCount = "TEST_INSTANCE_COUNT" - EnvTestInstanceParams = "TEST_INSTANCE_PARAMS" - EnvTestInstanceRole = "TEST_INSTANCE_ROLE" - EnvTestOutputsPath = "TEST_OUTPUTS_PATH" - EnvTestPlan = "TEST_PLAN" - EnvTestRepo = "TEST_REPO" - EnvTestRun = "TEST_RUN" - EnvTestSidecar = "TEST_SIDECAR" - EnvTestStartTime = "TEST_START_TIME" - EnvTestSubnet = "TEST_SUBNET" - EnvTestTag = "TEST_TAG" +var ( + InfluxBatching = true + InfluxBatchLength = 128 + InfluxBatchInterval = 1 * time.Second + InfluxBatchRetryOpts = func(re *RunEnv) []retry.Option { + return []retry.Option{ + retry.Attempts(5), + retry.Delay(500 * time.Millisecond), + retry.OnRetry(func(n uint, err error) { + re.RecordMessage("failed to send batch to InfluxDB; attempt %d; err: %s", n, err) + }), + } + } ) -type IPNet struct { - net.IPNet -} +// RunEnv encapsulates the context for this test run. +type RunEnv struct { + RunParams -func (i IPNet) MarshalJSON() ([]byte, error) { - if len(i.IPNet.IP) == 0 { - return json.Marshal("") - } - return json.Marshal(i.String()) -} + logger *zap.Logger + metrics *Metrics -func (i *IPNet) UnmarshalJSON(data []byte) error { - var s string - if err := json.Unmarshal(data, &s); err != nil { - return err - } + wg sync.WaitGroup + closeCh chan struct{} + assetsErr error - if s == "" { - return nil + unstructured struct { + files []*os.File + ch chan *os.File } - - _, ipnet, err := net.ParseCIDR(s) - if err != nil { - return err + structured struct { + loggers []*zap.Logger + ch chan *zap.Logger } - - i.IPNet = *ipnet - return nil } -// RunParams encapsulates the runtime parameters for this test. -type RunParams struct { - TestPlan string `json:"plan"` - TestCase string `json:"case"` - TestRun string `json:"run"` - - TestRepo string `json:"repo,omitempty"` - TestCommit string `json:"commit,omitempty"` - TestBranch string `json:"branch,omitempty"` - TestTag string `json:"tag,omitempty"` - - TestOutputsPath string `json:"outputs_path,omitempty"` - - TestInstanceCount int `json:"instances"` - TestInstanceRole string `json:"role,omitempty"` - TestInstanceParams map[string]string `json:"params,omitempty"` - - TestGroupID string `json:"group,omitempty"` - TestGroupInstanceCount int `json:"group_instances,omitempty"` - - // true if the test has access to the sidecar. - TestSidecar bool `json:"test_sidecar,omitempty"` - - // The subnet on which this test is running. - // - // The test instance can use this to pick an IP address and/or determine - // the "data" network interface. - // - // This will be 127.1.0.0/16 when using the local exec runner. - TestSubnet *IPNet `json:"network,omitempty"` - TestStartTime time.Time `json:"start_time,omitempty"` -} - -// RunEnv encapsulates the context for this test run. -type RunEnv struct { - RunParams - *logger - - metrics *Metrics - unstructured chan *os.File - structured chan *zap.Logger +func (re *RunEnv) SLogger() *zap.SugaredLogger { + return re.logger.Sugar() } // NewRunEnv constructs a runtime environment from the given runtime parameters. func NewRunEnv(params RunParams) *RunEnv { re := &RunEnv{ RunParams: params, - - structured: make(chan *zap.Logger, 32), - unstructured: make(chan *os.File, 32), + closeCh: make(chan struct{}), } + re.initLogger() - re.metrics = &Metrics{re} - re.logger = newLogger(&re.RunParams) - return re -} - -// M returns an object that groups the metrics facilities. -func (re *RunEnv) M() *Metrics { - return re.metrics -} + re.structured.ch = make(chan *zap.Logger) + re.unstructured.ch = make(chan *os.File) -func (re *RunEnv) Close() error { - close(re.structured) - close(re.unstructured) + re.wg.Add(1) + go re.manageAssets() - if l := re.logger; l != nil { - _ = l.SLogger().Sync() - } + re.metrics = newMetrics(re) - for l := range re.structured { - _ = l.Sync() // ignore errors. - } + return re +} - for f := range re.unstructured { - _ = f.Close() // ignore errors. - } - return nil +// R returns a metrics object for results. +func (re *RunEnv) R() *MetricsApi { + return re.metrics.R() } -func (re *RunParams) ToEnvVars() map[string]string { - packParams := func(in map[string]string) string { - arr := make([]string, 0, len(in)) - for k, v := range in { - arr = append(arr, k+"="+v) - } - return strings.Join(arr, "|") - } +// D returns a metrics object for diagnostics. +func (re *RunEnv) D() *MetricsApi { + return re.metrics.D() +} - out := map[string]string{ - EnvTestBranch: re.TestBranch, - EnvTestCase: re.TestCase, - EnvTestGroupID: re.TestGroupID, - EnvTestGroupInstanceCount: strconv.Itoa(re.TestGroupInstanceCount), - EnvTestInstanceCount: strconv.Itoa(re.TestInstanceCount), - EnvTestInstanceParams: packParams(re.TestInstanceParams), - EnvTestInstanceRole: re.TestInstanceRole, - EnvTestOutputsPath: re.TestOutputsPath, - EnvTestPlan: re.TestPlan, - EnvTestRepo: re.TestRepo, - EnvTestRun: re.TestRun, - EnvTestSidecar: strconv.FormatBool(re.TestSidecar), - EnvTestStartTime: re.TestStartTime.Format(time.RFC3339), - EnvTestSubnet: re.TestSubnet.String(), - EnvTestTag: re.TestTag, - } +func (re *RunEnv) manageAssets() { + defer re.wg.Done() - return out -} + var err *multierror.Error + defer func() { re.assetsErr = err.ErrorOrNil() }() -func unpackParams(packed string) map[string]string { - spltparams := strings.Split(packed, "|") - params := make(map[string]string, len(spltparams)) - for _, s := range spltparams { - v := strings.Split(s, "=") - if len(v) != 2 { - continue + for { + select { + case f := <-re.unstructured.ch: + re.unstructured.files = append(re.unstructured.files, f) + case l := <-re.structured.ch: + re.structured.loggers = append(re.structured.loggers, l) + case <-re.closeCh: + for _, f := range re.unstructured.files { + err = multierror.Append(err, f.Close()) + } + for _, l := range re.structured.loggers { + err = multierror.Append(err, l.Sync()) + } + return } - params[v[0]] = v[1] } - return params } -func toInt(s string) int { - v, err := strconv.Atoi(s) - if err != nil { - return -1 - } - return v -} +func (re *RunEnv) Close() error { + var err *multierror.Error -func toBool(s string) bool { - v, _ := strconv.ParseBool(s) - return v -} + // close metrics. + err = multierror.Append(err, re.metrics.Close()) -// toNet might parse any input, so it is possible to get an error and nil return value -func toNet(s string) *IPNet { - _, ipnet, err := net.ParseCIDR(s) - if err != nil { - return nil - } - return &IPNet{IPNet: *ipnet} -} + // This close stops monitoring the wapi errors channel, and closes assets. + close(re.closeCh) + re.wg.Wait() + err = multierror.Append(err, re.assetsErr) -// Try to parse the time. -// Failing to do so, return a zero value time -func toTime(s string) time.Time { - t, err := time.Parse(time.RFC3339, s) - if err != nil { - return time.Time{} + if l := re.logger; l != nil { + _ = l.Sync() } - return t + + return err.ErrorOrNil() } // CurrentRunEnv populates a test context from environment vars. @@ -222,32 +128,6 @@ func CurrentRunEnv() *RunEnv { return re } -// ParseRunParams parses a list of environment variables into a RunParams. -func ParseRunParams(env []string) (*RunParams, error) { - m, err := ParseKeyValues(env) - if err != nil { - return nil, err - } - - return &RunParams{ - TestBranch: m[EnvTestBranch], - TestCase: m[EnvTestCase], - TestGroupID: m[EnvTestGroupID], - TestGroupInstanceCount: toInt(m[EnvTestGroupInstanceCount]), - TestInstanceCount: toInt(m[EnvTestInstanceCount]), - TestInstanceParams: unpackParams(m[EnvTestInstanceParams]), - TestInstanceRole: m[EnvTestInstanceRole], - TestOutputsPath: m[EnvTestOutputsPath], - TestPlan: m[EnvTestPlan], - TestRepo: m[EnvTestRepo], - TestRun: m[EnvTestRun], - TestSidecar: toBool(m[EnvTestSidecar]), - TestStartTime: toTime(EnvTestStartTime), - TestSubnet: toNet(m[EnvTestSubnet]), - TestTag: m[EnvTestTag], - }, nil -} - // ParseRunEnv parses a list of environment variables into a RunEnv. func ParseRunEnv(env []string) (*RunEnv, error) { p, err := ParseRunParams(env) @@ -257,116 +137,3 @@ func ParseRunEnv(env []string) (*RunEnv, error) { return NewRunEnv(*p), nil } - -// IsParamSet checks if a certain parameter is set. -func (re *RunParams) IsParamSet(name string) bool { - _, ok := re.TestInstanceParams[name] - return ok -} - -// StringParam returns a string parameter, or "" if the parameter is not set. -func (re *RunParams) StringParam(name string) string { - v, ok := re.TestInstanceParams[name] - if !ok { - panic(fmt.Errorf("%s was not set", name)) - } - return v -} - -func (re *RunParams) SizeParam(name string) uint64 { - v := re.TestInstanceParams[name] - m, err := humanize.ParseBytes(v) - if err != nil { - panic(err) - } - return m -} - -// IntParam returns an int parameter, or -1 if the parameter is not set or -// the conversion failed. It panics on error. -func (re *RunParams) IntParam(name string) int { - v, ok := re.TestInstanceParams[name] - if !ok { - panic(fmt.Errorf("%s was not set", name)) - } - - i, err := strconv.Atoi(v) - if err != nil { - panic(err) - } - return i -} - -// FloatParam returns a float64 parameter, or -1.0 if the parameter is not set or -// the conversion failed. It panics on error. -func (re *RunEnv) FloatParam(name string) float64 { - v, ok := re.TestInstanceParams[name] - if !ok { - return -1.0 - } - - f, err := strconv.ParseFloat(v, 32) - if err != nil { - panic(err) - } - return f -} - -// BooleanParam returns the Boolean value of the parameter, or false if not passed -func (re *RunParams) BooleanParam(name string) bool { - s, ok := re.TestInstanceParams[name] - return ok && strings.ToLower(s) == "true" -} - -// StringArrayParam returns an array of string parameter, or an empty array -// if it does not exist. It panics on error. -func (re *RunParams) StringArrayParam(name string) []string { - a := []string{} - re.JSONParam(name, &a) - return a -} - -// SizeArrayParam returns an array of uint64 elements which represent sizes, -// in bytes. If the response is nil, then there was an error parsing the input. -// It panics on error. -func (re *RunParams) SizeArrayParam(name string) []uint64 { - humanSizes := re.StringArrayParam(name) - sizes := []uint64{} - - for _, size := range humanSizes { - n, err := humanize.ParseBytes(size) - if err != nil { - panic(err) - } - sizes = append(sizes, n) - } - - return sizes -} - -// JSONParam unmarshals a JSON parameter in an arbitrary interface. -// It panics on error. -func (re *RunParams) JSONParam(name string, v interface{}) { - s, ok := re.TestInstanceParams[name] - if !ok { - panic(fmt.Errorf("%s was not set", name)) - } - - if err := json.Unmarshal([]byte(s), v); err != nil { - panic(err) - } -} - -// Copied from github.com/ipfs/testground/pkg/conv, because we don't want the -// SDK to depend on that package. -func ParseKeyValues(in []string) (res map[string]string, err error) { - res = make(map[string]string, len(in)) - for _, d := range in { - splt := strings.Split(d, "=") - if len(splt) < 2 { - return nil, fmt.Errorf("invalid key-value: %s", d) - } - res[splt[0]] = strings.Join(splt[1:], "=") - } - return res, nil -} diff --git a/runtime/files.go b/runtime/runenv_assets.go similarity index 89% rename from runtime/files.go rename to runtime/runenv_assets.go index 1af2f86..377302f 100644 --- a/runtime/files.go +++ b/runtime/runenv_assets.go @@ -3,7 +3,6 @@ package runtime import ( "bufio" "crypto/rand" - "fmt" "io" "io/ioutil" "os" @@ -13,6 +12,53 @@ import ( "go.uber.org/zap/zapcore" ) +// CreateRawAsset creates an output asset. +// +// Output assets will be saved when the test terminates and available for +// further investigation. You can also manually create output assets/directories +// under re.TestOutputsPath. +func (re *RunEnv) CreateRawAsset(name string) (*os.File, error) { + file, err := os.Create(filepath.Join(re.TestOutputsPath, name)) + if err != nil { + return nil, err + } + + re.unstructured.ch <- file + + return file, nil +} + +// CreateStructuredAsset creates an output asset and wraps it in zap loggers. +func (re *RunEnv) CreateStructuredAsset(name string, config zap.Config) (*zap.Logger, *zap.SugaredLogger, error) { + path := filepath.Join(re.TestOutputsPath, name) + config.OutputPaths = []string{path} + + logger, err := config.Build() + if err != nil { + return nil, nil, err + } + + re.structured.ch <- logger + + return logger, logger.Sugar(), nil +} + +// StandardJSONConfig returns a zap.Config with JSON encoding, debug verbosity, +// caller and stacktraces disabled, and with timestamps encoded as nanos after +// epoch. +func StandardJSONConfig() zap.Config { + enc := zap.NewProductionEncoderConfig() + enc.EncodeTime = zapcore.EpochNanosTimeEncoder + + return zap.Config{ + Level: zap.NewAtomicLevelAt(zap.DebugLevel), + Encoding: "json", + EncoderConfig: enc, + DisableCaller: true, + DisableStacktrace: true, + } +} + // CreateRandomFile creates a file of the specified size (in bytes) within the // specified directory path and returns its path. func (re *RunEnv) CreateRandomFile(directoryPath string, size int64) (string, error) { @@ -63,58 +109,3 @@ func (re *RunEnv) CreateRandomDirectory(directoryPath string, depth uint) (strin return base, nil } - -// CreateRawAsset creates an output asset. -// -// Output assets will be saved when the test terminates and available for -// further investigation. You can also manually create output assets/directories -// under re.TestOutputsPath. -func (re *RunEnv) CreateRawAsset(name string) (*os.File, error) { - file, err := os.Create(filepath.Join(re.TestOutputsPath, name)) - if err != nil { - return nil, err - } - - select { - case re.unstructured <- file: - default: - return nil, fmt.Errorf("too many unstructured assets; current: %d", len(re.unstructured)) - } - - return file, nil -} - -// CreateStructuredAsset creates an output asset and wraps it in zap loggers. -func (re *RunEnv) CreateStructuredAsset(name string, config zap.Config) (*zap.Logger, *zap.SugaredLogger, error) { - path := filepath.Join(re.TestOutputsPath, name) - config.OutputPaths = []string{path} - - logger, err := config.Build() - if err != nil { - return nil, nil, err - } - - select { - case re.structured <- logger: - default: - return nil, nil, fmt.Errorf("too many structured assets; current: %d", len(re.structured)) - } - - return logger, logger.Sugar(), nil -} - -// StandardJSONConfig returns a zap.Config with JSON encoding, debug verbosity, -// caller and stacktraces disabled, and with timestamps encoded as nanos after -// epoch. -func StandardJSONConfig() zap.Config { - enc := zap.NewProductionEncoderConfig() - enc.EncodeTime = zapcore.EpochNanosTimeEncoder - - return zap.Config{ - Level: zap.NewAtomicLevelAt(zap.DebugLevel), - Encoding: "json", - EncoderConfig: enc, - DisableCaller: true, - DisableStacktrace: true, - } -} diff --git a/runtime/runenv_events.go b/runtime/runenv_events.go new file mode 100644 index 0000000..274aec9 --- /dev/null +++ b/runtime/runenv_events.go @@ -0,0 +1,148 @@ +package runtime + +import ( + "fmt" + "runtime/debug" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type ( + EventType string + EventOutcome string +) + +const ( + EventTypeStart = EventType("start") + EventTypeMessage = EventType("message") + EventTypeFinish = EventType("finish") + + EventOutcomeOK = EventOutcome("ok") + EventOutcomeFailed = EventOutcome("failed") + EventOutcomeCrashed = EventOutcome("crashed") +) + +type Event struct { + Type EventType `json:"type"` + Outcome EventOutcome `json:"outcome,omitempty"` + Error string `json:"error,omitempty"` + Stacktrace string `json:"stacktrace,omitempty"` + Message string `json:"message,omitempty"` + Runenv *RunParams `json:"runenv,omitempty"` +} + +func (e Event) MarshalLogObject(oe zapcore.ObjectEncoder) error { + oe.AddString("type", string(e.Type)) + + if e.Outcome != "" { + oe.AddString("outcome", string(e.Outcome)) + } + if e.Error != "" { + oe.AddString("error", e.Error) + } + if e.Stacktrace != "" { + oe.AddString("stacktrace", e.Stacktrace) + } + if e.Message != "" { + oe.AddString("message", e.Message) + } + if e.Runenv != nil { + if err := oe.AddObject("runenv", e.Runenv); err != nil { + return err + } + } + + return nil +} + +func (rp *RunParams) MarshalLogObject(oe zapcore.ObjectEncoder) error { + oe.AddString("plan", rp.TestPlan) + oe.AddString("case", rp.TestCase) + oe.AddString("run", rp.TestRun) + if err := oe.AddReflected("params", rp.TestInstanceParams); err != nil { + return err + } + oe.AddInt("instances", rp.TestInstanceCount) + oe.AddString("outputs_path", rp.TestOutputsPath) + oe.AddString("network", func() string { + if rp.TestSubnet == nil { + return "" + } + return rp.TestSubnet.String() + }()) + + oe.AddString("group", rp.TestGroupID) + oe.AddInt("group_instances", rp.TestGroupInstanceCount) + + if rp.TestRepo != "" { + oe.AddString("repo", rp.TestRepo) + } + if rp.TestCommit != "" { + oe.AddString("commit", rp.TestCommit) + } + if rp.TestBranch != "" { + oe.AddString("branch", rp.TestBranch) + } + if rp.TestTag != "" { + oe.AddString("tag", rp.TestTag) + } + return nil +} + +// RecordMessage records an informational message. +func (re *RunEnv) RecordMessage(msg string, a ...interface{}) { + if len(a) > 0 { + msg = fmt.Sprintf(msg, a...) + } + evt := Event{ + Type: EventTypeMessage, + Message: msg, + } + re.logger.Info("", zap.Object("event", evt)) +} + +func (re *RunEnv) RecordStart() { + evt := Event{ + Type: EventTypeStart, + Runenv: &re.RunParams, + } + + re.logger.Info("", zap.Object("event", evt)) + re.metrics.recordEvent(&evt) +} + +// RecordSuccess records that the calling instance succeeded. +func (re *RunEnv) RecordSuccess() { + evt := Event{ + Type: EventTypeFinish, + Outcome: EventOutcomeOK, + } + re.logger.Info("", zap.Object("event", evt)) + re.metrics.recordEvent(&evt) +} + +// RecordFailure records that the calling instance failed with the supplied +// error. +func (re *RunEnv) RecordFailure(err error) { + evt := Event{ + Type: EventTypeFinish, + Outcome: EventOutcomeFailed, + Error: err.Error(), + } + re.logger.Info("", zap.Object("event", evt)) + re.metrics.recordEvent(&evt) +} + +// RecordCrash records that the calling instance crashed/panicked with the +// supplied error. +func (re *RunEnv) RecordCrash(err interface{}) { + evt := Event{ + Type: EventTypeFinish, + Outcome: EventOutcomeCrashed, + Error: fmt.Sprintf("%s", err), + Stacktrace: string(debug.Stack()), + } + re.logger.Error("", zap.Object("event", evt)) + re.metrics.recordEvent(&evt) +} diff --git a/runtime/runenv_http.go b/runtime/runenv_http.go new file mode 100644 index 0000000..eee2d5a --- /dev/null +++ b/runtime/runenv_http.go @@ -0,0 +1,67 @@ +package runtime + +import ( + "context" + "io" + "net/http" + "os" + "path" + "strconv" + "time" +) + +// HTTPPeriodicSnapshots periodically fetches the snapshots from the given address +// and outputs them to the out directory. Every file will be in the format timestamp.out. +func (re *RunEnv) HTTPPeriodicSnapshots(ctx context.Context, addr string, dur time.Duration, outDir string) error { + err := os.MkdirAll(path.Join(re.TestOutputsPath, outDir), 0777) + if err != nil { + return err + } + + nextFile := func() (*os.File, error) { + timestamp := strconv.FormatInt(time.Now().Unix(), 10) + return os.Create(path.Join(re.TestOutputsPath, outDir, timestamp+".out")) + } + + go func() { + ticker := time.NewTicker(dur) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + func() { + req, err := http.NewRequestWithContext(ctx, "GET", addr, nil) + if err != nil { + re.RecordMessage("error while creating http request: %v", err) + return + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + re.RecordMessage("error while scraping http endpoint: %v", err) + return + } + defer resp.Body.Close() + + file, err := nextFile() + if err != nil { + re.RecordMessage("error while getting metrics output file: %v", err) + return + } + defer file.Close() + + _, err = io.Copy(file, resp.Body) + if err != nil { + re.RecordMessage("error while copying data to file: %v", err) + return + } + }() + } + } + }() + + return nil +} diff --git a/runtime/logger.go b/runtime/runenv_logger.go similarity index 50% rename from runtime/logger.go rename to runtime/runenv_logger.go index bbe20e2..a060b43 100644 --- a/runtime/logger.go +++ b/runtime/runenv_logger.go @@ -8,29 +8,15 @@ import ( "go.uber.org/zap/zapcore" ) -type logger struct { - runenv *RunParams - - // TODO: we'll want different kinds of loggers. - logger *zap.Logger - slogger *zap.SugaredLogger -} - -func newLogger(runenv *RunParams) *logger { - l := &logger{runenv: runenv} - l.init() - return l -} - -func (l *logger) init() { +func (re *RunEnv) initLogger() { level := zap.NewAtomicLevel() if lvl := os.Getenv("LOG_LEVEL"); lvl != "" { if err := level.UnmarshalText([]byte(lvl)); err != nil { defer func() { // once the logger is defined... - if l.slogger != nil { - l.slogger.Errorf("failed to decode log level '%q': %s", l, err) + if re.logger != nil { + re.logger.Sugar().Errorf("failed to decode log level '%q': %s", lvl, err) } }() } @@ -39,8 +25,8 @@ func (l *logger) init() { } paths := []string{"stdout"} - if l.runenv.TestOutputsPath != "" { - paths = append(paths, filepath.Join(l.runenv.TestOutputsPath, "run.out")) + if re.TestOutputsPath != "" { + paths = append(paths, filepath.Join(re.TestOutputsPath, "run.out")) } cfg := zap.Config{ @@ -51,8 +37,8 @@ func (l *logger) init() { OutputPaths: paths, Encoding: "json", InitialFields: map[string]interface{}{ - "run_id": l.runenv.TestRun, - "group_id": l.runenv.TestGroupID, + "run_id": re.TestRun, + "group_id": re.TestGroupID, }, } @@ -62,19 +48,8 @@ func (l *logger) init() { cfg.EncoderConfig = enc var err error - l.logger, err = cfg.Build() + re.logger, err = cfg.Build() if err != nil { panic(err) } - - l.slogger = l.logger.Sugar() -} - -func (l *logger) SLogger() *zap.SugaredLogger { - return l.slogger -} - -// Loggers returns the loggers populated from this runenv. -func (l *logger) Loggers() (*zap.Logger, *zap.SugaredLogger) { - return l.logger, l.slogger } diff --git a/runtime/runenv_test.go b/runtime/runenv_test.go index b7053a2..8bc39a4 100644 --- a/runtime/runenv_test.go +++ b/runtime/runenv_test.go @@ -1,8 +1,16 @@ package runtime import ( + "encoding/json" + "fmt" + "os" + "path/filepath" "reflect" + "strings" "testing" + "time" + + "github.com/stretchr/testify/require" ) func TestParseKeyValues(t *testing.T) { @@ -67,3 +75,228 @@ func TestParseKeyValues(t *testing.T) { }) } } + +func TestAllEvents(t *testing.T) { + re, cleanup := RandomTestRunEnv(t) + t.Cleanup(cleanup) + + re.RecordStart() + re.RecordFailure(fmt.Errorf("bang")) + re.RecordCrash(fmt.Errorf("terrible bang")) + re.RecordMessage("i have something to %s", "say") + re.RecordSuccess() + + if err := re.Close(); err != nil { + t.Fatal(err) + } + + file, err := os.OpenFile(re.TestOutputsPath+"/run.out", os.O_RDONLY, 0644) + if err != nil { + t.Fatal(err) + } + defer file.Close() + + require := require.New(t) + + var i int + for dec := json.NewDecoder(file); dec.More(); { + var m = struct { + Event Event `json:"event"` + }{} + if err := dec.Decode(&m); err != nil { + t.Fatal(err) + } + + switch evt := m.Event; i { + case 0: + require.Equal(EventTypeMessage, evt.Type) + require.Condition(func() bool { return strings.HasPrefix(evt.Message, "InfluxDB unavailable") }) + case 1: + require.Equal(EventTypeStart, evt.Type) + require.Equal(evt.Runenv.TestPlan, re.TestPlan) + require.Equal(evt.Runenv.TestCase, re.TestCase) + require.Equal(evt.Runenv.TestRun, re.TestRun) + require.Equal(evt.Runenv.TestGroupID, re.TestGroupID) + case 2: + require.Equal(EventTypeFinish, evt.Type) + require.Equal(EventOutcomeFailed, evt.Outcome) + require.Equal("bang", evt.Error) + case 3: + require.Equal(EventTypeFinish, evt.Type) + require.Equal(EventOutcomeCrashed, evt.Outcome) + require.Equal("terrible bang", evt.Error) + require.NotEmpty(evt.Stacktrace) + case 4: + require.Equal(EventTypeMessage, evt.Type) + case 5: + require.Equal(evt.Type, EventTypeFinish) + require.Equal(evt.Outcome, EventOutcomeOK) + } + i++ + } +} + +func TestMetricsRecordedInFile(t *testing.T) { + test := func(f func(*RunEnv) *MetricsApi, file string) func(t *testing.T) { + return func(t *testing.T) { + re, cleanup := RandomTestRunEnv(t) + t.Cleanup(cleanup) + + api := f(re) + + names := []string{"point1", "point2", "counter1", "meter1", "timer1"} + types := []string{"point", "counter", "meter", "timer"} + api.SetFrequency(200 * time.Millisecond) + api.RecordPoint("point1", 123) + api.RecordPoint("point2", 123) + api.Counter("counter1").Inc(50) + api.Meter("meter1").Mark(50) + api.Timer("timer1").Update(5 * time.Second) + + time.Sleep(1 * time.Second) + + _ = re.Close() + + file, err := os.OpenFile(filepath.Join(re.TestOutputsPath, file), os.O_RDONLY, 0644) + if err != nil { + t.Fatal(err) + } + defer file.Close() + + var metrics []*Metric + for dec := json.NewDecoder(file); dec.More(); { + var m *Metric + if err := dec.Decode(&m); err != nil { + t.Fatal(err) + } + metrics = append(metrics, m) + } + + require := require.New(t) + + na := make(map[string]struct{}) + ty := make(map[string]struct{}) + for _, m := range metrics { + require.Greater(m.Timestamp, int64(0)) + na[m.Name] = struct{}{} + ty[m.Type.String()] = struct{}{} + require.NotZero(len(m.Measures)) + } + + namesActual := make([]string, 0, len(na)) + for k := range na { + namesActual = append(namesActual, k) + } + + typesActual := make([]string, 0, len(ty)) + for k := range ty { + typesActual = append(typesActual, k) + } + + require.ElementsMatch(names, namesActual) + require.ElementsMatch(types, typesActual) + } + } + + t.Run("diagnostics", test((*RunEnv).D, "diagnostics.out")) + t.Run("results", test((*RunEnv).R, "results.out")) +} + +func TestDiagnosticsDispatchedToInfluxDB(t *testing.T) { + InfluxBatching = false + tc := &testClient{} + TestInfluxDBClient = tc + + re, cleanup := RandomTestRunEnv(t) + t.Cleanup(cleanup) + + re.D().RecordPoint("foo", 1234) + re.D().RecordPoint("foo", 1234) + re.D().RecordPoint("foo", 1234) + re.D().RecordPoint("foo", 1234) + + require := require.New(t) + + tc.RLock() + require.Len(tc.batchPoints, 4) + tc.RUnlock() + + re.D().SetFrequency(500 * time.Millisecond) + re.D().Counter("counter").Inc(100) + re.D().Histogram("histogram1", re.D().NewUniformSample(100)).Update(123) + + time.Sleep(1500 * time.Millisecond) + + tc.RLock() + if l := len(tc.batchPoints); l != 6 && l != 8 && l != 10 { + t.Fatalf("expected length to be 6, 8, or 10; was: %d", l) + } + tc.RUnlock() + + _ = re.Close() +} + +func TestResultsDispatchedOnClose(t *testing.T) { + InfluxBatching = false + tc := &testClient{} + TestInfluxDBClient = tc + + re, cleanup := RandomTestRunEnv(t) + t.Cleanup(cleanup) + + re.R().RecordPoint("foo", 1234) + re.R().RecordPoint("foo", 1234) + re.R().RecordPoint("foo", 1234) + re.R().RecordPoint("foo", 1234) + + require := require.New(t) + + tc.RLock() + require.Empty(tc.batchPoints) + tc.RUnlock() + + re.R().SetFrequency(500 * time.Millisecond) + re.R().Counter("counter").Inc(100) + re.R().Histogram("histogram1", re.D().NewUniformSample(100)).Update(123) + + time.Sleep(1500 * time.Millisecond) + + tc.RLock() + require.Empty(tc.batchPoints) + tc.RUnlock() + + _ = re.Close() + + tc.RLock() + require.NotEmpty(tc.batchPoints) + tc.RUnlock() +} + +func TestFrequencyChange(t *testing.T) { + InfluxBatching = false + tc := &testClient{} + TestInfluxDBClient = tc + + re, cleanup := RandomTestRunEnv(t) + t.Cleanup(cleanup) + + // set an abnormally high frequency to verify that no points are produced. + re.D().SetFrequency(24 * time.Hour) + counter := re.D().Counter("foo") + counter.Inc(100) + + require := require.New(t) + + time.Sleep(1500 * time.Millisecond) + + tc.RLock() + require.Empty(tc.batchPoints) + tc.RUnlock() + + re.D().SetFrequency(100 * time.Millisecond) + time.Sleep(1000 * time.Millisecond) + + tc.RLock() + require.Greater(len(tc.batchPoints), 5) + tc.RUnlock() +} diff --git a/runtime/runner.go b/runtime/runner.go index b8b7420..c1ca50a 100644 --- a/runtime/runner.go +++ b/runtime/runner.go @@ -4,13 +4,12 @@ import ( "fmt" "io" "net" + "net/http" + _ "net/http/pprof" "os" "runtime/debug" "strings" - "net/http" - _ "net/http/pprof" - "github.com/prometheus/client_golang/prometheus/promhttp" ) @@ -88,7 +87,7 @@ func Invoke(tc TestCaseFn) { runenv.RecordCrash(err) // Developers expect panics to be recorded in run.err too. - fmt.Fprintln(os.Stderr, err) + _, _ = fmt.Fprintln(os.Stderr, err) debug.PrintStack() } }() diff --git a/runtime/runparams.go b/runtime/runparams.go new file mode 100644 index 0000000..4f3ebae --- /dev/null +++ b/runtime/runparams.go @@ -0,0 +1,289 @@ +package runtime + +import ( + "encoding/json" + "fmt" + "net" + "strconv" + "strings" + "time" + + "github.com/dustin/go-humanize" +) + +type IPNet struct { + net.IPNet +} + +func (i IPNet) MarshalJSON() ([]byte, error) { + if len(i.IPNet.IP) == 0 { + return json.Marshal("") + } + return json.Marshal(i.String()) +} + +func (i *IPNet) UnmarshalJSON(data []byte) error { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return err + } + + if s == "" { + return nil + } + + _, ipnet, err := net.ParseCIDR(s) + if err != nil { + return err + } + + i.IPNet = *ipnet + return nil +} + +// RunParams encapsulates the runtime parameters for this test. +type RunParams struct { + TestPlan string `json:"plan"` + TestCase string `json:"case"` + TestRun string `json:"run"` + + TestRepo string `json:"repo,omitempty"` + TestCommit string `json:"commit,omitempty"` + TestBranch string `json:"branch,omitempty"` + TestTag string `json:"tag,omitempty"` + + TestOutputsPath string `json:"outputs_path,omitempty"` + + TestInstanceCount int `json:"instances"` + TestInstanceRole string `json:"role,omitempty"` + TestInstanceParams map[string]string `json:"params,omitempty"` + + TestGroupID string `json:"group,omitempty"` + TestGroupInstanceCount int `json:"group_instances,omitempty"` + + // true if the test has access to the sidecar. + TestSidecar bool `json:"test_sidecar,omitempty"` + + // The subnet on which this test is running. + // + // The test instance can use this to pick an IP address and/or determine + // the "data" network interface. + // + // This will be 127.1.0.0/16 when using the local exec runner. + TestSubnet *IPNet `json:"network,omitempty"` + TestStartTime time.Time `json:"start_time,omitempty"` +} + +// ParseRunParams parses a list of environment variables into a RunParams. +func ParseRunParams(env []string) (*RunParams, error) { + m, err := ParseKeyValues(env) + if err != nil { + return nil, err + } + + return &RunParams{ + TestBranch: m[EnvTestBranch], + TestCase: m[EnvTestCase], + TestGroupID: m[EnvTestGroupID], + TestGroupInstanceCount: toInt(m[EnvTestGroupInstanceCount]), + TestInstanceCount: toInt(m[EnvTestInstanceCount]), + TestInstanceParams: unpackParams(m[EnvTestInstanceParams]), + TestInstanceRole: m[EnvTestInstanceRole], + TestOutputsPath: m[EnvTestOutputsPath], + TestPlan: m[EnvTestPlan], + TestRepo: m[EnvTestRepo], + TestRun: m[EnvTestRun], + TestSidecar: toBool(m[EnvTestSidecar]), + TestStartTime: toTime(EnvTestStartTime), + TestSubnet: toNet(m[EnvTestSubnet]), + TestTag: m[EnvTestTag], + }, nil +} + +func (rp *RunParams) ToEnvVars() map[string]string { + packParams := func(in map[string]string) string { + arr := make([]string, 0, len(in)) + for k, v := range in { + arr = append(arr, k+"="+v) + } + return strings.Join(arr, "|") + } + + out := map[string]string{ + EnvTestBranch: rp.TestBranch, + EnvTestCase: rp.TestCase, + EnvTestGroupID: rp.TestGroupID, + EnvTestGroupInstanceCount: strconv.Itoa(rp.TestGroupInstanceCount), + EnvTestInstanceCount: strconv.Itoa(rp.TestInstanceCount), + EnvTestInstanceParams: packParams(rp.TestInstanceParams), + EnvTestInstanceRole: rp.TestInstanceRole, + EnvTestOutputsPath: rp.TestOutputsPath, + EnvTestPlan: rp.TestPlan, + EnvTestRepo: rp.TestRepo, + EnvTestRun: rp.TestRun, + EnvTestSidecar: strconv.FormatBool(rp.TestSidecar), + EnvTestStartTime: rp.TestStartTime.Format(time.RFC3339), + EnvTestSubnet: rp.TestSubnet.String(), + EnvTestTag: rp.TestTag, + } + + return out +} + +// IsParamSet checks if a certain parameter is set. +func (rp *RunParams) IsParamSet(name string) bool { + _, ok := rp.TestInstanceParams[name] + return ok +} + +// StringParam returns a string parameter, or "" if the parameter is not set. +func (rp *RunParams) StringParam(name string) string { + v, ok := rp.TestInstanceParams[name] + if !ok { + panic(fmt.Errorf("%s was not set", name)) + } + return v +} + +func (rp *RunParams) SizeParam(name string) uint64 { + v := rp.TestInstanceParams[name] + m, err := humanize.ParseBytes(v) + if err != nil { + panic(err) + } + return m +} + +// IntParam returns an int parameter, or -1 if the parameter is not set or +// the conversion failed. It panics on error. +func (rp *RunParams) IntParam(name string) int { + v, ok := rp.TestInstanceParams[name] + if !ok { + panic(fmt.Errorf("%s was not set", name)) + } + + i, err := strconv.Atoi(v) + if err != nil { + panic(err) + } + return i +} + +// FloatParam returns a float64 parameter, or -1.0 if the parameter is not set or +// the conversion failed. It panics on error. +func (rp *RunParams) FloatParam(name string) float64 { + v, ok := rp.TestInstanceParams[name] + if !ok { + return -1.0 + } + + f, err := strconv.ParseFloat(v, 32) + if err != nil { + panic(err) + } + return f +} + +// BooleanParam returns the Boolean value of the parameter, or false if not passed +func (rp *RunParams) BooleanParam(name string) bool { + s, ok := rp.TestInstanceParams[name] + return ok && strings.ToLower(s) == "true" +} + +// StringArrayParam returns an array of string parameter, or an empty array +// if it does not exist. It panics on error. +func (rp *RunParams) StringArrayParam(name string) []string { + var a []string + rp.JSONParam(name, &a) + return a +} + +// SizeArrayParam returns an array of uint64 elements which represent sizes, +// in bytes. If the response is nil, then there was an error parsing the input. +// It panics on error. +func (rp *RunParams) SizeArrayParam(name string) []uint64 { + humanSizes := rp.StringArrayParam(name) + var sizes []uint64 + + for _, size := range humanSizes { + n, err := humanize.ParseBytes(size) + if err != nil { + panic(err) + } + sizes = append(sizes, n) + } + + return sizes +} + +// JSONParam unmarshals a JSON parameter in an arbitrary interface. +// It panics on error. +func (rp *RunParams) JSONParam(name string, v interface{}) { + s, ok := rp.TestInstanceParams[name] + if !ok { + panic(fmt.Errorf("%s was not set", name)) + } + + if err := json.Unmarshal([]byte(s), v); err != nil { + panic(err) + } +} + +// Copied from github.com/ipfs/testground/pkg/conv, because we don't want the +// SDK to depend on that package. +func ParseKeyValues(in []string) (res map[string]string, err error) { + res = make(map[string]string, len(in)) + for _, d := range in { + splt := strings.Split(d, "=") + if len(splt) < 2 { + return nil, fmt.Errorf("invalid key-value: %s", d) + } + res[splt[0]] = strings.Join(splt[1:], "=") + } + return res, nil +} + +func unpackParams(packed string) map[string]string { + spltparams := strings.Split(packed, "|") + params := make(map[string]string, len(spltparams)) + for _, s := range spltparams { + v := strings.Split(s, "=") + if len(v) != 2 { + continue + } + params[v[0]] = v[1] + } + return params +} + +func toInt(s string) int { + v, err := strconv.Atoi(s) + if err != nil { + return -1 + } + return v +} + +func toBool(s string) bool { + v, _ := strconv.ParseBool(s) + return v +} + +// toNet might parse any input, so it is possible to get an error and nil return value +func toNet(s string) *IPNet { + _, ipnet, err := net.ParseCIDR(s) + if err != nil { + return nil + } + return &IPNet{IPNet: *ipnet} +} + +// Try to parse the time. +// Failing to do so, return a zero value time +func toTime(s string) time.Time { + t, err := time.Parse(time.RFC3339, s) + if err != nil { + return time.Time{} + } + return t +} diff --git a/runtime/test_utils.go b/runtime/test_utils.go new file mode 100644 index 0000000..f136d21 --- /dev/null +++ b/runtime/test_utils.go @@ -0,0 +1,45 @@ +package runtime + +import ( + "fmt" + "io/ioutil" + "math/rand" + "net" + "os" + "testing" + "time" +) + +// RandomTestRunEnv generates a random RunEnv for testing purposes. +func RandomTestRunEnv(t *testing.T) (re *RunEnv, cleanup func()) { + t.Helper() + + b := make([]byte, 32) + _, _ = rand.Read(b) + + _, subnet, _ := net.ParseCIDR("127.1.0.1/16") + + odir, err := ioutil.TempDir("", "testground-tests-*") + if err != nil { + t.Fatalf("failed to create temp output dir: %s", err) + } + + rp := RunParams{ + TestPlan: fmt.Sprintf("testplan-%d", rand.Uint32()), + TestSidecar: false, + TestCase: fmt.Sprintf("testcase-%d", rand.Uint32()), + TestRun: fmt.Sprintf("testrun-%d", rand.Uint32()), + TestSubnet: &IPNet{IPNet: *subnet}, + TestInstanceCount: int(1 + (rand.Uint32() % 999)), + TestInstanceRole: "", + TestInstanceParams: make(map[string]string), + TestGroupID: fmt.Sprintf("group-%d", rand.Uint32()), + TestStartTime: time.Now(), + TestGroupInstanceCount: int(1 + (rand.Uint32() % 999)), + TestOutputsPath: odir, + } + + return NewRunEnv(rp), func() { + _ = os.RemoveAll(odir) + } +} diff --git a/sync/barrier_test.go b/sync/barrier_test.go index a0a7a07..20ac4e4 100644 --- a/sync/barrier_test.go +++ b/sync/barrier_test.go @@ -6,13 +6,16 @@ import ( "time" "golang.org/x/sync/errgroup" + + "github.com/testground/sdk-go/runtime" ) func TestBarrier(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - runenv := randomRunEnv() + runenv, cleanup := runtime.RandomTestRunEnv(t) + t.Cleanup(cleanup) client, err := NewBoundClient(ctx, runenv) if err != nil { @@ -44,7 +47,8 @@ func TestBarrierBeyondTarget(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - runenv := randomRunEnv() + runenv, cleanup := runtime.RandomTestRunEnv(t) + t.Cleanup(cleanup) client, err := NewBoundClient(ctx, runenv) if err != nil { @@ -71,7 +75,8 @@ func TestBarrierZero(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - runenv := randomRunEnv() + runenv, cleanup := runtime.RandomTestRunEnv(t) + t.Cleanup(cleanup) client, err := NewBoundClient(ctx, runenv) if err != nil { @@ -97,7 +102,8 @@ func TestBarrierCancel(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - runenv := randomRunEnv() + runenv, cleanup := runtime.RandomTestRunEnv(t) + t.Cleanup(cleanup) client, err := NewBoundClient(ctx, runenv) if err != nil { @@ -124,7 +130,8 @@ func TestBarrierDeadline(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - runenv := randomRunEnv() + runenv, cleanup := runtime.RandomTestRunEnv(t) + t.Cleanup(cleanup) client, err := NewBoundClient(ctx, runenv) if err != nil { @@ -153,7 +160,8 @@ func TestSignalAndWait(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - runenv := randomRunEnv() + runenv, cleanup := runtime.RandomTestRunEnv(t) + t.Cleanup(cleanup) client, err := NewBoundClient(ctx, runenv) if err != nil { @@ -175,7 +183,8 @@ func TestSignalAndWait(t *testing.T) { } func TestSignalAndWaitTimeout(t *testing.T) { - runenv := randomRunEnv() + runenv, cleanup := runtime.RandomTestRunEnv(t) + t.Cleanup(cleanup) client, err := NewBoundClient(context.Background(), runenv) if err != nil { diff --git a/sync/common_test.go b/sync/common_test.go index ff37274..f18d968 100644 --- a/sync/common_test.go +++ b/sync/common_test.go @@ -2,18 +2,14 @@ package sync import ( "context" - "crypto/sha1" "fmt" "math/rand" - "net" "os" "os/exec" "testing" "time" "go.uber.org/zap" - - "github.com/testground/sdk-go/runtime" ) func TestMain(m *testing.M) { @@ -72,24 +68,3 @@ func ensureRedis() (func() error, error) { return nil }, nil } - -// randomRunEnv generates a random RunEnv for testing purposes. -func randomRunEnv() *runtime.RunEnv { - b := make([]byte, 32) - _, _ = rand.Read(b) - - _, subnet, _ := net.ParseCIDR("127.1.0.1/16") - - return runtime.NewRunEnv(runtime.RunParams{ - TestPlan: fmt.Sprintf("testplan-%d", rand.Uint32()), - TestSidecar: false, - TestCase: fmt.Sprintf("testcase-%d", rand.Uint32()), - TestRun: fmt.Sprintf("testrun-%d", rand.Uint32()), - TestRepo: "github.com/ipfs/go-ipfs", - TestSubnet: &runtime.IPNet{IPNet: *subnet}, - TestCommit: fmt.Sprintf("%x", sha1.Sum(b)), - TestInstanceCount: int(1 + (rand.Uint32() % 999)), - TestInstanceRole: "", - TestInstanceParams: make(map[string]string), - }) -} diff --git a/sync/gc_test.go b/sync/gc_test.go index cb10e5c..b9b1c36 100644 --- a/sync/gc_test.go +++ b/sync/gc_test.go @@ -7,6 +7,8 @@ import ( "fmt" "testing" "time" + + "github.com/testground/sdk-go/runtime" ) func TestGC(t *testing.T) { @@ -15,7 +17,8 @@ func TestGC(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - runenv := randomRunEnv() + runenv, cleanup := runtime.RandomTestRunEnv(t) + t.Cleanup(cleanup) client, err := NewBoundClient(ctx, runenv) if err != nil { diff --git a/sync/generic_client_test.go b/sync/generic_client_test.go index 5a2eff1..232df6e 100644 --- a/sync/generic_client_test.go +++ b/sync/generic_client_test.go @@ -7,6 +7,8 @@ import ( "time" "go.uber.org/zap" + + "github.com/testground/sdk-go/runtime" ) // TestGenericClientRunEnv checks that states and payloads published by a bound @@ -26,7 +28,8 @@ func TestGenericClientRunEnv(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - runenv := randomRunEnv() + runenv, cleanup := runtime.RandomTestRunEnv(t) + t.Cleanup(cleanup) bclient, err := NewBoundClient(ctx, runenv) if err != nil { diff --git a/sync/topic_test.go b/sync/topic_test.go index f34c9ad..1bbe26f 100644 --- a/sync/topic_test.go +++ b/sync/topic_test.go @@ -7,6 +7,8 @@ import ( "testing" "golang.org/x/sync/errgroup" + + "github.com/testground/sdk-go/runtime" ) type TestPayload struct { @@ -19,10 +21,12 @@ type TestPayload struct { func TestSubscribeAfterAllPublished(t *testing.T) { var ( - iterations = 1000 - runenv = randomRunEnv() + iterations = 1000 + runenv, cleanup = runtime.RandomTestRunEnv(t) ) + t.Cleanup(cleanup) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -64,10 +68,12 @@ func TestSubscribeAfterAllPublished(t *testing.T) { func TestSubscribeFirstConcurrentWrites(t *testing.T) { var ( - iterations = 1000 - runenv = randomRunEnv() + iterations = 1000 + runenv, cleanup = runtime.RandomTestRunEnv(t) ) + t.Cleanup(cleanup) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -118,11 +124,13 @@ func TestSubscribeFirstConcurrentWrites(t *testing.T) { func TestSubscriptionConcurrentPublishersSubscribers(t *testing.T) { var ( - topics = 100 - iterations = 100 - runenv = randomRunEnv() + topics = 100 + iterations = 100 + runenv, cleanup = runtime.RandomTestRunEnv(t) ) + t.Cleanup(cleanup) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -169,7 +177,9 @@ func TestSubscriptionConcurrentPublishersSubscribers(t *testing.T) { } func TestSubscriptionValidation(t *testing.T) { - runenv := randomRunEnv() + runenv, cleanup := runtime.RandomTestRunEnv(t) + + t.Cleanup(cleanup) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -205,11 +215,13 @@ func TestSubscriptionValidation(t *testing.T) { func TestSequenceOnWrite(t *testing.T) { var ( - iterations = 1000 - runenv = randomRunEnv() - topic = &Topic{name: "pandemic", typ: reflect.TypeOf("")} + iterations = 1000 + topic = &Topic{name: "pandemic", typ: reflect.TypeOf("")} + runenv, cleanup = runtime.RandomTestRunEnv(t) ) + t.Cleanup(cleanup) + ctx, cancel := context.WithCancel(context.Background()) defer cancel()