From 74e4b848417dcf1f88af02ecaefecb36ec9e3e4a Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Thu, 8 Jun 2017 10:42:10 -0500 Subject: [PATCH] Refactor how results are returned to the client This refactors the internal result returning system to match more closely how we iterative return points between result sets, series, and rows within the series. It uses the updated terminology rather than older terminology that no longer matches how we refer to things in the documentation or within the query engine. The refactor moves the aggregation and chunking behavior from `influxql.Emitter` to the HTTP service so that behavior is isolated to one location rather than sprinkled around in multiple locations. --- coordinator/points_writer_test.go | 110 --- coordinator/statement_executor.go | 826 ++++++++++-------- coordinator/statement_executor_test.go | 37 +- query/emitter.go | 112 +-- query/emitter_test.go | 146 ++-- query/iterator.gen.go | 35 +- query/iterator.gen.go.tmpl | 9 +- query/query_executor.go | 100 ++- query/query_executor_test.go | 135 +-- query/result.go | 106 ++- query/task_manager.go | 44 +- services/continuous_querier/service.go | 9 +- services/continuous_querier/service_test.go | 43 +- services/httpd/encoder.go | 281 ++++++ services/httpd/encoder_test.go | 79 ++ services/httpd/formatters.go | 161 ++++ ...onse_writer_test.go => formatters_test.go} | 56 +- services/httpd/handler.go | 254 ++---- services/httpd/handler_test.go | 116 +-- services/httpd/response_writer.go | 188 ---- tests/server_test.go | 8 +- 21 files changed, 1520 insertions(+), 1335 deletions(-) create mode 100644 services/httpd/encoder.go create mode 100644 services/httpd/encoder_test.go create mode 100644 services/httpd/formatters.go rename services/httpd/{response_writer_test.go => formatters_test.go} (51%) delete mode 100644 services/httpd/response_writer.go diff --git a/coordinator/points_writer_test.go b/coordinator/points_writer_test.go index 438460d4183..2d02485ea87 100644 --- a/coordinator/points_writer_test.go +++ b/coordinator/points_writer_test.go @@ -436,116 +436,6 @@ func (f *fakePointsWriter) WritePointsInto(req *coordinator.IntoWriteRequest) er return f.WritePointsIntoFn(req) } -func TestBufferedPointsWriter(t *testing.T) { - db := "db0" - rp := "rp0" - capacity := 10000 - - writePointsIntoCnt := 0 - pointsWritten := []models.Point{} - - reset := func() { - writePointsIntoCnt = 0 - pointsWritten = pointsWritten[:0] - } - - fakeWriter := &fakePointsWriter{ - WritePointsIntoFn: func(req *coordinator.IntoWriteRequest) error { - writePointsIntoCnt++ - pointsWritten = append(pointsWritten, req.Points...) - return nil - }, - } - - w := coordinator.NewBufferedPointsWriter(fakeWriter, db, rp, capacity) - - // Test that capacity and length are correct for new buffered writer. - if w.Cap() != capacity { - t.Fatalf("exp %d, got %d", capacity, w.Cap()) - } else if w.Len() != 0 { - t.Fatalf("exp %d, got %d", 0, w.Len()) - } - - // Test flushing an empty buffer. - if err := w.Flush(); err != nil { - t.Fatal(err) - } else if writePointsIntoCnt > 0 { - t.Fatalf("exp 0, got %d", writePointsIntoCnt) - } - - // Test writing zero points. - if err := w.WritePointsInto(&coordinator.IntoWriteRequest{ - Database: db, - RetentionPolicy: rp, - Points: []models.Point{}, - }); err != nil { - t.Fatal(err) - } else if writePointsIntoCnt > 0 { - t.Fatalf("exp 0, got %d", writePointsIntoCnt) - } else if w.Len() > 0 { - t.Fatalf("exp 0, got %d", w.Len()) - } - - // Test writing single large bunch of points points. - req := coordinator.WritePointsRequest{ - Database: db, - RetentionPolicy: rp, - } - - numPoints := int(float64(capacity) * 5.5) - for i := 0; i < numPoints; i++ { - req.AddPoint("cpu", float64(i), time.Now().Add(time.Duration(i)*time.Second), nil) - } - - r := coordinator.IntoWriteRequest(req) - if err := w.WritePointsInto(&r); err != nil { - t.Fatal(err) - } else if writePointsIntoCnt != 5 { - t.Fatalf("exp 5, got %d", writePointsIntoCnt) - } else if w.Len() != capacity/2 { - t.Fatalf("exp %d, got %d", capacity/2, w.Len()) - } else if len(pointsWritten) != numPoints-capacity/2 { - t.Fatalf("exp %d, got %d", numPoints-capacity/2, len(pointsWritten)) - } - - if err := w.Flush(); err != nil { - t.Fatal(err) - } else if writePointsIntoCnt != 6 { - t.Fatalf("exp 6, got %d", writePointsIntoCnt) - } else if w.Len() != 0 { - t.Fatalf("exp 0, got %d", w.Len()) - } else if len(pointsWritten) != numPoints { - t.Fatalf("exp %d, got %d", numPoints, len(pointsWritten)) - } else if !reflect.DeepEqual(r.Points, pointsWritten) { - t.Fatal("points don't match") - } - - reset() - - // Test writing points one at a time. - for i, _ := range r.Points { - if err := w.WritePointsInto(&coordinator.IntoWriteRequest{ - Database: db, - RetentionPolicy: rp, - Points: r.Points[i : i+1], - }); err != nil { - t.Fatal(err) - } - } - - if err := w.Flush(); err != nil { - t.Fatal(err) - } else if writePointsIntoCnt != 6 { - t.Fatalf("exp 6, got %d", writePointsIntoCnt) - } else if w.Len() != 0 { - t.Fatalf("exp 0, got %d", w.Len()) - } else if len(pointsWritten) != numPoints { - t.Fatalf("exp %d, got %d", numPoints, len(pointsWritten)) - } else if !reflect.DeepEqual(r.Points, pointsWritten) { - t.Fatal("points don't match") - } -} - var shardID uint64 type fakeStore struct { diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index 0afe9fc8438..7807e2605fc 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -58,149 +58,199 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query. return e.executeSelectStatement(stmt, &ctx) } - var rows models.Rows - var messages []*query.Message - var err error switch stmt := stmt.(type) { case *influxql.AlterRetentionPolicyStatement: + if err := e.executeAlterRetentionPolicyStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeAlterRetentionPolicyStatement(stmt) + return ctx.Ok() case *influxql.CreateContinuousQueryStatement: + if err := e.executeCreateContinuousQueryStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeCreateContinuousQueryStatement(stmt) + return ctx.Ok() case *influxql.CreateDatabaseStatement: + if err := e.executeCreateDatabaseStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeCreateDatabaseStatement(stmt) + return ctx.Ok() case *influxql.CreateRetentionPolicyStatement: + if err := e.executeCreateRetentionPolicyStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeCreateRetentionPolicyStatement(stmt) + return ctx.Ok() case *influxql.CreateSubscriptionStatement: + if err := e.executeCreateSubscriptionStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeCreateSubscriptionStatement(stmt) + return ctx.Ok() case *influxql.CreateUserStatement: + if err := e.executeCreateUserStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeCreateUserStatement(stmt) + return ctx.Ok() case *influxql.DeleteSeriesStatement: - err = e.executeDeleteSeriesStatement(stmt, ctx.Database) + if err := e.executeDeleteSeriesStatement(stmt, ctx.Database); err != nil { + return err + } + if ctx.ReadOnly { + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) + } + return ctx.Ok() case *influxql.DropContinuousQueryStatement: + if err := e.executeDropContinuousQueryStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeDropContinuousQueryStatement(stmt) + return ctx.Ok() case *influxql.DropDatabaseStatement: + if err := e.executeDropDatabaseStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeDropDatabaseStatement(stmt) + return ctx.Ok() case *influxql.DropMeasurementStatement: + if err := e.executeDropMeasurementStatement(stmt, ctx.Database); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeDropMeasurementStatement(stmt, ctx.Database) + return ctx.Ok() case *influxql.DropSeriesStatement: + if err := e.executeDropSeriesStatement(stmt, ctx.Database); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeDropSeriesStatement(stmt, ctx.Database) + return ctx.Ok() case *influxql.DropRetentionPolicyStatement: + if err := e.executeDropRetentionPolicyStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeDropRetentionPolicyStatement(stmt) + return ctx.Ok() case *influxql.DropShardStatement: + if err := e.executeDropShardStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeDropShardStatement(stmt) + return ctx.Ok() case *influxql.DropSubscriptionStatement: + if err := e.executeDropSubscriptionStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeDropSubscriptionStatement(stmt) + return ctx.Ok() case *influxql.DropUserStatement: + if err := e.executeDropUserStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeDropUserStatement(stmt) + return ctx.Ok() case *influxql.ExplainStatement: - rows, err = e.executeExplainStatement(stmt, &ctx) + return e.executeExplainStatement(stmt, &ctx) case *influxql.GrantStatement: + if err := e.executeGrantStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeGrantStatement(stmt) + return ctx.Ok() case *influxql.GrantAdminStatement: + if err := e.executeGrantAdminStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeGrantAdminStatement(stmt) + return ctx.Ok() case *influxql.RevokeStatement: + if err := e.executeRevokeStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeRevokeStatement(stmt) + return ctx.Ok() case *influxql.RevokeAdminStatement: + if err := e.executeRevokeAdminStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeRevokeAdminStatement(stmt) + return ctx.Ok() case *influxql.ShowContinuousQueriesStatement: - rows, err = e.executeShowContinuousQueriesStatement(stmt) + return e.executeShowContinuousQueriesStatement(stmt, &ctx) case *influxql.ShowDatabasesStatement: - rows, err = e.executeShowDatabasesStatement(stmt, &ctx) + return e.executeShowDatabasesStatement(stmt, &ctx) case *influxql.ShowDiagnosticsStatement: - rows, err = e.executeShowDiagnosticsStatement(stmt) + return e.executeShowDiagnosticsStatement(stmt, &ctx) case *influxql.ShowGrantsForUserStatement: - rows, err = e.executeShowGrantsForUserStatement(stmt) + return e.executeShowGrantsForUserStatement(stmt, &ctx) case *influxql.ShowMeasurementsStatement: return e.executeShowMeasurementsStatement(stmt, &ctx) case *influxql.ShowRetentionPoliciesStatement: - rows, err = e.executeShowRetentionPoliciesStatement(stmt) + return e.executeShowRetentionPoliciesStatement(stmt, &ctx) case *influxql.ShowShardsStatement: - rows, err = e.executeShowShardsStatement(stmt) + return e.executeShowShardsStatement(stmt, &ctx) case *influxql.ShowShardGroupsStatement: - rows, err = e.executeShowShardGroupsStatement(stmt) + return e.executeShowShardGroupsStatement(stmt, &ctx) case *influxql.ShowStatsStatement: - rows, err = e.executeShowStatsStatement(stmt) + return e.executeShowStatsStatement(stmt, &ctx) case *influxql.ShowSubscriptionsStatement: - rows, err = e.executeShowSubscriptionsStatement(stmt) + return e.executeShowSubscriptionsStatement(stmt, &ctx) case *influxql.ShowTagValuesStatement: return e.executeShowTagValues(stmt, &ctx) case *influxql.ShowUsersStatement: - rows, err = e.executeShowUsersStatement(stmt) + return e.executeShowUsersStatement(stmt, &ctx) case *influxql.SetPasswordUserStatement: + if err := e.executeSetPasswordUserStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeSetPasswordUserStatement(stmt) + return ctx.Ok() case *influxql.ShowQueriesStatement, *influxql.KillQueryStatement: // Send query related statements to the task manager. return e.TaskManager.ExecuteStatement(stmt, ctx) default: return query.ErrInvalidQuery } - - if err != nil { - return err - } - - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Series: rows, - Messages: messages, - }) } func (e *StatementExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.AlterRetentionPolicyStatement) error { @@ -400,8 +450,8 @@ func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStateme return e.MetaClient.DropUser(q.Name) } -func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ctx *query.ExecutionContext) (models.Rows, error) { - return nil, errors.New("unimplemented") +func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ctx *query.ExecutionContext) error { + return errors.New("unimplemented") } func (e *StatementExecutor) executeGrantStatement(stmt *influxql.GrantStatement) error { @@ -437,98 +487,110 @@ func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPassw } func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *query.ExecutionContext) error { + if stmt.Target != nil && stmt.Target.Measurement.Database == "" { + return errNoDatabaseInTarget + } + itrs, columns, err := e.createIterators(stmt, ctx) if err != nil { return err } + result, err := func() (*query.ResultSet, error) { + if stmt.Target != nil && ctx.ReadOnly { + return ctx.CreateResult(query.ReadOnlyWarning(stmt.String())) + } + return ctx.CreateResult() + }() + if err != nil { + return err + } - // Generate a row emitter from the iterator set. - em := query.NewEmitter(itrs, stmt.TimeAscending(), ctx.ChunkSize) - em.Columns = columns - if stmt.Location != nil { - em.Location = stmt.Location + // If we are writing the points, we need to send this result to the + // goroutine that will be writing the points so the goroutine can emit the + // number of points that have been written. + if stmt.Target != nil { + // Replace the result we just passed with our own created result. This + // allows us to write to some location that is being read by the + // writer. + r := &query.ResultSet{ + ID: ctx.StatementID, + AbortCh: ctx.AbortCh, + } + r.Init() + go e.writeResult(stmt, r, result) + result = r } - em.OmitTime = stmt.OmitTime - em.EmitName = stmt.EmitName - defer em.Close() + defer result.Close() - // Emit rows to the results channel. - var writeN int64 - var emitted bool + // Set the columns of the result to the statement columns. + result = result.WithColumns(columns...) - var pointsWriter *BufferedPointsWriter - if stmt.Target != nil { - pointsWriter = NewBufferedPointsWriter(e.PointsWriter, stmt.Target.Measurement.Database, stmt.Target.Measurement.RetentionPolicy, 10000) + // Generate a row emitter from the iterator set. + em := query.NewEmitter(itrs, stmt.TimeAscending()) + defer em.Close() + + // Retrieve the time zone location. Default to using UTC. + loc := time.UTC + if stmt.Location != nil { + loc = stmt.Location } + var series *query.Series for { - row, partial, err := em.Emit() + // Fill buffer. Close the series if no more points remain. + t, name, tags, err := em.LoadBuf() if err != nil { - return err - } else if row == nil { - // Check if the query was interrupted while emitting. - select { - case <-ctx.InterruptCh: - return query.ErrQueryInterrupted - default: + // An error occurred while reading the iterators. If we are in the + // middle of processing a series, assume the error comes from + // reading the series. If it has come before we have created any + // series, send the error to the result itself. + if series != nil { + series.Error(err) + series.Close() + } else { + result.Error(err) } - break - } - - // Write points back into system for INTO statements. - if stmt.Target != nil { - if err := e.writeInto(pointsWriter, stmt, row); err != nil { - return err + return query.ErrQueryCanceled + } else if t == query.ZeroTime { + if series != nil { + series.Close() } - writeN += int64(len(row.Values)) - continue + return nil } - result := &query.Result{ - StatementID: ctx.StatementID, - Series: []*models.Row{row}, - Partial: partial, + // Read next set of values from all iterators at a given time/name/tags. + values := make([]interface{}, len(columns)) + if stmt.OmitTime { + em.ReadInto(t, name, tags, values) + } else { + values[0] = time.Unix(0, t).In(loc) + em.ReadInto(t, name, tags, values[1:]) } - // Send results or exit if closing. - if err := ctx.Send(result); err != nil { - return err + if stmt.EmitName != "" { + name = stmt.EmitName } - emitted = true - } - - // Flush remaining points and emit write count if an INTO statement. - if stmt.Target != nil { - if err := pointsWriter.Flush(); err != nil { - return err + if series == nil { + s, ok := result.CreateSeriesWithTags(name, tags) + if !ok { + return query.ErrQueryAborted + } + series = s + } else if series.Name != name || !series.Tags.Equals(&tags) { + series.Close() + s, ok := result.CreateSeriesWithTags(name, tags) + if !ok { + return query.ErrQueryAborted + } + series = s } - var messages []*query.Message - if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + if ok := series.Emit(values); !ok { + series.Close() + return query.ErrQueryAborted } - - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Messages: messages, - Series: []*models.Row{{ - Name: "result", - Columns: []string{"time", "written"}, - Values: [][]interface{}{{time.Unix(0, 0).UTC(), writeN}}, - }}, - }) } - - // Always emit at least one result. - if !emitted { - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Series: make([]*models.Row, 0), - }) - } - - return nil } func (e *StatementExecutor) createIterators(stmt *influxql.SelectStatement, ctx *query.ExecutionContext) ([]query.Iterator, []string, error) { @@ -553,38 +615,59 @@ func (e *StatementExecutor) createIterators(stmt *influxql.SelectStatement, ctx return itrs, columns, nil } -func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement) (models.Rows, error) { +func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement, ctx *query.ExecutionContext) error { dis := e.MetaClient.Databases() - rows := []*models.Row{} + result, err := ctx.CreateResult() + if err != nil { + return err + } + defer result.Close() + + result = result.WithColumns("name", "query") for _, di := range dis { - row := &models.Row{Columns: []string{"name", "query"}, Name: di.Name} + series, ok := result.CreateSeries(di.Name) + if !ok { + return query.ErrQueryAborted + } for _, cqi := range di.ContinuousQueries { - row.Values = append(row.Values, []interface{}{cqi.Name, cqi.Query}) + series.Emit([]interface{}{cqi.Name, cqi.Query}) } - rows = append(rows, row) + series.Close() } - return rows, nil + return nil } -func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement, ctx *query.ExecutionContext) (models.Rows, error) { +func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement, ctx *query.ExecutionContext) error { dis := e.MetaClient.Databases() a := ctx.ExecutionOptions.Authorizer - row := &models.Row{Name: "databases", Columns: []string{"name"}} + result, err := ctx.CreateResult() + if err != nil { + return err + } + defer result.Close() + + result = result.WithColumns("name") + series, ok := result.CreateSeries("databases") + if !ok { + return query.ErrQueryAborted + } + defer series.Close() + for _, di := range dis { // Only include databases that the user is authorized to read or write. if a.AuthorizeDatabase(influxql.ReadPrivilege, di.Name) || a.AuthorizeDatabase(influxql.WritePrivilege, di.Name) { - row.Values = append(row.Values, []interface{}{di.Name}) + series.Emit([]interface{}{di.Name}) } } - return []*models.Row{row}, nil + return nil } -func (e *StatementExecutor) executeShowDiagnosticsStatement(stmt *influxql.ShowDiagnosticsStatement) (models.Rows, error) { +func (e *StatementExecutor) executeShowDiagnosticsStatement(stmt *influxql.ShowDiagnosticsStatement, ctx *query.ExecutionContext) error { diags, err := e.Monitor.Diagnostics() if err != nil { - return nil, err + return err } // Get a sorted list of diagnostics keys. @@ -594,32 +677,53 @@ func (e *StatementExecutor) executeShowDiagnosticsStatement(stmt *influxql.ShowD } sort.Strings(sortedKeys) - rows := make([]*models.Row, 0, len(diags)) + result, err := ctx.CreateResult() + if err != nil { + return err + } + defer result.Close() + for _, k := range sortedKeys { if stmt.Module != "" && k != stmt.Module { continue } - row := &models.Row{Name: k} + series, ok := result.WithColumns(diags[k].Columns...).CreateSeries(k) + if !ok { + return query.ErrQueryAborted + } - row.Columns = diags[k].Columns - row.Values = diags[k].Rows - rows = append(rows, row) + for _, row := range diags[k].Rows { + series.Emit(row) + } + series.Close() } - return rows, nil + return nil } -func (e *StatementExecutor) executeShowGrantsForUserStatement(q *influxql.ShowGrantsForUserStatement) (models.Rows, error) { +func (e *StatementExecutor) executeShowGrantsForUserStatement(q *influxql.ShowGrantsForUserStatement, ctx *query.ExecutionContext) error { priv, err := e.MetaClient.UserPrivileges(q.Name) if err != nil { - return nil, err + return err + } + + result, err := ctx.CreateResult() + if err != nil { + return err } + defer result.Close() + + result = result.WithColumns("database", "privilege") + series, ok := result.CreateSeries("") + if !ok { + return query.ErrQueryAborted + } + defer series.Close() - row := &models.Row{Columns: []string{"database", "privilege"}} for d, p := range priv { - row.Values = append(row.Values, []interface{}{d, p.String()}) + series.Emit([]interface{}{d, p.String()}) } - return []*models.Row{row}, nil + return nil } func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMeasurementsStatement, ctx *query.ExecutionContext) error { @@ -628,11 +732,8 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea } names, err := e.TSDBStore.MeasurementNames(q.Database, q.Condition) - if err != nil || len(names) == 0 { - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Err: err, - }) + if err != nil { + return err } if q.Offset > 0 { @@ -649,50 +750,74 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea } } - values := make([][]interface{}, len(names)) - for i, name := range names { - values[i] = []interface{}{string(name)} + if len(names) == 0 { + return ctx.Ok() + } + + result, err := ctx.CreateResult() + if err != nil { + return err } + defer result.Close() - if len(values) == 0 { - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - }) + result = result.WithColumns("name") + series, ok := result.CreateSeries("measurements") + if !ok { + return query.ErrQueryAborted } + defer series.Close() - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Series: []*models.Row{{ - Name: "measurements", - Columns: []string{"name"}, - Values: values, - }}, - }) + for _, name := range names { + series.Emit([]interface{}{string(name)}) + } + return nil } -func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) (models.Rows, error) { +func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement, ctx *query.ExecutionContext) error { if q.Database == "" { - return nil, ErrDatabaseNameRequired + return ErrDatabaseNameRequired } di := e.MetaClient.Database(q.Database) if di == nil { - return nil, influxdb.ErrDatabaseNotFound(q.Database) + return influxdb.ErrDatabaseNotFound(q.Database) } - row := &models.Row{Columns: []string{"name", "duration", "shardGroupDuration", "replicaN", "default"}} + result, err := ctx.CreateResult() + if err != nil { + return err + } + defer result.Close() + + result = result.WithColumns("name", "duration", "shardGroupDuration", "replicaN", "default") + series, ok := result.CreateSeries("") + if !ok { + return query.ErrQueryAborted + } + defer series.Close() + for _, rpi := range di.RetentionPolicies { - row.Values = append(row.Values, []interface{}{rpi.Name, rpi.Duration.String(), rpi.ShardGroupDuration.String(), rpi.ReplicaN, di.DefaultRetentionPolicy == rpi.Name}) + series.Emit([]interface{}{rpi.Name, rpi.Duration.String(), rpi.ShardGroupDuration.String(), rpi.ReplicaN, di.DefaultRetentionPolicy == rpi.Name}) } - return []*models.Row{row}, nil + return nil } -func (e *StatementExecutor) executeShowShardsStatement(stmt *influxql.ShowShardsStatement) (models.Rows, error) { +func (e *StatementExecutor) executeShowShardsStatement(stmt *influxql.ShowShardsStatement, ctx *query.ExecutionContext) error { dis := e.MetaClient.Databases() - rows := []*models.Row{} + result, err := ctx.CreateResult() + if err != nil { + return err + } + defer result.Close() + + result = result.WithColumns("id", "database", "retention_policy", "shard_group", "start_time", "end_time", "expiry_time", "owners") for _, di := range dis { - row := &models.Row{Columns: []string{"id", "database", "retention_policy", "shard_group", "start_time", "end_time", "expiry_time", "owners"}, Name: di.Name} + series, ok := result.CreateSeries(di.Name) + if !ok { + return query.ErrQueryAborted + } + for _, rpi := range di.RetentionPolicies { for _, sgi := range rpi.ShardGroups { // Shards associated with deleted shard groups are effectively deleted. @@ -707,7 +832,7 @@ func (e *StatementExecutor) executeShowShardsStatement(stmt *influxql.ShowShards ownerIDs[i] = owner.NodeID } - row.Values = append(row.Values, []interface{}{ + series.Emit([]interface{}{ si.ID, di.Name, rpi.Name, @@ -720,15 +845,27 @@ func (e *StatementExecutor) executeShowShardsStatement(stmt *influxql.ShowShards } } } - rows = append(rows, row) + series.Close() } - return rows, nil + return nil } -func (e *StatementExecutor) executeShowShardGroupsStatement(stmt *influxql.ShowShardGroupsStatement) (models.Rows, error) { +func (e *StatementExecutor) executeShowShardGroupsStatement(stmt *influxql.ShowShardGroupsStatement, ctx *query.ExecutionContext) error { dis := e.MetaClient.Databases() - row := &models.Row{Columns: []string{"id", "database", "retention_policy", "start_time", "end_time", "expiry_time"}, Name: "shard groups"} + result, err := ctx.CreateResult() + if err != nil { + return err + } + defer result.Close() + + result = result.WithColumns("id", "database", "retention_policy", "start_time", "end_time", "expiry_time") + series, ok := result.CreateSeries("shard groups") + if !ok { + return query.ErrQueryAborted + } + defer series.Close() + for _, di := range dis { for _, rpi := range di.RetentionPolicies { for _, sgi := range rpi.ShardGroups { @@ -738,7 +875,7 @@ func (e *StatementExecutor) executeShowShardGroupsStatement(stmt *influxql.ShowS continue } - row.Values = append(row.Values, []interface{}{ + series.Emit([]interface{}{ sgi.ID, di.Name, rpi.Name, @@ -749,50 +886,74 @@ func (e *StatementExecutor) executeShowShardGroupsStatement(stmt *influxql.ShowS } } } - - return []*models.Row{row}, nil + return nil } -func (e *StatementExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsStatement) (models.Rows, error) { +func (e *StatementExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsStatement, ctx *query.ExecutionContext) error { stats, err := e.Monitor.Statistics(nil) if err != nil { - return nil, err + return err + } + + result, err := ctx.CreateResult() + if err != nil { + return err } + defer result.Close() - var rows []*models.Row for _, stat := range stats { if stmt.Module != "" && stat.Name != stmt.Module { continue } - row := &models.Row{Name: stat.Name, Tags: stat.Tags} - values := make([]interface{}, 0, len(stat.Values)) - for _, k := range stat.ValueNames() { - row.Columns = append(row.Columns, k) - values = append(values, stat.Values[k]) + result := result.WithColumns(stat.ValueNames()...) + series, ok := result.CreateSeriesWithTags(stat.Name, query.NewTags(stat.Tags)) + if !ok { + return query.ErrQueryAborted } - row.Values = [][]interface{}{values} - rows = append(rows, row) + + row := make([]interface{}, 0, len(series.Columns)) + for _, k := range series.Columns { + row = append(row, stat.Values[k]) + } + series.Emit(row) + series.Close() } - return rows, nil + return nil } -func (e *StatementExecutor) executeShowSubscriptionsStatement(stmt *influxql.ShowSubscriptionsStatement) (models.Rows, error) { +func (e *StatementExecutor) executeShowSubscriptionsStatement(stmt *influxql.ShowSubscriptionsStatement, ctx *query.ExecutionContext) error { dis := e.MetaClient.Databases() - rows := []*models.Row{} + result, err := ctx.CreateResult() + if err != nil { + return err + } + defer result.Close() + + result = result.WithColumns("retention_policy", "name", "mode", "destinations") for _, di := range dis { - row := &models.Row{Columns: []string{"retention_policy", "name", "mode", "destinations"}, Name: di.Name} + var series *query.Series for _, rpi := range di.RetentionPolicies { for _, si := range rpi.Subscriptions { - row.Values = append(row.Values, []interface{}{rpi.Name, si.Name, si.Mode, si.Destinations}) + // Lazily initialize the series so we don't emit a series that + // has no subscriptions. + if series == nil { + s, ok := result.CreateSeries(di.Name) + if !ok { + return query.ErrQueryAborted + } + series = s + } + series.Emit([]interface{}{rpi.Name, si.Name, si.Mode, si.Destinations}) } } - if len(row.Values) > 0 { - rows = append(rows, row) + + if series != nil { + series.Close() } } - return rows, nil + return nil } func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatement, ctx *query.ExecutionContext) error { @@ -802,13 +963,16 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem tagValues, err := e.TSDBStore.TagValues(q.Database, q.Condition) if err != nil { - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Err: err, - }) + return err + } + + result, err := ctx.CreateResult() + if err != nil { + return err } + defer result.Close() - emitted := false + result = result.WithColumns("key", "value") for _, m := range tagValues { values := m.Values @@ -830,39 +994,36 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem continue } - row := &models.Row{ - Name: m.Measurement, - Columns: []string{"key", "value"}, - Values: make([][]interface{}, len(values)), + series, ok := result.CreateSeries(m.Measurement) + if !ok { + return query.ErrQueryAborted } - for i, v := range values { - row.Values[i] = []interface{}{v.Key, v.Value} + for _, v := range values { + series.Emit([]interface{}{v.Key, v.Value}) } + series.Close() + } + return nil +} - if err := ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Series: []*models.Row{row}, - }); err != nil { - return err - } - emitted = true +func (e *StatementExecutor) executeShowUsersStatement(q *influxql.ShowUsersStatement, ctx *query.ExecutionContext) error { + result, err := ctx.CreateResult() + if err != nil { + return err } + defer result.Close() - // Ensure at least one result is emitted. - if !emitted { - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - }) + result = result.WithColumns("user", "admin") + series, ok := result.CreateSeries("") + if !ok { + return query.ErrQueryAborted } - return nil -} + defer series.Close() -func (e *StatementExecutor) executeShowUsersStatement(q *influxql.ShowUsersStatement) (models.Rows, error) { - row := &models.Row{Columns: []string{"user", "admin"}} for _, ui := range e.MetaClient.Users() { - row.Values = append(row.Values, []interface{}{ui.Name, ui.Admin}) + series.Emit([]interface{}{ui.Name, ui.Admin}) } - return []*models.Row{row}, nil + return nil } // BufferedPointsWriter adds buffering to a pointsWriter so that SELECT INTO queries @@ -874,148 +1035,91 @@ type BufferedPointsWriter struct { retentionPolicy string } -// NewBufferedPointsWriter returns a new BufferedPointsWriter. -func NewBufferedPointsWriter(w pointsWriter, database, retentionPolicy string, capacity int) *BufferedPointsWriter { - return &BufferedPointsWriter{ - w: w, - buf: make([]models.Point, 0, capacity), - database: database, - retentionPolicy: retentionPolicy, - } -} +func (e *StatementExecutor) writeResult(stmt *influxql.SelectStatement, in, out *query.ResultSet) { + defer out.Close() + measurementName := stmt.Target.Measurement.Name -// WritePointsInto implements pointsWriter for BufferedPointsWriter. -func (w *BufferedPointsWriter) WritePointsInto(req *IntoWriteRequest) error { - // Make sure we're buffering points only for the expected destination. - if req.Database != w.database || req.RetentionPolicy != w.retentionPolicy { - return fmt.Errorf("writer for %s.%s can't write into %s.%s", w.database, w.retentionPolicy, req.Database, req.RetentionPolicy) - } - - for i := 0; i < len(req.Points); { - // Get the available space in the buffer. - avail := cap(w.buf) - len(w.buf) - - // Calculate number of points to copy into the buffer. - n := len(req.Points[i:]) - if n > avail { - n = avail + var writeN int64 + points := make([]models.Point, 0, 10000) + for series := range in.SeriesCh() { + if series.Err != nil { + continue } - // Copy points into buffer. - w.buf = append(w.buf, req.Points[i:n+i]...) - - // Advance the index by number of points copied. - i += n + // Convert the tags from the influxql format to the one expected by models. + name := measurementName + if name == "" { + name = series.Name + } + tags := models.NewTags(series.Tags.KeyValues()) + for row := range series.RowCh() { + if row.Err != nil { + continue + } - // If buffer is full, flush points to underlying writer. - if len(w.buf) == cap(w.buf) { - if err := w.Flush(); err != nil { - return err + // Convert the row back to a point. + point, err := convertRowToPoint(name, tags, series.Columns, row.Values) + if err != nil { + out.Error(err) + return + } + points = append(points, point) + + if len(points) == cap(points) { + if err := e.PointsWriter.WritePointsInto(&IntoWriteRequest{ + Database: stmt.Target.Measurement.Database, + RetentionPolicy: stmt.Target.Measurement.RetentionPolicy, + Points: points, + }); err != nil { + out.Error(err) + return + } + writeN += int64(len(points)) + points = points[:0] } } } - return nil -} - -// Flush writes all buffered points to the underlying writer. -func (w *BufferedPointsWriter) Flush() error { - if len(w.buf) == 0 { - return nil - } - - if err := w.w.WritePointsInto(&IntoWriteRequest{ - Database: w.database, - RetentionPolicy: w.retentionPolicy, - Points: w.buf, - }); err != nil { - return err - } - - // Clear the buffer. - w.buf = w.buf[:0] - - return nil -} - -// Len returns the number of points buffered. -func (w *BufferedPointsWriter) Len() int { return len(w.buf) } - -// Cap returns the capacity (in points) of the buffer. -func (w *BufferedPointsWriter) Cap() int { return cap(w.buf) } - -func (e *StatementExecutor) writeInto(w pointsWriter, stmt *influxql.SelectStatement, row *models.Row) error { - if stmt.Target.Measurement.Database == "" { - return errNoDatabaseInTarget - } - - // It might seem a bit weird that this is where we do this, since we will have to - // convert rows back to points. The Executors (both aggregate and raw) are complex - // enough that changing them to write back to the DB is going to be clumsy - // - // it might seem weird to have the write be in the QueryExecutor, but the interweaving of - // limitedRowWriter and ExecuteAggregate/Raw makes it ridiculously hard to make sure that the - // results will be the same as when queried normally. - name := stmt.Target.Measurement.Name - if name == "" { - name = row.Name + if len(points) > 0 { + if err := e.PointsWriter.WritePointsInto(&IntoWriteRequest{ + Database: stmt.Target.Measurement.Database, + RetentionPolicy: stmt.Target.Measurement.RetentionPolicy, + Points: points, + }); err != nil { + out.Error(err) + return + } + writeN += int64(len(points)) } - points, err := convertRowToPoints(name, row) - if err != nil { - return err + series, ok := out.WithColumns("time", "written").CreateSeries("result") + if !ok { + return } - - if err := w.WritePointsInto(&IntoWriteRequest{ - Database: stmt.Target.Measurement.Database, - RetentionPolicy: stmt.Target.Measurement.RetentionPolicy, - Points: points, - }); err != nil { - return err - } - - return nil + series.Emit([]interface{}{time.Unix(0, 0).UTC(), writeN}) + series.Close() } var errNoDatabaseInTarget = errors.New("no database in target") // convertRowToPoints will convert a query result Row into Points that can be written back in. -func convertRowToPoints(measurementName string, row *models.Row) ([]models.Point, error) { - // figure out which parts of the result are the time and which are the fields - timeIndex := -1 - fieldIndexes := make(map[string]int) - for i, c := range row.Columns { - if c == "time" { - timeIndex = i - } else { - fieldIndexes[c] = i +func convertRowToPoint(measurementName string, tags models.Tags, columns []string, row []interface{}) (models.Point, error) { + // Iterate through the columns and treat the first "time" field as the time. + var t time.Time + vals := make(map[string]interface{}, len(columns)-1) + for i, c := range columns { + if c == "time" && t.IsZero() { + t = row[i].(time.Time) + } else if val := row[i]; val != nil { + vals[c] = val } } - if timeIndex == -1 { + // If the time is zero, there was no time for some reason. + if t.IsZero() { return nil, errors.New("error finding time index in result") } - - points := make([]models.Point, 0, len(row.Values)) - for _, v := range row.Values { - vals := make(map[string]interface{}) - for fieldName, fieldIndex := range fieldIndexes { - val := v[fieldIndex] - if val != nil { - vals[fieldName] = v[fieldIndex] - } - } - - p, err := models.NewPoint(measurementName, models.NewTags(row.Tags), vals, v[timeIndex].(time.Time)) - if err != nil { - // Drop points that can't be stored - continue - } - - points = append(points, p) - } - - return points, nil + return models.NewPoint(measurementName, tags, vals, t) } // NormalizeStatement adds a default database and policy to the measurements in statement. diff --git a/coordinator/statement_executor_test.go b/coordinator/statement_executor_test.go index 893c190860b..52208a6b19c 100644 --- a/coordinator/statement_executor_test.go +++ b/coordinator/statement_executor_test.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "io" + "io/ioutil" "os" "reflect" "regexp" @@ -16,6 +17,7 @@ import ( "github.com/influxdata/influxdb/internal" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/query" + "github.com/influxdata/influxdb/services/httpd" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/tsdb" "github.com/uber-go/zap" @@ -66,7 +68,7 @@ func TestQueryExecutor_ExecuteQuery_SelectStatement(t *testing.T) { } // Verify all results from the query. - if a := ReadAllResults(e.ExecuteQuery(`SELECT * FROM cpu`, "db0", 0)); !reflect.DeepEqual(a, []*query.Result{ + if a := ReadAllResults(t, e.ExecuteQuery(`SELECT * FROM cpu`, "db0")); !reflect.DeepEqual(a, []*query.Result{ { StatementID: 0, Series: []*models.Row{{ @@ -118,7 +120,7 @@ func TestQueryExecutor_ExecuteQuery_MaxSelectBucketsN(t *testing.T) { } // Verify all results from the query. - if a := ReadAllResults(e.ExecuteQuery(`SELECT count(value) FROM cpu WHERE time >= '2000-01-01T00:00:05Z' AND time < '2000-01-01T00:00:35Z' GROUP BY time(10s)`, "db0", 0)); !reflect.DeepEqual(a, []*query.Result{ + if a := ReadAllResults(t, e.ExecuteQuery(`SELECT count(value) FROM cpu WHERE time >= '2000-01-01T00:00:05Z' AND time < '2000-01-01T00:00:35Z' GROUP BY time(10s)`, "db0")); !reflect.DeepEqual(a, []*query.Result{ { StatementID: 0, Err: errors.New("max-select-buckets limit exceeded: (4/3)"), @@ -239,7 +241,7 @@ func TestQueryExecutor_ExecuteQuery_ShowDatabases(t *testing.T) { t.Fatal(err) } - results := ReadAllResults(qe.ExecuteQuery(q, opt, make(chan struct{}))) + results := ReadAllResults(t, qe.ExecuteQuery(q, opt, make(chan struct{}))) exp := []*query.Result{ { StatementID: 0, @@ -303,10 +305,9 @@ func DefaultQueryExecutor() *QueryExecutor { } // ExecuteQuery parses query and executes against the database. -func (e *QueryExecutor) ExecuteQuery(q, database string, chunkSize int) <-chan *query.Result { +func (e *QueryExecutor) ExecuteQuery(q, database string) <-chan *query.ResultSet { return e.QueryExecutor.ExecuteQuery(MustParseQuery(q), query.ExecutionOptions{ - Database: database, - ChunkSize: chunkSize, + Database: database, }, make(chan struct{})) } @@ -433,13 +434,25 @@ func MustParseQuery(s string) *influxql.Query { return q } +type mockResponseWriter struct { + T *testing.T + Results []*query.Result +} + +func (m *mockResponseWriter) ContentType() string { return "" } +func (m *mockResponseWriter) WriteError(w io.Writer, err error) { m.T.Error(err) } + +func (m *mockResponseWriter) WriteResponse(w io.Writer, resp httpd.Response) (int, error) { + m.Results = append(m.Results, resp.Results...) + return 0, nil +} + // ReadAllResults reads all results from c and returns as a slice. -func ReadAllResults(c <-chan *query.Result) []*query.Result { - var a []*query.Result - for result := range c { - a = append(a, result) - } - return a +func ReadAllResults(t *testing.T, c <-chan *query.ResultSet) []*query.Result { + rw := mockResponseWriter{T: t} + enc := httpd.NewDefaultEncoder(&rw) + enc.Encode(ioutil.Discard, c) + return rw.Results } // FloatIterator is a represents an iterator that reads from a slice. diff --git a/query/emitter.go b/query/emitter.go index 6909172bca9..c5fb2649934 100644 --- a/query/emitter.go +++ b/query/emitter.go @@ -2,9 +2,6 @@ package query import ( "fmt" - "time" - - "github.com/influxdata/influxdb/models" ) // Emitter groups values together by name, tags, and time. @@ -12,33 +9,14 @@ type Emitter struct { buf []Point itrs []Iterator ascending bool - chunkSize int - - tags Tags - row *models.Row - - // The columns to attach to each row. - Columns []string - - // Overridden measurement name to emit. - EmitName string - - // The time zone location. - Location *time.Location - - // Removes the "time" column from output. - // Used for meta queries where time does not apply. - OmitTime bool } // NewEmitter returns a new instance of Emitter that pulls from itrs. -func NewEmitter(itrs []Iterator, ascending bool, chunkSize int) *Emitter { +func NewEmitter(itrs []Iterator, ascending bool) *Emitter { return &Emitter{ buf: make([]Point, len(itrs)), itrs: itrs, ascending: ascending, - chunkSize: chunkSize, - Location: time.UTC, } } @@ -47,59 +25,9 @@ func (e *Emitter) Close() error { return Iterators(e.itrs).Close() } -// Emit returns the next row from the iterators. -func (e *Emitter) Emit() (*models.Row, bool, error) { - // Immediately end emission if there are no iterators. - if len(e.itrs) == 0 { - return nil, false, nil - } - - // Continually read from iterators until they are exhausted. - for { - // Fill buffer. Return row if no more points remain. - t, name, tags, err := e.loadBuf() - if err != nil { - return nil, false, err - } else if t == ZeroTime { - row := e.row - e.row = nil - return row, false, nil - } - - // Read next set of values from all iterators at a given time/name/tags. - // If no values are returned then return row. - values := e.readAt(t, name, tags) - if values == nil { - row := e.row - e.row = nil - return row, false, nil - } - - // If there's no row yet then create one. - // If the name and tags match the existing row, append to that row if - // the number of values doesn't exceed the chunk size. - // Otherwise return existing row and add values to next emitted row. - if e.row == nil { - e.createRow(name, tags, values) - } else if e.row.Name == name && e.tags.Equals(&tags) { - if e.chunkSize > 0 && len(e.row.Values) >= e.chunkSize { - row := e.row - row.Partial = true - e.createRow(name, tags, values) - return row, true, nil - } - e.row.Values = append(e.row.Values, values) - } else { - row := e.row - e.createRow(name, tags, values) - return row, true, nil - } - } -} - -// loadBuf reads in points into empty buffer slots. +// LoadBuf reads in points into empty buffer slots. // Returns the next time/name/tags to emit for. -func (e *Emitter) loadBuf() (t int64, name string, tags Tags, err error) { +func (e *Emitter) LoadBuf() (t int64, name string, tags Tags, err error) { t = ZeroTime for i := range e.itrs { @@ -140,38 +68,8 @@ func (e *Emitter) loadBuf() (t int64, name string, tags Tags, err error) { return } -// createRow creates a new row attached to the emitter. -func (e *Emitter) createRow(name string, tags Tags, values []interface{}) { - if e.EmitName != "" { - name = e.EmitName - } - - e.tags = tags - e.row = &models.Row{ - Name: name, - Tags: tags.KeyValues(), - Columns: e.Columns, - Values: [][]interface{}{values}, - } -} - -// readAt returns the next slice of values from the iterators at time/name/tags. -// Returns nil values once the iterators are exhausted. -func (e *Emitter) readAt(t int64, name string, tags Tags) []interface{} { - offset := 1 - if e.OmitTime { - offset = 0 - } - - values := make([]interface{}, len(e.itrs)+offset) - if !e.OmitTime { - values[0] = time.Unix(0, t).In(e.Location) - } - e.readInto(t, name, tags, values[offset:]) - return values -} - -func (e *Emitter) readInto(t int64, name string, tags Tags, values []interface{}) { +// ReadInto reads the values at time, name, and tags into values. +func (e *Emitter) ReadInto(t int64, name string, tags Tags, values []interface{}) { for i, p := range e.buf { // Skip if buffer is empty. if p == nil { diff --git a/query/emitter_test.go b/query/emitter_test.go index dae9c1e00fa..79458fae4ed 100644 --- a/query/emitter_test.go +++ b/query/emitter_test.go @@ -1,17 +1,14 @@ package query_test import ( + "reflect" "testing" - "time" - "github.com/davecgh/go-spew/spew" - "github.com/influxdata/influxdb/models" - "github.com/influxdata/influxdb/pkg/deep" "github.com/influxdata/influxdb/query" ) // Ensure the emitter can group iterators together into rows. -func TestEmitter_Emit(t *testing.T) { +func TestEmitter(t *testing.T) { // Build an emitter that pulls from two iterators. e := query.NewEmitter([]query.Iterator{ &FloatIterator{Points: []query.FloatPoint{ @@ -23,103 +20,76 @@ func TestEmitter_Emit(t *testing.T) { {Name: "cpu", Tags: ParseTags("region=north"), Time: 0, Value: 4}, {Name: "mem", Time: 4, Value: 5}, }}, - }, true, 0) - e.Columns = []string{"col1", "col2"} + }, true) // Verify the cpu region=west is emitted first. - if row, _, err := e.Emit(); err != nil { + var values [2]interface{} + if ts, name, tags, err := e.LoadBuf(); err != nil { t.Fatalf("unexpected error(0): %s", err) - } else if !deep.Equal(row, &models.Row{ - Name: "cpu", - Tags: map[string]string{"region": "west"}, - Columns: []string{"col1", "col2"}, - Values: [][]interface{}{ - {time.Unix(0, 0).UTC(), float64(1), nil}, - {time.Unix(0, 1).UTC(), float64(2), float64(4)}, - }, - }) { - t.Fatalf("unexpected row(0): %s", spew.Sdump(row)) + } else if have, want := ts, int64(0); have != want { + t.Fatalf("unexpected time(0): have=%v want=%v", have, want) + } else if have, want := name, "cpu"; have != want { + t.Fatalf("unexpected name(0): have=%v want=%v", have, want) + } else if have, want := tags.ID(), "region\x00west"; have != want { + t.Fatalf("unexpected tags(0): have=%v want=%v", have, want) + } else { + e.ReadInto(ts, name, tags, values[:]) + if have, want := values[:], []interface{}{float64(1), nil}; !reflect.DeepEqual(have, want) { + t.Fatalf("unexpected values(0): have=%v want=%v", have, want) + } } - // Verify the cpu region=north is emitted next. - if row, _, err := e.Emit(); err != nil { + if ts, name, tags, err := e.LoadBuf(); err != nil { t.Fatalf("unexpected error(1): %s", err) - } else if !deep.Equal(row, &models.Row{ - Name: "cpu", - Tags: map[string]string{"region": "north"}, - Columns: []string{"col1", "col2"}, - Values: [][]interface{}{ - {time.Unix(0, 0).UTC(), nil, float64(4)}, - }, - }) { - t.Fatalf("unexpected row(1): %s", spew.Sdump(row)) + } else if have, want := ts, int64(1); have != want { + t.Fatalf("unexpected time(1): have=%v want=%v", have, want) + } else if have, want := name, "cpu"; have != want { + t.Fatalf("unexpected name(1): have=%v want=%v", have, want) + } else if have, want := tags.ID(), "region\x00west"; have != want { + t.Fatalf("unexpected tags(1): have=%v want=%v", have, want) + } else { + e.ReadInto(ts, name, tags, values[:]) + if have, want := values[:], []interface{}{float64(2), float64(4)}; !reflect.DeepEqual(have, want) { + t.Fatalf("unexpected values(1): have=%v want=%v", have, want) + } } - // Verify the mem series is emitted last. - if row, _, err := e.Emit(); err != nil { + // Verify the cpu region=north is emitted next. + if ts, name, tags, err := e.LoadBuf(); err != nil { t.Fatalf("unexpected error(2): %s", err) - } else if !deep.Equal(row, &models.Row{ - Name: "mem", - Columns: []string{"col1", "col2"}, - Values: [][]interface{}{ - {time.Unix(0, 4).UTC(), nil, float64(5)}, - }, - }) { - t.Fatalf("unexpected row(2): %s", spew.Sdump(row)) + } else if have, want := ts, int64(0); have != want { + t.Fatalf("unexpected time(2): have=%v want=%v", have, want) + } else if have, want := name, "cpu"; have != want { + t.Fatalf("unexpected name(2): have=%v want=%v", have, want) + } else if have, want := tags.ID(), "region\x00north"; have != want { + t.Fatalf("unexpected tags(2): have=%v want=%v", have, want) + } else { + e.ReadInto(ts, name, tags, values[:]) + if have, want := values[:], []interface{}{nil, float64(4)}; !reflect.DeepEqual(have, want) { + t.Fatalf("unexpected values(2): have=%v want=%v", have, want) + } } - // Verify EOF. - if row, _, err := e.Emit(); err != nil { - t.Fatalf("unexpected error(eof): %s", err) - } else if row != nil { - t.Fatalf("unexpected eof: %s", spew.Sdump(row)) - } -} - -// Ensure the emitter will limit the chunked output from a series. -func TestEmitter_ChunkSize(t *testing.T) { - // Build an emitter that pulls from one iterator with multiple points in the same series. - e := query.NewEmitter([]query.Iterator{ - &FloatIterator{Points: []query.FloatPoint{ - {Name: "cpu", Tags: ParseTags("region=west"), Time: 0, Value: 1}, - {Name: "cpu", Tags: ParseTags("region=west"), Time: 1, Value: 2}, - }}, - }, true, 1) - e.Columns = []string{"col1"} - - // Verify the cpu region=west is emitted first. - if row, _, err := e.Emit(); err != nil { - t.Fatalf("unexpected error(0): %s", err) - } else if !deep.Equal(row, &models.Row{ - Name: "cpu", - Tags: map[string]string{"region": "west"}, - Columns: []string{"col1"}, - Values: [][]interface{}{ - {time.Unix(0, 0).UTC(), float64(1)}, - }, - Partial: true, - }) { - t.Fatalf("unexpected row(0): %s", spew.Sdump(row)) - } - - // Verify the cpu region=north is emitted next. - if row, _, err := e.Emit(); err != nil { - t.Fatalf("unexpected error(1): %s", err) - } else if !deep.Equal(row, &models.Row{ - Name: "cpu", - Tags: map[string]string{"region": "west"}, - Columns: []string{"col1"}, - Values: [][]interface{}{ - {time.Unix(0, 1).UTC(), float64(2)}, - }, - }) { - t.Fatalf("unexpected row(1): %s", spew.Sdump(row)) + // Verify the mem series is emitted last. + if ts, name, tags, err := e.LoadBuf(); err != nil { + t.Fatalf("unexpected error(3): %s", err) + } else if have, want := ts, int64(4); have != want { + t.Fatalf("unexpected time(3): have=%v want=%v", have, want) + } else if have, want := name, "mem"; have != want { + t.Fatalf("unexpected name(3): have=%v want=%v", have, want) + } else if have, want := tags.ID(), ""; have != want { + t.Fatalf("unexpected tags(3): have=%v want=%v", have, want) + } else { + e.ReadInto(ts, name, tags, values[:]) + if have, want := values[:], []interface{}{nil, float64(5)}; !reflect.DeepEqual(have, want) { + t.Fatalf("unexpected values(2): have=%v want=%v", have, want) + } } // Verify EOF. - if row, _, err := e.Emit(); err != nil { + if ts, _, _, err := e.LoadBuf(); err != nil { t.Fatalf("unexpected error(eof): %s", err) - } else if row != nil { - t.Fatalf("unexpected eof: %s", spew.Sdump(row)) + } else if ts != query.ZeroTime { + t.Fatalf("unexpected time(eof): %v", ts) } } diff --git a/query/iterator.gen.go b/query/iterator.gen.go index e580177322d..80a0d6b6294 100644 --- a/query/iterator.gen.go +++ b/query/iterator.gen.go @@ -3221,8 +3221,7 @@ type floatIteratorMapper struct { } func newFloatIteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *floatIteratorMapper { - e := NewEmitter(itrs, opt.Ascending, 0) - e.OmitTime = true + e := NewEmitter(itrs, opt.Ascending) return &floatIteratorMapper{ e: e, buf: make([]interface{}, len(itrs)), @@ -3235,7 +3234,7 @@ func newFloatIteratorMapper(itrs []Iterator, driver IteratorMap, fields []Iterat } func (itr *floatIteratorMapper) Next() (*FloatPoint, error) { - t, name, tags, err := itr.e.loadBuf() + t, name, tags, err := itr.e.LoadBuf() if err != nil || t == ZeroTime { return nil, err } @@ -3243,7 +3242,7 @@ func (itr *floatIteratorMapper) Next() (*FloatPoint, error) { itr.point.Name = name itr.point.Tags = tags - itr.e.readInto(t, name, tags, itr.buf) + itr.e.ReadInto(t, name, tags, itr.buf) if itr.driver != nil { if v := itr.driver.Value(tags, itr.buf); v != nil { if v, ok := v.(float64); ok { @@ -6609,8 +6608,7 @@ type integerIteratorMapper struct { } func newIntegerIteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *integerIteratorMapper { - e := NewEmitter(itrs, opt.Ascending, 0) - e.OmitTime = true + e := NewEmitter(itrs, opt.Ascending) return &integerIteratorMapper{ e: e, buf: make([]interface{}, len(itrs)), @@ -6623,7 +6621,7 @@ func newIntegerIteratorMapper(itrs []Iterator, driver IteratorMap, fields []Iter } func (itr *integerIteratorMapper) Next() (*IntegerPoint, error) { - t, name, tags, err := itr.e.loadBuf() + t, name, tags, err := itr.e.LoadBuf() if err != nil || t == ZeroTime { return nil, err } @@ -6631,7 +6629,7 @@ func (itr *integerIteratorMapper) Next() (*IntegerPoint, error) { itr.point.Name = name itr.point.Tags = tags - itr.e.readInto(t, name, tags, itr.buf) + itr.e.ReadInto(t, name, tags, itr.buf) if itr.driver != nil { if v := itr.driver.Value(tags, itr.buf); v != nil { if v, ok := v.(int64); ok { @@ -9983,8 +9981,7 @@ type unsignedIteratorMapper struct { } func newUnsignedIteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *unsignedIteratorMapper { - e := NewEmitter(itrs, opt.Ascending, 0) - e.OmitTime = true + e := NewEmitter(itrs, opt.Ascending) return &unsignedIteratorMapper{ e: e, buf: make([]interface{}, len(itrs)), @@ -9997,7 +9994,7 @@ func newUnsignedIteratorMapper(itrs []Iterator, driver IteratorMap, fields []Ite } func (itr *unsignedIteratorMapper) Next() (*UnsignedPoint, error) { - t, name, tags, err := itr.e.loadBuf() + t, name, tags, err := itr.e.LoadBuf() if err != nil || t == ZeroTime { return nil, err } @@ -10005,7 +10002,7 @@ func (itr *unsignedIteratorMapper) Next() (*UnsignedPoint, error) { itr.point.Name = name itr.point.Tags = tags - itr.e.readInto(t, name, tags, itr.buf) + itr.e.ReadInto(t, name, tags, itr.buf) if itr.driver != nil { if v := itr.driver.Value(tags, itr.buf); v != nil { if v, ok := v.(uint64); ok { @@ -13357,8 +13354,7 @@ type stringIteratorMapper struct { } func newStringIteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *stringIteratorMapper { - e := NewEmitter(itrs, opt.Ascending, 0) - e.OmitTime = true + e := NewEmitter(itrs, opt.Ascending) return &stringIteratorMapper{ e: e, buf: make([]interface{}, len(itrs)), @@ -13371,7 +13367,7 @@ func newStringIteratorMapper(itrs []Iterator, driver IteratorMap, fields []Itera } func (itr *stringIteratorMapper) Next() (*StringPoint, error) { - t, name, tags, err := itr.e.loadBuf() + t, name, tags, err := itr.e.LoadBuf() if err != nil || t == ZeroTime { return nil, err } @@ -13379,7 +13375,7 @@ func (itr *stringIteratorMapper) Next() (*StringPoint, error) { itr.point.Name = name itr.point.Tags = tags - itr.e.readInto(t, name, tags, itr.buf) + itr.e.ReadInto(t, name, tags, itr.buf) if itr.driver != nil { if v := itr.driver.Value(tags, itr.buf); v != nil { if v, ok := v.(string); ok { @@ -16731,8 +16727,7 @@ type booleanIteratorMapper struct { } func newBooleanIteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *booleanIteratorMapper { - e := NewEmitter(itrs, opt.Ascending, 0) - e.OmitTime = true + e := NewEmitter(itrs, opt.Ascending) return &booleanIteratorMapper{ e: e, buf: make([]interface{}, len(itrs)), @@ -16745,7 +16740,7 @@ func newBooleanIteratorMapper(itrs []Iterator, driver IteratorMap, fields []Iter } func (itr *booleanIteratorMapper) Next() (*BooleanPoint, error) { - t, name, tags, err := itr.e.loadBuf() + t, name, tags, err := itr.e.LoadBuf() if err != nil || t == ZeroTime { return nil, err } @@ -16753,7 +16748,7 @@ func (itr *booleanIteratorMapper) Next() (*BooleanPoint, error) { itr.point.Name = name itr.point.Tags = tags - itr.e.readInto(t, name, tags, itr.buf) + itr.e.ReadInto(t, name, tags, itr.buf) if itr.driver != nil { if v := itr.driver.Value(tags, itr.buf); v != nil { if v, ok := v.(bool); ok { diff --git a/query/iterator.gen.go.tmpl b/query/iterator.gen.go.tmpl index 701efde865d..432b252f75b 100644 --- a/query/iterator.gen.go.tmpl +++ b/query/iterator.gen.go.tmpl @@ -1560,8 +1560,7 @@ type {{$k.name}}IteratorMapper struct { } func new{{$k.Name}}IteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *{{$k.name}}IteratorMapper { - e := NewEmitter(itrs, opt.Ascending, 0) - e.OmitTime = true + e := NewEmitter(itrs, opt.Ascending) return &{{$k.name}}IteratorMapper{ e: e, buf: make([]interface{}, len(itrs)), @@ -1574,7 +1573,7 @@ func new{{$k.Name}}IteratorMapper(itrs []Iterator, driver IteratorMap, fields [] } func (itr *{{$k.name}}IteratorMapper) Next() (*{{$k.Name}}Point, error) { - t, name, tags, err := itr.e.loadBuf() + t, name, tags, err := itr.e.LoadBuf() if err != nil || t == ZeroTime { return nil, err } @@ -1582,9 +1581,9 @@ func (itr *{{$k.name}}IteratorMapper) Next() (*{{$k.Name}}Point, error) { itr.point.Name = name itr.point.Tags = tags - itr.e.readInto(t, name, tags, itr.buf) + itr.e.ReadInto(t, name, tags, itr.buf) if itr.driver != nil { - if v := itr.driver.Value(tags, itr.buf); v != nil { + if v := itr.driver.Value(tags, itr.buf); v != nil { if v, ok := v.({{$k.Type}}); ok { itr.point.Value = v itr.point.Nil = false diff --git a/query/query_executor.go b/query/query_executor.go index 8f7aa618911..a6f6fd95911 100644 --- a/query/query_executor.go +++ b/query/query_executor.go @@ -29,6 +29,11 @@ var ( // ErrQueryAborted is an error returned when the query is aborted. ErrQueryAborted = errors.New("query aborted") + // ErrQueryCanceled is an error that signals the query was canceled during + // execution by the query engine itself due to another error that was + // already reported. This error should never be emitted to the user. + ErrQueryCanceled = errors.New("query canceled") + // ErrQueryEngineShutdown is an error sent when the query cannot be // created because the query engine was shutdown. ErrQueryEngineShutdown = errors.New("query engine shutdown") @@ -107,9 +112,6 @@ type ExecutionOptions struct { // what resources can be returned in SHOW queries, etc. Authorizer Authorizer - // The requested maximum number of points to return in each result. - ChunkSize int - // If this query is being executed in a read-only context. ReadOnly bool @@ -135,7 +137,7 @@ type ExecutionContext struct { Query *QueryTask // Output channel where results and errors should be sent. - Results chan *Result + Results chan *ResultSet // Hold the query executor's logger. Log zap.Logger @@ -147,28 +149,45 @@ type ExecutionContext struct { ExecutionOptions } -// send sends a Result to the Results channel and will exit if the query has -// been aborted. -func (ctx *ExecutionContext) send(result *Result) error { +// Send sends a Result to the Results channel and will exit if the query has +// been interrupted or aborted. +func (ctx *ExecutionContext) CreateResult(messages ...*Message) (*ResultSet, error) { + result := &ResultSet{ + ID: ctx.StatementID, + Messages: messages, + AbortCh: ctx.AbortCh, + } select { + case <-ctx.InterruptCh: + return nil, ErrQueryInterrupted case <-ctx.AbortCh: - return ErrQueryAborted - case ctx.Results <- result: + return nil, ErrQueryAborted + case ctx.Results <- result.Init(): + return result, nil } +} + +// Ok returns a result with no content (it is immediately closed). +func (ctx *ExecutionContext) Ok(messages ...*Message) error { + result, err := ctx.CreateResult(messages...) + if err != nil { + return err + } + result.Close() return nil } -// Send sends a Result to the Results channel and will exit if the query has -// been interrupted or aborted. -func (ctx *ExecutionContext) Send(result *Result) error { +func (ctx *ExecutionContext) Error(err error) bool { select { - case <-ctx.InterruptCh: - return ErrQueryInterrupted case <-ctx.AbortCh: - return ErrQueryAborted - case ctx.Results <- result: + return false + case ctx.Results <- &ResultSet{ID: ctx.StatementID, Err: err}: + return true } - return nil +} + +func (ctx *ExecutionContext) Errorf(format string, a ...interface{}) bool { + return ctx.Error(fmt.Errorf(format, a...)) } // StatementExecutor executes a statement within the QueryExecutor. @@ -247,15 +266,15 @@ func (e *QueryExecutor) WithLogger(log zap.Logger) { } // ExecuteQuery executes each statement within a query. -func (e *QueryExecutor) ExecuteQuery(query *influxql.Query, opt ExecutionOptions, closing chan struct{}) <-chan *Result { - results := make(chan *Result) +func (e *QueryExecutor) ExecuteQuery(query *influxql.Query, opt ExecutionOptions, closing chan struct{}) <-chan *ResultSet { + results := make(chan *ResultSet) go e.executeQuery(query, opt, closing, results) return results } -func (e *QueryExecutor) executeQuery(query *influxql.Query, opt ExecutionOptions, closing <-chan struct{}, results chan *Result) { +func (e *QueryExecutor) executeQuery(query *influxql.Query, opt ExecutionOptions, closing <-chan struct{}, results chan *ResultSet) { defer close(results) - defer e.recover(query, results) + defer e.recover(query, results, opt.AbortCh) atomic.AddInt64(&e.stats.ActiveQueries, 1) atomic.AddInt64(&e.stats.ExecutedQueries, 1) @@ -268,7 +287,7 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, opt ExecutionOptions qid, task, err := e.TaskManager.AttachQuery(query, opt.Database, closing) if err != nil { select { - case results <- &Result{Err: err}: + case results <- &ResultSet{Err: err}: case <-opt.AbortCh: } return @@ -320,9 +339,7 @@ LOOP: case "_tags": command = "SHOW TAG VALUES" } - results <- &Result{ - Err: fmt.Errorf("unable to use system source '%s': use %s instead", s.Name, command), - } + ctx.Errorf("unable to use system source '%s': use %s instead", s.Name, command) break LOOP } } @@ -333,7 +350,7 @@ LOOP: // This can occur on meta read statements which convert to SELECT statements. newStmt, err := influxql.RewriteStatement(stmt) if err != nil { - results <- &Result{Err: err} + ctx.Error(err) break } stmt = newStmt @@ -341,7 +358,7 @@ LOOP: // Normalize each statement if possible. if normalizer, ok := e.StatementExecutor.(StatementNormalizer); ok { if err := normalizer.NormalizeStatement(stmt, defaultDB); err != nil { - if err := ctx.send(&Result{Err: err}); err == ErrQueryAborted { + if ok := ctx.Error(err); !ok { return } break @@ -361,14 +378,17 @@ LOOP: if qerr := task.Error(); qerr != nil { err = qerr } + } else if err == ErrQueryCanceled { + // The query was canceled while it was running so the result has + // already been sent and the error has already been reported + // through a series or a row. Break out of the main loop so we can + // report ErrNotExecuted for the remaining statements. + break } // Send an error for this result if it failed for some reason. if err != nil { - if err := ctx.send(&Result{ - StatementID: i, - Err: err, - }); err == ErrQueryAborted { + if ok := ctx.Error(err); !ok { return } // Stop after the first error. @@ -393,10 +413,8 @@ LOOP: // Send error results for any statements which were not executed. for ; i < len(query.Statements)-1; i++ { - if err := ctx.send(&Result{ - StatementID: i, - Err: ErrNotExecuted, - }); err == ErrQueryAborted { + ctx.StatementID = i + if ok := ctx.Error(ErrNotExecuted); !ok { return } } @@ -413,13 +431,17 @@ func init() { } } -func (e *QueryExecutor) recover(query *influxql.Query, results chan *Result) { +func (e *QueryExecutor) recover(query *influxql.Query, results chan *ResultSet, abortCh <-chan struct{}) { if err := recover(); err != nil { atomic.AddInt64(&e.stats.RecoveredPanics, 1) // Capture the panic in _internal stats. e.Logger.Error(fmt.Sprintf("%s [panic:%s] %s", query.String(), err, debug.Stack())) - results <- &Result{ - StatementID: -1, - Err: fmt.Errorf("%s [panic:%s]", query.String(), err), + result := &ResultSet{ + ID: -1, + Err: fmt.Errorf("%s [panic:%s]", query.String(), err), + } + select { + case <-abortCh: + case results <- result: } if willCrash { diff --git a/query/query_executor_test.go b/query/query_executor_test.go index eaf3f311b73..3081f25bc7f 100644 --- a/query/query_executor_test.go +++ b/query/query_executor_test.go @@ -26,6 +26,9 @@ func NewQueryExecutor() *query.QueryExecutor { } func TestQueryExecutor_AttachQuery(t *testing.T) { + abort := make(chan struct{}) + defer close(abort) + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) if err != nil { t.Fatal(err) @@ -41,10 +44,13 @@ func TestQueryExecutor_AttachQuery(t *testing.T) { }, } - discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil)) + discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil)) } func TestQueryExecutor_KillQuery(t *testing.T) { + abort := make(chan struct{}) + defer close(abort) + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) if err != nil { t.Fatal(err) @@ -71,12 +77,12 @@ func TestQueryExecutor_KillQuery(t *testing.T) { }, } - results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil) + results := e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil) q, err = influxql.ParseQuery(fmt.Sprintf("KILL QUERY %d", <-qid)) if err != nil { t.Fatal(err) } - discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil)) + discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil)) result := <-results if result.Err != query.ErrQueryInterrupted { @@ -85,6 +91,9 @@ func TestQueryExecutor_KillQuery(t *testing.T) { } func TestQueryExecutor_Interrupt(t *testing.T) { + abort := make(chan struct{}) + defer close(abort) + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) if err != nil { t.Fatal(err) @@ -104,7 +113,7 @@ func TestQueryExecutor_Interrupt(t *testing.T) { } closing := make(chan struct{}) - results := e.ExecuteQuery(q, query.ExecutionOptions{}, closing) + results := e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, closing) close(closing) result := <-results if result.Err != query.ErrQueryInterrupted { @@ -112,38 +121,10 @@ func TestQueryExecutor_Interrupt(t *testing.T) { } } -func TestQueryExecutor_Abort(t *testing.T) { - q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) - if err != nil { - t.Fatal(err) - } - - ch1 := make(chan struct{}) - ch2 := make(chan struct{}) - - e := NewQueryExecutor() - e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { - <-ch1 - if err := ctx.Send(&query.Result{Err: errUnexpected}); err != query.ErrQueryAborted { - t.Errorf("unexpected error: %v", err) - } - close(ch2) - return nil - }, - } - - done := make(chan struct{}) - close(done) - - results := e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: done}, nil) - close(ch1) - - <-ch2 - discardOutput(results) -} - func TestQueryExecutor_ShowQueries(t *testing.T) { + abort := make(chan struct{}) + defer close(abort) + e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { @@ -162,17 +143,41 @@ func TestQueryExecutor_ShowQueries(t *testing.T) { t.Fatal(err) } - results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil) + results := e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil) result := <-results - if len(result.Series) != 1 { - t.Errorf("expected %d rows, got %d", 1, len(result.Series)) + + // Should be able to retrieve a single series. + var series *query.Series + timer := time.NewTimer(100 * time.Millisecond) + select { + case series = <-result.SeriesCh(): + timer.Stop() + case <-timer.C: + t.Fatal("unexpected timeout") } - if result.Err != nil { - t.Errorf("unexpected error: %s", result.Err) + + // Should only retrieve one row from the series. + rows := make([][]interface{}, 0, 1) + for row := range series.RowCh() { + if row.Err != nil { + t.Errorf("unexpected error: %s", result.Err) + } + rows = append(rows, row.Values) + } + if len(rows) != 1 { + t.Errorf("expected %d rows, got %d", 1, len(rows)) + } + + // No more series should be returned. + if series := <-result.SeriesCh(); series != nil { + t.Fatal("unexpected series") } } func TestQueryExecutor_Limit_Timeout(t *testing.T) { + abort := make(chan struct{}) + defer close(abort) + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) if err != nil { t.Fatal(err) @@ -192,7 +197,7 @@ func TestQueryExecutor_Limit_Timeout(t *testing.T) { } e.TaskManager.QueryTimeout = time.Nanosecond - results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil) + results := e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil) result := <-results if result.Err == nil || !strings.Contains(result.Err.Error(), "query-timeout") { t.Errorf("unexpected error: %s", result.Err) @@ -200,6 +205,9 @@ func TestQueryExecutor_Limit_Timeout(t *testing.T) { } func TestQueryExecutor_Limit_ConcurrentQueries(t *testing.T) { + abort := make(chan struct{}) + defer close(abort) + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) if err != nil { t.Fatal(err) @@ -219,17 +227,14 @@ func TestQueryExecutor_Limit_ConcurrentQueries(t *testing.T) { defer e.Close() // Start first query and wait for it to be executing. - go discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil)) + go discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil)) <-qid // Start second query and expect for it to fail. - results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil) + results := e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil) select { case result := <-results: - if len(result.Series) != 0 { - t.Errorf("expected %d rows, got %d", 0, len(result.Series)) - } if result.Err == nil || !strings.Contains(result.Err.Error(), "max-concurrent-queries") { t.Errorf("unexpected error: %s", result.Err) } @@ -239,6 +244,9 @@ func TestQueryExecutor_Limit_ConcurrentQueries(t *testing.T) { } func TestQueryExecutor_Close(t *testing.T) { + abort := make(chan struct{}) + defer close(abort) + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) if err != nil { t.Fatal(err) @@ -256,8 +264,8 @@ func TestQueryExecutor_Close(t *testing.T) { }, } - results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil) - go func(results <-chan *query.Result) { + results := e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil) + go func(results <-chan *query.ResultSet) { result := <-results if result.Err != query.ErrQueryEngineShutdown { t.Errorf("unexpected error: %s", result.Err) @@ -278,17 +286,17 @@ func TestQueryExecutor_Close(t *testing.T) { t.Fatal("closing the query manager did not kill the query after 100 milliseconds") } - results = e.ExecuteQuery(q, query.ExecutionOptions{}, nil) + results = e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil) result := <-results - if len(result.Series) != 0 { - t.Errorf("expected %d rows, got %d", 0, len(result.Series)) - } if result.Err != query.ErrQueryEngineShutdown { t.Errorf("unexpected error: %s", result.Err) } } func TestQueryExecutor_Panic(t *testing.T) { + abort := make(chan struct{}) + defer close(abort) + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) if err != nil { t.Fatal(err) @@ -301,17 +309,17 @@ func TestQueryExecutor_Panic(t *testing.T) { }, } - results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil) + results := e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil) result := <-results - if len(result.Series) != 0 { - t.Errorf("expected %d rows, got %d", 0, len(result.Series)) - } if result.Err == nil || result.Err.Error() != "SELECT count(value) FROM cpu [panic:test error]" { t.Errorf("unexpected error: %s", result.Err) } } func TestQueryExecutor_InvalidSource(t *testing.T) { + abort := make(chan struct{}) + defer close(abort) + e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { @@ -350,19 +358,22 @@ func TestQueryExecutor_InvalidSource(t *testing.T) { continue } - results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil) + results := e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil) result := <-results - if len(result.Series) != 0 { - t.Errorf("%d. expected %d rows, got %d", 0, i, len(result.Series)) - } if result.Err == nil || result.Err.Error() != tt.err { t.Errorf("%d. unexpected error: %s", i, result.Err) } } } -func discardOutput(results <-chan *query.Result) { - for range results { +func discardOutput(results <-chan *query.ResultSet) { + for r := range results { // Read all results and discard. + for s := range r.SeriesCh() { + // Read all series and discard. + for range s.RowCh() { + // Read all rows and discard. + } + } } } diff --git a/query/result.go b/query/result.go index a73fd950374..be36073d29f 100644 --- a/query/result.go +++ b/query/result.go @@ -14,6 +14,110 @@ const ( WarningLevel = "warning" ) +type Row struct { + Values []interface{} + Err error +} + +type Series struct { + Name string + Tags Tags + Columns []string + Err error + AbortCh <-chan struct{} + + rowCh chan Row +} + +func (s *Series) Emit(values []interface{}) (ok bool) { + row := Row{Values: values} + select { + case <-s.AbortCh: + return false + case s.rowCh <- row: + return true + } +} + +func (s *Series) Error(err error) (ok bool) { + row := Row{Err: err} + select { + case <-s.AbortCh: + return false + case s.rowCh <- row: + return true + } +} + +func (s *Series) RowCh() <-chan Row { + return s.rowCh +} + +func (s *Series) Close() error { + close(s.rowCh) + return nil +} + +type ResultSet struct { + ID int + Messages []*Message + Err error + AbortCh <-chan struct{} + + seriesCh chan *Series + columns []string +} + +func (rs *ResultSet) Init() *ResultSet { + rs.seriesCh = make(chan *Series) + return rs +} + +func (rs *ResultSet) WithColumns(columns ...string) *ResultSet { + dup := *rs + dup.columns = columns + return &dup +} + +func (rs *ResultSet) CreateSeries(name string) (*Series, bool) { + return rs.CreateSeriesWithTags(name, Tags{}) +} + +func (rs *ResultSet) CreateSeriesWithTags(name string, tags Tags) (*Series, bool) { + series := &Series{ + Name: name, + Tags: tags, + Columns: rs.columns, + rowCh: make(chan Row), + AbortCh: rs.AbortCh, + } + select { + case <-rs.AbortCh: + return nil, false + case rs.seriesCh <- series: + return series, true + } +} + +func (rs *ResultSet) Error(err error) (ok bool) { + series := &Series{Err: err} + select { + case <-rs.AbortCh: + return false + case rs.seriesCh <- series: + return true + } +} + +func (rs *ResultSet) SeriesCh() <-chan *Series { + return rs.seriesCh +} + +func (rs *ResultSet) Close() error { + close(rs.seriesCh) + return nil +} + // TagSet is a fundamental concept within the query system. It represents a composite series, // composed of multiple individual series that share a set of tag attributes. type TagSet struct { @@ -72,7 +176,7 @@ type Message struct { // ReadOnlyWarning generates a warning message that tells the user the command // they are using is being used for writing in a read only context. // -// This is a temporary method while to be used while transitioning to read only +// This is a temporary method to be used while transitioning to read only // operations for issue #6290. func ReadOnlyWarning(stmt string) *Message { return &Message{ diff --git a/query/task_manager.go b/query/task_manager.go index f0b789720d7..f80cec410a9 100644 --- a/query/task_manager.go +++ b/query/task_manager.go @@ -6,7 +6,6 @@ import ( "time" "github.com/influxdata/influxdb/influxql" - "github.com/influxdata/influxdb/models" "github.com/uber-go/zap" ) @@ -53,15 +52,7 @@ func NewTaskManager() *TaskManager { func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx ExecutionContext) error { switch stmt := stmt.(type) { case *influxql.ShowQueriesStatement: - rows, err := t.executeShowQueriesStatement(stmt) - if err != nil { - return err - } - - ctx.Results <- &Result{ - StatementID: ctx.StatementID, - Series: rows, - } + t.executeShowQueriesStatement(stmt, &ctx) case *influxql.KillQueryStatement: var messages []*Message if ctx.ReadOnly { @@ -71,10 +62,12 @@ func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx ExecutionCon if err := t.executeKillQueryStatement(stmt); err != nil { return err } - ctx.Results <- &Result{ - StatementID: ctx.StatementID, - Messages: messages, + result := &ResultSet{ + ID: ctx.StatementID, + Messages: messages, } + ctx.Results <- result.Init() + result.Close() default: return ErrInvalidQuery } @@ -85,13 +78,26 @@ func (t *TaskManager) executeKillQueryStatement(stmt *influxql.KillQueryStatemen return t.KillQuery(stmt.QueryID) } -func (t *TaskManager) executeShowQueriesStatement(q *influxql.ShowQueriesStatement) (models.Rows, error) { +func (t *TaskManager) executeShowQueriesStatement(q *influxql.ShowQueriesStatement, ctx *ExecutionContext) { + result, err := ctx.CreateResult() + if err != nil { + ctx.Error(err) + return + } + defer result.Close() + + result = result.WithColumns("qid", "query", "database", "duration") + series, ok := result.CreateSeries("") + if !ok { + return + } + defer series.Close() + t.mu.RLock() defer t.mu.RUnlock() now := time.Now() - values := make([][]interface{}, 0, len(t.queries)) for id, qi := range t.queries { d := now.Sub(qi.startTime) @@ -103,14 +109,8 @@ func (t *TaskManager) executeShowQueriesStatement(q *influxql.ShowQueriesStateme case d >= time.Microsecond: d = d - (d % time.Microsecond) } - - values = append(values, []interface{}{id, qi.query, qi.database, d.String()}) + series.Emit([]interface{}{id, qi.query, qi.database, d.String()}) } - - return []*models.Row{{ - Columns: []string{"qid", "query", "database", "duration"}, - Values: values, - }}, nil } func (t *TaskManager) queryError(qid uint64, err error) { diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index abfb8197be4..bf509c6a2e6 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -388,9 +388,10 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti // extract number of points written from SELECT ... INTO result var written int64 = -1 - if len(res.Series) == 1 && len(res.Series[0].Values) == 1 { - s := res.Series[0] - written = s.Values[0][1].(int64) + if series := <-res.SeriesCh(); series != nil { + if row, ok := <-series.RowCh(); ok { + written = row.Values[1].(int64) + } } if s.loggingEnabled { @@ -408,7 +409,7 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti } // runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in -func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) *query.Result { +func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) *query.ResultSet { // Wrap the CQ's inner SELECT statement in a Query for the QueryExecutor. q := &influxql.Query{ Statements: influxql.Statements([]influxql.Statement{cq.q}), diff --git a/services/continuous_querier/service_test.go b/services/continuous_querier/service_test.go index 55157306137..64eab23ed95 100644 --- a/services/continuous_querier/service_test.go +++ b/services/continuous_querier/service_test.go @@ -55,8 +55,7 @@ func TestContinuousQueryService_Run(t *testing.T) { if callCnt >= expectCallCnt { done <- struct{}{} } - ctx.Results <- &query.Result{} - return nil + return ctx.Ok() }, } @@ -132,8 +131,7 @@ func TestContinuousQueryService_ResampleOptions(t *testing.T) { t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", timeRange.Min, timeRange.Max, expected.min, expected.max) } done <- struct{}{} - ctx.Results <- &query.Result{} - return nil + return ctx.Ok() }, } @@ -214,8 +212,7 @@ func TestContinuousQueryService_EveryHigherThanInterval(t *testing.T) { t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", timeRange.Min, timeRange.Max, expected.min, expected.max) } done <- struct{}{} - ctx.Results <- &query.Result{} - return nil + return ctx.Ok() }, } @@ -284,8 +281,7 @@ func TestContinuousQueryService_GroupByOffset(t *testing.T) { t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", timeRange.Min, timeRange.Max, expected.min, expected.max) } done <- struct{}{} - ctx.Results <- &query.Result{} - return nil + return ctx.Ok() }, } @@ -317,7 +313,9 @@ func TestContinuousQueryService_NotLeader(t *testing.T) { s.QueryExecutor.StatementExecutor = &StatementExecutor{ ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { done <- struct{}{} - ctx.Results <- &query.Result{Err: errUnexpected} + if ok := ctx.Error(errUnexpected); !ok { + return query.ErrQueryAborted + } return nil }, } @@ -446,8 +444,7 @@ func TestExecuteContinuousQuery_TimeRange(t *testing.T) { t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", timeRange.Min, timeRange.Max, tt.start, tt.end) } done <- struct{}{} - ctx.Results <- &query.Result{} - return nil + return ctx.Ok() }, } @@ -561,8 +558,7 @@ func TestExecuteContinuousQuery_TimeZone(t *testing.T) { t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", timeRange.Min, timeRange.Max, test.start, test.end) } done <- struct{}{} - ctx.Results <- &query.Result{} - return nil + return ctx.Ok() }, } @@ -613,13 +609,19 @@ func TestService_ExecuteContinuousQuery_LogsToMonitor(t *testing.T) { s.QueryExecutor.StatementExecutor = &StatementExecutor{ ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { - ctx.Results <- &query.Result{ - Series: []*models.Row{{ - Name: "result", - Columns: []string{"time", "written"}, - Values: [][]interface{}{{time.Time{}, writeN}}, - }}, + result, err := ctx.CreateResult() + if err != nil { + return err + } + defer result.Close() + + result = result.WithColumns("time", "written") + series, ok := result.CreateSeries("result") + if !ok { + return nil } + defer series.Close() + series.Emit([]interface{}{time.Time{}, writeN}) return nil }, } @@ -659,8 +661,7 @@ func TestService_ExecuteContinuousQuery_LogToMonitor_DisabledByDefault(t *testin s := NewTestService(t) s.QueryExecutor.StatementExecutor = &StatementExecutor{ ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { - ctx.Results <- &query.Result{} - return nil + return ctx.Ok() }, } s.Monitor = &monitor{ diff --git a/services/httpd/encoder.go b/services/httpd/encoder.go new file mode 100644 index 00000000000..b818cc22a56 --- /dev/null +++ b/services/httpd/encoder.go @@ -0,0 +1,281 @@ +package httpd + +import ( + "io" + "net/http" + "strconv" + "strings" + "time" + + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/query" +) + +type Encoder interface { + // ContentType returns the content-type used to identify this format in HTTP. + ContentType() string + + // Encode encodes the full response from the results channel. + Encode(w io.Writer, results <-chan *query.ResultSet) + + // Error encodes a top-level error to the io.Writer. + Error(w io.Writer, err error) +} + +type ResponseFormatter interface { + WriteResponse(w io.Writer, resp Response) (int, error) + WriteError(w io.Writer, err error) + ContentType() string +} + +func NewEncoder(r *http.Request, config *Config) Encoder { + epoch := strings.TrimSpace(r.FormValue("epoch")) + switch r.Header.Get("Accept") { + case "application/csv", "text/csv": + formatter := &csvFormatter{statementID: -1} + chunked, size := parseChunkedOptions(r) + if chunked { + return &chunkedEncoder{ + Formatter: formatter, + ChunkSize: size, + Epoch: epoch, + } + } + return &defaultEncoder{ + Formatter: formatter, + MaxRowLimit: config.MaxRowLimit, + Epoch: epoch, + } + case "application/json": + fallthrough + default: + pretty := r.URL.Query().Get("pretty") == "true" + formatter := &jsonFormatter{Pretty: pretty} + chunked, size := parseChunkedOptions(r) + if chunked { + return &chunkedEncoder{ + Formatter: formatter, + ChunkSize: size, + Epoch: epoch, + } + } + return &defaultEncoder{ + Formatter: formatter, + MaxRowLimit: config.MaxRowLimit, + Epoch: epoch, + } + } +} + +type defaultEncoder struct { + Formatter ResponseFormatter + MaxRowLimit int + Epoch string +} + +func NewDefaultEncoder(formatter ResponseFormatter) Encoder { + return &defaultEncoder{ + Formatter: formatter, + } +} + +func (e *defaultEncoder) ContentType() string { + return e.Formatter.ContentType() +} + +func (e *defaultEncoder) Encode(w io.Writer, results <-chan *query.ResultSet) { + var convertToEpoch func(row *query.Row) + if e.Epoch != "" { + convertToEpoch = epochConverter(e.Epoch) + } + + resp := Response{Results: make([]*query.Result, 0)} + + rows := 0 +RESULTS: + for result := range results { + r := &query.Result{ + StatementID: result.ID, + Messages: result.Messages, + Err: result.Err, + } + resp.Results = append(resp.Results, r) + if r.Err != nil { + continue + } + + for series := range result.SeriesCh() { + if series.Err != nil { + r.Err = series.Err + continue RESULTS + } + + s := &models.Row{ + Name: series.Name, + Tags: series.Tags.KeyValues(), + Columns: series.Columns, + } + r.Series = append(r.Series, s) + + for row := range series.RowCh() { + if row.Err != nil { + r.Err = row.Err + r.Series = nil + continue RESULTS + } else if e.MaxRowLimit > 0 && rows+len(s.Values) >= e.MaxRowLimit { + s.Partial = true + break RESULTS + } + + if convertToEpoch != nil { + convertToEpoch(&row) + } + s.Values = append(s.Values, row.Values) + } + rows += len(s.Values) + } + } + e.Formatter.WriteResponse(w, resp) +} + +func (e *defaultEncoder) Error(w io.Writer, err error) { + e.Formatter.WriteError(w, err) +} + +type chunkedEncoder struct { + Formatter ResponseFormatter + ChunkSize int + Epoch string +} + +func (e *chunkedEncoder) ContentType() string { + return e.Formatter.ContentType() +} + +func (e *chunkedEncoder) Encode(w io.Writer, results <-chan *query.ResultSet) { + var convertToEpoch func(row *query.Row) + if e.Epoch != "" { + convertToEpoch = epochConverter(e.Epoch) + } + + for result := range results { + messages := result.Messages + + series := <-result.SeriesCh() + if series == nil { + e.Formatter.WriteResponse(w, Response{Results: []*query.Result{ + { + StatementID: result.ID, + Messages: messages, + }, + }}) + continue + } else if series.Err != nil { + // An error occurred while processing the result. + e.Formatter.WriteResponse(w, Response{Results: []*query.Result{ + { + StatementID: result.ID, + Messages: messages, + Err: series.Err, + }, + }}) + continue + } + + for series != nil { + var values [][]interface{} + for row := range series.RowCh() { + if row.Err != nil { + // An error occurred while processing the result. + e.Formatter.WriteResponse(w, Response{Results: []*query.Result{ + { + StatementID: result.ID, + Messages: messages, + Err: series.Err, + }, + }}) + continue + } + + if convertToEpoch != nil { + convertToEpoch(&row) + } + + if e.ChunkSize > 0 && len(values) >= e.ChunkSize { + r := &query.Result{ + StatementID: result.ID, + Series: []*models.Row{{ + Name: series.Name, + Tags: series.Tags.KeyValues(), + Columns: series.Columns, + Values: values, + Partial: true, + }}, + Messages: messages, + Partial: true, + } + e.Formatter.WriteResponse(w, Response{Results: []*query.Result{r}}) + messages = nil + values = values[:0] + } + values = append(values, row.Values) + } + + r := &query.Result{ + StatementID: result.ID, + Series: []*models.Row{{ + Name: series.Name, + Tags: series.Tags.KeyValues(), + Columns: series.Columns, + Values: values, + }}, + Messages: messages, + } + + series = <-result.SeriesCh() + if series != nil { + r.Partial = true + } + e.Formatter.WriteResponse(w, Response{Results: []*query.Result{r}}) + } + } +} + +func (e *chunkedEncoder) Error(w io.Writer, err error) { + e.Formatter.WriteError(w, err) +} + +func epochConverter(epoch string) func(row *query.Row) { + divisor := int64(1) + + switch epoch { + case "u": + divisor = int64(time.Microsecond) + case "ms": + divisor = int64(time.Millisecond) + case "s": + divisor = int64(time.Second) + case "m": + divisor = int64(time.Minute) + case "h": + divisor = int64(time.Hour) + } + return func(row *query.Row) { + if ts, ok := row.Values[0].(time.Time); ok { + row.Values[0] = ts.UnixNano() / divisor + } + } +} + +func parseChunkedOptions(r *http.Request) (chunked bool, size int) { + chunked = r.FormValue("chunked") == "true" + if chunked { + size = DefaultChunkSize + if chunked { + if n, err := strconv.ParseInt(r.FormValue("chunk_size"), 10, 64); err == nil && int(n) > 0 { + size = int(n) + } + } + } + return chunked, size +} diff --git a/services/httpd/encoder_test.go b/services/httpd/encoder_test.go new file mode 100644 index 00000000000..4deb3ce0cac --- /dev/null +++ b/services/httpd/encoder_test.go @@ -0,0 +1,79 @@ +package httpd_test + +import ( + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/influxdata/influxdb/query" + "github.com/influxdata/influxdb/services/httpd" +) + +func EmitTestResults(results chan *query.ResultSet) { + result := &query.ResultSet{ID: 0} + results <- result.Init() + result = result.WithColumns("time", "value") + + series, _ := result.CreateSeriesWithTags("cpu", + query.NewTags(map[string]string{"host": "server01"})) + series.Emit([]interface{}{time.Unix(0, 0).UTC(), 2.0}) + series.Emit([]interface{}{time.Unix(10, 0).UTC(), 5.0}) + series.Emit([]interface{}{time.Unix(20, 0).UTC(), 7.0}) + series.Close() + + series, _ = result.CreateSeriesWithTags("cpu", + query.NewTags(map[string]string{"host": "server02"})) + series.Emit([]interface{}{time.Unix(0, 0).UTC(), 8.0}) + series.Close() + result.Close() + + result = &query.ResultSet{ID: 1} + results <- result.Init() + result = result.WithColumns("name") + close(results) + + series, _ = result.CreateSeries("databases") + series.Emit([]interface{}{"db0"}) + series.Emit([]interface{}{"db1"}) + series.Close() + result.Close() +} + +func TestEncoder_Default(t *testing.T) { + req, _ := http.NewRequest("GET", "http://127.0.0.1:8086/query", nil) + req.Header.Set("Accept", "application/json") + resp := httptest.NewRecorder() + + results := make(chan *query.ResultSet) + go EmitTestResults(results) + + config := httpd.NewConfig() + enc := httpd.NewEncoder(req, &config) + enc.Encode(resp, results) + + if have, want := strings.TrimSpace(resp.Body.String()), `{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[["1970-01-01T00:00:00Z",2],["1970-01-01T00:00:10Z",5],["1970-01-01T00:00:20Z",7]]},{"name":"cpu","tags":{"host":"server02"},"columns":["time","value"],"values":[["1970-01-01T00:00:00Z",8]]}]},{"statement_id":1,"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db1"]]}]}]}`; have != want { + t.Errorf("mismatched output:\n\thave=%v\n\twant=%v\n", have, want) + } +} + +func TestEncoder_Chunked(t *testing.T) { + req, _ := http.NewRequest("GET", "http://127.0.0.1:8086/query?chunked=true&chunk_size=2", nil) + req.Header.Set("Accept", "application/json") + resp := httptest.NewRecorder() + + results := make(chan *query.ResultSet) + go EmitTestResults(results) + + config := httpd.NewConfig() + enc := httpd.NewEncoder(req, &config) + enc.Encode(resp, results) + + if have, want := strings.TrimSpace(resp.Body.String()), `{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[["1970-01-01T00:00:00Z",2],["1970-01-01T00:00:10Z",5]],"partial":true}],"partial":true}]} +{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[["1970-01-01T00:00:20Z",7]]}],"partial":true}]} +{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"server02"},"columns":["time","value"],"values":[["1970-01-01T00:00:00Z",8]]}]}]} +{"results":[{"statement_id":1,"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db1"]]}]}]}`; have != want { + t.Errorf("mismatched output:\n\thave=%v\n\twant=%v\n", have, want) + } +} diff --git a/services/httpd/formatters.go b/services/httpd/formatters.go new file mode 100644 index 00000000000..be78782608e --- /dev/null +++ b/services/httpd/formatters.go @@ -0,0 +1,161 @@ +package httpd + +import ( + "encoding/csv" + "encoding/json" + "io" + "strconv" + "time" + + "github.com/influxdata/influxdb/models" +) + +type jsonFormatter struct { + Pretty bool +} + +func (f *jsonFormatter) ContentType() string { + return "application/json" +} + +func (f *jsonFormatter) WriteResponse(w io.Writer, resp Response) (n int, err error) { + var b []byte + if f.Pretty { + b, err = json.MarshalIndent(resp, "", " ") + } else { + b, err = json.Marshal(resp) + } + + if err != nil { + n, err = io.WriteString(w, err.Error()) + } else { + n, err = w.Write(b) + } + + w.Write([]byte("\n")) + n++ + return n, err +} + +func (f *jsonFormatter) WriteError(w io.Writer, err error) { + m := map[string]string{"error": err.Error()} + + var b []byte + if f.Pretty { + b, err = json.MarshalIndent(m, "", " ") + } else { + b, err = json.Marshal(m) + } + + if err != nil { + io.WriteString(w, err.Error()) + } else { + w.Write(b) + } + + w.Write([]byte("\n")) +} + +type csvFormatter struct { + statementID int + columns []string +} + +func (f *csvFormatter) ContentType() string { + return "text/csv" +} + +func (f *csvFormatter) WriteResponse(w io.Writer, resp Response) (n int, err error) { + csv := csv.NewWriter(w) + if resp.Err != nil { + csv.Write([]string{"error"}) + csv.Write([]string{resp.Err.Error()}) + csv.Flush() + return n, csv.Error() + } + + for _, result := range resp.Results { + if result.StatementID != f.statementID { + // If there are no series in the result, skip past this result. + if len(result.Series) == 0 { + continue + } + + // Set the statement id and print out a newline if this is not the first statement. + if f.statementID >= 0 { + // Flush the csv writer and write a newline. + csv.Flush() + if err := csv.Error(); err != nil { + return n, err + } + + out, err := io.WriteString(w, "\n") + if err != nil { + return n, err + } + n += out + } + f.statementID = result.StatementID + + // Print out the column headers from the first series. + f.columns = make([]string, 2+len(result.Series[0].Columns)) + f.columns[0] = "name" + f.columns[1] = "tags" + copy(f.columns[2:], result.Series[0].Columns) + if err := csv.Write(f.columns); err != nil { + return n, err + } + } + + for _, row := range result.Series { + f.columns[0] = row.Name + if len(row.Tags) > 0 { + f.columns[1] = string(models.NewTags(row.Tags).HashKey()[1:]) + } else { + f.columns[1] = "" + } + for _, values := range row.Values { + for i, value := range values { + if value == nil { + f.columns[i+2] = "" + continue + } + + switch v := value.(type) { + case float64: + f.columns[i+2] = strconv.FormatFloat(v, 'f', -1, 64) + case int64: + f.columns[i+2] = strconv.FormatInt(v, 10) + case string: + f.columns[i+2] = v + case bool: + if v { + f.columns[i+2] = "true" + } else { + f.columns[i+2] = "false" + } + case time.Time: + f.columns[i+2] = strconv.FormatInt(v.UnixNano(), 10) + case *float64, *int64, *string, *bool: + f.columns[i+2] = "" + } + } + csv.Write(f.columns) + } + } + } + csv.Flush() + if err := csv.Error(); err != nil { + return n, err + } + return n, nil +} + +func (f *csvFormatter) WriteError(w io.Writer, err error) { + csv := csv.NewWriter(w) + csv.WriteAll([][]string{ + {"error"}, + {err.Error()}, + }) + csv.Flush() +} diff --git a/services/httpd/response_writer_test.go b/services/httpd/formatters_test.go similarity index 51% rename from services/httpd/response_writer_test.go rename to services/httpd/formatters_test.go index 72f183f3458..54f94740219 100644 --- a/services/httpd/response_writer_test.go +++ b/services/httpd/formatters_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/services/httpd" ) @@ -21,32 +20,35 @@ func TestResponseWriter_CSV(t *testing.T) { } w := httptest.NewRecorder() - writer := httpd.NewResponseWriter(w, r) - writer.WriteResponse(httpd.Response{ - Results: []*query.Result{ - { - StatementID: 0, - Series: []*models.Row{ - { - Name: "cpu", - Tags: map[string]string{ - "host": "server01", - "region": "uswest", - }, - Columns: []string{"time", "value"}, - Values: [][]interface{}{ - {time.Unix(0, 10), float64(2.5)}, - {time.Unix(0, 20), int64(5)}, - {time.Unix(0, 30), nil}, - {time.Unix(0, 40), "foobar"}, - {time.Unix(0, 50), true}, - {time.Unix(0, 60), false}, - }, - }, - }, - }, - }, - }) + results := make(chan *query.ResultSet) + go func() { + defer close(results) + result := &query.ResultSet{ID: 0} + results <- result.Init() + defer result.Close() + + result = result.WithColumns("time", "value") + series, _ := result.CreateSeriesWithTags("cpu", query.NewTags(map[string]string{ + "host": "server01", + "region": "uswest", + })) + defer series.Close() + + for _, row := range [][]interface{}{ + {time.Unix(0, 10), float64(2.5)}, + {time.Unix(0, 20), int64(5)}, + {time.Unix(0, 30), nil}, + {time.Unix(0, 40), "foobar"}, + {time.Unix(0, 50), true}, + {time.Unix(0, 60), false}, + } { + series.Emit(row) + } + }() + + config := httpd.NewConfig() + enc := httpd.NewEncoder(r, &config) + enc.Encode(w, results) if got, want := w.Body.String(), `name,tags,time,value cpu,"host=server01,region=uswest",10,2.5 diff --git a/services/httpd/handler.go b/services/httpd/handler.go index b835b7c2701..5c8062c1104 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -230,7 +230,6 @@ func (h *Handler) AddRoutes(routes ...Route) { handler = http.HandlerFunc(hf) } - handler = h.responseWriter(handler) if r.Gzipped { handler = gzipFilter(handler) } @@ -289,11 +288,8 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U }(time.Now()) h.requestTracker.Add(r, user) - // Retrieve the underlying ResponseWriter or initialize our own. - rw, ok := w.(ResponseWriter) - if !ok { - rw = NewResponseWriter(w, r) - } + // Initialize an encoder for us to use to encode the response. + enc := NewEncoder(r, h.Config) // Retrieve the node id the query should be executed on. nodeID, _ := strconv.ParseUint(r.FormValue("node_id"), 10, 64) @@ -307,7 +303,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U if fhs := r.MultipartForm.File["q"]; len(fhs) > 0 { f, err := fhs[0].Open() if err != nil { - h.httpError(rw, err.Error(), http.StatusBadRequest) + h.httpError(w, enc, err.Error(), http.StatusBadRequest) return } defer f.Close() @@ -316,12 +312,10 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U } if qr == nil { - h.httpError(rw, `missing required parameter "q"`, http.StatusBadRequest) + h.httpError(w, enc, `missing required parameter "q"`, http.StatusBadRequest) return } - epoch := strings.TrimSpace(r.FormValue("epoch")) - p := influxql.NewParser(qr) db := r.FormValue("db") @@ -336,7 +330,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U decoder := json.NewDecoder(strings.NewReader(rawParams)) decoder.UseNumber() if err := decoder.Decode(¶ms); err != nil { - h.httpError(rw, "error parsing query parameters: "+err.Error(), http.StatusBadRequest) + h.httpError(w, enc, "error parsing query parameters: "+err.Error(), http.StatusBadRequest) return } @@ -351,7 +345,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U } if err != nil { - h.httpError(rw, "error parsing json value: "+err.Error(), http.StatusBadRequest) + h.httpError(w, enc, "error parsing json value: "+err.Error(), http.StatusBadRequest) return } } @@ -362,7 +356,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U // Parse query from query string. q, err := p.ParseQuery() if err != nil { - h.httpError(rw, "error parsing query: "+err.Error(), http.StatusBadRequest) + h.httpError(w, enc, "error parsing query: "+err.Error(), http.StatusBadRequest) return } @@ -372,28 +366,18 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U if err, ok := err.(meta.ErrAuthorize); ok { h.Logger.Info(fmt.Sprintf("Unauthorized request | user: %q | query: %q | database %q", err.User, err.Query.String(), err.Database)) } - h.httpError(rw, "error authorizing query: "+err.Error(), http.StatusForbidden) + h.httpError(w, enc, "error authorizing query: "+err.Error(), http.StatusForbidden) return } } - // Parse chunk size. Use default if not provided or unparsable. - chunked := r.FormValue("chunked") == "true" - chunkSize := DefaultChunkSize - if chunked { - if n, err := strconv.ParseInt(r.FormValue("chunk_size"), 10, 64); err == nil && int(n) > 0 { - chunkSize = int(n) - } - } - // Parse whether this is an async command. async := r.FormValue("async") == "true" opts := query.ExecutionOptions{ - Database: db, - ChunkSize: chunkSize, - ReadOnly: r.Method == "GET", - NodeID: nodeID, + Database: db, + ReadOnly: r.Method == "GET", + NodeID: nodeID, } if h.Config.AuthEnabled { @@ -442,131 +426,21 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U return } - // if we're not chunking, this will be the in memory buffer for all results before sending to client - resp := Response{Results: make([]*query.Result, 0)} - // Status header is OK once this point is reached. // Attempt to flush the header immediately so the client gets the header information // and knows the query was accepted. - h.writeHeader(rw, http.StatusOK) + w.Header().Set("Content-Type", enc.ContentType()) + h.writeHeader(w, http.StatusOK) if w, ok := w.(http.Flusher); ok { w.Flush() } - // pull all results from the channel - rows := 0 - for r := range results { - // Ignore nil results. - if r == nil { - continue - } - - // if requested, convert result timestamps to epoch - if epoch != "" { - convertToEpoch(r, epoch) - } - - // Write out result immediately if chunked. - if chunked { - n, _ := rw.WriteResponse(Response{ - Results: []*query.Result{r}, - }) - atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n)) - w.(http.Flusher).Flush() - continue - } - - // Limit the number of rows that can be returned in a non-chunked - // response. This is to prevent the server from going OOM when - // returning a large response. If you want to return more than the - // default chunk size, then use chunking to process multiple blobs. - // Iterate through the series in this result to count the rows and - // truncate any rows we shouldn't return. - if h.Config.MaxRowLimit > 0 { - for i, series := range r.Series { - n := h.Config.MaxRowLimit - rows - if n < len(series.Values) { - // We have reached the maximum number of values. Truncate - // the values within this row. - series.Values = series.Values[:n] - // Since this was truncated, it will always be a partial return. - // Add this so the client knows we truncated the response. - series.Partial = true - } - rows += len(series.Values) - - if rows >= h.Config.MaxRowLimit { - // Drop any remaining series since we have already reached the row limit. - if i < len(r.Series) { - r.Series = r.Series[:i+1] - } - break - } - } - } - - // It's not chunked so buffer results in memory. - // Results for statements need to be combined together. - // We need to check if this new result is for the same statement as - // the last result, or for the next statement - l := len(resp.Results) - if l == 0 { - resp.Results = append(resp.Results, r) - } else if resp.Results[l-1].StatementID == r.StatementID { - if r.Err != nil { - resp.Results[l-1] = r - continue - } - - cr := resp.Results[l-1] - rowsMerged := 0 - if len(cr.Series) > 0 { - lastSeries := cr.Series[len(cr.Series)-1] - - for _, row := range r.Series { - if !lastSeries.SameSeries(row) { - // Next row is for a different series than last. - break - } - // Values are for the same series, so append them. - lastSeries.Values = append(lastSeries.Values, row.Values...) - rowsMerged++ - } - } - - // Append remaining rows as new rows. - r.Series = r.Series[rowsMerged:] - cr.Series = append(cr.Series, r.Series...) - cr.Messages = append(cr.Messages, r.Messages...) - cr.Partial = r.Partial - } else { - resp.Results = append(resp.Results, r) - } - - // Drop out of this loop and do not process further results when we hit the row limit. - if h.Config.MaxRowLimit > 0 && rows >= h.Config.MaxRowLimit { - // If the result is marked as partial, remove that partial marking - // here. While the series is partial and we would normally have - // tried to return the rest in the next chunk, we are not using - // chunking and are truncating the series so we don't want to - // signal to the client that we plan on sending another JSON blob - // with another result. The series, on the other hand, still - // returns partial true if it was truncated or had more data to - // send in a future chunk. - r.Partial = false - break - } - } - - // If it's not chunked we buffered everything in memory, so write it out - if !chunked { - n, _ := rw.WriteResponse(resp) - atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n)) - } + // Read the results and encode them in the proper structure. + enc.Encode(w, results) } // async drains the results from an async query and logs a message if it fails. -func (h *Handler) async(q *influxql.Query, results <-chan *query.Result) { +func (h *Handler) async(q *influxql.Query, results <-chan *query.ResultSet) { for r := range results { // Drain the results and do nothing with them. // If it fails, log the failure so there is at least a record of it. @@ -578,6 +452,14 @@ func (h *Handler) async(q *influxql.Query, results <-chan *query.Result) { } h.Logger.Info(fmt.Sprintf("error while running async query: %s: %s", q, r.Err)) } + + for series := range r.SeriesCh() { + for row := range series.RowCh() { + if row.Err != nil { + h.Logger.Info(fmt.Sprintf("error while running async query: %s: %s", q, row.Err)) + } + } + } } } @@ -593,23 +475,23 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U database := r.URL.Query().Get("db") if database == "" { - h.httpError(w, "database is required", http.StatusBadRequest) + h.httpError(w, NewEncoder(r, h.Config), "database is required", http.StatusBadRequest) return } if di := h.MetaClient.Database(database); di == nil { - h.httpError(w, fmt.Sprintf("database not found: %q", database), http.StatusNotFound) + h.httpError(w, NewEncoder(r, h.Config), fmt.Sprintf("database not found: %q", database), http.StatusNotFound) return } if h.Config.AuthEnabled { if user == nil { - h.httpError(w, fmt.Sprintf("user is required to write to database %q", database), http.StatusForbidden) + h.httpError(w, NewEncoder(r, h.Config), fmt.Sprintf("user is required to write to database %q", database), http.StatusForbidden) return } if err := h.WriteAuthorizer.AuthorizeWrite(user.ID(), database); err != nil { - h.httpError(w, fmt.Sprintf("%q user is not authorized to write to database %q", user.ID(), database), http.StatusForbidden) + h.httpError(w, NewEncoder(r, h.Config), fmt.Sprintf("%q user is not authorized to write to database %q", user.ID(), database), http.StatusForbidden) return } } @@ -623,7 +505,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U if r.Header.Get("Content-Encoding") == "gzip" { b, err := gzip.NewReader(r.Body) if err != nil { - h.httpError(w, err.Error(), http.StatusBadRequest) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusBadRequest) return } defer b.Close() @@ -633,7 +515,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U var bs []byte if r.ContentLength > 0 { if h.Config.MaxBodySize > 0 && r.ContentLength > int64(h.Config.MaxBodySize) { - h.httpError(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge) + h.httpError(w, NewEncoder(r, h.Config), http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge) return } @@ -646,14 +528,14 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U _, err := buf.ReadFrom(body) if err != nil { if err == errTruncated { - h.httpError(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge) + h.httpError(w, NewEncoder(r, h.Config), http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge) return } if h.Config.WriteTracing { h.Logger.Info("Write handler unable to read bytes from request body") } - h.httpError(w, err.Error(), http.StatusBadRequest) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusBadRequest) return } atomic.AddInt64(&h.stats.WriteRequestBytesReceived, int64(buf.Len())) @@ -669,7 +551,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U h.writeHeader(w, http.StatusOK) return } - h.httpError(w, parseError.Error(), http.StatusBadRequest) + h.httpError(w, NewEncoder(r, h.Config), parseError.Error(), http.StatusBadRequest) return } @@ -680,7 +562,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U var err error consistency, err = models.ParseConsistencyLevel(level) if err != nil { - h.httpError(w, err.Error(), http.StatusBadRequest) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusBadRequest) return } } @@ -688,27 +570,27 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U // Write points. if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, user, points); influxdb.IsClientError(err) { atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points))) - h.httpError(w, err.Error(), http.StatusBadRequest) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusBadRequest) return } else if influxdb.IsAuthorizationError(err) { atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points))) - h.httpError(w, err.Error(), http.StatusForbidden) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusForbidden) return } else if werr, ok := err.(tsdb.PartialWriteError); ok { atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points)-werr.Dropped)) atomic.AddInt64(&h.stats.PointsWrittenDropped, int64(werr.Dropped)) - h.httpError(w, werr.Error(), http.StatusBadRequest) + h.httpError(w, NewEncoder(r, h.Config), werr.Error(), http.StatusBadRequest) return } else if err != nil { atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points))) - h.httpError(w, err.Error(), http.StatusInternalServerError) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusInternalServerError) return } else if parseError != nil { // We wrote some of the points atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points))) // The other points failed to parse which means the client sent invalid line protocol. We return a 400 // response code as well as the lines that failed to parse. - h.httpError(w, tsdb.PartialWriteError{Reason: parseError.Error()}.Error(), http.StatusBadRequest) + h.httpError(w, NewEncoder(r, h.Config), tsdb.PartialWriteError{Reason: parseError.Error()}.Error(), http.StatusBadRequest) return } @@ -765,14 +647,14 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) { // Retrieve statistics from the monitor. stats, err := h.Monitor.Statistics(nil) if err != nil { - h.httpError(w, err.Error(), http.StatusInternalServerError) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusInternalServerError) return } // Retrieve diagnostics from the monitor. diags, err := h.Monitor.Diagnostics() if err != nil { - h.httpError(w, err.Error(), http.StatusInternalServerError) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusInternalServerError) return } @@ -782,13 +664,13 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) { if val := diags["system"]; val != nil { jv, err := parseSystemDiagnostics(val) if err != nil { - h.httpError(w, err.Error(), http.StatusInternalServerError) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusInternalServerError) return } data, err := json.Marshal(jv) if err != nil { - h.httpError(w, err.Error(), http.StatusInternalServerError) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusInternalServerError) return } @@ -862,12 +744,12 @@ func (h *Handler) serveDebugRequests(w http.ResponseWriter, r *http.Request) { if s := r.URL.Query().Get("seconds"); s == "" { d = DefaultDebugRequestsInterval } else if seconds, err := strconv.ParseInt(s, 10, 64); err != nil { - h.httpError(w, err.Error(), http.StatusBadRequest) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusBadRequest) return } else { d = time.Duration(seconds) * time.Second if d > MaxDebugRequestsInterval { - h.httpError(w, fmt.Sprintf("exceeded maximum interval time: %s > %s", + h.httpError(w, NewEncoder(r, h.Config), fmt.Sprintf("exceeded maximum interval time: %s > %s", influxql.FormatDuration(d), influxql.FormatDuration(MaxDebugRequestsInterval)), http.StatusBadRequest) @@ -957,7 +839,7 @@ func parseSystemDiagnostics(d *diagnostics.Diagnostics) (map[string]interface{}, } // httpError writes an error to the client in a standard format. -func (h *Handler) httpError(w http.ResponseWriter, errmsg string, code int) { +func (h *Handler) httpError(w http.ResponseWriter, enc Encoder, errmsg string, code int) { if code == http.StatusUnauthorized { // If an unauthorized header will be sent back, add a WWW-Authenticate header // as an authorization challenge. @@ -967,19 +849,8 @@ func (h *Handler) httpError(w http.ResponseWriter, errmsg string, code int) { w.Header().Set("X-InfluxDB-Error", errmsg[:int(sz)]) } - response := Response{Err: errors.New(errmsg)} - if rw, ok := w.(ResponseWriter); ok { - h.writeHeader(w, code) - rw.WriteResponse(response) - return - } - - // Default implementation if the response writer hasn't been replaced - // with our special response writer type. - w.Header().Add("Content-Type", "application/json") h.writeHeader(w, code) - b, _ := json.Marshal(response) - w.Write(b) + enc.Error(w, errors.New(errmsg)) } // Filters and filter helpers @@ -1052,7 +923,7 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, meta.User), h * creds, err := parseCredentials(r) if err != nil { atomic.AddInt64(&h.stats.AuthenticationFailures, 1) - h.httpError(w, err.Error(), http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusUnauthorized) return } @@ -1060,14 +931,14 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, meta.User), h * case UserAuthentication: if creds.Username == "" { atomic.AddInt64(&h.stats.AuthenticationFailures, 1) - h.httpError(w, "username required", http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), "username required", http.StatusUnauthorized) return } user, err = h.MetaClient.Authenticate(creds.Username, creds.Password) if err != nil { atomic.AddInt64(&h.stats.AuthenticationFailures, 1) - h.httpError(w, "authorization failed", http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), "authorization failed", http.StatusUnauthorized) return } case BearerAuthentication: @@ -1082,46 +953,46 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, meta.User), h * // Parse and validate the token. token, err := jwt.Parse(creds.Token, keyLookupFn) if err != nil { - h.httpError(w, err.Error(), http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusUnauthorized) return } else if !token.Valid { - h.httpError(w, "invalid token", http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), "invalid token", http.StatusUnauthorized) return } claims, ok := token.Claims.(jwt.MapClaims) if !ok { - h.httpError(w, "problem authenticating token", http.StatusInternalServerError) + h.httpError(w, NewEncoder(r, h.Config), "problem authenticating token", http.StatusInternalServerError) h.Logger.Info("Could not assert JWT token claims as jwt.MapClaims") return } // Make sure an expiration was set on the token. if exp, ok := claims["exp"].(float64); !ok || exp <= 0.0 { - h.httpError(w, "token expiration required", http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), "token expiration required", http.StatusUnauthorized) return } // Get the username from the token. username, ok := claims["username"].(string) if !ok { - h.httpError(w, "username in token must be a string", http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), "username in token must be a string", http.StatusUnauthorized) return } else if username == "" { - h.httpError(w, "token must contain a username", http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), "token must contain a username", http.StatusUnauthorized) return } // Lookup user in the metastore. if user, err = h.MetaClient.User(username); err != nil { - h.httpError(w, err.Error(), http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusUnauthorized) return } else if user == nil { - h.httpError(w, meta.ErrUserNotFound.Error(), http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), meta.ErrUserNotFound.Error(), http.StatusUnauthorized) return } default: - h.httpError(w, "unsupported authentication", http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), "unsupported authentication", http.StatusUnauthorized) } } @@ -1216,13 +1087,6 @@ func (h *Handler) logging(inner http.Handler, name string) http.Handler { }) } -func (h *Handler) responseWriter(inner http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w = NewResponseWriter(w, r) - inner.ServeHTTP(w, r) - }) -} - // if the env var is set, and the value is truthy, then we will *not* // recover from a panic. var willCrash bool diff --git a/services/httpd/handler_test.go b/services/httpd/handler_test.go index 530734d2953..c3909e5740a 100644 --- a/services/httpd/handler_test.go +++ b/services/httpd/handler_test.go @@ -14,7 +14,7 @@ import ( "testing" "time" - "github.com/dgrijalva/jwt-go" + jwt "github.com/dgrijalva/jwt-go" "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/internal" "github.com/influxdata/influxdb/models" @@ -32,8 +32,17 @@ func TestHandler_Query(t *testing.T) { } else if ctx.Database != `foo` { t.Fatalf("unexpected db: %s", ctx.Database) } - ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} - ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})} + ctx.StatementID = 1 + result, _ := ctx.CreateResult() + series, _ := result.CreateSeries("series0") + series.Close() + result.Close() + + ctx.StatementID = 2 + result, _ = ctx.CreateResult() + series, _ = result.CreateSeries("series1") + series.Close() + result.Close() return nil } @@ -55,8 +64,17 @@ func TestHandler_Query_File(t *testing.T) { } else if ctx.Database != `foo` { t.Fatalf("unexpected db: %s", ctx.Database) } - ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} - ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})} + ctx.StatementID = 1 + result, _ := ctx.CreateResult() + series, _ := result.CreateSeries("series0") + series.Close() + result.Close() + + ctx.StatementID = 2 + result, _ = ctx.CreateResult() + series, _ = result.CreateSeries("series1") + series.Close() + result.Close() return nil } @@ -124,8 +142,17 @@ func TestHandler_Query_Auth(t *testing.T) { } else if ctx.Database != `foo` { t.Fatalf("unexpected db: %s", ctx.Database) } - ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} - ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})} + ctx.StatementID = 1 + result, _ := ctx.CreateResult() + series, _ := result.CreateSeries("series0") + series.Close() + result.Close() + + ctx.StatementID = 2 + result, _ = ctx.CreateResult() + series, _ = result.CreateSeries("series1") + series.Close() + result.Close() return nil } @@ -239,73 +266,13 @@ func TestHandler_QueryRegex(t *testing.T) { } else if ctx.Database != `test` { t.Fatalf("unexpected db: %s", ctx.Database) } - ctx.Results <- nil - return nil + return ctx.Ok() } w := httptest.NewRecorder() h.ServeHTTP(w, MustNewRequest("GET", "/query?db=test&q=SELECT%20%2A%20FROM%20test%20WHERE%20url%20%3D~%20%2Fhttp%5C%3A%5C%2F%5C%2Fwww.akamai%5C.com%2F", nil)) } -// Ensure the handler merges results from the same statement. -func TestHandler_Query_MergeResults(t *testing.T) { - h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { - ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} - ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})} - return nil - } - - w := httptest.NewRecorder() - h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil)) - if w.Code != http.StatusOK { - t.Fatalf("unexpected status: %d", w.Code) - } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"},{"name":"series1"}]}]}` { - t.Fatalf("unexpected body: %s", body) - } -} - -// Ensure the handler merges results from the same statement. -func TestHandler_Query_MergeEmptyResults(t *testing.T) { - h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { - ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows{}} - ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})} - return nil - } - - w := httptest.NewRecorder() - h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil)) - if w.Code != http.StatusOK { - t.Fatalf("unexpected status: %d", w.Code) - } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series1"}]}]}` { - t.Fatalf("unexpected body: %s", body) - } -} - -// Ensure the handler can parse chunked and chunk size query parameters. -func TestHandler_Query_Chunked(t *testing.T) { - h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { - if ctx.ChunkSize != 2 { - t.Fatalf("unexpected chunk size: %d", ctx.ChunkSize) - } - ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} - ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})} - return nil - } - - w := httptest.NewRecorder() - h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar&chunked=true&chunk_size=2", nil)) - if w.Code != http.StatusOK { - t.Fatalf("unexpected status: %d", w.Code) - } else if w.Body.String() != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]}]} -{"results":[{"statement_id":1,"series":[{"name":"series1"}]}]} -` { - t.Fatalf("unexpected body: %s", w.Body.String()) - } -} - // Ensure the handler can accept an async query. func TestHandler_Query_Async(t *testing.T) { done := make(chan struct{}) @@ -316,8 +283,17 @@ func TestHandler_Query_Async(t *testing.T) { } else if ctx.Database != `foo` { t.Fatalf("unexpected db: %s", ctx.Database) } - ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} - ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})} + ctx.StatementID = 1 + result, _ := ctx.CreateResult() + series, _ := result.CreateSeries("series0") + series.Close() + result.Close() + + ctx.StatementID = 2 + result, _ = ctx.CreateResult() + series, _ = result.CreateSeries("series1") + series.Close() + result.Close() close(done) return nil } diff --git a/services/httpd/response_writer.go b/services/httpd/response_writer.go deleted file mode 100644 index 477d1d901db..00000000000 --- a/services/httpd/response_writer.go +++ /dev/null @@ -1,188 +0,0 @@ -package httpd - -import ( - "encoding/csv" - "encoding/json" - "io" - "net/http" - "strconv" - "time" - - "github.com/influxdata/influxdb/models" -) - -// ResponseWriter is an interface for writing a response. -type ResponseWriter interface { - // WriteResponse writes a response. - WriteResponse(resp Response) (int, error) - - http.ResponseWriter -} - -// NewResponseWriter creates a new ResponseWriter based on the Accept header -// in the request that wraps the ResponseWriter. -func NewResponseWriter(w http.ResponseWriter, r *http.Request) ResponseWriter { - pretty := r.URL.Query().Get("pretty") == "true" - rw := &responseWriter{ResponseWriter: w} - switch r.Header.Get("Accept") { - case "application/csv", "text/csv": - w.Header().Add("Content-Type", "text/csv") - rw.formatter = &csvFormatter{statementID: -1, Writer: w} - case "application/json": - fallthrough - default: - w.Header().Add("Content-Type", "application/json") - rw.formatter = &jsonFormatter{Pretty: pretty, Writer: w} - } - return rw -} - -// WriteError is a convenience function for writing an error response to the ResponseWriter. -func WriteError(w ResponseWriter, err error) (int, error) { - return w.WriteResponse(Response{Err: err}) -} - -// responseWriter is an implementation of ResponseWriter. -type responseWriter struct { - formatter interface { - WriteResponse(resp Response) (int, error) - } - http.ResponseWriter -} - -// WriteResponse writes the response using the formatter. -func (w *responseWriter) WriteResponse(resp Response) (int, error) { - return w.formatter.WriteResponse(resp) -} - -// Flush flushes the ResponseWriter if it has a Flush() method. -func (w *responseWriter) Flush() { - if w, ok := w.ResponseWriter.(http.Flusher); ok { - w.Flush() - } -} - -// CloseNotify calls CloseNotify on the underlying http.ResponseWriter if it -// exists. Otherwise, it returns a nil channel that will never notify. -func (w *responseWriter) CloseNotify() <-chan bool { - if notifier, ok := w.ResponseWriter.(http.CloseNotifier); ok { - return notifier.CloseNotify() - } - return nil -} - -type jsonFormatter struct { - io.Writer - Pretty bool -} - -func (w *jsonFormatter) WriteResponse(resp Response) (n int, err error) { - var b []byte - if w.Pretty { - b, err = json.MarshalIndent(resp, "", " ") - } else { - b, err = json.Marshal(resp) - } - - if err != nil { - n, err = io.WriteString(w, err.Error()) - } else { - n, err = w.Write(b) - } - - w.Write([]byte("\n")) - n++ - return n, err -} - -type csvFormatter struct { - io.Writer - statementID int - columns []string -} - -func (w *csvFormatter) WriteResponse(resp Response) (n int, err error) { - csv := csv.NewWriter(w) - if resp.Err != nil { - csv.Write([]string{"error"}) - csv.Write([]string{resp.Err.Error()}) - csv.Flush() - return n, csv.Error() - } - - for _, result := range resp.Results { - if result.StatementID != w.statementID { - // If there are no series in the result, skip past this result. - if len(result.Series) == 0 { - continue - } - - // Set the statement id and print out a newline if this is not the first statement. - if w.statementID >= 0 { - // Flush the csv writer and write a newline. - csv.Flush() - if err := csv.Error(); err != nil { - return n, err - } - - out, err := io.WriteString(w, "\n") - if err != nil { - return n, err - } - n += out - } - w.statementID = result.StatementID - - // Print out the column headers from the first series. - w.columns = make([]string, 2+len(result.Series[0].Columns)) - w.columns[0] = "name" - w.columns[1] = "tags" - copy(w.columns[2:], result.Series[0].Columns) - if err := csv.Write(w.columns); err != nil { - return n, err - } - } - - for _, row := range result.Series { - w.columns[0] = row.Name - if len(row.Tags) > 0 { - w.columns[1] = string(models.NewTags(row.Tags).HashKey()[1:]) - } else { - w.columns[1] = "" - } - for _, values := range row.Values { - for i, value := range values { - if value == nil { - w.columns[i+2] = "" - continue - } - - switch v := value.(type) { - case float64: - w.columns[i+2] = strconv.FormatFloat(v, 'f', -1, 64) - case int64: - w.columns[i+2] = strconv.FormatInt(v, 10) - case string: - w.columns[i+2] = v - case bool: - if v { - w.columns[i+2] = "true" - } else { - w.columns[i+2] = "false" - } - case time.Time: - w.columns[i+2] = strconv.FormatInt(v.UnixNano(), 10) - case *float64, *int64, *string, *bool: - w.columns[i+2] = "" - } - } - csv.Write(w.columns) - } - } - } - csv.Flush() - if err := csv.Error(); err != nil { - return n, err - } - return n, nil -} diff --git a/tests/server_test.go b/tests/server_test.go index 67c954cd0e6..ceb3def2fdd 100644 --- a/tests/server_test.go +++ b/tests/server_test.go @@ -6684,15 +6684,17 @@ func TestServer_Query_TimeZone(t *testing.T) { func TestServer_Query_Chunk(t *testing.T) { t.Parallel() - s := OpenServer(NewConfig()) + config := NewConfig() + config.HTTPD.MaxRowLimit = 100 + s := OpenServer(config) defer s.Close() if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicySpec("rp0", 1, 0), true); err != nil { t.Fatal(err) } - writes := make([]string, 10001) // 10,000 is the default chunking size, even when no chunking requested. - expectedValues := make([]string, len(writes)) + writes := make([]string, 101) // 10,000 is the default chunking size, even when no chunking requested. + expectedValues := make([]string, config.HTTPD.MaxRowLimit) for i := 0; i < len(writes); i++ { writes[i] = fmt.Sprintf(`cpu value=%d %d`, i, time.Unix(0, int64(i)).UnixNano()) if i < len(expectedValues) {