diff --git a/CHANGELOG.md b/CHANGELOG.md index 6406dba27c0..4d47da9efa5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -72,6 +72,7 @@ With this release the systemd configuration files for InfluxDB will use the syst - [#6882](https://github.com/influxdata/influxdb/pull/6882): Remove a double lock in the tsm1 index writer. - [#6883](https://github.com/influxdata/influxdb/pull/6883): Rename dumptsmdev to dumptsm in influx_inspect. - [#6864](https://github.com/influxdata/influxdb/pull/6864): Allow a non-admin to call "use" for the influx cli. +- [#6855](https://github.com/influxdata/influxdb/pull/6855): Update `stress/v2` to work with clusters, ssl, and username/password auth. Code cleanup ## v0.13.0 [2016-05-12] diff --git a/cmd/influx_stress/influx_stress.go b/cmd/influx_stress/influx_stress.go index 17a835628b3..032c26db72d 100644 --- a/cmd/influx_stress/influx_stress.go +++ b/cmd/influx_stress/influx_stress.go @@ -36,7 +36,7 @@ func main() { if *config != "" { v2.RunStress(*config) } else { - v2.RunStress("stress/v2/file.iql") + v2.RunStress("stress/v2/iql/file.iql") } } else { diff --git a/stress/v2/.gitignore b/stress/v2/.gitignore deleted file mode 100644 index b86d49a0cfb..00000000000 --- a/stress/v2/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -/iql -stress-tool-refactor -*.txt \ No newline at end of file diff --git a/stress/v2/DESIGN.md b/stress/v2/DESIGN.md index 26eaad36d71..0ca7a610b06 100644 --- a/stress/v2/DESIGN.md +++ b/stress/v2/DESIGN.md @@ -17,9 +17,9 @@ The tool has the following components: - `QUERY` - Runs a given query or generates sample queries given a companion `INSERT` statement - `SET` - Changes the test parameters. Defaults are listed in the `README.md` - `WAIT` - Required after a `GO` statement. Blocks till all proceeding statements finish. -* Clients - The statement, results and InfluxDB clients. This code lives in `v2/ponyExpress` - - `Storefront` - The `Statement` client. Also contains the results client. - - `ponyExpress` - A performant InfluxDB client. Makes `GET /query` and `POST /write` requests. Forwards the results to the results client. +* Clients - The statement, results and InfluxDB clients. This code lives in `v2/stress_client` + - `StressTest` - The `Statement` client. Also contains the results client. + - `stressClient` - A performant InfluxDB client. Makes `GET /query` and `POST /write` requests. Forwards the results to the results client. ![Influx Stress Design](./influx_stress_v2.png) @@ -28,8 +28,8 @@ The tool has the following components: `Statement` is an interface defined in `v2/statement/statement.go`: ```go type Statement interface { - Run(s *ponyExpress.StoreFront) - Report(s *ponyExpress.StoreFront) string + Run(s *stressClient.StressTest) + Report(s *stressClient.StressTest) string SetID(s string) } ``` @@ -37,11 +37,11 @@ type Statement interface { * `Report` retrieves and collates all recorded test data from the reporting InfluxDB instance. * `SetID` gives the statement an ID. Used in the parser. Each `statementID` is an 8 character random string used for reporting. -### `Statement` -> `Storefront` +### `Statement` -> `StressTest` -`Statement`s send `Packages` (queries or writes to the target database) or `Directives` (for changing test state) through the `StoreFront` to the `ponyExpress` where they are processed. +`Statement`s send `Package`s (queries or writes to the target database) or `Directives` (for changing test state) through the `StressTest` to the `stressClient` where they are processed. ```go -// v2/ponyExpress/package.go +// v2/stress_client/package.go // T is Query or Write // StatementID is for reporting @@ -52,9 +52,9 @@ type Package struct { Tracer *Tracer } -// v2/ponyExpress/directive.go +// v2/stress_client/directive.go -// Property is test state to change +// Property is test state variable to change // Value is the new value type Directive struct { Property string @@ -63,10 +63,10 @@ type Directive struct { } ``` -The `Tracer` on both of these packages contains a `sync.WaitGroup` that prevents `Statement`s from returning before all their operations are finished. This `WaitGroup` is incremented in the `Run()` of the statement and decremented in `*StoreFront.resultsListen()` after results are recorded in the database. This is well documented with inline comments. `Tracer`s also carry optional tags for reporting purposes. +The `Tracer` on both of these packages contains a `sync.WaitGroup` that prevents `Statement`s from returning before all their operations are finished. This `WaitGroup` is incremented in the `Run()` of the statement and decremented in `*StressTest.resultsListen()` after results are recorded in the database. This is well documented with inline comments. `Tracer`s also carry optional tags for reporting purposes. ```go -// v2/ponyExpress/tracer.go +// v2/stress_client/tracer.go type Tracer struct { Tags map[string]string @@ -74,12 +74,12 @@ type Tracer struct { } ``` -### `StoreFront` +### `StressTest` -The `StoreFront` is the client for the statements through the `*StoreFront.SendPackage()` and `*StoreFront.SendDirective()` functions. It also contains some test state and the `ResultsClient`. +The `StressTest` is the client for the statements through the `*StressTest.SendPackage()` and `*StressTest.SendDirective()` functions. It also contains some test state and the `ResultsClient`. ```go -type StoreFront struct { +type StressTest struct { TestID string TestName string @@ -101,19 +101,19 @@ type StoreFront struct { ### Reporting Client -The `ResultsClient` turns raw responses from InfluxDB into properly tagged points containing any relevant information for storage in another InfluxDB instance. The code for creating those points lives in `v2/ponyExpress/reporting.go` +The `ResultsClient` turns raw responses from InfluxDB into properly tagged points containing any relevant information for storage in another InfluxDB instance. The code for creating those points lives in `v2/stress_client/reporting.go` ### InfluxDB Instance (reporting) -This is `localhost:8086` by default. The results are currently stored in the `_DefaultTestName` database. This is going to be changed. +This is `localhost:8086` by default. The results are currently stored in the `_stressTest` database. -### `ponyExpress` +### `stressClient` -An InfluxDB client designed for speed. `ponyExpress` also holds most test state. +An InfluxDB client designed for speed. `stressClient` also holds most test state. ```go -// v2/ponyExpress/ponyExpress.go -type ponyExpress struct { +// v2/stress_client/stress_client.go +type stressClient struct { testID string // State for the Stress Test @@ -144,7 +144,7 @@ type ponyExpress struct { rc *ConcurrencyLimiter } ``` -Code for handling the write path is in `v2/ponyExpress/ponyExpress_write.go` while the query path is in `v2/ponyExpress/ponyExpress_query.go`. +Code for handling the write path is in `v2/stress_client/stress_client_write.go` while the query path is in `v2/stress_client/stress_client_query.go`. ### InfluxDB Instance (stress test target) @@ -152,10 +152,10 @@ The InfluxDB which is being put under stress. ### response data -`Response`s carry points from `ponyExpress` to the `ResultsClient`. +`Response`s carry points from `stressClient` to the `ResultsClient`. ```go -// v2/ponyExpress/response.go +// v2/stress_client/response.go type Response struct { Point *influx.Point Tracer *Tracer diff --git a/stress/v2/README.md b/stress/v2/README.md index cbd29f6916c..77a32835f9a 100644 --- a/stress/v2/README.md +++ b/stress/v2/README.md @@ -1,17 +1,5 @@ # Influx Stress tool -Blockers to finishing: -* Finalize reporting - - Decide on how to incorporate TestName (db[difficult], measurement[refactor], tag[easy]) - - Get feedback on reporting syntax - - Pull addition data from queries -* Documentation is sorely lacking. - - Parser behavior and proper `.iql` syntax - - How the templated query generation works - - Collection of tested `.iql` files to simulate different loads - -Commune is potentially blocking writes, look into performance - This stress tool works from list of InfluxQL-esque statements. The language has been extended to allow for some basic templating of fields, tags and measurements in both line protocol and query statements. By default the test outputs a human readable report to `STDOUT` and records test statistics in an active installation of InfluxDB at `localhost:8086`. @@ -19,17 +7,26 @@ By default the test outputs a human readable report to `STDOUT` and records test To set state variables for the test such as the address of the Influx node use the following syntax: ``` +# The values listed below are the default values for each of the parameters + # Pipe delineated list of addresses. For cluster: [192.168.0.10:8086|192.168.0.2:8086|192.168.0.3:8086] -# Queries currently hit only the first node in a list. Writes are round robin. +# Queries and writes are round-robin to the configured addresses. SET Addresses [localhost:8086] -# Influx instance to store results -SET ResultsAddress [localhost:8086] +# False (default) uses http, true uses https +SET SSL [false] + +# Username for targeted influx server or cluster +SET Username [] + +# Password for targeted influx server or cluster +SET Password [] # Database to target for queries and writes. Works like the InfluxCLI USE -SET Database [thing2] +SET Database [stress] # Precision for the data being written +# Only s and ns supported SET Precision [s] # Date the first written point will be timestamped @@ -157,3 +154,19 @@ WAIT -> 624.585319ms [√] "DROP DATABASE thing" -> 991.088464ms [√] "DROP DATABASE thing2" -> 421.362831ms ``` + +### Next Steps: + +##### Reporting +- Only use one database for reporting +- Get feedback on reporting syntax +- Pull addition data from queries + +##### Documentation +- Parser behavior and proper `.iql` syntax +- How the templated query generation works +- Collection of tested `.iql` files to simulate different loads + +##### Performance +- `Commune` is potentially blocking writes, look into performance. +- Templated query generation is currently in a quazi-working state. diff --git a/stress/v2/iql/default.iql b/stress/v2/iql/default.iql new file mode 100644 index 00000000000..8440c9288f8 --- /dev/null +++ b/stress/v2/iql/default.iql @@ -0,0 +1,13 @@ +CREATE DATABASE stress + +GO INSERT cpu +cpu, +host=server-[int inc(0) 100000],location=us-west +value=[int rand(100) 0] +10000000 10s + +GO QUERY cpu +SELECT count(value) FROM cpu WHERE %t +DO 250 + +WAIT diff --git a/stress/v2/file.iql b/stress/v2/iql/file.iql similarity index 100% rename from stress/v2/file.iql rename to stress/v2/iql/file.iql diff --git a/stress/v2/main.go b/stress/v2/main.go index e4ab4f6bfcc..0675809a284 100644 --- a/stress/v2/main.go +++ b/stress/v2/main.go @@ -6,7 +6,7 @@ import ( "time" influx "github.com/influxdata/influxdb/client/v2" - "github.com/influxdata/influxdb/stress/v2/ponyExpress" + "github.com/influxdata/influxdb/stress/v2/stress_client" "github.com/influxdata/influxdb/stress/v2/stressql" ) @@ -14,7 +14,7 @@ import ( func RunStress(file string) { // Spin up the Client - s := ponyExpress.NewStoreFront() + s := stressClient.NewStressTest() // Parse the file into Statements stmts, err := stressql.ParseStatements(file) @@ -40,7 +40,7 @@ func RunStress(file string) { } } -func blankResponse() ponyExpress.Response { +func blankResponse() stressClient.Response { // Points must have at least one field fields := map[string]interface{}{"done": true} // Make a 'blank' point @@ -50,10 +50,10 @@ func blankResponse() ponyExpress.Response { log.Fatalf("Error creating blank response point\n error: %v\n", err) } // Add a tracer to prevent program from returning too early - tracer := ponyExpress.NewTracer(make(map[string]string)) + tracer := stressClient.NewTracer(make(map[string]string)) // Add to the WaitGroup tracer.Add(1) // Make a new response with the point and the tracer - resp := ponyExpress.NewResponse(p, tracer) + resp := stressClient.NewResponse(p, tracer) return resp } diff --git a/stress/v2/ponyExpress/storeFront.go b/stress/v2/ponyExpress/storeFront.go deleted file mode 100644 index 7a97144394b..00000000000 --- a/stress/v2/ponyExpress/storeFront.go +++ /dev/null @@ -1,209 +0,0 @@ -package ponyExpress - -import ( - "fmt" - "log" - "sync" - - influx "github.com/influxdata/influxdb/client/v2" -) - -// NewStoreFront creates the backend for the stress test -func NewStoreFront() *StoreFront { - - // Make the Package and Directive chans - packageCh := make(chan Package, 0) - directiveCh := make(chan Directive, 0) - - // Make the Response chan - responseCh := make(chan Response, 0) - - s := &StoreFront{ - TestName: "DefaultTestName", - Precision: "s", - StartDate: "2016-01-02", - BatchSize: 5000, - - packageChan: packageCh, - directiveChan: directiveCh, - - ResultsChan: responseCh, - communes: make(map[string]*commune), - TestID: randStr(10), - } - - // Set the results instance to localhost:8086 by default - s.SetResultsClient(influx.HTTPConfig{ - Addr: fmt.Sprintf("http://%v/", "localhost:8086"), - }) - - // Start the client service - startPonyExpress(packageCh, directiveCh, responseCh, s.TestID) - - // Listen for Results coming in - s.resultsListen() - - return s -} - -// NewTestStoreFront returns a StoreFront to be used for testing Statements -func NewTestStoreFront() (*StoreFront, chan Package, chan Directive) { - - packageCh := make(chan Package, 0) - directiveCh := make(chan Directive, 0) - - s := &StoreFront{ - TestName: "DefaultTestName", - Precision: "s", - StartDate: "2016-01-02", - BatchSize: 5000, - - directiveChan: directiveCh, - packageChan: packageCh, - - communes: make(map[string]*commune), - TestID: randStr(10), - } - - return s, packageCh, directiveCh -} - -// The StoreFront is the Statement facing API that consume Statement output and coordinates the test results -type StoreFront struct { - TestID string - TestName string - - Precision string - StartDate string - BatchSize int - - sync.WaitGroup - sync.Mutex - - packageChan chan<- Package - directiveChan chan<- Directive - - ResultsChan chan Response - communes map[string]*commune - ResultsClient influx.Client -} - -// SendPackage is the public facing API for to send Queries and Points -func (sf *StoreFront) SendPackage(p Package) { - sf.packageChan <- p -} - -// SendDirective is the public facing API to set state variables in the test -func (sf *StoreFront) SendDirective(d Directive) { - sf.directiveChan <- d -} - -// Starts a go routine that listens for Results -func (sf *StoreFront) resultsListen() { - - // Make sure databases for results are created - sf.createDatabase(fmt.Sprintf("_%v", sf.TestName)) - sf.createDatabase(sf.TestName) - - // Listen for Responses - go func() { - - // Prepare a BatchPointsConfig - bpconf := influx.BatchPointsConfig{ - Database: fmt.Sprintf("_%v", sf.TestName), - Precision: "ns", - } - - // Prepare the first batch of points - bp, _ := influx.NewBatchPoints(bpconf) - - // TODO: Panics on resp.Tracer.Done() if there are too many 500s in a row - // Loop over ResultsChan - for resp := range sf.ResultsChan { - switch resp.Point.Name() { - // If the done point comes down the channel write the results - case "done": - sf.ResultsClient.Write(bp) - // Decrement the tracer - resp.Tracer.Done() - // By default fall back to the batcher - default: - // Add the StoreFront tags - pt := resp.AddTags(sf.tags()) - // Add the point to the batch - bp = sf.batcher(pt, bp, bpconf) - // Decrement the tracer - resp.Tracer.Done() - } - } - - }() -} - -// Batches incoming Result.Point and sends them if the batch reaches 5k in sizes -func (sf *StoreFront) batcher(pt *influx.Point, bp influx.BatchPoints, bpconf influx.BatchPointsConfig) influx.BatchPoints { - // If fewer than 5k add point and return - if len(bp.Points()) <= 5000 { - bp.AddPoint(pt) - } else { - // Otherwise send the batch - err := sf.ResultsClient.Write(bp) - - // Check error - if err != nil { - log.Fatalf("Error writing performance stats\n error: %v\n", err) - } - - // Reset the batch of points - bp, _ = influx.NewBatchPoints(bpconf) - } - return bp -} - -// SetResultsClient is the utility for reseting the address of the ResultsClient -func (sf *StoreFront) SetResultsClient(conf influx.HTTPConfig) { - clnt, err := influx.NewHTTPClient(conf) - if err != nil { - log.Fatalf("Error resetting results clien\n error: %v\n", err) - } - sf.ResultsClient = clnt -} - -// Convinence database creation function -func (sf *StoreFront) createDatabase(db string) { - query := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %v", db) - sf.ResultsClient.Query(influx.Query{Command: query}) -} - -// GetStatementResults is a convinence function for fetching all results given a StatementID -func (sf *StoreFront) GetStatementResults(sID, t string) (res []influx.Result) { - // Make the template string - qryStr := fmt.Sprintf(`SELECT * FROM "%v" WHERE statement_id = '%v'`, t, sID) - // Make the query and return the results - return sf.queryTestResults(qryStr) -} - -// Runs given qry on the test results database and returns the results or nil in case of error -func (sf *StoreFront) queryTestResults(qry string) (res []influx.Result) { - q := influx.Query{ - Command: qry, - Database: fmt.Sprintf("_%v", sf.TestName), - } - - response, err := sf.ResultsClient.Query(q) - - if err == nil { - if response.Error() != nil { - log.Fatalf("Error sending results query\n error: %v\n", response.Error()) - } - } - - res = response.Results - - // If there are no results this indicates some kind of error - if res[0].Series == nil { - return nil - } - - return res -} diff --git a/stress/v2/statement/exec.go b/stress/v2/statement/exec.go index a855be60072..b82f71c0aae 100644 --- a/stress/v2/statement/exec.go +++ b/stress/v2/statement/exec.go @@ -3,7 +3,7 @@ package statement import ( "time" - "github.com/influxdata/influxdb/stress/v2/ponyExpress" + "github.com/influxdata/influxdb/stress/v2/stress_client" ) // ExecStatement run outside scripts. This functionality is not built out @@ -21,12 +21,12 @@ func (i *ExecStatement) SetID(s string) { } // Run statisfies the Statement Interface -func (i *ExecStatement) Run(s *ponyExpress.StoreFront) { +func (i *ExecStatement) Run(s *stressClient.StressTest) { runtime := time.Now() i.runtime = time.Since(runtime) } // Report statisfies the Statement Interface -func (i *ExecStatement) Report(s *ponyExpress.StoreFront) string { +func (i *ExecStatement) Report(s *stressClient.StressTest) string { return "" } diff --git a/stress/v2/statement/exec_test.go b/stress/v2/statement/exec_test.go index eab8ea379f1..06c433eac24 100644 --- a/stress/v2/statement/exec_test.go +++ b/stress/v2/statement/exec_test.go @@ -3,7 +3,7 @@ package statement import ( "testing" - "github.com/influxdata/influxdb/stress/v2/ponyExpress" + "github.com/influxdata/influxdb/stress/v2/stress_client" ) func TestExecSetID(t *testing.T) { @@ -17,7 +17,7 @@ func TestExecSetID(t *testing.T) { func TestExecRun(t *testing.T) { e := newTestExec() - s, _, _ := ponyExpress.NewTestStoreFront() + s, _, _ := stressClient.NewTestStressTest() e.Run(s) if e == nil { t.Fail() @@ -26,7 +26,7 @@ func TestExecRun(t *testing.T) { func TestExecReport(t *testing.T) { e := newTestExec() - s, _, _ := ponyExpress.NewTestStoreFront() + s, _, _ := stressClient.NewTestStressTest() rep := e.Report(s) if rep != "" { t.Fail() diff --git a/stress/v2/statement/go.go b/stress/v2/statement/go.go index b24a33c321c..e1d61e7e0f4 100644 --- a/stress/v2/statement/go.go +++ b/stress/v2/statement/go.go @@ -2,8 +2,9 @@ package statement import ( "fmt" + "time" - "github.com/influxdata/influxdb/stress/v2/ponyExpress" + "github.com/influxdata/influxdb/stress/v2/stress_client" ) // GoStatement is a Statement Implementation to allow other statements to be run concurrently @@ -19,7 +20,13 @@ func (i *GoStatement) SetID(s string) { } // Run statisfies the Statement Interface -func (i *GoStatement) Run(s *ponyExpress.StoreFront) { +func (i *GoStatement) Run(s *stressClient.StressTest) { + // TODO: remove + switch i.Statement.(type) { + case *QueryStatement: + time.Sleep(1 * time.Second) + } + s.Add(1) go func() { i.Statement.Run(s) @@ -28,6 +35,6 @@ func (i *GoStatement) Run(s *ponyExpress.StoreFront) { } // Report statisfies the Statement Interface -func (i *GoStatement) Report(s *ponyExpress.StoreFront) string { +func (i *GoStatement) Report(s *stressClient.StressTest) string { return fmt.Sprintf("Go %v", i.Statement.Report(s)) } diff --git a/stress/v2/statement/go_test.go b/stress/v2/statement/go_test.go index c1141b69c86..c9ebba3969b 100644 --- a/stress/v2/statement/go_test.go +++ b/stress/v2/statement/go_test.go @@ -3,7 +3,7 @@ package statement import ( "testing" - "github.com/influxdata/influxdb/stress/v2/ponyExpress" + "github.com/influxdata/influxdb/stress/v2/stress_client" ) func TestGoSetID(t *testing.T) { @@ -17,7 +17,7 @@ func TestGoSetID(t *testing.T) { func TestGoRun(t *testing.T) { e := newTestGo() - s, _, _ := ponyExpress.NewTestStoreFront() + s, _, _ := stressClient.NewTestStressTest() e.Run(s) if e == nil { t.Fail() @@ -26,7 +26,7 @@ func TestGoRun(t *testing.T) { func TestGoReport(t *testing.T) { e := newTestGo() - s, _, _ := ponyExpress.NewTestStoreFront() + s, _, _ := stressClient.NewTestStressTest() report := e.Report(s) if report != "Go " { t.Errorf("Expected: %v\nGot: %v\n", "Go ", report) diff --git a/stress/v2/statement/influxql.go b/stress/v2/statement/influxql.go index 83ac1ed124e..2a1eca2c4b2 100644 --- a/stress/v2/statement/influxql.go +++ b/stress/v2/statement/influxql.go @@ -4,14 +4,14 @@ import ( "log" "time" - "github.com/influxdata/influxdb/stress/v2/ponyExpress" + "github.com/influxdata/influxdb/stress/v2/stress_client" ) // InfluxqlStatement is a Statement Implementation that allows statements that parse in InfluxQL to be passed directly to the target instance type InfluxqlStatement struct { StatementID string Query string - Tracer *ponyExpress.Tracer + Tracer *stressClient.Tracer } func (i *InfluxqlStatement) tags() map[string]string { @@ -25,13 +25,13 @@ func (i *InfluxqlStatement) SetID(s string) { } // Run statisfies the Statement Interface -func (i *InfluxqlStatement) Run(s *ponyExpress.StoreFront) { +func (i *InfluxqlStatement) Run(s *stressClient.StressTest) { // Set the tracer - i.Tracer = ponyExpress.NewTracer(i.tags()) + i.Tracer = stressClient.NewTracer(i.tags()) // Make the Package - p := ponyExpress.NewPackage(ponyExpress.Query, []byte(i.Query), i.StatementID, i.Tracer) + p := stressClient.NewPackage(stressClient.Query, []byte(i.Query), i.StatementID, i.Tracer) // Increment the tracer i.Tracer.Add(1) @@ -45,7 +45,7 @@ func (i *InfluxqlStatement) Run(s *ponyExpress.StoreFront) { // Report statisfies the Statement Interface // No test coverage, fix -func (i *InfluxqlStatement) Report(s *ponyExpress.StoreFront) (out string) { +func (i *InfluxqlStatement) Report(s *stressClient.StressTest) (out string) { allData := s.GetStatementResults(i.StatementID, "query") iqlr := &influxQlReport{ diff --git a/stress/v2/statement/influxql_test.go b/stress/v2/statement/influxql_test.go index 3285772f7b4..74c8b45077e 100644 --- a/stress/v2/statement/influxql_test.go +++ b/stress/v2/statement/influxql_test.go @@ -3,7 +3,7 @@ package statement import ( "testing" - "github.com/influxdata/influxdb/stress/v2/ponyExpress" + "github.com/influxdata/influxdb/stress/v2/stress_client" ) func TestInfluxQlSetID(t *testing.T) { @@ -17,10 +17,10 @@ func TestInfluxQlSetID(t *testing.T) { func TestInfluxQlRun(t *testing.T) { e := newTestInfluxQl() - s, packageCh, _ := ponyExpress.NewTestStoreFront() + s, packageCh, _ := stressClient.NewTestStressTest() go func() { for pkg := range packageCh { - if pkg.T != ponyExpress.Query { + if pkg.T != stressClient.Query { t.Errorf("Expected package to be Query\nGot: %v", pkg.T) } if string(pkg.Body) != e.Query { @@ -38,7 +38,7 @@ func TestInfluxQlRun(t *testing.T) { func newTestInfluxQl() *InfluxqlStatement { return &InfluxqlStatement{ Query: "CREATE DATABASE foo", - Tracer: ponyExpress.NewTracer(make(map[string]string)), + Tracer: stressClient.NewTracer(make(map[string]string)), StatementID: "fooID", } } diff --git a/stress/v2/statement/insert.go b/stress/v2/statement/insert.go index b4bd77a3c2a..bfa0b242f2c 100644 --- a/stress/v2/statement/insert.go +++ b/stress/v2/statement/insert.go @@ -9,7 +9,7 @@ import ( "sync" "time" - "github.com/influxdata/influxdb/stress/v2/ponyExpress" + "github.com/influxdata/influxdb/stress/v2/stress_client" ) // InsertStatement is a Statement Implementation that creates points to be written to the target InfluxDB instance @@ -27,7 +27,7 @@ type InsertStatement struct { TagCount int // The Tracer prevents InsertStatement.Run() from returning early - Tracer *ponyExpress.Tracer + Tracer *stressClient.Tracer // Timestamp is #points to write and percision Timestamp *Timestamp @@ -65,7 +65,7 @@ func (i *InsertStatement) SetID(s string) { } // SetVars sets up the environment for InsertStatement to call it's Run function -func (i *InsertStatement) SetVars(s *ponyExpress.StoreFront) chan<- string { +func (i *InsertStatement) SetVars(s *stressClient.StressTest) chan<- string { // Set the #series at 1 to start i.series = 1 @@ -80,19 +80,19 @@ func (i *InsertStatement) SetVars(s *ponyExpress.StoreFront) chan<- string { // Set the time function, keeps track of 'time' of the points being created i.time = i.Timestamp.Time(s.StartDate, i.series, s.Precision) - // Set a commune on the StoreFront + // Set a commune on the StressTest s.Lock() comCh := s.SetCommune(i.Name) s.Unlock() // Set the tracer - i.Tracer = ponyExpress.NewTracer(i.tags()) + i.Tracer = stressClient.NewTracer(i.tags()) return comCh } // Run statisfies the Statement Interface -func (i *InsertStatement) Run(s *ponyExpress.StoreFront) { +func (i *InsertStatement) Run(s *stressClient.StressTest) { // Set variables on the InsertStatement and make the comCh comCh := i.SetVars(s) @@ -126,7 +126,7 @@ func (i *InsertStatement) Run(s *ponyExpress.StoreFront) { b = b[0 : len(b)-1] // Create the package - p := ponyExpress.NewPackage(ponyExpress.Write, b, i.StatementID, i.Tracer) + p := stressClient.NewPackage(stressClient.Write, b, i.StatementID, i.Tracer) // Use Tracer to wait for all operations to finish i.Tracer.Add(1) @@ -159,7 +159,7 @@ func (i *InsertStatement) Run(s *ponyExpress.StoreFront) { b = b[0 : len(b)-1] // Create the package - p := ponyExpress.NewPackage(ponyExpress.Write, b, i.StatementID, i.Tracer) + p := stressClient.NewPackage(stressClient.Write, b, i.StatementID, i.Tracer) // Use Tracer to wait for all operations to finish i.Tracer.Add(1) @@ -176,8 +176,8 @@ func (i *InsertStatement) Run(s *ponyExpress.StoreFront) { } // Report statisfies the Statement Interface -func (i *InsertStatement) Report(s *ponyExpress.StoreFront) string { - // Pull data via StoreFront client +func (i *InsertStatement) Report(s *stressClient.StressTest) string { + // Pull data via StressTest client allData := s.GetStatementResults(i.StatementID, "write") if allData == nil || allData[0].Series == nil { diff --git a/stress/v2/statement/insert_test.go b/stress/v2/statement/insert_test.go index 847416ffc74..4fc04182327 100644 --- a/stress/v2/statement/insert_test.go +++ b/stress/v2/statement/insert_test.go @@ -4,7 +4,7 @@ import ( "strings" "testing" - "github.com/influxdata/influxdb/stress/v2/ponyExpress" + "github.com/influxdata/influxdb/stress/v2/stress_client" ) func TestInsertSetID(t *testing.T) { @@ -18,7 +18,7 @@ func TestInsertSetID(t *testing.T) { func TestInsertRun(t *testing.T) { i := newTestInsert() - s, packageCh, _ := ponyExpress.NewTestStoreFront() + s, packageCh, _ := stressClient.NewTestStressTest() // Listen to the other side of the directiveCh go func() { for pkg := range packageCh { diff --git a/stress/v2/statement/query.go b/stress/v2/statement/query.go index 75b4be74cba..40c38196a28 100644 --- a/stress/v2/statement/query.go +++ b/stress/v2/statement/query.go @@ -6,7 +6,7 @@ import ( "time" "github.com/influxdata/influxdb/models" - "github.com/influxdata/influxdb/stress/v2/ponyExpress" + "github.com/influxdata/influxdb/stress/v2/stress_client" ) // QueryStatement is a Statement Implementation to run queries on the target InfluxDB instance @@ -22,7 +22,7 @@ type QueryStatement struct { Count int // Tracer for tracking returns - Tracer *ponyExpress.Tracer + Tracer *stressClient.Tracer // track time for all queries runtime time.Duration @@ -40,10 +40,9 @@ func (i *QueryStatement) SetID(s string) { } // Run statisfies the Statement Interface -func (i *QueryStatement) Run(s *ponyExpress.StoreFront) { +func (i *QueryStatement) Run(s *stressClient.StressTest) { - // Set the tracer - i.Tracer = ponyExpress.NewTracer(i.tags()) + i.Tracer = stressClient.NewTracer(i.tags()) vals := make(map[string]interface{}) @@ -58,7 +57,7 @@ func (i *QueryStatement) Run(s *ponyExpress.StoreFront) { b := []byte(i.TemplateString) // Make the package - p := ponyExpress.NewPackage(ponyExpress.Query, b, i.StatementID, i.Tracer) + p := stressClient.NewPackage(stressClient.Query, b, i.StatementID, i.Tracer) // Increment the tracer i.Tracer.Add(1) @@ -71,7 +70,7 @@ func (i *QueryStatement) Run(s *ponyExpress.StoreFront) { // TODO: Currently the program lock up here if s.GetPoint // cannot return a value, which can happen. - // Seee insert.go + // See insert.go s.Lock() point = s.GetPoint(i.Name, s.Precision) s.Unlock() @@ -82,7 +81,7 @@ func (i *QueryStatement) Run(s *ponyExpress.StoreFront) { b := []byte(fmt.Sprintf(i.TemplateString, setArgs(vals, i.Args)...)) // Make the package - p := ponyExpress.NewPackage(ponyExpress.Query, b, i.StatementID, i.Tracer) + p := stressClient.NewPackage(stressClient.Query, b, i.StatementID, i.Tracer) // Increment the tracer i.Tracer.Add(1) @@ -101,8 +100,8 @@ func (i *QueryStatement) Run(s *ponyExpress.StoreFront) { } // Report statisfies the Statement Interface -func (i *QueryStatement) Report(s *ponyExpress.StoreFront) string { - // Pull data via StoreFront client +func (i *QueryStatement) Report(s *stressClient.StressTest) string { + // Pull data via StressTest client allData := s.GetStatementResults(i.StatementID, "query") if len(allData) == 0 || allData[0].Series == nil { diff --git a/stress/v2/statement/query_test.go b/stress/v2/statement/query_test.go index d619a32f2e5..b9b607f8f49 100644 --- a/stress/v2/statement/query_test.go +++ b/stress/v2/statement/query_test.go @@ -3,7 +3,7 @@ package statement import ( "testing" - "github.com/influxdata/influxdb/stress/v2/ponyExpress" + "github.com/influxdata/influxdb/stress/v2/stress_client" ) func TestQuerySetID(t *testing.T) { @@ -17,7 +17,7 @@ func TestQuerySetID(t *testing.T) { func TestQueryRun(t *testing.T) { i := newTestQuery() - s, packageCh, _ := ponyExpress.NewTestStoreFront() + s, packageCh, _ := stressClient.NewTestStressTest() // Listen to the other side of the directiveCh go func() { for pkg := range packageCh { @@ -37,6 +37,6 @@ func newTestQuery() *QueryStatement { TemplateString: "SELECT count(value) FROM cpu", Args: []string{}, Count: 5, - Tracer: ponyExpress.NewTracer(map[string]string{}), + Tracer: stressClient.NewTracer(map[string]string{}), } } diff --git a/stress/v2/statement/set.go b/stress/v2/statement/set.go index 567e0172925..825c74f3b11 100644 --- a/stress/v2/statement/set.go +++ b/stress/v2/statement/set.go @@ -4,8 +4,7 @@ import ( "fmt" "strings" - influx "github.com/influxdata/influxdb/client/v2" - "github.com/influxdata/influxdb/stress/v2/ponyExpress" + "github.com/influxdata/influxdb/stress/v2/stress_client" ) // SetStatement set state variables for the test @@ -15,7 +14,7 @@ type SetStatement struct { StatementID string - Tracer *ponyExpress.Tracer + Tracer *stressClient.Tracer } // SetID statisfies the Statement Interface @@ -24,57 +23,30 @@ func (i *SetStatement) SetID(s string) { } // Run statisfies the Statement Interface -func (i *SetStatement) Run(s *ponyExpress.StoreFront) { - - // Set the Tracer - i.Tracer = ponyExpress.NewTracer(make(map[string]string)) - - // Create a new Directive - d := ponyExpress.NewDirective(strings.ToLower(i.Var), strings.ToLower(i.Value), i.Tracer) - +func (i *SetStatement) Run(s *stressClient.StressTest) { + i.Tracer = stressClient.NewTracer(make(map[string]string)) + d := stressClient.NewDirective(strings.ToLower(i.Var), strings.ToLower(i.Value), i.Tracer) switch d.Property { - - // Needs to be set on both StoreFront and ponyExpress + // Needs to be set on both StressTest and stressClient // Set the write percison for points generated case "precision": s.Precision = d.Value - - // Increment the tracer i.Tracer.Add(1) s.SendDirective(d) - - // Lives on StoreFront + // Lives on StressTest // Set the date for the first point entered into the database case "startdate": s.Lock() s.StartDate = d.Value s.Unlock() - - // Lives on StoreFront + // Lives on StressTest // Set the BatchSize for writes case "batchsize": s.Lock() s.BatchSize = parseInt(d.Value) s.Unlock() - - // Lives on StoreFront - // Reset the ResultsClient to have a new address - case "resultsaddress": - s.Lock() - s.SetResultsClient(influx.HTTPConfig{Addr: fmt.Sprintf("http://%v/", d.Value)}) - s.Unlock() - - // TODO: Make TestName actually change the reporting DB - // Lives on StoreFront - // Set the TestName that controls reporting DB - case "testname": - s.Lock() - s.TestName = d.Value - s.Unlock() - - // All other variables live on ponyExpress + // All other variables live on stressClient default: - // Increment the tracer i.Tracer.Add(1) s.SendDirective(d) } @@ -82,6 +54,6 @@ func (i *SetStatement) Run(s *ponyExpress.StoreFront) { } // Report statisfies the Statement Interface -func (i *SetStatement) Report(s *ponyExpress.StoreFront) string { +func (i *SetStatement) Report(s *stressClient.StressTest) string { return fmt.Sprintf("SET %v = '%v'", i.Var, i.Value) } diff --git a/stress/v2/statement/set_test.go b/stress/v2/statement/set_test.go index 2d455bec8f7..c6c9febb4f5 100644 --- a/stress/v2/statement/set_test.go +++ b/stress/v2/statement/set_test.go @@ -4,7 +4,7 @@ import ( "fmt" "testing" - "github.com/influxdata/influxdb/stress/v2/ponyExpress" + "github.com/influxdata/influxdb/stress/v2/stress_client" ) func TestSetSetID(t *testing.T) { @@ -37,15 +37,15 @@ func TestSetRun(t *testing.T) { func testSetRunUtl(t *testing.T, property string, value string) { i := newTestSet(property, value) - s, _, directiveCh := ponyExpress.NewTestStoreFront() + s, _, directiveCh := stressClient.NewTestStressTest() // Listen to the other side of the directiveCh go func() { for d := range directiveCh { if i.Var != d.Property { - t.Errorf("wrong property sent to ponyExpress\n expected: %v\n got: %v\n", i.Var, d.Property) + t.Errorf("wrong property sent to stressClient\n expected: %v\n got: %v\n", i.Var, d.Property) } if i.Value != d.Value { - t.Errorf("wrong value sent to ponyExpress\n expected: %v\n got: %v\n", i.Value, d.Value) + t.Errorf("wrong value sent to stressClient\n expected: %v\n got: %v\n", i.Value, d.Value) } d.Tracer.Done() } @@ -68,17 +68,13 @@ func testSetRunUtl(t *testing.T, property string, value string) { } // TODO: Actually test this case "resultsaddress": - case "testname": - if i.Value != s.TestName { - t.Errorf("Failed to set %v\n", i.Var) - } default: } } func TestSetReport(t *testing.T) { set := newTestSet("this", "that") - s, _, _ := ponyExpress.NewTestStoreFront() + s, _, _ := stressClient.NewTestStressTest() rpt := set.Report(s) expected := fmt.Sprintf("SET %v = '%v'", set.Var, set.Value) if rpt != expected { @@ -90,7 +86,7 @@ func newTestSet(toSet, value string) *SetStatement { return &SetStatement{ Var: toSet, Value: value, - Tracer: ponyExpress.NewTracer(make(map[string]string)), + Tracer: stressClient.NewTracer(make(map[string]string)), StatementID: "fooID", } } diff --git a/stress/v2/statement/statement.go b/stress/v2/statement/statement.go index 143d87c95c0..53cd40060fe 100644 --- a/stress/v2/statement/statement.go +++ b/stress/v2/statement/statement.go @@ -4,14 +4,14 @@ import ( "log" "strconv" - "github.com/influxdata/influxdb/stress/v2/ponyExpress" + "github.com/influxdata/influxdb/stress/v2/stress_client" ) // Statement is the common interface to shape the testing environment and prepare database requests // The parser turns the 'statements' in the config file into Statements type Statement interface { - Run(s *ponyExpress.StoreFront) - Report(s *ponyExpress.StoreFront) string + Run(s *stressClient.StressTest) + Report(s *stressClient.StressTest) string SetID(s string) } diff --git a/stress/v2/statement/wait.go b/stress/v2/statement/wait.go index 69b11845f5a..e047761d868 100644 --- a/stress/v2/statement/wait.go +++ b/stress/v2/statement/wait.go @@ -4,7 +4,7 @@ import ( "fmt" "time" - "github.com/influxdata/influxdb/stress/v2/ponyExpress" + "github.com/influxdata/influxdb/stress/v2/stress_client" ) // WaitStatement is a Statement Implementation to prevent the test from returning to early when running GoStatements @@ -20,13 +20,13 @@ func (w *WaitStatement) SetID(s string) { } // Run statisfies the Statement Interface -func (w *WaitStatement) Run(s *ponyExpress.StoreFront) { +func (w *WaitStatement) Run(s *stressClient.StressTest) { runtime := time.Now() s.Wait() w.runtime = time.Since(runtime) } // Report statisfies the Statement Interface -func (w *WaitStatement) Report(s *ponyExpress.StoreFront) string { +func (w *WaitStatement) Report(s *stressClient.StressTest) string { return fmt.Sprintf("WAIT -> %v", w.runtime) } diff --git a/stress/v2/statement/wait_test.go b/stress/v2/statement/wait_test.go index ac62697debf..5ad0b32a9cc 100644 --- a/stress/v2/statement/wait_test.go +++ b/stress/v2/statement/wait_test.go @@ -4,7 +4,7 @@ import ( "strings" "testing" - "github.com/influxdata/influxdb/stress/v2/ponyExpress" + "github.com/influxdata/influxdb/stress/v2/stress_client" ) func TestWaitSetID(t *testing.T) { @@ -18,7 +18,7 @@ func TestWaitSetID(t *testing.T) { func TestWaitRun(t *testing.T) { e := newTestWait() - s, _, _ := ponyExpress.NewTestStoreFront() + s, _, _ := stressClient.NewTestStressTest() e.Run(s) if e == nil { t.Fail() @@ -27,7 +27,7 @@ func TestWaitRun(t *testing.T) { func TestWaitReport(t *testing.T) { e := newTestWait() - s, _, _ := ponyExpress.NewTestStoreFront() + s, _, _ := stressClient.NewTestStressTest() rpt := e.Report(s) if !strings.Contains(rpt, "WAIT") { t.Fail() diff --git a/stress/v2/ponyExpress/commune.go b/stress/v2/stress_client/commune.go similarity index 79% rename from stress/v2/ponyExpress/commune.go rename to stress/v2/stress_client/commune.go index cc65ea9617b..abd99298445 100644 --- a/stress/v2/ponyExpress/commune.go +++ b/stress/v2/stress_client/commune.go @@ -1,4 +1,4 @@ -package ponyExpress +package stressClient import ( "log" @@ -37,17 +37,17 @@ func (c *commune) point(precision string) models.Point { return p[0] } -// SetCommune creates a new commune on the StoreFront -func (sf *StoreFront) SetCommune(name string) chan<- string { +// SetCommune creates a new commune on the StressTest +func (st *StressTest) SetCommune(name string) chan<- string { com := newCommune(10) - sf.communes[name] = com + st.communes[name] = com return com.ch } // GetPoint is called by a QueryStatement and retrieves a point sent by the associated InsertStatement -func (sf *StoreFront) GetPoint(name, precision string) models.Point { - p := sf.communes[name].point(precision) +func (st *StressTest) GetPoint(name, precision string) models.Point { + p := st.communes[name].point(precision) // Function needs to return a point. Panic if it doesn't if p == nil { diff --git a/stress/v2/ponyExpress/commune_test.go b/stress/v2/stress_client/commune_test.go similarity index 96% rename from stress/v2/ponyExpress/commune_test.go rename to stress/v2/stress_client/commune_test.go index fafe84366cb..d28f1febd6d 100644 --- a/stress/v2/ponyExpress/commune_test.go +++ b/stress/v2/stress_client/commune_test.go @@ -1,4 +1,4 @@ -package ponyExpress +package stressClient import ( "testing" @@ -33,7 +33,7 @@ func TestCommunePoint(t *testing.T) { } func TestSetCommune(t *testing.T) { - sf, _, _ := NewTestStoreFront() + sf, _, _ := NewTestStressTest() ch := sf.SetCommune("foo_name") ch <- "write,tag=tagVal fooField=5 1460912595" pt := sf.GetPoint("foo_name", "s") diff --git a/stress/v2/ponyExpress/directive.go b/stress/v2/stress_client/directive.go similarity index 88% rename from stress/v2/ponyExpress/directive.go rename to stress/v2/stress_client/directive.go index eb698efd523..e8efc522518 100644 --- a/stress/v2/ponyExpress/directive.go +++ b/stress/v2/stress_client/directive.go @@ -1,6 +1,6 @@ -package ponyExpress +package stressClient -// Directive is a struct to enable communication between SetStatements and the ponyExpress backend +// Directive is a struct to enable communication between SetStatements and the stressClient backend // Directives change state for the stress test type Directive struct { Property string diff --git a/stress/v2/ponyExpress/directive_test.go b/stress/v2/stress_client/directive_test.go similarity index 94% rename from stress/v2/ponyExpress/directive_test.go rename to stress/v2/stress_client/directive_test.go index 8c5cee5a06d..3782a10f838 100644 --- a/stress/v2/ponyExpress/directive_test.go +++ b/stress/v2/stress_client/directive_test.go @@ -1,4 +1,4 @@ -package ponyExpress +package stressClient import ( "testing" diff --git a/stress/v2/ponyExpress/package.go b/stress/v2/stress_client/package.go similarity index 84% rename from stress/v2/ponyExpress/package.go rename to stress/v2/stress_client/package.go index e277c510ffd..29eb869f48d 100644 --- a/stress/v2/ponyExpress/package.go +++ b/stress/v2/stress_client/package.go @@ -1,6 +1,6 @@ -package ponyExpress +package stressClient -// Package is a struct to enable communication between InsertStatements, QueryStatements and InfluxQLStatements and the ponyExpress backend +// Package is a struct to enable communication between InsertStatements, QueryStatements and InfluxQLStatements and the stressClient backend // Packages carry either writes or queries in the []byte that makes up the Body type Package struct { T Type diff --git a/stress/v2/ponyExpress/package_test.go b/stress/v2/stress_client/package_test.go similarity index 93% rename from stress/v2/ponyExpress/package_test.go rename to stress/v2/stress_client/package_test.go index b52a7d4d581..f52c665a943 100644 --- a/stress/v2/ponyExpress/package_test.go +++ b/stress/v2/stress_client/package_test.go @@ -1,4 +1,4 @@ -package ponyExpress +package stressClient import ( "testing" diff --git a/stress/v2/ponyExpress/reporting.go b/stress/v2/stress_client/reporting.go similarity index 60% rename from stress/v2/ponyExpress/reporting.go rename to stress/v2/stress_client/reporting.go index 3a9c0416fd8..35bfed2684e 100644 --- a/stress/v2/ponyExpress/reporting.go +++ b/stress/v2/stress_client/reporting.go @@ -1,4 +1,4 @@ -package ponyExpress +package stressClient import ( "log" @@ -8,37 +8,37 @@ import ( influx "github.com/influxdata/influxdb/client/v2" ) -// reporting.go contains functions to emit tags and points from various parts of ponyExpress +// reporting.go contains functions to emit tags and points from various parts of stressClient // These points are then written to the ("_%v", sf.TestName) database -// These are the tags that ponyExpress adds to any response points -func (pe *ponyExpress) tags(statementID string) map[string]string { +// These are the tags that stressClient adds to any response points +func (sc *stressClient) tags(statementID string) map[string]string { tags := map[string]string{ - "number_targets": fmtInt(len(pe.addresses)), - "precision": pe.precision, - "writers": fmtInt(pe.wconc), - "readers": fmtInt(pe.qconc), - "test_id": pe.testID, + "number_targets": fmtInt(len(sc.addresses)), + "precision": sc.precision, + "writers": fmtInt(sc.wconc), + "readers": fmtInt(sc.qconc), + "test_id": sc.testID, "statement_id": statementID, - "write_interval": pe.wdelay, - "query_interval": pe.qdelay, + "write_interval": sc.wdelay, + "query_interval": sc.qdelay, } return tags } -// These are the tags that the StoreFront adds to any response points -func (sf *StoreFront) tags() map[string]string { +// These are the tags that the StressTest adds to any response points +func (st *StressTest) tags() map[string]string { tags := map[string]string{ - "precision": sf.Precision, - "batch_size": fmtInt(sf.BatchSize), + "precision": st.Precision, + "batch_size": fmtInt(st.BatchSize), } return tags } // This function makes a *client.Point for reporting on writes -func (pe *ponyExpress) writePoint(retries int, statementID string, statusCode int, responseTime time.Duration, addedTags map[string]string, writeBytes int) *influx.Point { +func (sc *stressClient) writePoint(retries int, statementID string, statusCode int, responseTime time.Duration, addedTags map[string]string, writeBytes int) *influx.Point { - tags := sumTags(pe.tags(statementID), addedTags) + tags := sumTags(sc.tags(statementID), addedTags) fields := map[string]interface{}{ "status_code": statusCode, @@ -56,9 +56,9 @@ func (pe *ponyExpress) writePoint(retries int, statementID string, statusCode in } // This function makes a *client.Point for reporting on queries -func (pe *ponyExpress) queryPoint(statementID string, body []byte, statusCode int, responseTime time.Duration, addedTags map[string]string) *influx.Point { +func (sc *stressClient) queryPoint(statementID string, body []byte, statusCode int, responseTime time.Duration, addedTags map[string]string) *influx.Point { - tags := sumTags(pe.tags(statementID), addedTags) + tags := sumTags(sc.tags(statementID), addedTags) fields := map[string]interface{}{ "status_code": statusCode, diff --git a/stress/v2/ponyExpress/reporting_test.go b/stress/v2/stress_client/reporting_test.go similarity index 88% rename from stress/v2/ponyExpress/reporting_test.go rename to stress/v2/stress_client/reporting_test.go index 412f4a9854b..bb287f4c295 100644 --- a/stress/v2/ponyExpress/reporting_test.go +++ b/stress/v2/stress_client/reporting_test.go @@ -1,12 +1,12 @@ -package ponyExpress +package stressClient import ( "testing" "time" ) -func TestNewPonyExpressTags(t *testing.T) { - pe, _, _ := newTestPonyExpress("localhost:8086") +func TestNewStressClientTags(t *testing.T) { + pe, _, _ := newTestStressClient("localhost:8086") tags := pe.tags("foo_id") expected := fmtInt(len(pe.addresses)) got := tags["number_targets"] @@ -30,8 +30,8 @@ func TestNewPonyExpressTags(t *testing.T) { } } -func TestNewStorefrontTags(t *testing.T) { - sf, _, _ := NewTestStoreFront() +func TestNewStressTestTags(t *testing.T) { + sf, _, _ := NewTestStressTest() tags := sf.tags() expected := sf.Precision got := tags["precision"] @@ -46,7 +46,7 @@ func TestNewStorefrontTags(t *testing.T) { } func TestWritePoint(t *testing.T) { - pe, _, _ := newTestPonyExpress("localhost:8086") + pe, _, _ := newTestStressClient("localhost:8086") statementID := "foo_id" responseCode := 200 responseTime := time.Duration(10 * time.Millisecond) @@ -69,7 +69,7 @@ func TestWritePoint(t *testing.T) { } func TestQueryPoint(t *testing.T) { - pe, _, _ := newTestPonyExpress("localhost:8086") + pe, _, _ := newTestStressClient("localhost:8086") statementID := "foo_id" responseCode := 200 body := []byte{12} diff --git a/stress/v2/ponyExpress/response.go b/stress/v2/stress_client/response.go similarity index 98% rename from stress/v2/ponyExpress/response.go rename to stress/v2/stress_client/response.go index 6a0046775cf..ac6a942e10c 100644 --- a/stress/v2/ponyExpress/response.go +++ b/stress/v2/stress_client/response.go @@ -1,4 +1,4 @@ -package ponyExpress +package stressClient import ( "log" diff --git a/stress/v2/ponyExpress/response_test.go b/stress/v2/stress_client/response_test.go similarity index 94% rename from stress/v2/ponyExpress/response_test.go rename to stress/v2/stress_client/response_test.go index 774691ddaee..30e56746176 100644 --- a/stress/v2/ponyExpress/response_test.go +++ b/stress/v2/stress_client/response_test.go @@ -1,4 +1,4 @@ -package ponyExpress +package stressClient import ( "testing" diff --git a/stress/v2/stress_client/stressTest.go b/stress/v2/stress_client/stressTest.go new file mode 100644 index 00000000000..1371e97d88d --- /dev/null +++ b/stress/v2/stress_client/stressTest.go @@ -0,0 +1,172 @@ +package stressClient + +import ( + "fmt" + "log" + "sync" + + influx "github.com/influxdata/influxdb/client/v2" +) + +// NewStressTest creates the backend for the stress test +func NewStressTest() *StressTest { + + packageCh := make(chan Package, 0) + directiveCh := make(chan Directive, 0) + responseCh := make(chan Response, 0) + + clnt, _ := influx.NewHTTPClient(influx.HTTPConfig{ + Addr: fmt.Sprintf("http://%v/", "localhost:8086"), + }) + + s := &StressTest{ + TestDB: "_stressTest", + Precision: "s", + StartDate: "2016-01-02", + BatchSize: 5000, + + packageChan: packageCh, + directiveChan: directiveCh, + + ResultsClient: clnt, + ResultsChan: responseCh, + communes: make(map[string]*commune), + TestID: randStr(10), + } + + // Start the client service + startStressClient(packageCh, directiveCh, responseCh, s.TestID) + + // Listen for Results coming in + s.resultsListen() + + return s +} + +// NewTestStressTest returns a StressTest to be used for testing Statements +func NewTestStressTest() (*StressTest, chan Package, chan Directive) { + + packageCh := make(chan Package, 0) + directiveCh := make(chan Directive, 0) + + s := &StressTest{ + TestDB: "_stressTest", + Precision: "s", + StartDate: "2016-01-02", + BatchSize: 5000, + + directiveChan: directiveCh, + packageChan: packageCh, + + communes: make(map[string]*commune), + TestID: randStr(10), + } + + return s, packageCh, directiveCh +} + +// The StressTest is the Statement facing API that consumes Statement output and coordinates the test results +type StressTest struct { + TestID string + TestDB string + + Precision string + StartDate string + BatchSize int + + sync.WaitGroup + sync.Mutex + + packageChan chan<- Package + directiveChan chan<- Directive + + ResultsChan chan Response + communes map[string]*commune + ResultsClient influx.Client +} + +// SendPackage is the public facing API for to send Queries and Points +func (st *StressTest) SendPackage(p Package) { + st.packageChan <- p +} + +// SendDirective is the public facing API to set state variables in the test +func (st *StressTest) SendDirective(d Directive) { + st.directiveChan <- d +} + +// Starts a go routine that listens for Results +func (st *StressTest) resultsListen() { + st.createDatabase(st.TestDB) + go func() { + bp := st.NewResultsPointBatch() + for resp := range st.ResultsChan { + switch resp.Point.Name() { + case "done": + st.ResultsClient.Write(bp) + resp.Tracer.Done() + default: + // Add the StressTest tags + pt := resp.AddTags(st.tags()) + // Add the point to the batch + bp = st.batcher(pt, bp) + resp.Tracer.Done() + } + } + }() +} + +// NewResultsPointBatch creates a new batch of points for the results +func (st *StressTest) NewResultsPointBatch() influx.BatchPoints { + bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{ + Database: st.TestDB, + Precision: "ns", + }) + return bp +} + +// Batches incoming Result.Point and sends them if the batch reaches 5k in size +func (st *StressTest) batcher(pt *influx.Point, bp influx.BatchPoints) influx.BatchPoints { + if len(bp.Points()) <= 5000 { + bp.AddPoint(pt) + } else { + err := st.ResultsClient.Write(bp) + if err != nil { + log.Fatalf("Error writing performance stats\n error: %v\n", err) + } + bp = st.NewResultsPointBatch() + } + return bp +} + +// Convinence database creation function +func (st *StressTest) createDatabase(db string) { + query := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %v", db) + res, err := st.ResultsClient.Query(influx.Query{Command: query}) + if err != nil { + log.Fatalf("error: no running influx server at localhost:8086") + if res.Error() != nil { + log.Fatalf("error: no running influx server at localhost:8086") + } + } +} + +// GetStatementResults is a convinence function for fetching all results given a StatementID +func (st *StressTest) GetStatementResults(sID, t string) (res []influx.Result) { + qryStr := fmt.Sprintf(`SELECT * FROM "%v" WHERE statement_id = '%v'`, t, sID) + return st.queryTestResults(qryStr) +} + +// Runs given qry on the test results database and returns the results or nil in case of error +func (st *StressTest) queryTestResults(qry string) (res []influx.Result) { + response, err := st.ResultsClient.Query(influx.Query{Command: qry, Database: st.TestDB}) + if err == nil { + if response.Error() != nil { + log.Fatalf("Error sending results query\n error: %v\n", response.Error()) + } + } + if response.Results[0].Series == nil { + return nil + } + return response.Results +} diff --git a/stress/v2/ponyExpress/storeFront_test.go b/stress/v2/stress_client/stressTest_test.go similarity index 76% rename from stress/v2/ponyExpress/storeFront_test.go rename to stress/v2/stress_client/stressTest_test.go index 76ad6b65dbe..4efdea3edc9 100644 --- a/stress/v2/ponyExpress/storeFront_test.go +++ b/stress/v2/stress_client/stressTest_test.go @@ -1,7 +1,6 @@ -package ponyExpress +package stressClient import ( - "fmt" "testing" "time" @@ -18,15 +17,15 @@ func NewBlankTestPoint() *influx.Point { return pt } -func TestStoreFrontBatcher(t *testing.T) { - sf, _, _ := NewTestStoreFront() +func TestStressTestBatcher(t *testing.T) { + sf, _, _ := NewTestStressTest() bpconf := influx.BatchPointsConfig{ - Database: fmt.Sprintf("_%v", sf.TestName), + Database: sf.TestDB, Precision: "ns", } bp, _ := influx.NewBatchPoints(bpconf) pt := NewBlankTestPoint() - bp = sf.batcher(pt, bp, bpconf) + bp = sf.batcher(pt, bp) if len(bp.Points()) != 1 { t.Fail() } diff --git a/stress/v2/ponyExpress/ponyExpress.go b/stress/v2/stress_client/stress_client.go similarity index 63% rename from stress/v2/ponyExpress/ponyExpress.go rename to stress/v2/stress_client/stress_client.go index 3b4af7e527f..f5d691bedb0 100644 --- a/stress/v2/ponyExpress/ponyExpress.go +++ b/stress/v2/stress_client/stress_client.go @@ -1,4 +1,4 @@ -package ponyExpress +package stressClient import ( "strings" @@ -14,9 +14,9 @@ const ( Query ) -func startPonyExpress(packageCh <-chan Package, directiveCh <-chan Directive, responseCh chan<- Response, testID string) { +func startStressClient(packageCh <-chan Package, directiveCh <-chan Directive, responseCh chan<- Response, testID string) { - c := &ponyExpress{ + c := &stressClient{ testID: testID, addresses: []string{"localhost:8086"}, @@ -39,12 +39,11 @@ func startPonyExpress(packageCh <-chan Package, directiveCh <-chan Directive, re } // start listening for writes and queries go c.listen() - // start listening for state changes go c.directiveListen() } -type ponyExpress struct { +type stressClient struct { testID string // State for the Stress Test @@ -78,11 +77,11 @@ type ponyExpress struct { rc *ConcurrencyLimiter } -// NewTestPonyExpress returns a blank ponyExpress for testing -func newTestPonyExpress(url string) (*ponyExpress, chan Directive, chan Package) { +// NewTestStressClient returns a blank stressClient for testing +func newTestStressClient(url string) (*stressClient, chan Directive, chan Package) { pkgChan := make(chan Package) dirChan := make(chan Directive) - pe := &ponyExpress{ + pe := &stressClient{ testID: "foo_id", addresses: []string{url}, precision: "s", @@ -103,30 +102,22 @@ func newTestPonyExpress(url string) (*ponyExpress, chan Directive, chan Package) return pe, dirChan, pkgChan } -// client starts listening for Packages on the main channel -func (pe *ponyExpress) listen() { - - defer pe.Wait() - - // Keep track of number of concurrent readers and writers seperately - pe.wc = NewConcurrencyLimiter(pe.wconc) - pe.rc = NewConcurrencyLimiter(pe.qconc) - - // Manage overall number of goroutines and keep at 2 x (wconc + qconc) - l := NewConcurrencyLimiter((pe.wconc + pe.qconc) * 2) - - // Concume incoming packages +// stressClient starts listening for Packages on the main channel +func (sc *stressClient) listen() { + defer sc.Wait() + sc.wc = NewConcurrencyLimiter(sc.wconc) + sc.rc = NewConcurrencyLimiter(sc.qconc) + l := NewConcurrencyLimiter((sc.wconc + sc.qconc) * 2) counter := 0 - for p := range pe.packageChan { - serv := counter % len(pe.addresses) + for p := range sc.packageChan { l.Increment() go func(p Package) { defer l.Decrement() switch p.T { case Write: - pe.spinOffWritePackage(p, serv) + sc.spinOffWritePackage(p, (counter % len(sc.addresses))) case Query: - pe.spinOffQueryPackage(p, serv) + sc.spinOffQueryPackage(p, (counter % len(sc.addresses))) } }(p) counter++ @@ -135,64 +126,50 @@ func (pe *ponyExpress) listen() { } // Set handles all SET requests for test state -func (pe *ponyExpress) directiveListen() { - for d := range pe.directiveChan { - pe.Lock() +func (sc *stressClient) directiveListen() { + for d := range sc.directiveChan { + sc.Lock() switch d.Property { - // addresses is a []string of target InfluxDB instance(s) for the test // comes in as a "|" seperated array of addresses case "addresses": addr := strings.Split(d.Value, "|") - pe.addresses = addr - + sc.addresses = addr // percison is the write precision for InfluxDB case "precision": - pe.precision = d.Value - + sc.precision = d.Value // writeinterval is an optional delay between batches case "writeinterval": - pe.wdelay = d.Value - + sc.wdelay = d.Value // queryinterval is an optional delay between the batches case "queryinterval": - pe.qdelay = d.Value - + sc.qdelay = d.Value // database is the InfluxDB database to target for both writes and queries case "database": - pe.database = d.Value - + sc.database = d.Value // username for the target database case "username": - pe.username = d.Value - + sc.username = d.Value // username for the target database case "password": - pe.password = d.Value - - // use https if the there is a value for ssl + sc.password = d.Value + // use https if sent true case "ssl": if d.Value == "true" { - pe.ssl = true + sc.ssl = true } - - // concurrency is the number concurrent writers to the database + // concurrency is the number concurrent writers to the database case "writeconcurrency": conc := parseInt(d.Value) - pe.wconc = conc - // Reset the ConcurrencyLimiter - pe.wc.NewMax(conc) - - // concurrentqueries is the number of concurrent queries to run against the database + sc.wconc = conc + sc.wc.NewMax(conc) + // concurrentqueries is the number of concurrent queriers database case "queryconcurrency": conc := parseInt(d.Value) - pe.qconc = conc - // Reset the ConcurrencyLimiter - pe.rc.NewMax(conc) + sc.qconc = conc + sc.rc.NewMax(conc) } - - // Decrement the tracker d.Tracer.Done() - pe.Unlock() + sc.Unlock() } } diff --git a/stress/v2/ponyExpress/ponyExpress_query.go b/stress/v2/stress_client/stress_client_query.go similarity index 65% rename from stress/v2/ponyExpress/ponyExpress_query.go rename to stress/v2/stress_client/stress_client_query.go index b956f557619..3cb64c571d8 100644 --- a/stress/v2/ponyExpress/ponyExpress_query.go +++ b/stress/v2/stress_client/stress_client_query.go @@ -1,4 +1,4 @@ -package ponyExpress +package stressClient import ( "fmt" @@ -9,38 +9,38 @@ import ( "time" ) -func (pe *ponyExpress) spinOffQueryPackage(p Package, serv int) { - pe.Add(1) - pe.rc.Increment() +func (sc *stressClient) spinOffQueryPackage(p Package, serv int) { + sc.Add(1) + sc.rc.Increment() go func() { // Send the query - pe.prepareQuerySend(p, serv) - pe.Done() - pe.rc.Decrement() + sc.prepareQuerySend(p, serv) + sc.Done() + sc.rc.Decrement() }() } // Prepares to send the GET request -func (pe *ponyExpress) prepareQuerySend(p Package, serv int) { +func (sc *stressClient) prepareQuerySend(p Package, serv int) { var queryTemplate string - if pe.ssl { + if sc.ssl { queryTemplate = "https://%v/query?db=%v&q=%v&u=%v&p=%v" } else { queryTemplate = "http://%v/query?db=%v&q=%v&u=%v&p=%v" } - queryURL := fmt.Sprintf(queryTemplate, pe.addresses[serv], pe.database, url.QueryEscape(string(p.Body)), pe.username, pe.password) + queryURL := fmt.Sprintf(queryTemplate, sc.addresses[serv], sc.database, url.QueryEscape(string(p.Body)), sc.username, sc.password) // Send the query - pe.makeGet(queryURL, p.StatementID, p.Tracer) + sc.makeGet(queryURL, p.StatementID, p.Tracer) // Query Interval enforcement - qi, _ := time.ParseDuration(pe.qdelay) + qi, _ := time.ParseDuration(sc.qdelay) time.Sleep(qi) } // Sends the GET request, reads it, and handles errors -func (pe *ponyExpress) makeGet(addr, statementID string, tr *Tracer) { +func (sc *stressClient) makeGet(addr, statementID string, tr *Tracer) { // Make GET request t := time.Now() @@ -65,7 +65,7 @@ func (pe *ponyExpress) makeGet(addr, statementID string, tr *Tracer) { } // Send the response - pe.responseChan <- NewResponse(pe.queryPoint(statementID, body, resp.StatusCode, elapsed, tr.Tags), tr) + sc.responseChan <- NewResponse(sc.queryPoint(statementID, body, resp.StatusCode, elapsed, tr.Tags), tr) } func success(r *http.Response) bool { diff --git a/stress/v2/ponyExpress/ponyExpress_write.go b/stress/v2/stress_client/stress_client_write.go similarity index 74% rename from stress/v2/ponyExpress/ponyExpress_write.go rename to stress/v2/stress_client/stress_client_write.go index c29bef6a050..a9880869700 100644 --- a/stress/v2/ponyExpress/ponyExpress_write.go +++ b/stress/v2/stress_client/stress_client_write.go @@ -1,4 +1,4 @@ -package ponyExpress +package stressClient import ( "bytes" @@ -14,18 +14,18 @@ import ( // ############################################### // Packages up Package from channel in goroutine -func (pe *ponyExpress) spinOffWritePackage(p Package, serv int) { - pe.Add(1) - pe.wc.Increment() +func (sc *stressClient) spinOffWritePackage(p Package, serv int) { + sc.Add(1) + sc.wc.Increment() go func() { - pe.retry(p, time.Duration(time.Nanosecond), serv) - pe.Done() - pe.wc.Decrement() + sc.retry(p, time.Duration(time.Nanosecond), serv) + sc.Done() + sc.wc.Decrement() }() } // Implements backoff and retry logic for 500 responses -func (pe *ponyExpress) retry(p Package, backoff time.Duration, serv int) { +func (sc *stressClient) retry(p Package, backoff time.Duration, serv int) { // Set Backoff Interval to 500ms backoffInterval := time.Duration(500 * time.Millisecond) @@ -34,7 +34,7 @@ func (pe *ponyExpress) retry(p Package, backoff time.Duration, serv int) { bo := backoff + backoffInterval // Make the write request - resp, elapsed, err := pe.prepareWrite(p.Body, serv) + resp, elapsed, err := sc.prepareWrite(p.Body, serv) // Find number of times request has been retried numBackoffs := int(bo/backoffInterval) - 1 @@ -49,13 +49,13 @@ func (pe *ponyExpress) retry(p Package, backoff time.Duration, serv int) { } // Make a point for reporting - point := pe.writePoint(numBackoffs, p.StatementID, statusCode, elapsed, p.Tracer.Tags, len(p.Body)) + point := sc.writePoint(numBackoffs, p.StatementID, statusCode, elapsed, p.Tracer.Tags, len(p.Body)) // Send the Response(point, tracer) - pe.responseChan <- NewResponse(point, p.Tracer) + sc.responseChan <- NewResponse(point, p.Tracer) // BatchInterval enforcement - bi, _ := time.ParseDuration(pe.wdelay) + bi, _ := time.ParseDuration(sc.wdelay) time.Sleep(bi) // Retry if the statusCode was not 204 or the err != nil @@ -66,22 +66,22 @@ func (pe *ponyExpress) retry(p Package, backoff time.Duration, serv int) { fmt.Println(err) // Backoff enforcement time.Sleep(bo) - pe.retry(p, bo, serv) + sc.retry(p, bo, serv) } } // Prepares to send the POST request -func (pe *ponyExpress) prepareWrite(points []byte, serv int) (*http.Response, time.Duration, error) { +func (sc *stressClient) prepareWrite(points []byte, serv int) (*http.Response, time.Duration, error) { // Construct address string var writeTemplate string - if pe.ssl { + if sc.ssl { writeTemplate = "https://%v/write?db=%v&precision=%v&u=%v&p=%v" } else { writeTemplate = "http://%v/write?db=%v&precision=%v&u=%v&p=%v" } - address := fmt.Sprintf(writeTemplate, pe.addresses[serv], pe.database, pe.precision, pe.username, pe.password) + address := fmt.Sprintf(writeTemplate, sc.addresses[serv], sc.database, sc.precision, sc.username, sc.password) // Start timer t := time.Now() diff --git a/stress/v2/ponyExpress/tracer.go b/stress/v2/stress_client/tracer.go similarity index 93% rename from stress/v2/ponyExpress/tracer.go rename to stress/v2/stress_client/tracer.go index e2a72549388..0256ba2edcc 100644 --- a/stress/v2/ponyExpress/tracer.go +++ b/stress/v2/stress_client/tracer.go @@ -1,4 +1,4 @@ -package ponyExpress +package stressClient import ( "sync" diff --git a/stress/v2/ponyExpress/tracer_test.go b/stress/v2/stress_client/tracer_test.go similarity index 93% rename from stress/v2/ponyExpress/tracer_test.go rename to stress/v2/stress_client/tracer_test.go index 0c5892f96d9..f764dfdc7bd 100644 --- a/stress/v2/ponyExpress/tracer_test.go +++ b/stress/v2/stress_client/tracer_test.go @@ -1,4 +1,4 @@ -package ponyExpress +package stressClient import ( "testing" diff --git a/stress/v2/ponyExpress/util.go b/stress/v2/stress_client/util.go similarity index 98% rename from stress/v2/ponyExpress/util.go rename to stress/v2/stress_client/util.go index fcc5722292e..e6ec53ab9d0 100644 --- a/stress/v2/ponyExpress/util.go +++ b/stress/v2/stress_client/util.go @@ -1,4 +1,4 @@ -package ponyExpress +package stressClient import ( "crypto/rand" diff --git a/stress/v2/stressql/parser_test.go b/stress/v2/stressql/parser_test.go index 586be41b2bb..f66d64d0b45 100644 --- a/stress/v2/stressql/parser_test.go +++ b/stress/v2/stressql/parser_test.go @@ -4,7 +4,7 @@ import "testing" // Pulls the default configFile and makes sure it parses func TestParseStatements(t *testing.T) { - stmts, err := ParseStatements("../file.iql") + stmts, err := ParseStatements("../iql/file.iql") if err != nil { t.Error(err) }