diff --git a/coordinator/points_writer_test.go b/coordinator/points_writer_test.go index 438460d4183..2d02485ea87 100644 --- a/coordinator/points_writer_test.go +++ b/coordinator/points_writer_test.go @@ -436,116 +436,6 @@ func (f *fakePointsWriter) WritePointsInto(req *coordinator.IntoWriteRequest) er return f.WritePointsIntoFn(req) } -func TestBufferedPointsWriter(t *testing.T) { - db := "db0" - rp := "rp0" - capacity := 10000 - - writePointsIntoCnt := 0 - pointsWritten := []models.Point{} - - reset := func() { - writePointsIntoCnt = 0 - pointsWritten = pointsWritten[:0] - } - - fakeWriter := &fakePointsWriter{ - WritePointsIntoFn: func(req *coordinator.IntoWriteRequest) error { - writePointsIntoCnt++ - pointsWritten = append(pointsWritten, req.Points...) - return nil - }, - } - - w := coordinator.NewBufferedPointsWriter(fakeWriter, db, rp, capacity) - - // Test that capacity and length are correct for new buffered writer. - if w.Cap() != capacity { - t.Fatalf("exp %d, got %d", capacity, w.Cap()) - } else if w.Len() != 0 { - t.Fatalf("exp %d, got %d", 0, w.Len()) - } - - // Test flushing an empty buffer. - if err := w.Flush(); err != nil { - t.Fatal(err) - } else if writePointsIntoCnt > 0 { - t.Fatalf("exp 0, got %d", writePointsIntoCnt) - } - - // Test writing zero points. - if err := w.WritePointsInto(&coordinator.IntoWriteRequest{ - Database: db, - RetentionPolicy: rp, - Points: []models.Point{}, - }); err != nil { - t.Fatal(err) - } else if writePointsIntoCnt > 0 { - t.Fatalf("exp 0, got %d", writePointsIntoCnt) - } else if w.Len() > 0 { - t.Fatalf("exp 0, got %d", w.Len()) - } - - // Test writing single large bunch of points points. - req := coordinator.WritePointsRequest{ - Database: db, - RetentionPolicy: rp, - } - - numPoints := int(float64(capacity) * 5.5) - for i := 0; i < numPoints; i++ { - req.AddPoint("cpu", float64(i), time.Now().Add(time.Duration(i)*time.Second), nil) - } - - r := coordinator.IntoWriteRequest(req) - if err := w.WritePointsInto(&r); err != nil { - t.Fatal(err) - } else if writePointsIntoCnt != 5 { - t.Fatalf("exp 5, got %d", writePointsIntoCnt) - } else if w.Len() != capacity/2 { - t.Fatalf("exp %d, got %d", capacity/2, w.Len()) - } else if len(pointsWritten) != numPoints-capacity/2 { - t.Fatalf("exp %d, got %d", numPoints-capacity/2, len(pointsWritten)) - } - - if err := w.Flush(); err != nil { - t.Fatal(err) - } else if writePointsIntoCnt != 6 { - t.Fatalf("exp 6, got %d", writePointsIntoCnt) - } else if w.Len() != 0 { - t.Fatalf("exp 0, got %d", w.Len()) - } else if len(pointsWritten) != numPoints { - t.Fatalf("exp %d, got %d", numPoints, len(pointsWritten)) - } else if !reflect.DeepEqual(r.Points, pointsWritten) { - t.Fatal("points don't match") - } - - reset() - - // Test writing points one at a time. - for i, _ := range r.Points { - if err := w.WritePointsInto(&coordinator.IntoWriteRequest{ - Database: db, - RetentionPolicy: rp, - Points: r.Points[i : i+1], - }); err != nil { - t.Fatal(err) - } - } - - if err := w.Flush(); err != nil { - t.Fatal(err) - } else if writePointsIntoCnt != 6 { - t.Fatalf("exp 6, got %d", writePointsIntoCnt) - } else if w.Len() != 0 { - t.Fatalf("exp 0, got %d", w.Len()) - } else if len(pointsWritten) != numPoints { - t.Fatalf("exp %d, got %d", numPoints, len(pointsWritten)) - } else if !reflect.DeepEqual(r.Points, pointsWritten) { - t.Fatal("points don't match") - } -} - var shardID uint64 type fakeStore struct { diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index 0afe9fc8438..7807e2605fc 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -58,149 +58,199 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query. return e.executeSelectStatement(stmt, &ctx) } - var rows models.Rows - var messages []*query.Message - var err error switch stmt := stmt.(type) { case *influxql.AlterRetentionPolicyStatement: + if err := e.executeAlterRetentionPolicyStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeAlterRetentionPolicyStatement(stmt) + return ctx.Ok() case *influxql.CreateContinuousQueryStatement: + if err := e.executeCreateContinuousQueryStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeCreateContinuousQueryStatement(stmt) + return ctx.Ok() case *influxql.CreateDatabaseStatement: + if err := e.executeCreateDatabaseStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeCreateDatabaseStatement(stmt) + return ctx.Ok() case *influxql.CreateRetentionPolicyStatement: + if err := e.executeCreateRetentionPolicyStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeCreateRetentionPolicyStatement(stmt) + return ctx.Ok() case *influxql.CreateSubscriptionStatement: + if err := e.executeCreateSubscriptionStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeCreateSubscriptionStatement(stmt) + return ctx.Ok() case *influxql.CreateUserStatement: + if err := e.executeCreateUserStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeCreateUserStatement(stmt) + return ctx.Ok() case *influxql.DeleteSeriesStatement: - err = e.executeDeleteSeriesStatement(stmt, ctx.Database) + if err := e.executeDeleteSeriesStatement(stmt, ctx.Database); err != nil { + return err + } + if ctx.ReadOnly { + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) + } + return ctx.Ok() case *influxql.DropContinuousQueryStatement: + if err := e.executeDropContinuousQueryStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeDropContinuousQueryStatement(stmt) + return ctx.Ok() case *influxql.DropDatabaseStatement: + if err := e.executeDropDatabaseStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeDropDatabaseStatement(stmt) + return ctx.Ok() case *influxql.DropMeasurementStatement: + if err := e.executeDropMeasurementStatement(stmt, ctx.Database); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeDropMeasurementStatement(stmt, ctx.Database) + return ctx.Ok() case *influxql.DropSeriesStatement: + if err := e.executeDropSeriesStatement(stmt, ctx.Database); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeDropSeriesStatement(stmt, ctx.Database) + return ctx.Ok() case *influxql.DropRetentionPolicyStatement: + if err := e.executeDropRetentionPolicyStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeDropRetentionPolicyStatement(stmt) + return ctx.Ok() case *influxql.DropShardStatement: + if err := e.executeDropShardStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeDropShardStatement(stmt) + return ctx.Ok() case *influxql.DropSubscriptionStatement: + if err := e.executeDropSubscriptionStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeDropSubscriptionStatement(stmt) + return ctx.Ok() case *influxql.DropUserStatement: + if err := e.executeDropUserStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeDropUserStatement(stmt) + return ctx.Ok() case *influxql.ExplainStatement: - rows, err = e.executeExplainStatement(stmt, &ctx) + return e.executeExplainStatement(stmt, &ctx) case *influxql.GrantStatement: + if err := e.executeGrantStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeGrantStatement(stmt) + return ctx.Ok() case *influxql.GrantAdminStatement: + if err := e.executeGrantAdminStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeGrantAdminStatement(stmt) + return ctx.Ok() case *influxql.RevokeStatement: + if err := e.executeRevokeStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeRevokeStatement(stmt) + return ctx.Ok() case *influxql.RevokeAdminStatement: + if err := e.executeRevokeAdminStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeRevokeAdminStatement(stmt) + return ctx.Ok() case *influxql.ShowContinuousQueriesStatement: - rows, err = e.executeShowContinuousQueriesStatement(stmt) + return e.executeShowContinuousQueriesStatement(stmt, &ctx) case *influxql.ShowDatabasesStatement: - rows, err = e.executeShowDatabasesStatement(stmt, &ctx) + return e.executeShowDatabasesStatement(stmt, &ctx) case *influxql.ShowDiagnosticsStatement: - rows, err = e.executeShowDiagnosticsStatement(stmt) + return e.executeShowDiagnosticsStatement(stmt, &ctx) case *influxql.ShowGrantsForUserStatement: - rows, err = e.executeShowGrantsForUserStatement(stmt) + return e.executeShowGrantsForUserStatement(stmt, &ctx) case *influxql.ShowMeasurementsStatement: return e.executeShowMeasurementsStatement(stmt, &ctx) case *influxql.ShowRetentionPoliciesStatement: - rows, err = e.executeShowRetentionPoliciesStatement(stmt) + return e.executeShowRetentionPoliciesStatement(stmt, &ctx) case *influxql.ShowShardsStatement: - rows, err = e.executeShowShardsStatement(stmt) + return e.executeShowShardsStatement(stmt, &ctx) case *influxql.ShowShardGroupsStatement: - rows, err = e.executeShowShardGroupsStatement(stmt) + return e.executeShowShardGroupsStatement(stmt, &ctx) case *influxql.ShowStatsStatement: - rows, err = e.executeShowStatsStatement(stmt) + return e.executeShowStatsStatement(stmt, &ctx) case *influxql.ShowSubscriptionsStatement: - rows, err = e.executeShowSubscriptionsStatement(stmt) + return e.executeShowSubscriptionsStatement(stmt, &ctx) case *influxql.ShowTagValuesStatement: return e.executeShowTagValues(stmt, &ctx) case *influxql.ShowUsersStatement: - rows, err = e.executeShowUsersStatement(stmt) + return e.executeShowUsersStatement(stmt, &ctx) case *influxql.SetPasswordUserStatement: + if err := e.executeSetPasswordUserStatement(stmt); err != nil { + return err + } if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + return ctx.Ok(query.ReadOnlyWarning(stmt.String())) } - err = e.executeSetPasswordUserStatement(stmt) + return ctx.Ok() case *influxql.ShowQueriesStatement, *influxql.KillQueryStatement: // Send query related statements to the task manager. return e.TaskManager.ExecuteStatement(stmt, ctx) default: return query.ErrInvalidQuery } - - if err != nil { - return err - } - - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Series: rows, - Messages: messages, - }) } func (e *StatementExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.AlterRetentionPolicyStatement) error { @@ -400,8 +450,8 @@ func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStateme return e.MetaClient.DropUser(q.Name) } -func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ctx *query.ExecutionContext) (models.Rows, error) { - return nil, errors.New("unimplemented") +func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ctx *query.ExecutionContext) error { + return errors.New("unimplemented") } func (e *StatementExecutor) executeGrantStatement(stmt *influxql.GrantStatement) error { @@ -437,98 +487,110 @@ func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPassw } func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *query.ExecutionContext) error { + if stmt.Target != nil && stmt.Target.Measurement.Database == "" { + return errNoDatabaseInTarget + } + itrs, columns, err := e.createIterators(stmt, ctx) if err != nil { return err } + result, err := func() (*query.ResultSet, error) { + if stmt.Target != nil && ctx.ReadOnly { + return ctx.CreateResult(query.ReadOnlyWarning(stmt.String())) + } + return ctx.CreateResult() + }() + if err != nil { + return err + } - // Generate a row emitter from the iterator set. - em := query.NewEmitter(itrs, stmt.TimeAscending(), ctx.ChunkSize) - em.Columns = columns - if stmt.Location != nil { - em.Location = stmt.Location + // If we are writing the points, we need to send this result to the + // goroutine that will be writing the points so the goroutine can emit the + // number of points that have been written. + if stmt.Target != nil { + // Replace the result we just passed with our own created result. This + // allows us to write to some location that is being read by the + // writer. + r := &query.ResultSet{ + ID: ctx.StatementID, + AbortCh: ctx.AbortCh, + } + r.Init() + go e.writeResult(stmt, r, result) + result = r } - em.OmitTime = stmt.OmitTime - em.EmitName = stmt.EmitName - defer em.Close() + defer result.Close() - // Emit rows to the results channel. - var writeN int64 - var emitted bool + // Set the columns of the result to the statement columns. + result = result.WithColumns(columns...) - var pointsWriter *BufferedPointsWriter - if stmt.Target != nil { - pointsWriter = NewBufferedPointsWriter(e.PointsWriter, stmt.Target.Measurement.Database, stmt.Target.Measurement.RetentionPolicy, 10000) + // Generate a row emitter from the iterator set. + em := query.NewEmitter(itrs, stmt.TimeAscending()) + defer em.Close() + + // Retrieve the time zone location. Default to using UTC. + loc := time.UTC + if stmt.Location != nil { + loc = stmt.Location } + var series *query.Series for { - row, partial, err := em.Emit() + // Fill buffer. Close the series if no more points remain. + t, name, tags, err := em.LoadBuf() if err != nil { - return err - } else if row == nil { - // Check if the query was interrupted while emitting. - select { - case <-ctx.InterruptCh: - return query.ErrQueryInterrupted - default: + // An error occurred while reading the iterators. If we are in the + // middle of processing a series, assume the error comes from + // reading the series. If it has come before we have created any + // series, send the error to the result itself. + if series != nil { + series.Error(err) + series.Close() + } else { + result.Error(err) } - break - } - - // Write points back into system for INTO statements. - if stmt.Target != nil { - if err := e.writeInto(pointsWriter, stmt, row); err != nil { - return err + return query.ErrQueryCanceled + } else if t == query.ZeroTime { + if series != nil { + series.Close() } - writeN += int64(len(row.Values)) - continue + return nil } - result := &query.Result{ - StatementID: ctx.StatementID, - Series: []*models.Row{row}, - Partial: partial, + // Read next set of values from all iterators at a given time/name/tags. + values := make([]interface{}, len(columns)) + if stmt.OmitTime { + em.ReadInto(t, name, tags, values) + } else { + values[0] = time.Unix(0, t).In(loc) + em.ReadInto(t, name, tags, values[1:]) } - // Send results or exit if closing. - if err := ctx.Send(result); err != nil { - return err + if stmt.EmitName != "" { + name = stmt.EmitName } - emitted = true - } - - // Flush remaining points and emit write count if an INTO statement. - if stmt.Target != nil { - if err := pointsWriter.Flush(); err != nil { - return err + if series == nil { + s, ok := result.CreateSeriesWithTags(name, tags) + if !ok { + return query.ErrQueryAborted + } + series = s + } else if series.Name != name || !series.Tags.Equals(&tags) { + series.Close() + s, ok := result.CreateSeriesWithTags(name, tags) + if !ok { + return query.ErrQueryAborted + } + series = s } - var messages []*query.Message - if ctx.ReadOnly { - messages = append(messages, query.ReadOnlyWarning(stmt.String())) + if ok := series.Emit(values); !ok { + series.Close() + return query.ErrQueryAborted } - - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Messages: messages, - Series: []*models.Row{{ - Name: "result", - Columns: []string{"time", "written"}, - Values: [][]interface{}{{time.Unix(0, 0).UTC(), writeN}}, - }}, - }) } - - // Always emit at least one result. - if !emitted { - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Series: make([]*models.Row, 0), - }) - } - - return nil } func (e *StatementExecutor) createIterators(stmt *influxql.SelectStatement, ctx *query.ExecutionContext) ([]query.Iterator, []string, error) { @@ -553,38 +615,59 @@ func (e *StatementExecutor) createIterators(stmt *influxql.SelectStatement, ctx return itrs, columns, nil } -func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement) (models.Rows, error) { +func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement, ctx *query.ExecutionContext) error { dis := e.MetaClient.Databases() - rows := []*models.Row{} + result, err := ctx.CreateResult() + if err != nil { + return err + } + defer result.Close() + + result = result.WithColumns("name", "query") for _, di := range dis { - row := &models.Row{Columns: []string{"name", "query"}, Name: di.Name} + series, ok := result.CreateSeries(di.Name) + if !ok { + return query.ErrQueryAborted + } for _, cqi := range di.ContinuousQueries { - row.Values = append(row.Values, []interface{}{cqi.Name, cqi.Query}) + series.Emit([]interface{}{cqi.Name, cqi.Query}) } - rows = append(rows, row) + series.Close() } - return rows, nil + return nil } -func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement, ctx *query.ExecutionContext) (models.Rows, error) { +func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement, ctx *query.ExecutionContext) error { dis := e.MetaClient.Databases() a := ctx.ExecutionOptions.Authorizer - row := &models.Row{Name: "databases", Columns: []string{"name"}} + result, err := ctx.CreateResult() + if err != nil { + return err + } + defer result.Close() + + result = result.WithColumns("name") + series, ok := result.CreateSeries("databases") + if !ok { + return query.ErrQueryAborted + } + defer series.Close() + for _, di := range dis { // Only include databases that the user is authorized to read or write. if a.AuthorizeDatabase(influxql.ReadPrivilege, di.Name) || a.AuthorizeDatabase(influxql.WritePrivilege, di.Name) { - row.Values = append(row.Values, []interface{}{di.Name}) + series.Emit([]interface{}{di.Name}) } } - return []*models.Row{row}, nil + return nil } -func (e *StatementExecutor) executeShowDiagnosticsStatement(stmt *influxql.ShowDiagnosticsStatement) (models.Rows, error) { +func (e *StatementExecutor) executeShowDiagnosticsStatement(stmt *influxql.ShowDiagnosticsStatement, ctx *query.ExecutionContext) error { diags, err := e.Monitor.Diagnostics() if err != nil { - return nil, err + return err } // Get a sorted list of diagnostics keys. @@ -594,32 +677,53 @@ func (e *StatementExecutor) executeShowDiagnosticsStatement(stmt *influxql.ShowD } sort.Strings(sortedKeys) - rows := make([]*models.Row, 0, len(diags)) + result, err := ctx.CreateResult() + if err != nil { + return err + } + defer result.Close() + for _, k := range sortedKeys { if stmt.Module != "" && k != stmt.Module { continue } - row := &models.Row{Name: k} + series, ok := result.WithColumns(diags[k].Columns...).CreateSeries(k) + if !ok { + return query.ErrQueryAborted + } - row.Columns = diags[k].Columns - row.Values = diags[k].Rows - rows = append(rows, row) + for _, row := range diags[k].Rows { + series.Emit(row) + } + series.Close() } - return rows, nil + return nil } -func (e *StatementExecutor) executeShowGrantsForUserStatement(q *influxql.ShowGrantsForUserStatement) (models.Rows, error) { +func (e *StatementExecutor) executeShowGrantsForUserStatement(q *influxql.ShowGrantsForUserStatement, ctx *query.ExecutionContext) error { priv, err := e.MetaClient.UserPrivileges(q.Name) if err != nil { - return nil, err + return err + } + + result, err := ctx.CreateResult() + if err != nil { + return err } + defer result.Close() + + result = result.WithColumns("database", "privilege") + series, ok := result.CreateSeries("") + if !ok { + return query.ErrQueryAborted + } + defer series.Close() - row := &models.Row{Columns: []string{"database", "privilege"}} for d, p := range priv { - row.Values = append(row.Values, []interface{}{d, p.String()}) + series.Emit([]interface{}{d, p.String()}) } - return []*models.Row{row}, nil + return nil } func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMeasurementsStatement, ctx *query.ExecutionContext) error { @@ -628,11 +732,8 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea } names, err := e.TSDBStore.MeasurementNames(q.Database, q.Condition) - if err != nil || len(names) == 0 { - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Err: err, - }) + if err != nil { + return err } if q.Offset > 0 { @@ -649,50 +750,74 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea } } - values := make([][]interface{}, len(names)) - for i, name := range names { - values[i] = []interface{}{string(name)} + if len(names) == 0 { + return ctx.Ok() + } + + result, err := ctx.CreateResult() + if err != nil { + return err } + defer result.Close() - if len(values) == 0 { - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - }) + result = result.WithColumns("name") + series, ok := result.CreateSeries("measurements") + if !ok { + return query.ErrQueryAborted } + defer series.Close() - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Series: []*models.Row{{ - Name: "measurements", - Columns: []string{"name"}, - Values: values, - }}, - }) + for _, name := range names { + series.Emit([]interface{}{string(name)}) + } + return nil } -func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) (models.Rows, error) { +func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement, ctx *query.ExecutionContext) error { if q.Database == "" { - return nil, ErrDatabaseNameRequired + return ErrDatabaseNameRequired } di := e.MetaClient.Database(q.Database) if di == nil { - return nil, influxdb.ErrDatabaseNotFound(q.Database) + return influxdb.ErrDatabaseNotFound(q.Database) } - row := &models.Row{Columns: []string{"name", "duration", "shardGroupDuration", "replicaN", "default"}} + result, err := ctx.CreateResult() + if err != nil { + return err + } + defer result.Close() + + result = result.WithColumns("name", "duration", "shardGroupDuration", "replicaN", "default") + series, ok := result.CreateSeries("") + if !ok { + return query.ErrQueryAborted + } + defer series.Close() + for _, rpi := range di.RetentionPolicies { - row.Values = append(row.Values, []interface{}{rpi.Name, rpi.Duration.String(), rpi.ShardGroupDuration.String(), rpi.ReplicaN, di.DefaultRetentionPolicy == rpi.Name}) + series.Emit([]interface{}{rpi.Name, rpi.Duration.String(), rpi.ShardGroupDuration.String(), rpi.ReplicaN, di.DefaultRetentionPolicy == rpi.Name}) } - return []*models.Row{row}, nil + return nil } -func (e *StatementExecutor) executeShowShardsStatement(stmt *influxql.ShowShardsStatement) (models.Rows, error) { +func (e *StatementExecutor) executeShowShardsStatement(stmt *influxql.ShowShardsStatement, ctx *query.ExecutionContext) error { dis := e.MetaClient.Databases() - rows := []*models.Row{} + result, err := ctx.CreateResult() + if err != nil { + return err + } + defer result.Close() + + result = result.WithColumns("id", "database", "retention_policy", "shard_group", "start_time", "end_time", "expiry_time", "owners") for _, di := range dis { - row := &models.Row{Columns: []string{"id", "database", "retention_policy", "shard_group", "start_time", "end_time", "expiry_time", "owners"}, Name: di.Name} + series, ok := result.CreateSeries(di.Name) + if !ok { + return query.ErrQueryAborted + } + for _, rpi := range di.RetentionPolicies { for _, sgi := range rpi.ShardGroups { // Shards associated with deleted shard groups are effectively deleted. @@ -707,7 +832,7 @@ func (e *StatementExecutor) executeShowShardsStatement(stmt *influxql.ShowShards ownerIDs[i] = owner.NodeID } - row.Values = append(row.Values, []interface{}{ + series.Emit([]interface{}{ si.ID, di.Name, rpi.Name, @@ -720,15 +845,27 @@ func (e *StatementExecutor) executeShowShardsStatement(stmt *influxql.ShowShards } } } - rows = append(rows, row) + series.Close() } - return rows, nil + return nil } -func (e *StatementExecutor) executeShowShardGroupsStatement(stmt *influxql.ShowShardGroupsStatement) (models.Rows, error) { +func (e *StatementExecutor) executeShowShardGroupsStatement(stmt *influxql.ShowShardGroupsStatement, ctx *query.ExecutionContext) error { dis := e.MetaClient.Databases() - row := &models.Row{Columns: []string{"id", "database", "retention_policy", "start_time", "end_time", "expiry_time"}, Name: "shard groups"} + result, err := ctx.CreateResult() + if err != nil { + return err + } + defer result.Close() + + result = result.WithColumns("id", "database", "retention_policy", "start_time", "end_time", "expiry_time") + series, ok := result.CreateSeries("shard groups") + if !ok { + return query.ErrQueryAborted + } + defer series.Close() + for _, di := range dis { for _, rpi := range di.RetentionPolicies { for _, sgi := range rpi.ShardGroups { @@ -738,7 +875,7 @@ func (e *StatementExecutor) executeShowShardGroupsStatement(stmt *influxql.ShowS continue } - row.Values = append(row.Values, []interface{}{ + series.Emit([]interface{}{ sgi.ID, di.Name, rpi.Name, @@ -749,50 +886,74 @@ func (e *StatementExecutor) executeShowShardGroupsStatement(stmt *influxql.ShowS } } } - - return []*models.Row{row}, nil + return nil } -func (e *StatementExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsStatement) (models.Rows, error) { +func (e *StatementExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsStatement, ctx *query.ExecutionContext) error { stats, err := e.Monitor.Statistics(nil) if err != nil { - return nil, err + return err + } + + result, err := ctx.CreateResult() + if err != nil { + return err } + defer result.Close() - var rows []*models.Row for _, stat := range stats { if stmt.Module != "" && stat.Name != stmt.Module { continue } - row := &models.Row{Name: stat.Name, Tags: stat.Tags} - values := make([]interface{}, 0, len(stat.Values)) - for _, k := range stat.ValueNames() { - row.Columns = append(row.Columns, k) - values = append(values, stat.Values[k]) + result := result.WithColumns(stat.ValueNames()...) + series, ok := result.CreateSeriesWithTags(stat.Name, query.NewTags(stat.Tags)) + if !ok { + return query.ErrQueryAborted } - row.Values = [][]interface{}{values} - rows = append(rows, row) + + row := make([]interface{}, 0, len(series.Columns)) + for _, k := range series.Columns { + row = append(row, stat.Values[k]) + } + series.Emit(row) + series.Close() } - return rows, nil + return nil } -func (e *StatementExecutor) executeShowSubscriptionsStatement(stmt *influxql.ShowSubscriptionsStatement) (models.Rows, error) { +func (e *StatementExecutor) executeShowSubscriptionsStatement(stmt *influxql.ShowSubscriptionsStatement, ctx *query.ExecutionContext) error { dis := e.MetaClient.Databases() - rows := []*models.Row{} + result, err := ctx.CreateResult() + if err != nil { + return err + } + defer result.Close() + + result = result.WithColumns("retention_policy", "name", "mode", "destinations") for _, di := range dis { - row := &models.Row{Columns: []string{"retention_policy", "name", "mode", "destinations"}, Name: di.Name} + var series *query.Series for _, rpi := range di.RetentionPolicies { for _, si := range rpi.Subscriptions { - row.Values = append(row.Values, []interface{}{rpi.Name, si.Name, si.Mode, si.Destinations}) + // Lazily initialize the series so we don't emit a series that + // has no subscriptions. + if series == nil { + s, ok := result.CreateSeries(di.Name) + if !ok { + return query.ErrQueryAborted + } + series = s + } + series.Emit([]interface{}{rpi.Name, si.Name, si.Mode, si.Destinations}) } } - if len(row.Values) > 0 { - rows = append(rows, row) + + if series != nil { + series.Close() } } - return rows, nil + return nil } func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatement, ctx *query.ExecutionContext) error { @@ -802,13 +963,16 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem tagValues, err := e.TSDBStore.TagValues(q.Database, q.Condition) if err != nil { - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Err: err, - }) + return err + } + + result, err := ctx.CreateResult() + if err != nil { + return err } + defer result.Close() - emitted := false + result = result.WithColumns("key", "value") for _, m := range tagValues { values := m.Values @@ -830,39 +994,36 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem continue } - row := &models.Row{ - Name: m.Measurement, - Columns: []string{"key", "value"}, - Values: make([][]interface{}, len(values)), + series, ok := result.CreateSeries(m.Measurement) + if !ok { + return query.ErrQueryAborted } - for i, v := range values { - row.Values[i] = []interface{}{v.Key, v.Value} + for _, v := range values { + series.Emit([]interface{}{v.Key, v.Value}) } + series.Close() + } + return nil +} - if err := ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - Series: []*models.Row{row}, - }); err != nil { - return err - } - emitted = true +func (e *StatementExecutor) executeShowUsersStatement(q *influxql.ShowUsersStatement, ctx *query.ExecutionContext) error { + result, err := ctx.CreateResult() + if err != nil { + return err } + defer result.Close() - // Ensure at least one result is emitted. - if !emitted { - return ctx.Send(&query.Result{ - StatementID: ctx.StatementID, - }) + result = result.WithColumns("user", "admin") + series, ok := result.CreateSeries("") + if !ok { + return query.ErrQueryAborted } - return nil -} + defer series.Close() -func (e *StatementExecutor) executeShowUsersStatement(q *influxql.ShowUsersStatement) (models.Rows, error) { - row := &models.Row{Columns: []string{"user", "admin"}} for _, ui := range e.MetaClient.Users() { - row.Values = append(row.Values, []interface{}{ui.Name, ui.Admin}) + series.Emit([]interface{}{ui.Name, ui.Admin}) } - return []*models.Row{row}, nil + return nil } // BufferedPointsWriter adds buffering to a pointsWriter so that SELECT INTO queries @@ -874,148 +1035,91 @@ type BufferedPointsWriter struct { retentionPolicy string } -// NewBufferedPointsWriter returns a new BufferedPointsWriter. -func NewBufferedPointsWriter(w pointsWriter, database, retentionPolicy string, capacity int) *BufferedPointsWriter { - return &BufferedPointsWriter{ - w: w, - buf: make([]models.Point, 0, capacity), - database: database, - retentionPolicy: retentionPolicy, - } -} +func (e *StatementExecutor) writeResult(stmt *influxql.SelectStatement, in, out *query.ResultSet) { + defer out.Close() + measurementName := stmt.Target.Measurement.Name -// WritePointsInto implements pointsWriter for BufferedPointsWriter. -func (w *BufferedPointsWriter) WritePointsInto(req *IntoWriteRequest) error { - // Make sure we're buffering points only for the expected destination. - if req.Database != w.database || req.RetentionPolicy != w.retentionPolicy { - return fmt.Errorf("writer for %s.%s can't write into %s.%s", w.database, w.retentionPolicy, req.Database, req.RetentionPolicy) - } - - for i := 0; i < len(req.Points); { - // Get the available space in the buffer. - avail := cap(w.buf) - len(w.buf) - - // Calculate number of points to copy into the buffer. - n := len(req.Points[i:]) - if n > avail { - n = avail + var writeN int64 + points := make([]models.Point, 0, 10000) + for series := range in.SeriesCh() { + if series.Err != nil { + continue } - // Copy points into buffer. - w.buf = append(w.buf, req.Points[i:n+i]...) - - // Advance the index by number of points copied. - i += n + // Convert the tags from the influxql format to the one expected by models. + name := measurementName + if name == "" { + name = series.Name + } + tags := models.NewTags(series.Tags.KeyValues()) + for row := range series.RowCh() { + if row.Err != nil { + continue + } - // If buffer is full, flush points to underlying writer. - if len(w.buf) == cap(w.buf) { - if err := w.Flush(); err != nil { - return err + // Convert the row back to a point. + point, err := convertRowToPoint(name, tags, series.Columns, row.Values) + if err != nil { + out.Error(err) + return + } + points = append(points, point) + + if len(points) == cap(points) { + if err := e.PointsWriter.WritePointsInto(&IntoWriteRequest{ + Database: stmt.Target.Measurement.Database, + RetentionPolicy: stmt.Target.Measurement.RetentionPolicy, + Points: points, + }); err != nil { + out.Error(err) + return + } + writeN += int64(len(points)) + points = points[:0] } } } - return nil -} - -// Flush writes all buffered points to the underlying writer. -func (w *BufferedPointsWriter) Flush() error { - if len(w.buf) == 0 { - return nil - } - - if err := w.w.WritePointsInto(&IntoWriteRequest{ - Database: w.database, - RetentionPolicy: w.retentionPolicy, - Points: w.buf, - }); err != nil { - return err - } - - // Clear the buffer. - w.buf = w.buf[:0] - - return nil -} - -// Len returns the number of points buffered. -func (w *BufferedPointsWriter) Len() int { return len(w.buf) } - -// Cap returns the capacity (in points) of the buffer. -func (w *BufferedPointsWriter) Cap() int { return cap(w.buf) } - -func (e *StatementExecutor) writeInto(w pointsWriter, stmt *influxql.SelectStatement, row *models.Row) error { - if stmt.Target.Measurement.Database == "" { - return errNoDatabaseInTarget - } - - // It might seem a bit weird that this is where we do this, since we will have to - // convert rows back to points. The Executors (both aggregate and raw) are complex - // enough that changing them to write back to the DB is going to be clumsy - // - // it might seem weird to have the write be in the QueryExecutor, but the interweaving of - // limitedRowWriter and ExecuteAggregate/Raw makes it ridiculously hard to make sure that the - // results will be the same as when queried normally. - name := stmt.Target.Measurement.Name - if name == "" { - name = row.Name + if len(points) > 0 { + if err := e.PointsWriter.WritePointsInto(&IntoWriteRequest{ + Database: stmt.Target.Measurement.Database, + RetentionPolicy: stmt.Target.Measurement.RetentionPolicy, + Points: points, + }); err != nil { + out.Error(err) + return + } + writeN += int64(len(points)) } - points, err := convertRowToPoints(name, row) - if err != nil { - return err + series, ok := out.WithColumns("time", "written").CreateSeries("result") + if !ok { + return } - - if err := w.WritePointsInto(&IntoWriteRequest{ - Database: stmt.Target.Measurement.Database, - RetentionPolicy: stmt.Target.Measurement.RetentionPolicy, - Points: points, - }); err != nil { - return err - } - - return nil + series.Emit([]interface{}{time.Unix(0, 0).UTC(), writeN}) + series.Close() } var errNoDatabaseInTarget = errors.New("no database in target") // convertRowToPoints will convert a query result Row into Points that can be written back in. -func convertRowToPoints(measurementName string, row *models.Row) ([]models.Point, error) { - // figure out which parts of the result are the time and which are the fields - timeIndex := -1 - fieldIndexes := make(map[string]int) - for i, c := range row.Columns { - if c == "time" { - timeIndex = i - } else { - fieldIndexes[c] = i +func convertRowToPoint(measurementName string, tags models.Tags, columns []string, row []interface{}) (models.Point, error) { + // Iterate through the columns and treat the first "time" field as the time. + var t time.Time + vals := make(map[string]interface{}, len(columns)-1) + for i, c := range columns { + if c == "time" && t.IsZero() { + t = row[i].(time.Time) + } else if val := row[i]; val != nil { + vals[c] = val } } - if timeIndex == -1 { + // If the time is zero, there was no time for some reason. + if t.IsZero() { return nil, errors.New("error finding time index in result") } - - points := make([]models.Point, 0, len(row.Values)) - for _, v := range row.Values { - vals := make(map[string]interface{}) - for fieldName, fieldIndex := range fieldIndexes { - val := v[fieldIndex] - if val != nil { - vals[fieldName] = v[fieldIndex] - } - } - - p, err := models.NewPoint(measurementName, models.NewTags(row.Tags), vals, v[timeIndex].(time.Time)) - if err != nil { - // Drop points that can't be stored - continue - } - - points = append(points, p) - } - - return points, nil + return models.NewPoint(measurementName, tags, vals, t) } // NormalizeStatement adds a default database and policy to the measurements in statement. diff --git a/coordinator/statement_executor_test.go b/coordinator/statement_executor_test.go index 893c190860b..52208a6b19c 100644 --- a/coordinator/statement_executor_test.go +++ b/coordinator/statement_executor_test.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "io" + "io/ioutil" "os" "reflect" "regexp" @@ -16,6 +17,7 @@ import ( "github.com/influxdata/influxdb/internal" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/query" + "github.com/influxdata/influxdb/services/httpd" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/tsdb" "github.com/uber-go/zap" @@ -66,7 +68,7 @@ func TestQueryExecutor_ExecuteQuery_SelectStatement(t *testing.T) { } // Verify all results from the query. - if a := ReadAllResults(e.ExecuteQuery(`SELECT * FROM cpu`, "db0", 0)); !reflect.DeepEqual(a, []*query.Result{ + if a := ReadAllResults(t, e.ExecuteQuery(`SELECT * FROM cpu`, "db0")); !reflect.DeepEqual(a, []*query.Result{ { StatementID: 0, Series: []*models.Row{{ @@ -118,7 +120,7 @@ func TestQueryExecutor_ExecuteQuery_MaxSelectBucketsN(t *testing.T) { } // Verify all results from the query. - if a := ReadAllResults(e.ExecuteQuery(`SELECT count(value) FROM cpu WHERE time >= '2000-01-01T00:00:05Z' AND time < '2000-01-01T00:00:35Z' GROUP BY time(10s)`, "db0", 0)); !reflect.DeepEqual(a, []*query.Result{ + if a := ReadAllResults(t, e.ExecuteQuery(`SELECT count(value) FROM cpu WHERE time >= '2000-01-01T00:00:05Z' AND time < '2000-01-01T00:00:35Z' GROUP BY time(10s)`, "db0")); !reflect.DeepEqual(a, []*query.Result{ { StatementID: 0, Err: errors.New("max-select-buckets limit exceeded: (4/3)"), @@ -239,7 +241,7 @@ func TestQueryExecutor_ExecuteQuery_ShowDatabases(t *testing.T) { t.Fatal(err) } - results := ReadAllResults(qe.ExecuteQuery(q, opt, make(chan struct{}))) + results := ReadAllResults(t, qe.ExecuteQuery(q, opt, make(chan struct{}))) exp := []*query.Result{ { StatementID: 0, @@ -303,10 +305,9 @@ func DefaultQueryExecutor() *QueryExecutor { } // ExecuteQuery parses query and executes against the database. -func (e *QueryExecutor) ExecuteQuery(q, database string, chunkSize int) <-chan *query.Result { +func (e *QueryExecutor) ExecuteQuery(q, database string) <-chan *query.ResultSet { return e.QueryExecutor.ExecuteQuery(MustParseQuery(q), query.ExecutionOptions{ - Database: database, - ChunkSize: chunkSize, + Database: database, }, make(chan struct{})) } @@ -433,13 +434,25 @@ func MustParseQuery(s string) *influxql.Query { return q } +type mockResponseWriter struct { + T *testing.T + Results []*query.Result +} + +func (m *mockResponseWriter) ContentType() string { return "" } +func (m *mockResponseWriter) WriteError(w io.Writer, err error) { m.T.Error(err) } + +func (m *mockResponseWriter) WriteResponse(w io.Writer, resp httpd.Response) (int, error) { + m.Results = append(m.Results, resp.Results...) + return 0, nil +} + // ReadAllResults reads all results from c and returns as a slice. -func ReadAllResults(c <-chan *query.Result) []*query.Result { - var a []*query.Result - for result := range c { - a = append(a, result) - } - return a +func ReadAllResults(t *testing.T, c <-chan *query.ResultSet) []*query.Result { + rw := mockResponseWriter{T: t} + enc := httpd.NewDefaultEncoder(&rw) + enc.Encode(ioutil.Discard, c) + return rw.Results } // FloatIterator is a represents an iterator that reads from a slice. diff --git a/query/emitter.go b/query/emitter.go index 6909172bca9..c5fb2649934 100644 --- a/query/emitter.go +++ b/query/emitter.go @@ -2,9 +2,6 @@ package query import ( "fmt" - "time" - - "github.com/influxdata/influxdb/models" ) // Emitter groups values together by name, tags, and time. @@ -12,33 +9,14 @@ type Emitter struct { buf []Point itrs []Iterator ascending bool - chunkSize int - - tags Tags - row *models.Row - - // The columns to attach to each row. - Columns []string - - // Overridden measurement name to emit. - EmitName string - - // The time zone location. - Location *time.Location - - // Removes the "time" column from output. - // Used for meta queries where time does not apply. - OmitTime bool } // NewEmitter returns a new instance of Emitter that pulls from itrs. -func NewEmitter(itrs []Iterator, ascending bool, chunkSize int) *Emitter { +func NewEmitter(itrs []Iterator, ascending bool) *Emitter { return &Emitter{ buf: make([]Point, len(itrs)), itrs: itrs, ascending: ascending, - chunkSize: chunkSize, - Location: time.UTC, } } @@ -47,59 +25,9 @@ func (e *Emitter) Close() error { return Iterators(e.itrs).Close() } -// Emit returns the next row from the iterators. -func (e *Emitter) Emit() (*models.Row, bool, error) { - // Immediately end emission if there are no iterators. - if len(e.itrs) == 0 { - return nil, false, nil - } - - // Continually read from iterators until they are exhausted. - for { - // Fill buffer. Return row if no more points remain. - t, name, tags, err := e.loadBuf() - if err != nil { - return nil, false, err - } else if t == ZeroTime { - row := e.row - e.row = nil - return row, false, nil - } - - // Read next set of values from all iterators at a given time/name/tags. - // If no values are returned then return row. - values := e.readAt(t, name, tags) - if values == nil { - row := e.row - e.row = nil - return row, false, nil - } - - // If there's no row yet then create one. - // If the name and tags match the existing row, append to that row if - // the number of values doesn't exceed the chunk size. - // Otherwise return existing row and add values to next emitted row. - if e.row == nil { - e.createRow(name, tags, values) - } else if e.row.Name == name && e.tags.Equals(&tags) { - if e.chunkSize > 0 && len(e.row.Values) >= e.chunkSize { - row := e.row - row.Partial = true - e.createRow(name, tags, values) - return row, true, nil - } - e.row.Values = append(e.row.Values, values) - } else { - row := e.row - e.createRow(name, tags, values) - return row, true, nil - } - } -} - -// loadBuf reads in points into empty buffer slots. +// LoadBuf reads in points into empty buffer slots. // Returns the next time/name/tags to emit for. -func (e *Emitter) loadBuf() (t int64, name string, tags Tags, err error) { +func (e *Emitter) LoadBuf() (t int64, name string, tags Tags, err error) { t = ZeroTime for i := range e.itrs { @@ -140,38 +68,8 @@ func (e *Emitter) loadBuf() (t int64, name string, tags Tags, err error) { return } -// createRow creates a new row attached to the emitter. -func (e *Emitter) createRow(name string, tags Tags, values []interface{}) { - if e.EmitName != "" { - name = e.EmitName - } - - e.tags = tags - e.row = &models.Row{ - Name: name, - Tags: tags.KeyValues(), - Columns: e.Columns, - Values: [][]interface{}{values}, - } -} - -// readAt returns the next slice of values from the iterators at time/name/tags. -// Returns nil values once the iterators are exhausted. -func (e *Emitter) readAt(t int64, name string, tags Tags) []interface{} { - offset := 1 - if e.OmitTime { - offset = 0 - } - - values := make([]interface{}, len(e.itrs)+offset) - if !e.OmitTime { - values[0] = time.Unix(0, t).In(e.Location) - } - e.readInto(t, name, tags, values[offset:]) - return values -} - -func (e *Emitter) readInto(t int64, name string, tags Tags, values []interface{}) { +// ReadInto reads the values at time, name, and tags into values. +func (e *Emitter) ReadInto(t int64, name string, tags Tags, values []interface{}) { for i, p := range e.buf { // Skip if buffer is empty. if p == nil { diff --git a/query/emitter_test.go b/query/emitter_test.go index dae9c1e00fa..79458fae4ed 100644 --- a/query/emitter_test.go +++ b/query/emitter_test.go @@ -1,17 +1,14 @@ package query_test import ( + "reflect" "testing" - "time" - "github.com/davecgh/go-spew/spew" - "github.com/influxdata/influxdb/models" - "github.com/influxdata/influxdb/pkg/deep" "github.com/influxdata/influxdb/query" ) // Ensure the emitter can group iterators together into rows. -func TestEmitter_Emit(t *testing.T) { +func TestEmitter(t *testing.T) { // Build an emitter that pulls from two iterators. e := query.NewEmitter([]query.Iterator{ &FloatIterator{Points: []query.FloatPoint{ @@ -23,103 +20,76 @@ func TestEmitter_Emit(t *testing.T) { {Name: "cpu", Tags: ParseTags("region=north"), Time: 0, Value: 4}, {Name: "mem", Time: 4, Value: 5}, }}, - }, true, 0) - e.Columns = []string{"col1", "col2"} + }, true) // Verify the cpu region=west is emitted first. - if row, _, err := e.Emit(); err != nil { + var values [2]interface{} + if ts, name, tags, err := e.LoadBuf(); err != nil { t.Fatalf("unexpected error(0): %s", err) - } else if !deep.Equal(row, &models.Row{ - Name: "cpu", - Tags: map[string]string{"region": "west"}, - Columns: []string{"col1", "col2"}, - Values: [][]interface{}{ - {time.Unix(0, 0).UTC(), float64(1), nil}, - {time.Unix(0, 1).UTC(), float64(2), float64(4)}, - }, - }) { - t.Fatalf("unexpected row(0): %s", spew.Sdump(row)) + } else if have, want := ts, int64(0); have != want { + t.Fatalf("unexpected time(0): have=%v want=%v", have, want) + } else if have, want := name, "cpu"; have != want { + t.Fatalf("unexpected name(0): have=%v want=%v", have, want) + } else if have, want := tags.ID(), "region\x00west"; have != want { + t.Fatalf("unexpected tags(0): have=%v want=%v", have, want) + } else { + e.ReadInto(ts, name, tags, values[:]) + if have, want := values[:], []interface{}{float64(1), nil}; !reflect.DeepEqual(have, want) { + t.Fatalf("unexpected values(0): have=%v want=%v", have, want) + } } - // Verify the cpu region=north is emitted next. - if row, _, err := e.Emit(); err != nil { + if ts, name, tags, err := e.LoadBuf(); err != nil { t.Fatalf("unexpected error(1): %s", err) - } else if !deep.Equal(row, &models.Row{ - Name: "cpu", - Tags: map[string]string{"region": "north"}, - Columns: []string{"col1", "col2"}, - Values: [][]interface{}{ - {time.Unix(0, 0).UTC(), nil, float64(4)}, - }, - }) { - t.Fatalf("unexpected row(1): %s", spew.Sdump(row)) + } else if have, want := ts, int64(1); have != want { + t.Fatalf("unexpected time(1): have=%v want=%v", have, want) + } else if have, want := name, "cpu"; have != want { + t.Fatalf("unexpected name(1): have=%v want=%v", have, want) + } else if have, want := tags.ID(), "region\x00west"; have != want { + t.Fatalf("unexpected tags(1): have=%v want=%v", have, want) + } else { + e.ReadInto(ts, name, tags, values[:]) + if have, want := values[:], []interface{}{float64(2), float64(4)}; !reflect.DeepEqual(have, want) { + t.Fatalf("unexpected values(1): have=%v want=%v", have, want) + } } - // Verify the mem series is emitted last. - if row, _, err := e.Emit(); err != nil { + // Verify the cpu region=north is emitted next. + if ts, name, tags, err := e.LoadBuf(); err != nil { t.Fatalf("unexpected error(2): %s", err) - } else if !deep.Equal(row, &models.Row{ - Name: "mem", - Columns: []string{"col1", "col2"}, - Values: [][]interface{}{ - {time.Unix(0, 4).UTC(), nil, float64(5)}, - }, - }) { - t.Fatalf("unexpected row(2): %s", spew.Sdump(row)) + } else if have, want := ts, int64(0); have != want { + t.Fatalf("unexpected time(2): have=%v want=%v", have, want) + } else if have, want := name, "cpu"; have != want { + t.Fatalf("unexpected name(2): have=%v want=%v", have, want) + } else if have, want := tags.ID(), "region\x00north"; have != want { + t.Fatalf("unexpected tags(2): have=%v want=%v", have, want) + } else { + e.ReadInto(ts, name, tags, values[:]) + if have, want := values[:], []interface{}{nil, float64(4)}; !reflect.DeepEqual(have, want) { + t.Fatalf("unexpected values(2): have=%v want=%v", have, want) + } } - // Verify EOF. - if row, _, err := e.Emit(); err != nil { - t.Fatalf("unexpected error(eof): %s", err) - } else if row != nil { - t.Fatalf("unexpected eof: %s", spew.Sdump(row)) - } -} - -// Ensure the emitter will limit the chunked output from a series. -func TestEmitter_ChunkSize(t *testing.T) { - // Build an emitter that pulls from one iterator with multiple points in the same series. - e := query.NewEmitter([]query.Iterator{ - &FloatIterator{Points: []query.FloatPoint{ - {Name: "cpu", Tags: ParseTags("region=west"), Time: 0, Value: 1}, - {Name: "cpu", Tags: ParseTags("region=west"), Time: 1, Value: 2}, - }}, - }, true, 1) - e.Columns = []string{"col1"} - - // Verify the cpu region=west is emitted first. - if row, _, err := e.Emit(); err != nil { - t.Fatalf("unexpected error(0): %s", err) - } else if !deep.Equal(row, &models.Row{ - Name: "cpu", - Tags: map[string]string{"region": "west"}, - Columns: []string{"col1"}, - Values: [][]interface{}{ - {time.Unix(0, 0).UTC(), float64(1)}, - }, - Partial: true, - }) { - t.Fatalf("unexpected row(0): %s", spew.Sdump(row)) - } - - // Verify the cpu region=north is emitted next. - if row, _, err := e.Emit(); err != nil { - t.Fatalf("unexpected error(1): %s", err) - } else if !deep.Equal(row, &models.Row{ - Name: "cpu", - Tags: map[string]string{"region": "west"}, - Columns: []string{"col1"}, - Values: [][]interface{}{ - {time.Unix(0, 1).UTC(), float64(2)}, - }, - }) { - t.Fatalf("unexpected row(1): %s", spew.Sdump(row)) + // Verify the mem series is emitted last. + if ts, name, tags, err := e.LoadBuf(); err != nil { + t.Fatalf("unexpected error(3): %s", err) + } else if have, want := ts, int64(4); have != want { + t.Fatalf("unexpected time(3): have=%v want=%v", have, want) + } else if have, want := name, "mem"; have != want { + t.Fatalf("unexpected name(3): have=%v want=%v", have, want) + } else if have, want := tags.ID(), ""; have != want { + t.Fatalf("unexpected tags(3): have=%v want=%v", have, want) + } else { + e.ReadInto(ts, name, tags, values[:]) + if have, want := values[:], []interface{}{nil, float64(5)}; !reflect.DeepEqual(have, want) { + t.Fatalf("unexpected values(2): have=%v want=%v", have, want) + } } // Verify EOF. - if row, _, err := e.Emit(); err != nil { + if ts, _, _, err := e.LoadBuf(); err != nil { t.Fatalf("unexpected error(eof): %s", err) - } else if row != nil { - t.Fatalf("unexpected eof: %s", spew.Sdump(row)) + } else if ts != query.ZeroTime { + t.Fatalf("unexpected time(eof): %v", ts) } } diff --git a/query/iterator.gen.go b/query/iterator.gen.go index e580177322d..80a0d6b6294 100644 --- a/query/iterator.gen.go +++ b/query/iterator.gen.go @@ -3221,8 +3221,7 @@ type floatIteratorMapper struct { } func newFloatIteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *floatIteratorMapper { - e := NewEmitter(itrs, opt.Ascending, 0) - e.OmitTime = true + e := NewEmitter(itrs, opt.Ascending) return &floatIteratorMapper{ e: e, buf: make([]interface{}, len(itrs)), @@ -3235,7 +3234,7 @@ func newFloatIteratorMapper(itrs []Iterator, driver IteratorMap, fields []Iterat } func (itr *floatIteratorMapper) Next() (*FloatPoint, error) { - t, name, tags, err := itr.e.loadBuf() + t, name, tags, err := itr.e.LoadBuf() if err != nil || t == ZeroTime { return nil, err } @@ -3243,7 +3242,7 @@ func (itr *floatIteratorMapper) Next() (*FloatPoint, error) { itr.point.Name = name itr.point.Tags = tags - itr.e.readInto(t, name, tags, itr.buf) + itr.e.ReadInto(t, name, tags, itr.buf) if itr.driver != nil { if v := itr.driver.Value(tags, itr.buf); v != nil { if v, ok := v.(float64); ok { @@ -6609,8 +6608,7 @@ type integerIteratorMapper struct { } func newIntegerIteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *integerIteratorMapper { - e := NewEmitter(itrs, opt.Ascending, 0) - e.OmitTime = true + e := NewEmitter(itrs, opt.Ascending) return &integerIteratorMapper{ e: e, buf: make([]interface{}, len(itrs)), @@ -6623,7 +6621,7 @@ func newIntegerIteratorMapper(itrs []Iterator, driver IteratorMap, fields []Iter } func (itr *integerIteratorMapper) Next() (*IntegerPoint, error) { - t, name, tags, err := itr.e.loadBuf() + t, name, tags, err := itr.e.LoadBuf() if err != nil || t == ZeroTime { return nil, err } @@ -6631,7 +6629,7 @@ func (itr *integerIteratorMapper) Next() (*IntegerPoint, error) { itr.point.Name = name itr.point.Tags = tags - itr.e.readInto(t, name, tags, itr.buf) + itr.e.ReadInto(t, name, tags, itr.buf) if itr.driver != nil { if v := itr.driver.Value(tags, itr.buf); v != nil { if v, ok := v.(int64); ok { @@ -9983,8 +9981,7 @@ type unsignedIteratorMapper struct { } func newUnsignedIteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *unsignedIteratorMapper { - e := NewEmitter(itrs, opt.Ascending, 0) - e.OmitTime = true + e := NewEmitter(itrs, opt.Ascending) return &unsignedIteratorMapper{ e: e, buf: make([]interface{}, len(itrs)), @@ -9997,7 +9994,7 @@ func newUnsignedIteratorMapper(itrs []Iterator, driver IteratorMap, fields []Ite } func (itr *unsignedIteratorMapper) Next() (*UnsignedPoint, error) { - t, name, tags, err := itr.e.loadBuf() + t, name, tags, err := itr.e.LoadBuf() if err != nil || t == ZeroTime { return nil, err } @@ -10005,7 +10002,7 @@ func (itr *unsignedIteratorMapper) Next() (*UnsignedPoint, error) { itr.point.Name = name itr.point.Tags = tags - itr.e.readInto(t, name, tags, itr.buf) + itr.e.ReadInto(t, name, tags, itr.buf) if itr.driver != nil { if v := itr.driver.Value(tags, itr.buf); v != nil { if v, ok := v.(uint64); ok { @@ -13357,8 +13354,7 @@ type stringIteratorMapper struct { } func newStringIteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *stringIteratorMapper { - e := NewEmitter(itrs, opt.Ascending, 0) - e.OmitTime = true + e := NewEmitter(itrs, opt.Ascending) return &stringIteratorMapper{ e: e, buf: make([]interface{}, len(itrs)), @@ -13371,7 +13367,7 @@ func newStringIteratorMapper(itrs []Iterator, driver IteratorMap, fields []Itera } func (itr *stringIteratorMapper) Next() (*StringPoint, error) { - t, name, tags, err := itr.e.loadBuf() + t, name, tags, err := itr.e.LoadBuf() if err != nil || t == ZeroTime { return nil, err } @@ -13379,7 +13375,7 @@ func (itr *stringIteratorMapper) Next() (*StringPoint, error) { itr.point.Name = name itr.point.Tags = tags - itr.e.readInto(t, name, tags, itr.buf) + itr.e.ReadInto(t, name, tags, itr.buf) if itr.driver != nil { if v := itr.driver.Value(tags, itr.buf); v != nil { if v, ok := v.(string); ok { @@ -16731,8 +16727,7 @@ type booleanIteratorMapper struct { } func newBooleanIteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *booleanIteratorMapper { - e := NewEmitter(itrs, opt.Ascending, 0) - e.OmitTime = true + e := NewEmitter(itrs, opt.Ascending) return &booleanIteratorMapper{ e: e, buf: make([]interface{}, len(itrs)), @@ -16745,7 +16740,7 @@ func newBooleanIteratorMapper(itrs []Iterator, driver IteratorMap, fields []Iter } func (itr *booleanIteratorMapper) Next() (*BooleanPoint, error) { - t, name, tags, err := itr.e.loadBuf() + t, name, tags, err := itr.e.LoadBuf() if err != nil || t == ZeroTime { return nil, err } @@ -16753,7 +16748,7 @@ func (itr *booleanIteratorMapper) Next() (*BooleanPoint, error) { itr.point.Name = name itr.point.Tags = tags - itr.e.readInto(t, name, tags, itr.buf) + itr.e.ReadInto(t, name, tags, itr.buf) if itr.driver != nil { if v := itr.driver.Value(tags, itr.buf); v != nil { if v, ok := v.(bool); ok { diff --git a/query/iterator.gen.go.tmpl b/query/iterator.gen.go.tmpl index 701efde865d..432b252f75b 100644 --- a/query/iterator.gen.go.tmpl +++ b/query/iterator.gen.go.tmpl @@ -1560,8 +1560,7 @@ type {{$k.name}}IteratorMapper struct { } func new{{$k.Name}}IteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *{{$k.name}}IteratorMapper { - e := NewEmitter(itrs, opt.Ascending, 0) - e.OmitTime = true + e := NewEmitter(itrs, opt.Ascending) return &{{$k.name}}IteratorMapper{ e: e, buf: make([]interface{}, len(itrs)), @@ -1574,7 +1573,7 @@ func new{{$k.Name}}IteratorMapper(itrs []Iterator, driver IteratorMap, fields [] } func (itr *{{$k.name}}IteratorMapper) Next() (*{{$k.Name}}Point, error) { - t, name, tags, err := itr.e.loadBuf() + t, name, tags, err := itr.e.LoadBuf() if err != nil || t == ZeroTime { return nil, err } @@ -1582,9 +1581,9 @@ func (itr *{{$k.name}}IteratorMapper) Next() (*{{$k.Name}}Point, error) { itr.point.Name = name itr.point.Tags = tags - itr.e.readInto(t, name, tags, itr.buf) + itr.e.ReadInto(t, name, tags, itr.buf) if itr.driver != nil { - if v := itr.driver.Value(tags, itr.buf); v != nil { + if v := itr.driver.Value(tags, itr.buf); v != nil { if v, ok := v.({{$k.Type}}); ok { itr.point.Value = v itr.point.Nil = false diff --git a/query/query_executor.go b/query/query_executor.go index 8f7aa618911..a6f6fd95911 100644 --- a/query/query_executor.go +++ b/query/query_executor.go @@ -29,6 +29,11 @@ var ( // ErrQueryAborted is an error returned when the query is aborted. ErrQueryAborted = errors.New("query aborted") + // ErrQueryCanceled is an error that signals the query was canceled during + // execution by the query engine itself due to another error that was + // already reported. This error should never be emitted to the user. + ErrQueryCanceled = errors.New("query canceled") + // ErrQueryEngineShutdown is an error sent when the query cannot be // created because the query engine was shutdown. ErrQueryEngineShutdown = errors.New("query engine shutdown") @@ -107,9 +112,6 @@ type ExecutionOptions struct { // what resources can be returned in SHOW queries, etc. Authorizer Authorizer - // The requested maximum number of points to return in each result. - ChunkSize int - // If this query is being executed in a read-only context. ReadOnly bool @@ -135,7 +137,7 @@ type ExecutionContext struct { Query *QueryTask // Output channel where results and errors should be sent. - Results chan *Result + Results chan *ResultSet // Hold the query executor's logger. Log zap.Logger @@ -147,28 +149,45 @@ type ExecutionContext struct { ExecutionOptions } -// send sends a Result to the Results channel and will exit if the query has -// been aborted. -func (ctx *ExecutionContext) send(result *Result) error { +// Send sends a Result to the Results channel and will exit if the query has +// been interrupted or aborted. +func (ctx *ExecutionContext) CreateResult(messages ...*Message) (*ResultSet, error) { + result := &ResultSet{ + ID: ctx.StatementID, + Messages: messages, + AbortCh: ctx.AbortCh, + } select { + case <-ctx.InterruptCh: + return nil, ErrQueryInterrupted case <-ctx.AbortCh: - return ErrQueryAborted - case ctx.Results <- result: + return nil, ErrQueryAborted + case ctx.Results <- result.Init(): + return result, nil } +} + +// Ok returns a result with no content (it is immediately closed). +func (ctx *ExecutionContext) Ok(messages ...*Message) error { + result, err := ctx.CreateResult(messages...) + if err != nil { + return err + } + result.Close() return nil } -// Send sends a Result to the Results channel and will exit if the query has -// been interrupted or aborted. -func (ctx *ExecutionContext) Send(result *Result) error { +func (ctx *ExecutionContext) Error(err error) bool { select { - case <-ctx.InterruptCh: - return ErrQueryInterrupted case <-ctx.AbortCh: - return ErrQueryAborted - case ctx.Results <- result: + return false + case ctx.Results <- &ResultSet{ID: ctx.StatementID, Err: err}: + return true } - return nil +} + +func (ctx *ExecutionContext) Errorf(format string, a ...interface{}) bool { + return ctx.Error(fmt.Errorf(format, a...)) } // StatementExecutor executes a statement within the QueryExecutor. @@ -247,15 +266,15 @@ func (e *QueryExecutor) WithLogger(log zap.Logger) { } // ExecuteQuery executes each statement within a query. -func (e *QueryExecutor) ExecuteQuery(query *influxql.Query, opt ExecutionOptions, closing chan struct{}) <-chan *Result { - results := make(chan *Result) +func (e *QueryExecutor) ExecuteQuery(query *influxql.Query, opt ExecutionOptions, closing chan struct{}) <-chan *ResultSet { + results := make(chan *ResultSet) go e.executeQuery(query, opt, closing, results) return results } -func (e *QueryExecutor) executeQuery(query *influxql.Query, opt ExecutionOptions, closing <-chan struct{}, results chan *Result) { +func (e *QueryExecutor) executeQuery(query *influxql.Query, opt ExecutionOptions, closing <-chan struct{}, results chan *ResultSet) { defer close(results) - defer e.recover(query, results) + defer e.recover(query, results, opt.AbortCh) atomic.AddInt64(&e.stats.ActiveQueries, 1) atomic.AddInt64(&e.stats.ExecutedQueries, 1) @@ -268,7 +287,7 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, opt ExecutionOptions qid, task, err := e.TaskManager.AttachQuery(query, opt.Database, closing) if err != nil { select { - case results <- &Result{Err: err}: + case results <- &ResultSet{Err: err}: case <-opt.AbortCh: } return @@ -320,9 +339,7 @@ LOOP: case "_tags": command = "SHOW TAG VALUES" } - results <- &Result{ - Err: fmt.Errorf("unable to use system source '%s': use %s instead", s.Name, command), - } + ctx.Errorf("unable to use system source '%s': use %s instead", s.Name, command) break LOOP } } @@ -333,7 +350,7 @@ LOOP: // This can occur on meta read statements which convert to SELECT statements. newStmt, err := influxql.RewriteStatement(stmt) if err != nil { - results <- &Result{Err: err} + ctx.Error(err) break } stmt = newStmt @@ -341,7 +358,7 @@ LOOP: // Normalize each statement if possible. if normalizer, ok := e.StatementExecutor.(StatementNormalizer); ok { if err := normalizer.NormalizeStatement(stmt, defaultDB); err != nil { - if err := ctx.send(&Result{Err: err}); err == ErrQueryAborted { + if ok := ctx.Error(err); !ok { return } break @@ -361,14 +378,17 @@ LOOP: if qerr := task.Error(); qerr != nil { err = qerr } + } else if err == ErrQueryCanceled { + // The query was canceled while it was running so the result has + // already been sent and the error has already been reported + // through a series or a row. Break out of the main loop so we can + // report ErrNotExecuted for the remaining statements. + break } // Send an error for this result if it failed for some reason. if err != nil { - if err := ctx.send(&Result{ - StatementID: i, - Err: err, - }); err == ErrQueryAborted { + if ok := ctx.Error(err); !ok { return } // Stop after the first error. @@ -393,10 +413,8 @@ LOOP: // Send error results for any statements which were not executed. for ; i < len(query.Statements)-1; i++ { - if err := ctx.send(&Result{ - StatementID: i, - Err: ErrNotExecuted, - }); err == ErrQueryAborted { + ctx.StatementID = i + if ok := ctx.Error(ErrNotExecuted); !ok { return } } @@ -413,13 +431,17 @@ func init() { } } -func (e *QueryExecutor) recover(query *influxql.Query, results chan *Result) { +func (e *QueryExecutor) recover(query *influxql.Query, results chan *ResultSet, abortCh <-chan struct{}) { if err := recover(); err != nil { atomic.AddInt64(&e.stats.RecoveredPanics, 1) // Capture the panic in _internal stats. e.Logger.Error(fmt.Sprintf("%s [panic:%s] %s", query.String(), err, debug.Stack())) - results <- &Result{ - StatementID: -1, - Err: fmt.Errorf("%s [panic:%s]", query.String(), err), + result := &ResultSet{ + ID: -1, + Err: fmt.Errorf("%s [panic:%s]", query.String(), err), + } + select { + case <-abortCh: + case results <- result: } if willCrash { diff --git a/query/query_executor_test.go b/query/query_executor_test.go index eaf3f311b73..3081f25bc7f 100644 --- a/query/query_executor_test.go +++ b/query/query_executor_test.go @@ -26,6 +26,9 @@ func NewQueryExecutor() *query.QueryExecutor { } func TestQueryExecutor_AttachQuery(t *testing.T) { + abort := make(chan struct{}) + defer close(abort) + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) if err != nil { t.Fatal(err) @@ -41,10 +44,13 @@ func TestQueryExecutor_AttachQuery(t *testing.T) { }, } - discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil)) + discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil)) } func TestQueryExecutor_KillQuery(t *testing.T) { + abort := make(chan struct{}) + defer close(abort) + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) if err != nil { t.Fatal(err) @@ -71,12 +77,12 @@ func TestQueryExecutor_KillQuery(t *testing.T) { }, } - results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil) + results := e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil) q, err = influxql.ParseQuery(fmt.Sprintf("KILL QUERY %d", <-qid)) if err != nil { t.Fatal(err) } - discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil)) + discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil)) result := <-results if result.Err != query.ErrQueryInterrupted { @@ -85,6 +91,9 @@ func TestQueryExecutor_KillQuery(t *testing.T) { } func TestQueryExecutor_Interrupt(t *testing.T) { + abort := make(chan struct{}) + defer close(abort) + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) if err != nil { t.Fatal(err) @@ -104,7 +113,7 @@ func TestQueryExecutor_Interrupt(t *testing.T) { } closing := make(chan struct{}) - results := e.ExecuteQuery(q, query.ExecutionOptions{}, closing) + results := e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, closing) close(closing) result := <-results if result.Err != query.ErrQueryInterrupted { @@ -112,38 +121,10 @@ func TestQueryExecutor_Interrupt(t *testing.T) { } } -func TestQueryExecutor_Abort(t *testing.T) { - q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) - if err != nil { - t.Fatal(err) - } - - ch1 := make(chan struct{}) - ch2 := make(chan struct{}) - - e := NewQueryExecutor() - e.StatementExecutor = &StatementExecutor{ - ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { - <-ch1 - if err := ctx.Send(&query.Result{Err: errUnexpected}); err != query.ErrQueryAborted { - t.Errorf("unexpected error: %v", err) - } - close(ch2) - return nil - }, - } - - done := make(chan struct{}) - close(done) - - results := e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: done}, nil) - close(ch1) - - <-ch2 - discardOutput(results) -} - func TestQueryExecutor_ShowQueries(t *testing.T) { + abort := make(chan struct{}) + defer close(abort) + e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { @@ -162,17 +143,41 @@ func TestQueryExecutor_ShowQueries(t *testing.T) { t.Fatal(err) } - results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil) + results := e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil) result := <-results - if len(result.Series) != 1 { - t.Errorf("expected %d rows, got %d", 1, len(result.Series)) + + // Should be able to retrieve a single series. + var series *query.Series + timer := time.NewTimer(100 * time.Millisecond) + select { + case series = <-result.SeriesCh(): + timer.Stop() + case <-timer.C: + t.Fatal("unexpected timeout") } - if result.Err != nil { - t.Errorf("unexpected error: %s", result.Err) + + // Should only retrieve one row from the series. + rows := make([][]interface{}, 0, 1) + for row := range series.RowCh() { + if row.Err != nil { + t.Errorf("unexpected error: %s", result.Err) + } + rows = append(rows, row.Values) + } + if len(rows) != 1 { + t.Errorf("expected %d rows, got %d", 1, len(rows)) + } + + // No more series should be returned. + if series := <-result.SeriesCh(); series != nil { + t.Fatal("unexpected series") } } func TestQueryExecutor_Limit_Timeout(t *testing.T) { + abort := make(chan struct{}) + defer close(abort) + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) if err != nil { t.Fatal(err) @@ -192,7 +197,7 @@ func TestQueryExecutor_Limit_Timeout(t *testing.T) { } e.TaskManager.QueryTimeout = time.Nanosecond - results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil) + results := e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil) result := <-results if result.Err == nil || !strings.Contains(result.Err.Error(), "query-timeout") { t.Errorf("unexpected error: %s", result.Err) @@ -200,6 +205,9 @@ func TestQueryExecutor_Limit_Timeout(t *testing.T) { } func TestQueryExecutor_Limit_ConcurrentQueries(t *testing.T) { + abort := make(chan struct{}) + defer close(abort) + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) if err != nil { t.Fatal(err) @@ -219,17 +227,14 @@ func TestQueryExecutor_Limit_ConcurrentQueries(t *testing.T) { defer e.Close() // Start first query and wait for it to be executing. - go discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil)) + go discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil)) <-qid // Start second query and expect for it to fail. - results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil) + results := e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil) select { case result := <-results: - if len(result.Series) != 0 { - t.Errorf("expected %d rows, got %d", 0, len(result.Series)) - } if result.Err == nil || !strings.Contains(result.Err.Error(), "max-concurrent-queries") { t.Errorf("unexpected error: %s", result.Err) } @@ -239,6 +244,9 @@ func TestQueryExecutor_Limit_ConcurrentQueries(t *testing.T) { } func TestQueryExecutor_Close(t *testing.T) { + abort := make(chan struct{}) + defer close(abort) + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) if err != nil { t.Fatal(err) @@ -256,8 +264,8 @@ func TestQueryExecutor_Close(t *testing.T) { }, } - results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil) - go func(results <-chan *query.Result) { + results := e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil) + go func(results <-chan *query.ResultSet) { result := <-results if result.Err != query.ErrQueryEngineShutdown { t.Errorf("unexpected error: %s", result.Err) @@ -278,17 +286,17 @@ func TestQueryExecutor_Close(t *testing.T) { t.Fatal("closing the query manager did not kill the query after 100 milliseconds") } - results = e.ExecuteQuery(q, query.ExecutionOptions{}, nil) + results = e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil) result := <-results - if len(result.Series) != 0 { - t.Errorf("expected %d rows, got %d", 0, len(result.Series)) - } if result.Err != query.ErrQueryEngineShutdown { t.Errorf("unexpected error: %s", result.Err) } } func TestQueryExecutor_Panic(t *testing.T) { + abort := make(chan struct{}) + defer close(abort) + q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`) if err != nil { t.Fatal(err) @@ -301,17 +309,17 @@ func TestQueryExecutor_Panic(t *testing.T) { }, } - results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil) + results := e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil) result := <-results - if len(result.Series) != 0 { - t.Errorf("expected %d rows, got %d", 0, len(result.Series)) - } if result.Err == nil || result.Err.Error() != "SELECT count(value) FROM cpu [panic:test error]" { t.Errorf("unexpected error: %s", result.Err) } } func TestQueryExecutor_InvalidSource(t *testing.T) { + abort := make(chan struct{}) + defer close(abort) + e := NewQueryExecutor() e.StatementExecutor = &StatementExecutor{ ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { @@ -350,19 +358,22 @@ func TestQueryExecutor_InvalidSource(t *testing.T) { continue } - results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil) + results := e.ExecuteQuery(q, query.ExecutionOptions{AbortCh: abort}, nil) result := <-results - if len(result.Series) != 0 { - t.Errorf("%d. expected %d rows, got %d", 0, i, len(result.Series)) - } if result.Err == nil || result.Err.Error() != tt.err { t.Errorf("%d. unexpected error: %s", i, result.Err) } } } -func discardOutput(results <-chan *query.Result) { - for range results { +func discardOutput(results <-chan *query.ResultSet) { + for r := range results { // Read all results and discard. + for s := range r.SeriesCh() { + // Read all series and discard. + for range s.RowCh() { + // Read all rows and discard. + } + } } } diff --git a/query/result.go b/query/result.go index a73fd950374..be36073d29f 100644 --- a/query/result.go +++ b/query/result.go @@ -14,6 +14,110 @@ const ( WarningLevel = "warning" ) +type Row struct { + Values []interface{} + Err error +} + +type Series struct { + Name string + Tags Tags + Columns []string + Err error + AbortCh <-chan struct{} + + rowCh chan Row +} + +func (s *Series) Emit(values []interface{}) (ok bool) { + row := Row{Values: values} + select { + case <-s.AbortCh: + return false + case s.rowCh <- row: + return true + } +} + +func (s *Series) Error(err error) (ok bool) { + row := Row{Err: err} + select { + case <-s.AbortCh: + return false + case s.rowCh <- row: + return true + } +} + +func (s *Series) RowCh() <-chan Row { + return s.rowCh +} + +func (s *Series) Close() error { + close(s.rowCh) + return nil +} + +type ResultSet struct { + ID int + Messages []*Message + Err error + AbortCh <-chan struct{} + + seriesCh chan *Series + columns []string +} + +func (rs *ResultSet) Init() *ResultSet { + rs.seriesCh = make(chan *Series) + return rs +} + +func (rs *ResultSet) WithColumns(columns ...string) *ResultSet { + dup := *rs + dup.columns = columns + return &dup +} + +func (rs *ResultSet) CreateSeries(name string) (*Series, bool) { + return rs.CreateSeriesWithTags(name, Tags{}) +} + +func (rs *ResultSet) CreateSeriesWithTags(name string, tags Tags) (*Series, bool) { + series := &Series{ + Name: name, + Tags: tags, + Columns: rs.columns, + rowCh: make(chan Row), + AbortCh: rs.AbortCh, + } + select { + case <-rs.AbortCh: + return nil, false + case rs.seriesCh <- series: + return series, true + } +} + +func (rs *ResultSet) Error(err error) (ok bool) { + series := &Series{Err: err} + select { + case <-rs.AbortCh: + return false + case rs.seriesCh <- series: + return true + } +} + +func (rs *ResultSet) SeriesCh() <-chan *Series { + return rs.seriesCh +} + +func (rs *ResultSet) Close() error { + close(rs.seriesCh) + return nil +} + // TagSet is a fundamental concept within the query system. It represents a composite series, // composed of multiple individual series that share a set of tag attributes. type TagSet struct { @@ -72,7 +176,7 @@ type Message struct { // ReadOnlyWarning generates a warning message that tells the user the command // they are using is being used for writing in a read only context. // -// This is a temporary method while to be used while transitioning to read only +// This is a temporary method to be used while transitioning to read only // operations for issue #6290. func ReadOnlyWarning(stmt string) *Message { return &Message{ diff --git a/query/task_manager.go b/query/task_manager.go index f0b789720d7..f80cec410a9 100644 --- a/query/task_manager.go +++ b/query/task_manager.go @@ -6,7 +6,6 @@ import ( "time" "github.com/influxdata/influxdb/influxql" - "github.com/influxdata/influxdb/models" "github.com/uber-go/zap" ) @@ -53,15 +52,7 @@ func NewTaskManager() *TaskManager { func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx ExecutionContext) error { switch stmt := stmt.(type) { case *influxql.ShowQueriesStatement: - rows, err := t.executeShowQueriesStatement(stmt) - if err != nil { - return err - } - - ctx.Results <- &Result{ - StatementID: ctx.StatementID, - Series: rows, - } + t.executeShowQueriesStatement(stmt, &ctx) case *influxql.KillQueryStatement: var messages []*Message if ctx.ReadOnly { @@ -71,10 +62,12 @@ func (t *TaskManager) ExecuteStatement(stmt influxql.Statement, ctx ExecutionCon if err := t.executeKillQueryStatement(stmt); err != nil { return err } - ctx.Results <- &Result{ - StatementID: ctx.StatementID, - Messages: messages, + result := &ResultSet{ + ID: ctx.StatementID, + Messages: messages, } + ctx.Results <- result.Init() + result.Close() default: return ErrInvalidQuery } @@ -85,13 +78,26 @@ func (t *TaskManager) executeKillQueryStatement(stmt *influxql.KillQueryStatemen return t.KillQuery(stmt.QueryID) } -func (t *TaskManager) executeShowQueriesStatement(q *influxql.ShowQueriesStatement) (models.Rows, error) { +func (t *TaskManager) executeShowQueriesStatement(q *influxql.ShowQueriesStatement, ctx *ExecutionContext) { + result, err := ctx.CreateResult() + if err != nil { + ctx.Error(err) + return + } + defer result.Close() + + result = result.WithColumns("qid", "query", "database", "duration") + series, ok := result.CreateSeries("") + if !ok { + return + } + defer series.Close() + t.mu.RLock() defer t.mu.RUnlock() now := time.Now() - values := make([][]interface{}, 0, len(t.queries)) for id, qi := range t.queries { d := now.Sub(qi.startTime) @@ -103,14 +109,8 @@ func (t *TaskManager) executeShowQueriesStatement(q *influxql.ShowQueriesStateme case d >= time.Microsecond: d = d - (d % time.Microsecond) } - - values = append(values, []interface{}{id, qi.query, qi.database, d.String()}) + series.Emit([]interface{}{id, qi.query, qi.database, d.String()}) } - - return []*models.Row{{ - Columns: []string{"qid", "query", "database", "duration"}, - Values: values, - }}, nil } func (t *TaskManager) queryError(qid uint64, err error) { diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index abfb8197be4..bf509c6a2e6 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -388,9 +388,10 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti // extract number of points written from SELECT ... INTO result var written int64 = -1 - if len(res.Series) == 1 && len(res.Series[0].Values) == 1 { - s := res.Series[0] - written = s.Values[0][1].(int64) + if series := <-res.SeriesCh(); series != nil { + if row, ok := <-series.RowCh(); ok { + written = row.Values[1].(int64) + } } if s.loggingEnabled { @@ -408,7 +409,7 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti } // runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in -func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) *query.Result { +func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) *query.ResultSet { // Wrap the CQ's inner SELECT statement in a Query for the QueryExecutor. q := &influxql.Query{ Statements: influxql.Statements([]influxql.Statement{cq.q}), diff --git a/services/continuous_querier/service_test.go b/services/continuous_querier/service_test.go index 55157306137..64eab23ed95 100644 --- a/services/continuous_querier/service_test.go +++ b/services/continuous_querier/service_test.go @@ -55,8 +55,7 @@ func TestContinuousQueryService_Run(t *testing.T) { if callCnt >= expectCallCnt { done <- struct{}{} } - ctx.Results <- &query.Result{} - return nil + return ctx.Ok() }, } @@ -132,8 +131,7 @@ func TestContinuousQueryService_ResampleOptions(t *testing.T) { t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", timeRange.Min, timeRange.Max, expected.min, expected.max) } done <- struct{}{} - ctx.Results <- &query.Result{} - return nil + return ctx.Ok() }, } @@ -214,8 +212,7 @@ func TestContinuousQueryService_EveryHigherThanInterval(t *testing.T) { t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", timeRange.Min, timeRange.Max, expected.min, expected.max) } done <- struct{}{} - ctx.Results <- &query.Result{} - return nil + return ctx.Ok() }, } @@ -284,8 +281,7 @@ func TestContinuousQueryService_GroupByOffset(t *testing.T) { t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", timeRange.Min, timeRange.Max, expected.min, expected.max) } done <- struct{}{} - ctx.Results <- &query.Result{} - return nil + return ctx.Ok() }, } @@ -317,7 +313,9 @@ func TestContinuousQueryService_NotLeader(t *testing.T) { s.QueryExecutor.StatementExecutor = &StatementExecutor{ ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { done <- struct{}{} - ctx.Results <- &query.Result{Err: errUnexpected} + if ok := ctx.Error(errUnexpected); !ok { + return query.ErrQueryAborted + } return nil }, } @@ -446,8 +444,7 @@ func TestExecuteContinuousQuery_TimeRange(t *testing.T) { t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", timeRange.Min, timeRange.Max, tt.start, tt.end) } done <- struct{}{} - ctx.Results <- &query.Result{} - return nil + return ctx.Ok() }, } @@ -561,8 +558,7 @@ func TestExecuteContinuousQuery_TimeZone(t *testing.T) { t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", timeRange.Min, timeRange.Max, test.start, test.end) } done <- struct{}{} - ctx.Results <- &query.Result{} - return nil + return ctx.Ok() }, } @@ -613,13 +609,19 @@ func TestService_ExecuteContinuousQuery_LogsToMonitor(t *testing.T) { s.QueryExecutor.StatementExecutor = &StatementExecutor{ ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { - ctx.Results <- &query.Result{ - Series: []*models.Row{{ - Name: "result", - Columns: []string{"time", "written"}, - Values: [][]interface{}{{time.Time{}, writeN}}, - }}, + result, err := ctx.CreateResult() + if err != nil { + return err + } + defer result.Close() + + result = result.WithColumns("time", "written") + series, ok := result.CreateSeries("result") + if !ok { + return nil } + defer series.Close() + series.Emit([]interface{}{time.Time{}, writeN}) return nil }, } @@ -659,8 +661,7 @@ func TestService_ExecuteContinuousQuery_LogToMonitor_DisabledByDefault(t *testin s := NewTestService(t) s.QueryExecutor.StatementExecutor = &StatementExecutor{ ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error { - ctx.Results <- &query.Result{} - return nil + return ctx.Ok() }, } s.Monitor = &monitor{ diff --git a/services/httpd/encoder.go b/services/httpd/encoder.go new file mode 100644 index 00000000000..b818cc22a56 --- /dev/null +++ b/services/httpd/encoder.go @@ -0,0 +1,281 @@ +package httpd + +import ( + "io" + "net/http" + "strconv" + "strings" + "time" + + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/query" +) + +type Encoder interface { + // ContentType returns the content-type used to identify this format in HTTP. + ContentType() string + + // Encode encodes the full response from the results channel. + Encode(w io.Writer, results <-chan *query.ResultSet) + + // Error encodes a top-level error to the io.Writer. + Error(w io.Writer, err error) +} + +type ResponseFormatter interface { + WriteResponse(w io.Writer, resp Response) (int, error) + WriteError(w io.Writer, err error) + ContentType() string +} + +func NewEncoder(r *http.Request, config *Config) Encoder { + epoch := strings.TrimSpace(r.FormValue("epoch")) + switch r.Header.Get("Accept") { + case "application/csv", "text/csv": + formatter := &csvFormatter{statementID: -1} + chunked, size := parseChunkedOptions(r) + if chunked { + return &chunkedEncoder{ + Formatter: formatter, + ChunkSize: size, + Epoch: epoch, + } + } + return &defaultEncoder{ + Formatter: formatter, + MaxRowLimit: config.MaxRowLimit, + Epoch: epoch, + } + case "application/json": + fallthrough + default: + pretty := r.URL.Query().Get("pretty") == "true" + formatter := &jsonFormatter{Pretty: pretty} + chunked, size := parseChunkedOptions(r) + if chunked { + return &chunkedEncoder{ + Formatter: formatter, + ChunkSize: size, + Epoch: epoch, + } + } + return &defaultEncoder{ + Formatter: formatter, + MaxRowLimit: config.MaxRowLimit, + Epoch: epoch, + } + } +} + +type defaultEncoder struct { + Formatter ResponseFormatter + MaxRowLimit int + Epoch string +} + +func NewDefaultEncoder(formatter ResponseFormatter) Encoder { + return &defaultEncoder{ + Formatter: formatter, + } +} + +func (e *defaultEncoder) ContentType() string { + return e.Formatter.ContentType() +} + +func (e *defaultEncoder) Encode(w io.Writer, results <-chan *query.ResultSet) { + var convertToEpoch func(row *query.Row) + if e.Epoch != "" { + convertToEpoch = epochConverter(e.Epoch) + } + + resp := Response{Results: make([]*query.Result, 0)} + + rows := 0 +RESULTS: + for result := range results { + r := &query.Result{ + StatementID: result.ID, + Messages: result.Messages, + Err: result.Err, + } + resp.Results = append(resp.Results, r) + if r.Err != nil { + continue + } + + for series := range result.SeriesCh() { + if series.Err != nil { + r.Err = series.Err + continue RESULTS + } + + s := &models.Row{ + Name: series.Name, + Tags: series.Tags.KeyValues(), + Columns: series.Columns, + } + r.Series = append(r.Series, s) + + for row := range series.RowCh() { + if row.Err != nil { + r.Err = row.Err + r.Series = nil + continue RESULTS + } else if e.MaxRowLimit > 0 && rows+len(s.Values) >= e.MaxRowLimit { + s.Partial = true + break RESULTS + } + + if convertToEpoch != nil { + convertToEpoch(&row) + } + s.Values = append(s.Values, row.Values) + } + rows += len(s.Values) + } + } + e.Formatter.WriteResponse(w, resp) +} + +func (e *defaultEncoder) Error(w io.Writer, err error) { + e.Formatter.WriteError(w, err) +} + +type chunkedEncoder struct { + Formatter ResponseFormatter + ChunkSize int + Epoch string +} + +func (e *chunkedEncoder) ContentType() string { + return e.Formatter.ContentType() +} + +func (e *chunkedEncoder) Encode(w io.Writer, results <-chan *query.ResultSet) { + var convertToEpoch func(row *query.Row) + if e.Epoch != "" { + convertToEpoch = epochConverter(e.Epoch) + } + + for result := range results { + messages := result.Messages + + series := <-result.SeriesCh() + if series == nil { + e.Formatter.WriteResponse(w, Response{Results: []*query.Result{ + { + StatementID: result.ID, + Messages: messages, + }, + }}) + continue + } else if series.Err != nil { + // An error occurred while processing the result. + e.Formatter.WriteResponse(w, Response{Results: []*query.Result{ + { + StatementID: result.ID, + Messages: messages, + Err: series.Err, + }, + }}) + continue + } + + for series != nil { + var values [][]interface{} + for row := range series.RowCh() { + if row.Err != nil { + // An error occurred while processing the result. + e.Formatter.WriteResponse(w, Response{Results: []*query.Result{ + { + StatementID: result.ID, + Messages: messages, + Err: series.Err, + }, + }}) + continue + } + + if convertToEpoch != nil { + convertToEpoch(&row) + } + + if e.ChunkSize > 0 && len(values) >= e.ChunkSize { + r := &query.Result{ + StatementID: result.ID, + Series: []*models.Row{{ + Name: series.Name, + Tags: series.Tags.KeyValues(), + Columns: series.Columns, + Values: values, + Partial: true, + }}, + Messages: messages, + Partial: true, + } + e.Formatter.WriteResponse(w, Response{Results: []*query.Result{r}}) + messages = nil + values = values[:0] + } + values = append(values, row.Values) + } + + r := &query.Result{ + StatementID: result.ID, + Series: []*models.Row{{ + Name: series.Name, + Tags: series.Tags.KeyValues(), + Columns: series.Columns, + Values: values, + }}, + Messages: messages, + } + + series = <-result.SeriesCh() + if series != nil { + r.Partial = true + } + e.Formatter.WriteResponse(w, Response{Results: []*query.Result{r}}) + } + } +} + +func (e *chunkedEncoder) Error(w io.Writer, err error) { + e.Formatter.WriteError(w, err) +} + +func epochConverter(epoch string) func(row *query.Row) { + divisor := int64(1) + + switch epoch { + case "u": + divisor = int64(time.Microsecond) + case "ms": + divisor = int64(time.Millisecond) + case "s": + divisor = int64(time.Second) + case "m": + divisor = int64(time.Minute) + case "h": + divisor = int64(time.Hour) + } + return func(row *query.Row) { + if ts, ok := row.Values[0].(time.Time); ok { + row.Values[0] = ts.UnixNano() / divisor + } + } +} + +func parseChunkedOptions(r *http.Request) (chunked bool, size int) { + chunked = r.FormValue("chunked") == "true" + if chunked { + size = DefaultChunkSize + if chunked { + if n, err := strconv.ParseInt(r.FormValue("chunk_size"), 10, 64); err == nil && int(n) > 0 { + size = int(n) + } + } + } + return chunked, size +} diff --git a/services/httpd/encoder_test.go b/services/httpd/encoder_test.go new file mode 100644 index 00000000000..4deb3ce0cac --- /dev/null +++ b/services/httpd/encoder_test.go @@ -0,0 +1,79 @@ +package httpd_test + +import ( + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/influxdata/influxdb/query" + "github.com/influxdata/influxdb/services/httpd" +) + +func EmitTestResults(results chan *query.ResultSet) { + result := &query.ResultSet{ID: 0} + results <- result.Init() + result = result.WithColumns("time", "value") + + series, _ := result.CreateSeriesWithTags("cpu", + query.NewTags(map[string]string{"host": "server01"})) + series.Emit([]interface{}{time.Unix(0, 0).UTC(), 2.0}) + series.Emit([]interface{}{time.Unix(10, 0).UTC(), 5.0}) + series.Emit([]interface{}{time.Unix(20, 0).UTC(), 7.0}) + series.Close() + + series, _ = result.CreateSeriesWithTags("cpu", + query.NewTags(map[string]string{"host": "server02"})) + series.Emit([]interface{}{time.Unix(0, 0).UTC(), 8.0}) + series.Close() + result.Close() + + result = &query.ResultSet{ID: 1} + results <- result.Init() + result = result.WithColumns("name") + close(results) + + series, _ = result.CreateSeries("databases") + series.Emit([]interface{}{"db0"}) + series.Emit([]interface{}{"db1"}) + series.Close() + result.Close() +} + +func TestEncoder_Default(t *testing.T) { + req, _ := http.NewRequest("GET", "http://127.0.0.1:8086/query", nil) + req.Header.Set("Accept", "application/json") + resp := httptest.NewRecorder() + + results := make(chan *query.ResultSet) + go EmitTestResults(results) + + config := httpd.NewConfig() + enc := httpd.NewEncoder(req, &config) + enc.Encode(resp, results) + + if have, want := strings.TrimSpace(resp.Body.String()), `{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[["1970-01-01T00:00:00Z",2],["1970-01-01T00:00:10Z",5],["1970-01-01T00:00:20Z",7]]},{"name":"cpu","tags":{"host":"server02"},"columns":["time","value"],"values":[["1970-01-01T00:00:00Z",8]]}]},{"statement_id":1,"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db1"]]}]}]}`; have != want { + t.Errorf("mismatched output:\n\thave=%v\n\twant=%v\n", have, want) + } +} + +func TestEncoder_Chunked(t *testing.T) { + req, _ := http.NewRequest("GET", "http://127.0.0.1:8086/query?chunked=true&chunk_size=2", nil) + req.Header.Set("Accept", "application/json") + resp := httptest.NewRecorder() + + results := make(chan *query.ResultSet) + go EmitTestResults(results) + + config := httpd.NewConfig() + enc := httpd.NewEncoder(req, &config) + enc.Encode(resp, results) + + if have, want := strings.TrimSpace(resp.Body.String()), `{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[["1970-01-01T00:00:00Z",2],["1970-01-01T00:00:10Z",5]],"partial":true}],"partial":true}]} +{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[["1970-01-01T00:00:20Z",7]]}],"partial":true}]} +{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"server02"},"columns":["time","value"],"values":[["1970-01-01T00:00:00Z",8]]}]}]} +{"results":[{"statement_id":1,"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db1"]]}]}]}`; have != want { + t.Errorf("mismatched output:\n\thave=%v\n\twant=%v\n", have, want) + } +} diff --git a/services/httpd/formatters.go b/services/httpd/formatters.go new file mode 100644 index 00000000000..be78782608e --- /dev/null +++ b/services/httpd/formatters.go @@ -0,0 +1,161 @@ +package httpd + +import ( + "encoding/csv" + "encoding/json" + "io" + "strconv" + "time" + + "github.com/influxdata/influxdb/models" +) + +type jsonFormatter struct { + Pretty bool +} + +func (f *jsonFormatter) ContentType() string { + return "application/json" +} + +func (f *jsonFormatter) WriteResponse(w io.Writer, resp Response) (n int, err error) { + var b []byte + if f.Pretty { + b, err = json.MarshalIndent(resp, "", " ") + } else { + b, err = json.Marshal(resp) + } + + if err != nil { + n, err = io.WriteString(w, err.Error()) + } else { + n, err = w.Write(b) + } + + w.Write([]byte("\n")) + n++ + return n, err +} + +func (f *jsonFormatter) WriteError(w io.Writer, err error) { + m := map[string]string{"error": err.Error()} + + var b []byte + if f.Pretty { + b, err = json.MarshalIndent(m, "", " ") + } else { + b, err = json.Marshal(m) + } + + if err != nil { + io.WriteString(w, err.Error()) + } else { + w.Write(b) + } + + w.Write([]byte("\n")) +} + +type csvFormatter struct { + statementID int + columns []string +} + +func (f *csvFormatter) ContentType() string { + return "text/csv" +} + +func (f *csvFormatter) WriteResponse(w io.Writer, resp Response) (n int, err error) { + csv := csv.NewWriter(w) + if resp.Err != nil { + csv.Write([]string{"error"}) + csv.Write([]string{resp.Err.Error()}) + csv.Flush() + return n, csv.Error() + } + + for _, result := range resp.Results { + if result.StatementID != f.statementID { + // If there are no series in the result, skip past this result. + if len(result.Series) == 0 { + continue + } + + // Set the statement id and print out a newline if this is not the first statement. + if f.statementID >= 0 { + // Flush the csv writer and write a newline. + csv.Flush() + if err := csv.Error(); err != nil { + return n, err + } + + out, err := io.WriteString(w, "\n") + if err != nil { + return n, err + } + n += out + } + f.statementID = result.StatementID + + // Print out the column headers from the first series. + f.columns = make([]string, 2+len(result.Series[0].Columns)) + f.columns[0] = "name" + f.columns[1] = "tags" + copy(f.columns[2:], result.Series[0].Columns) + if err := csv.Write(f.columns); err != nil { + return n, err + } + } + + for _, row := range result.Series { + f.columns[0] = row.Name + if len(row.Tags) > 0 { + f.columns[1] = string(models.NewTags(row.Tags).HashKey()[1:]) + } else { + f.columns[1] = "" + } + for _, values := range row.Values { + for i, value := range values { + if value == nil { + f.columns[i+2] = "" + continue + } + + switch v := value.(type) { + case float64: + f.columns[i+2] = strconv.FormatFloat(v, 'f', -1, 64) + case int64: + f.columns[i+2] = strconv.FormatInt(v, 10) + case string: + f.columns[i+2] = v + case bool: + if v { + f.columns[i+2] = "true" + } else { + f.columns[i+2] = "false" + } + case time.Time: + f.columns[i+2] = strconv.FormatInt(v.UnixNano(), 10) + case *float64, *int64, *string, *bool: + f.columns[i+2] = "" + } + } + csv.Write(f.columns) + } + } + } + csv.Flush() + if err := csv.Error(); err != nil { + return n, err + } + return n, nil +} + +func (f *csvFormatter) WriteError(w io.Writer, err error) { + csv := csv.NewWriter(w) + csv.WriteAll([][]string{ + {"error"}, + {err.Error()}, + }) + csv.Flush() +} diff --git a/services/httpd/response_writer_test.go b/services/httpd/formatters_test.go similarity index 51% rename from services/httpd/response_writer_test.go rename to services/httpd/formatters_test.go index 72f183f3458..54f94740219 100644 --- a/services/httpd/response_writer_test.go +++ b/services/httpd/formatters_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/services/httpd" ) @@ -21,32 +20,35 @@ func TestResponseWriter_CSV(t *testing.T) { } w := httptest.NewRecorder() - writer := httpd.NewResponseWriter(w, r) - writer.WriteResponse(httpd.Response{ - Results: []*query.Result{ - { - StatementID: 0, - Series: []*models.Row{ - { - Name: "cpu", - Tags: map[string]string{ - "host": "server01", - "region": "uswest", - }, - Columns: []string{"time", "value"}, - Values: [][]interface{}{ - {time.Unix(0, 10), float64(2.5)}, - {time.Unix(0, 20), int64(5)}, - {time.Unix(0, 30), nil}, - {time.Unix(0, 40), "foobar"}, - {time.Unix(0, 50), true}, - {time.Unix(0, 60), false}, - }, - }, - }, - }, - }, - }) + results := make(chan *query.ResultSet) + go func() { + defer close(results) + result := &query.ResultSet{ID: 0} + results <- result.Init() + defer result.Close() + + result = result.WithColumns("time", "value") + series, _ := result.CreateSeriesWithTags("cpu", query.NewTags(map[string]string{ + "host": "server01", + "region": "uswest", + })) + defer series.Close() + + for _, row := range [][]interface{}{ + {time.Unix(0, 10), float64(2.5)}, + {time.Unix(0, 20), int64(5)}, + {time.Unix(0, 30), nil}, + {time.Unix(0, 40), "foobar"}, + {time.Unix(0, 50), true}, + {time.Unix(0, 60), false}, + } { + series.Emit(row) + } + }() + + config := httpd.NewConfig() + enc := httpd.NewEncoder(r, &config) + enc.Encode(w, results) if got, want := w.Body.String(), `name,tags,time,value cpu,"host=server01,region=uswest",10,2.5 diff --git a/services/httpd/handler.go b/services/httpd/handler.go index b835b7c2701..5c8062c1104 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -230,7 +230,6 @@ func (h *Handler) AddRoutes(routes ...Route) { handler = http.HandlerFunc(hf) } - handler = h.responseWriter(handler) if r.Gzipped { handler = gzipFilter(handler) } @@ -289,11 +288,8 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U }(time.Now()) h.requestTracker.Add(r, user) - // Retrieve the underlying ResponseWriter or initialize our own. - rw, ok := w.(ResponseWriter) - if !ok { - rw = NewResponseWriter(w, r) - } + // Initialize an encoder for us to use to encode the response. + enc := NewEncoder(r, h.Config) // Retrieve the node id the query should be executed on. nodeID, _ := strconv.ParseUint(r.FormValue("node_id"), 10, 64) @@ -307,7 +303,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U if fhs := r.MultipartForm.File["q"]; len(fhs) > 0 { f, err := fhs[0].Open() if err != nil { - h.httpError(rw, err.Error(), http.StatusBadRequest) + h.httpError(w, enc, err.Error(), http.StatusBadRequest) return } defer f.Close() @@ -316,12 +312,10 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U } if qr == nil { - h.httpError(rw, `missing required parameter "q"`, http.StatusBadRequest) + h.httpError(w, enc, `missing required parameter "q"`, http.StatusBadRequest) return } - epoch := strings.TrimSpace(r.FormValue("epoch")) - p := influxql.NewParser(qr) db := r.FormValue("db") @@ -336,7 +330,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U decoder := json.NewDecoder(strings.NewReader(rawParams)) decoder.UseNumber() if err := decoder.Decode(¶ms); err != nil { - h.httpError(rw, "error parsing query parameters: "+err.Error(), http.StatusBadRequest) + h.httpError(w, enc, "error parsing query parameters: "+err.Error(), http.StatusBadRequest) return } @@ -351,7 +345,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U } if err != nil { - h.httpError(rw, "error parsing json value: "+err.Error(), http.StatusBadRequest) + h.httpError(w, enc, "error parsing json value: "+err.Error(), http.StatusBadRequest) return } } @@ -362,7 +356,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U // Parse query from query string. q, err := p.ParseQuery() if err != nil { - h.httpError(rw, "error parsing query: "+err.Error(), http.StatusBadRequest) + h.httpError(w, enc, "error parsing query: "+err.Error(), http.StatusBadRequest) return } @@ -372,28 +366,18 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U if err, ok := err.(meta.ErrAuthorize); ok { h.Logger.Info(fmt.Sprintf("Unauthorized request | user: %q | query: %q | database %q", err.User, err.Query.String(), err.Database)) } - h.httpError(rw, "error authorizing query: "+err.Error(), http.StatusForbidden) + h.httpError(w, enc, "error authorizing query: "+err.Error(), http.StatusForbidden) return } } - // Parse chunk size. Use default if not provided or unparsable. - chunked := r.FormValue("chunked") == "true" - chunkSize := DefaultChunkSize - if chunked { - if n, err := strconv.ParseInt(r.FormValue("chunk_size"), 10, 64); err == nil && int(n) > 0 { - chunkSize = int(n) - } - } - // Parse whether this is an async command. async := r.FormValue("async") == "true" opts := query.ExecutionOptions{ - Database: db, - ChunkSize: chunkSize, - ReadOnly: r.Method == "GET", - NodeID: nodeID, + Database: db, + ReadOnly: r.Method == "GET", + NodeID: nodeID, } if h.Config.AuthEnabled { @@ -442,131 +426,21 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U return } - // if we're not chunking, this will be the in memory buffer for all results before sending to client - resp := Response{Results: make([]*query.Result, 0)} - // Status header is OK once this point is reached. // Attempt to flush the header immediately so the client gets the header information // and knows the query was accepted. - h.writeHeader(rw, http.StatusOK) + w.Header().Set("Content-Type", enc.ContentType()) + h.writeHeader(w, http.StatusOK) if w, ok := w.(http.Flusher); ok { w.Flush() } - // pull all results from the channel - rows := 0 - for r := range results { - // Ignore nil results. - if r == nil { - continue - } - - // if requested, convert result timestamps to epoch - if epoch != "" { - convertToEpoch(r, epoch) - } - - // Write out result immediately if chunked. - if chunked { - n, _ := rw.WriteResponse(Response{ - Results: []*query.Result{r}, - }) - atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n)) - w.(http.Flusher).Flush() - continue - } - - // Limit the number of rows that can be returned in a non-chunked - // response. This is to prevent the server from going OOM when - // returning a large response. If you want to return more than the - // default chunk size, then use chunking to process multiple blobs. - // Iterate through the series in this result to count the rows and - // truncate any rows we shouldn't return. - if h.Config.MaxRowLimit > 0 { - for i, series := range r.Series { - n := h.Config.MaxRowLimit - rows - if n < len(series.Values) { - // We have reached the maximum number of values. Truncate - // the values within this row. - series.Values = series.Values[:n] - // Since this was truncated, it will always be a partial return. - // Add this so the client knows we truncated the response. - series.Partial = true - } - rows += len(series.Values) - - if rows >= h.Config.MaxRowLimit { - // Drop any remaining series since we have already reached the row limit. - if i < len(r.Series) { - r.Series = r.Series[:i+1] - } - break - } - } - } - - // It's not chunked so buffer results in memory. - // Results for statements need to be combined together. - // We need to check if this new result is for the same statement as - // the last result, or for the next statement - l := len(resp.Results) - if l == 0 { - resp.Results = append(resp.Results, r) - } else if resp.Results[l-1].StatementID == r.StatementID { - if r.Err != nil { - resp.Results[l-1] = r - continue - } - - cr := resp.Results[l-1] - rowsMerged := 0 - if len(cr.Series) > 0 { - lastSeries := cr.Series[len(cr.Series)-1] - - for _, row := range r.Series { - if !lastSeries.SameSeries(row) { - // Next row is for a different series than last. - break - } - // Values are for the same series, so append them. - lastSeries.Values = append(lastSeries.Values, row.Values...) - rowsMerged++ - } - } - - // Append remaining rows as new rows. - r.Series = r.Series[rowsMerged:] - cr.Series = append(cr.Series, r.Series...) - cr.Messages = append(cr.Messages, r.Messages...) - cr.Partial = r.Partial - } else { - resp.Results = append(resp.Results, r) - } - - // Drop out of this loop and do not process further results when we hit the row limit. - if h.Config.MaxRowLimit > 0 && rows >= h.Config.MaxRowLimit { - // If the result is marked as partial, remove that partial marking - // here. While the series is partial and we would normally have - // tried to return the rest in the next chunk, we are not using - // chunking and are truncating the series so we don't want to - // signal to the client that we plan on sending another JSON blob - // with another result. The series, on the other hand, still - // returns partial true if it was truncated or had more data to - // send in a future chunk. - r.Partial = false - break - } - } - - // If it's not chunked we buffered everything in memory, so write it out - if !chunked { - n, _ := rw.WriteResponse(resp) - atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n)) - } + // Read the results and encode them in the proper structure. + enc.Encode(w, results) } // async drains the results from an async query and logs a message if it fails. -func (h *Handler) async(q *influxql.Query, results <-chan *query.Result) { +func (h *Handler) async(q *influxql.Query, results <-chan *query.ResultSet) { for r := range results { // Drain the results and do nothing with them. // If it fails, log the failure so there is at least a record of it. @@ -578,6 +452,14 @@ func (h *Handler) async(q *influxql.Query, results <-chan *query.Result) { } h.Logger.Info(fmt.Sprintf("error while running async query: %s: %s", q, r.Err)) } + + for series := range r.SeriesCh() { + for row := range series.RowCh() { + if row.Err != nil { + h.Logger.Info(fmt.Sprintf("error while running async query: %s: %s", q, row.Err)) + } + } + } } } @@ -593,23 +475,23 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U database := r.URL.Query().Get("db") if database == "" { - h.httpError(w, "database is required", http.StatusBadRequest) + h.httpError(w, NewEncoder(r, h.Config), "database is required", http.StatusBadRequest) return } if di := h.MetaClient.Database(database); di == nil { - h.httpError(w, fmt.Sprintf("database not found: %q", database), http.StatusNotFound) + h.httpError(w, NewEncoder(r, h.Config), fmt.Sprintf("database not found: %q", database), http.StatusNotFound) return } if h.Config.AuthEnabled { if user == nil { - h.httpError(w, fmt.Sprintf("user is required to write to database %q", database), http.StatusForbidden) + h.httpError(w, NewEncoder(r, h.Config), fmt.Sprintf("user is required to write to database %q", database), http.StatusForbidden) return } if err := h.WriteAuthorizer.AuthorizeWrite(user.ID(), database); err != nil { - h.httpError(w, fmt.Sprintf("%q user is not authorized to write to database %q", user.ID(), database), http.StatusForbidden) + h.httpError(w, NewEncoder(r, h.Config), fmt.Sprintf("%q user is not authorized to write to database %q", user.ID(), database), http.StatusForbidden) return } } @@ -623,7 +505,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U if r.Header.Get("Content-Encoding") == "gzip" { b, err := gzip.NewReader(r.Body) if err != nil { - h.httpError(w, err.Error(), http.StatusBadRequest) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusBadRequest) return } defer b.Close() @@ -633,7 +515,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U var bs []byte if r.ContentLength > 0 { if h.Config.MaxBodySize > 0 && r.ContentLength > int64(h.Config.MaxBodySize) { - h.httpError(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge) + h.httpError(w, NewEncoder(r, h.Config), http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge) return } @@ -646,14 +528,14 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U _, err := buf.ReadFrom(body) if err != nil { if err == errTruncated { - h.httpError(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge) + h.httpError(w, NewEncoder(r, h.Config), http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge) return } if h.Config.WriteTracing { h.Logger.Info("Write handler unable to read bytes from request body") } - h.httpError(w, err.Error(), http.StatusBadRequest) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusBadRequest) return } atomic.AddInt64(&h.stats.WriteRequestBytesReceived, int64(buf.Len())) @@ -669,7 +551,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U h.writeHeader(w, http.StatusOK) return } - h.httpError(w, parseError.Error(), http.StatusBadRequest) + h.httpError(w, NewEncoder(r, h.Config), parseError.Error(), http.StatusBadRequest) return } @@ -680,7 +562,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U var err error consistency, err = models.ParseConsistencyLevel(level) if err != nil { - h.httpError(w, err.Error(), http.StatusBadRequest) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusBadRequest) return } } @@ -688,27 +570,27 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U // Write points. if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, user, points); influxdb.IsClientError(err) { atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points))) - h.httpError(w, err.Error(), http.StatusBadRequest) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusBadRequest) return } else if influxdb.IsAuthorizationError(err) { atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points))) - h.httpError(w, err.Error(), http.StatusForbidden) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusForbidden) return } else if werr, ok := err.(tsdb.PartialWriteError); ok { atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points)-werr.Dropped)) atomic.AddInt64(&h.stats.PointsWrittenDropped, int64(werr.Dropped)) - h.httpError(w, werr.Error(), http.StatusBadRequest) + h.httpError(w, NewEncoder(r, h.Config), werr.Error(), http.StatusBadRequest) return } else if err != nil { atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points))) - h.httpError(w, err.Error(), http.StatusInternalServerError) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusInternalServerError) return } else if parseError != nil { // We wrote some of the points atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points))) // The other points failed to parse which means the client sent invalid line protocol. We return a 400 // response code as well as the lines that failed to parse. - h.httpError(w, tsdb.PartialWriteError{Reason: parseError.Error()}.Error(), http.StatusBadRequest) + h.httpError(w, NewEncoder(r, h.Config), tsdb.PartialWriteError{Reason: parseError.Error()}.Error(), http.StatusBadRequest) return } @@ -765,14 +647,14 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) { // Retrieve statistics from the monitor. stats, err := h.Monitor.Statistics(nil) if err != nil { - h.httpError(w, err.Error(), http.StatusInternalServerError) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusInternalServerError) return } // Retrieve diagnostics from the monitor. diags, err := h.Monitor.Diagnostics() if err != nil { - h.httpError(w, err.Error(), http.StatusInternalServerError) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusInternalServerError) return } @@ -782,13 +664,13 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) { if val := diags["system"]; val != nil { jv, err := parseSystemDiagnostics(val) if err != nil { - h.httpError(w, err.Error(), http.StatusInternalServerError) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusInternalServerError) return } data, err := json.Marshal(jv) if err != nil { - h.httpError(w, err.Error(), http.StatusInternalServerError) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusInternalServerError) return } @@ -862,12 +744,12 @@ func (h *Handler) serveDebugRequests(w http.ResponseWriter, r *http.Request) { if s := r.URL.Query().Get("seconds"); s == "" { d = DefaultDebugRequestsInterval } else if seconds, err := strconv.ParseInt(s, 10, 64); err != nil { - h.httpError(w, err.Error(), http.StatusBadRequest) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusBadRequest) return } else { d = time.Duration(seconds) * time.Second if d > MaxDebugRequestsInterval { - h.httpError(w, fmt.Sprintf("exceeded maximum interval time: %s > %s", + h.httpError(w, NewEncoder(r, h.Config), fmt.Sprintf("exceeded maximum interval time: %s > %s", influxql.FormatDuration(d), influxql.FormatDuration(MaxDebugRequestsInterval)), http.StatusBadRequest) @@ -957,7 +839,7 @@ func parseSystemDiagnostics(d *diagnostics.Diagnostics) (map[string]interface{}, } // httpError writes an error to the client in a standard format. -func (h *Handler) httpError(w http.ResponseWriter, errmsg string, code int) { +func (h *Handler) httpError(w http.ResponseWriter, enc Encoder, errmsg string, code int) { if code == http.StatusUnauthorized { // If an unauthorized header will be sent back, add a WWW-Authenticate header // as an authorization challenge. @@ -967,19 +849,8 @@ func (h *Handler) httpError(w http.ResponseWriter, errmsg string, code int) { w.Header().Set("X-InfluxDB-Error", errmsg[:int(sz)]) } - response := Response{Err: errors.New(errmsg)} - if rw, ok := w.(ResponseWriter); ok { - h.writeHeader(w, code) - rw.WriteResponse(response) - return - } - - // Default implementation if the response writer hasn't been replaced - // with our special response writer type. - w.Header().Add("Content-Type", "application/json") h.writeHeader(w, code) - b, _ := json.Marshal(response) - w.Write(b) + enc.Error(w, errors.New(errmsg)) } // Filters and filter helpers @@ -1052,7 +923,7 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, meta.User), h * creds, err := parseCredentials(r) if err != nil { atomic.AddInt64(&h.stats.AuthenticationFailures, 1) - h.httpError(w, err.Error(), http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusUnauthorized) return } @@ -1060,14 +931,14 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, meta.User), h * case UserAuthentication: if creds.Username == "" { atomic.AddInt64(&h.stats.AuthenticationFailures, 1) - h.httpError(w, "username required", http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), "username required", http.StatusUnauthorized) return } user, err = h.MetaClient.Authenticate(creds.Username, creds.Password) if err != nil { atomic.AddInt64(&h.stats.AuthenticationFailures, 1) - h.httpError(w, "authorization failed", http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), "authorization failed", http.StatusUnauthorized) return } case BearerAuthentication: @@ -1082,46 +953,46 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, meta.User), h * // Parse and validate the token. token, err := jwt.Parse(creds.Token, keyLookupFn) if err != nil { - h.httpError(w, err.Error(), http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusUnauthorized) return } else if !token.Valid { - h.httpError(w, "invalid token", http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), "invalid token", http.StatusUnauthorized) return } claims, ok := token.Claims.(jwt.MapClaims) if !ok { - h.httpError(w, "problem authenticating token", http.StatusInternalServerError) + h.httpError(w, NewEncoder(r, h.Config), "problem authenticating token", http.StatusInternalServerError) h.Logger.Info("Could not assert JWT token claims as jwt.MapClaims") return } // Make sure an expiration was set on the token. if exp, ok := claims["exp"].(float64); !ok || exp <= 0.0 { - h.httpError(w, "token expiration required", http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), "token expiration required", http.StatusUnauthorized) return } // Get the username from the token. username, ok := claims["username"].(string) if !ok { - h.httpError(w, "username in token must be a string", http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), "username in token must be a string", http.StatusUnauthorized) return } else if username == "" { - h.httpError(w, "token must contain a username", http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), "token must contain a username", http.StatusUnauthorized) return } // Lookup user in the metastore. if user, err = h.MetaClient.User(username); err != nil { - h.httpError(w, err.Error(), http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), err.Error(), http.StatusUnauthorized) return } else if user == nil { - h.httpError(w, meta.ErrUserNotFound.Error(), http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), meta.ErrUserNotFound.Error(), http.StatusUnauthorized) return } default: - h.httpError(w, "unsupported authentication", http.StatusUnauthorized) + h.httpError(w, NewEncoder(r, h.Config), "unsupported authentication", http.StatusUnauthorized) } } @@ -1216,13 +1087,6 @@ func (h *Handler) logging(inner http.Handler, name string) http.Handler { }) } -func (h *Handler) responseWriter(inner http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w = NewResponseWriter(w, r) - inner.ServeHTTP(w, r) - }) -} - // if the env var is set, and the value is truthy, then we will *not* // recover from a panic. var willCrash bool diff --git a/services/httpd/handler_test.go b/services/httpd/handler_test.go index 530734d2953..c3909e5740a 100644 --- a/services/httpd/handler_test.go +++ b/services/httpd/handler_test.go @@ -14,7 +14,7 @@ import ( "testing" "time" - "github.com/dgrijalva/jwt-go" + jwt "github.com/dgrijalva/jwt-go" "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/internal" "github.com/influxdata/influxdb/models" @@ -32,8 +32,17 @@ func TestHandler_Query(t *testing.T) { } else if ctx.Database != `foo` { t.Fatalf("unexpected db: %s", ctx.Database) } - ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} - ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})} + ctx.StatementID = 1 + result, _ := ctx.CreateResult() + series, _ := result.CreateSeries("series0") + series.Close() + result.Close() + + ctx.StatementID = 2 + result, _ = ctx.CreateResult() + series, _ = result.CreateSeries("series1") + series.Close() + result.Close() return nil } @@ -55,8 +64,17 @@ func TestHandler_Query_File(t *testing.T) { } else if ctx.Database != `foo` { t.Fatalf("unexpected db: %s", ctx.Database) } - ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} - ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})} + ctx.StatementID = 1 + result, _ := ctx.CreateResult() + series, _ := result.CreateSeries("series0") + series.Close() + result.Close() + + ctx.StatementID = 2 + result, _ = ctx.CreateResult() + series, _ = result.CreateSeries("series1") + series.Close() + result.Close() return nil } @@ -124,8 +142,17 @@ func TestHandler_Query_Auth(t *testing.T) { } else if ctx.Database != `foo` { t.Fatalf("unexpected db: %s", ctx.Database) } - ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} - ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})} + ctx.StatementID = 1 + result, _ := ctx.CreateResult() + series, _ := result.CreateSeries("series0") + series.Close() + result.Close() + + ctx.StatementID = 2 + result, _ = ctx.CreateResult() + series, _ = result.CreateSeries("series1") + series.Close() + result.Close() return nil } @@ -239,73 +266,13 @@ func TestHandler_QueryRegex(t *testing.T) { } else if ctx.Database != `test` { t.Fatalf("unexpected db: %s", ctx.Database) } - ctx.Results <- nil - return nil + return ctx.Ok() } w := httptest.NewRecorder() h.ServeHTTP(w, MustNewRequest("GET", "/query?db=test&q=SELECT%20%2A%20FROM%20test%20WHERE%20url%20%3D~%20%2Fhttp%5C%3A%5C%2F%5C%2Fwww.akamai%5C.com%2F", nil)) } -// Ensure the handler merges results from the same statement. -func TestHandler_Query_MergeResults(t *testing.T) { - h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { - ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} - ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})} - return nil - } - - w := httptest.NewRecorder() - h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil)) - if w.Code != http.StatusOK { - t.Fatalf("unexpected status: %d", w.Code) - } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"},{"name":"series1"}]}]}` { - t.Fatalf("unexpected body: %s", body) - } -} - -// Ensure the handler merges results from the same statement. -func TestHandler_Query_MergeEmptyResults(t *testing.T) { - h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { - ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows{}} - ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})} - return nil - } - - w := httptest.NewRecorder() - h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil)) - if w.Code != http.StatusOK { - t.Fatalf("unexpected status: %d", w.Code) - } else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series1"}]}]}` { - t.Fatalf("unexpected body: %s", body) - } -} - -// Ensure the handler can parse chunked and chunk size query parameters. -func TestHandler_Query_Chunked(t *testing.T) { - h := NewHandler(false) - h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { - if ctx.ChunkSize != 2 { - t.Fatalf("unexpected chunk size: %d", ctx.ChunkSize) - } - ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} - ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})} - return nil - } - - w := httptest.NewRecorder() - h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar&chunked=true&chunk_size=2", nil)) - if w.Code != http.StatusOK { - t.Fatalf("unexpected status: %d", w.Code) - } else if w.Body.String() != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]}]} -{"results":[{"statement_id":1,"series":[{"name":"series1"}]}]} -` { - t.Fatalf("unexpected body: %s", w.Body.String()) - } -} - // Ensure the handler can accept an async query. func TestHandler_Query_Async(t *testing.T) { done := make(chan struct{}) @@ -316,8 +283,17 @@ func TestHandler_Query_Async(t *testing.T) { } else if ctx.Database != `foo` { t.Fatalf("unexpected db: %s", ctx.Database) } - ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} - ctx.Results <- &query.Result{StatementID: 2, Series: models.Rows([]*models.Row{{Name: "series1"}})} + ctx.StatementID = 1 + result, _ := ctx.CreateResult() + series, _ := result.CreateSeries("series0") + series.Close() + result.Close() + + ctx.StatementID = 2 + result, _ = ctx.CreateResult() + series, _ = result.CreateSeries("series1") + series.Close() + result.Close() close(done) return nil } diff --git a/services/httpd/response_writer.go b/services/httpd/response_writer.go deleted file mode 100644 index 477d1d901db..00000000000 --- a/services/httpd/response_writer.go +++ /dev/null @@ -1,188 +0,0 @@ -package httpd - -import ( - "encoding/csv" - "encoding/json" - "io" - "net/http" - "strconv" - "time" - - "github.com/influxdata/influxdb/models" -) - -// ResponseWriter is an interface for writing a response. -type ResponseWriter interface { - // WriteResponse writes a response. - WriteResponse(resp Response) (int, error) - - http.ResponseWriter -} - -// NewResponseWriter creates a new ResponseWriter based on the Accept header -// in the request that wraps the ResponseWriter. -func NewResponseWriter(w http.ResponseWriter, r *http.Request) ResponseWriter { - pretty := r.URL.Query().Get("pretty") == "true" - rw := &responseWriter{ResponseWriter: w} - switch r.Header.Get("Accept") { - case "application/csv", "text/csv": - w.Header().Add("Content-Type", "text/csv") - rw.formatter = &csvFormatter{statementID: -1, Writer: w} - case "application/json": - fallthrough - default: - w.Header().Add("Content-Type", "application/json") - rw.formatter = &jsonFormatter{Pretty: pretty, Writer: w} - } - return rw -} - -// WriteError is a convenience function for writing an error response to the ResponseWriter. -func WriteError(w ResponseWriter, err error) (int, error) { - return w.WriteResponse(Response{Err: err}) -} - -// responseWriter is an implementation of ResponseWriter. -type responseWriter struct { - formatter interface { - WriteResponse(resp Response) (int, error) - } - http.ResponseWriter -} - -// WriteResponse writes the response using the formatter. -func (w *responseWriter) WriteResponse(resp Response) (int, error) { - return w.formatter.WriteResponse(resp) -} - -// Flush flushes the ResponseWriter if it has a Flush() method. -func (w *responseWriter) Flush() { - if w, ok := w.ResponseWriter.(http.Flusher); ok { - w.Flush() - } -} - -// CloseNotify calls CloseNotify on the underlying http.ResponseWriter if it -// exists. Otherwise, it returns a nil channel that will never notify. -func (w *responseWriter) CloseNotify() <-chan bool { - if notifier, ok := w.ResponseWriter.(http.CloseNotifier); ok { - return notifier.CloseNotify() - } - return nil -} - -type jsonFormatter struct { - io.Writer - Pretty bool -} - -func (w *jsonFormatter) WriteResponse(resp Response) (n int, err error) { - var b []byte - if w.Pretty { - b, err = json.MarshalIndent(resp, "", " ") - } else { - b, err = json.Marshal(resp) - } - - if err != nil { - n, err = io.WriteString(w, err.Error()) - } else { - n, err = w.Write(b) - } - - w.Write([]byte("\n")) - n++ - return n, err -} - -type csvFormatter struct { - io.Writer - statementID int - columns []string -} - -func (w *csvFormatter) WriteResponse(resp Response) (n int, err error) { - csv := csv.NewWriter(w) - if resp.Err != nil { - csv.Write([]string{"error"}) - csv.Write([]string{resp.Err.Error()}) - csv.Flush() - return n, csv.Error() - } - - for _, result := range resp.Results { - if result.StatementID != w.statementID { - // If there are no series in the result, skip past this result. - if len(result.Series) == 0 { - continue - } - - // Set the statement id and print out a newline if this is not the first statement. - if w.statementID >= 0 { - // Flush the csv writer and write a newline. - csv.Flush() - if err := csv.Error(); err != nil { - return n, err - } - - out, err := io.WriteString(w, "\n") - if err != nil { - return n, err - } - n += out - } - w.statementID = result.StatementID - - // Print out the column headers from the first series. - w.columns = make([]string, 2+len(result.Series[0].Columns)) - w.columns[0] = "name" - w.columns[1] = "tags" - copy(w.columns[2:], result.Series[0].Columns) - if err := csv.Write(w.columns); err != nil { - return n, err - } - } - - for _, row := range result.Series { - w.columns[0] = row.Name - if len(row.Tags) > 0 { - w.columns[1] = string(models.NewTags(row.Tags).HashKey()[1:]) - } else { - w.columns[1] = "" - } - for _, values := range row.Values { - for i, value := range values { - if value == nil { - w.columns[i+2] = "" - continue - } - - switch v := value.(type) { - case float64: - w.columns[i+2] = strconv.FormatFloat(v, 'f', -1, 64) - case int64: - w.columns[i+2] = strconv.FormatInt(v, 10) - case string: - w.columns[i+2] = v - case bool: - if v { - w.columns[i+2] = "true" - } else { - w.columns[i+2] = "false" - } - case time.Time: - w.columns[i+2] = strconv.FormatInt(v.UnixNano(), 10) - case *float64, *int64, *string, *bool: - w.columns[i+2] = "" - } - } - csv.Write(w.columns) - } - } - } - csv.Flush() - if err := csv.Error(); err != nil { - return n, err - } - return n, nil -} diff --git a/tests/server_test.go b/tests/server_test.go index 67c954cd0e6..ceb3def2fdd 100644 --- a/tests/server_test.go +++ b/tests/server_test.go @@ -6684,15 +6684,17 @@ func TestServer_Query_TimeZone(t *testing.T) { func TestServer_Query_Chunk(t *testing.T) { t.Parallel() - s := OpenServer(NewConfig()) + config := NewConfig() + config.HTTPD.MaxRowLimit = 100 + s := OpenServer(config) defer s.Close() if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicySpec("rp0", 1, 0), true); err != nil { t.Fatal(err) } - writes := make([]string, 10001) // 10,000 is the default chunking size, even when no chunking requested. - expectedValues := make([]string, len(writes)) + writes := make([]string, 101) // 10,000 is the default chunking size, even when no chunking requested. + expectedValues := make([]string, config.HTTPD.MaxRowLimit) for i := 0; i < len(writes); i++ { writes[i] = fmt.Sprintf(`cpu value=%d %d`, i, time.Unix(0, int64(i)).UnixNano()) if i < len(expectedValues) {