Skip to content

Commit

Permalink
Introduce syntax for marking a partial response with chunking
Browse files Browse the repository at this point in the history
The `partial` tag has been added to the JSON response of a series and
the result so that a client knows when more of the series or result will
be sent in a future JSON chunk.

This helps interactive clients who don't want to wait for all of the
data to know if it is done processing the current series or the current
result. Previously, the client had to guess if the next chunk would
refer to the same result or a new result and it had to match the name
and tags of the two series to know if they were the same series. Now,
the client just needs to check the `partial` field included with the
response to know if it should expect more.

Fixed `max-row-limit` so it counts rows instead of results and it
truncates the response when the `max-row-limit` is reached.
  • Loading branch information
jsternberg committed Sep 30, 2016
1 parent 11cf759 commit 79c5a3b
Show file tree
Hide file tree
Showing 10 changed files with 728 additions and 664 deletions.
120 changes: 60 additions & 60 deletions cmd/influxd/run/server_suite_test.go

Large diffs are not rendered by default.

1,082 changes: 542 additions & 540 deletions cmd/influxd/run/server_test.go

Large diffs are not rendered by default.

53 changes: 26 additions & 27 deletions coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,11 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influx
return err
}

ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
Series: rows,
Messages: messages,
}
return nil
})
}

func (e *StatementExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.AlterRetentionPolicyStatement) error {
Expand Down Expand Up @@ -427,7 +426,7 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
}

for {
row, err := em.Emit()
row, partial, err := em.Emit()
if err != nil {
return err
} else if row == nil {
Expand All @@ -452,19 +451,18 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
result := &influxql.Result{
StatementID: ctx.StatementID,
Series: []*models.Row{row},
Partial: partial,
}

// Send results or exit if closing.
select {
case <-ctx.InterruptCh:
return influxql.ErrQueryInterrupted
case ctx.Results <- result:
if err := ctx.Send(result); err != nil {
return err
}

emitted = true
}

// Flush remaing points and emit write count if an INTO statement.
// Flush remaining points and emit write count if an INTO statement.
if stmt.Target != nil {
if err := pointsWriter.Flush(); err != nil {
return err
Expand All @@ -475,23 +473,24 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
messages = append(messages, influxql.ReadOnlyWarning(stmt.String()))
}

ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
Messages: messages,
Series: []*models.Row{{
Name: "result",
Columns: []string{"time", "written"},
Values: [][]interface{}{{time.Unix(0, 0).UTC(), writeN}},
}},
}
return nil
})
}

// Always emit at least one result.
if !emitted {
ctx.Results <- &influxql.Result{
if err := ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
Series: make([]*models.Row, 0),
}); err != nil {
return err
}
}

Expand Down Expand Up @@ -678,11 +677,10 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea

measurements, err := e.TSDBStore.Measurements(q.Database, q.Condition)
if err != nil || len(measurements) == 0 {
ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
Err: err,
}
return nil
})
}

if q.Offset > 0 {
Expand All @@ -705,21 +703,19 @@ func (e *StatementExecutor) executeShowMeasurementsStatement(q *influxql.ShowMea
}

if len(values) == 0 {
ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
}
return nil
})
}

ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
Series: []*models.Row{{
Name: "measurements",
Columns: []string{"name"},
Values: values,
}},
}
return nil
})
}

func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) (models.Rows, error) {
Expand Down Expand Up @@ -854,11 +850,10 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem

tagValues, err := e.TSDBStore.TagValues(ctx.Database, q.Condition)
if err != nil {
ctx.Results <- &influxql.Result{
return ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
Err: err,
}
return nil
})
}

emitted := false
Expand Down Expand Up @@ -892,17 +887,21 @@ func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatem
row.Values[i] = []interface{}{v.Key, v.Value}
}

ctx.Results <- &influxql.Result{
if ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
Series: []*models.Row{row},
}); err != nil {
return err
}
emitted = true
}

// Ensure at least one result is emitted.
if !emitted {
ctx.Results <- &influxql.Result{
if err := ctx.Send(&influxql.Result{
StatementID: ctx.StatementID,
}); err != nil {
return err
}
}
return nil
Expand Down
20 changes: 13 additions & 7 deletions influxql/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,22 @@ func (e *Emitter) Close() error {
}

// Emit returns the next row from the iterators.
func (e *Emitter) Emit() (*models.Row, error) {
func (e *Emitter) Emit() (*models.Row, bool, error) {
// Immediately end emission if there are no iterators.
if len(e.itrs) == 0 {
return nil, nil
return nil, false, nil
}

// Continually read from iterators until they are exhausted.
for {
// Fill buffer. Return row if no more points remain.
t, name, tags, err := e.loadBuf()
if err != nil {
return nil, err
return nil, false, err
} else if t == ZeroTime {
row := e.row
e.row = nil
return row, nil
return row, false, nil
}

// Read next set of values from all iterators at a given time/name/tags.
Expand All @@ -65,7 +65,7 @@ func (e *Emitter) Emit() (*models.Row, error) {
if values == nil {
row := e.row
e.row = nil
return row, nil
return row, false, nil
}

// If there's no row yet then create one.
Expand All @@ -74,12 +74,18 @@ func (e *Emitter) Emit() (*models.Row, error) {
// Otherwise return existing row and add values to next emitted row.
if e.row == nil {
e.createRow(name, tags, values)
} else if e.row.Name == name && e.tags.Equals(&tags) && (e.chunkSize <= 0 || len(e.row.Values) < e.chunkSize) {
} else if e.row.Name == name && e.tags.Equals(&tags) {
if e.chunkSize > 0 && len(e.row.Values) >= e.chunkSize {
row := e.row
row.Partial = true
e.createRow(name, tags, values)
return row, true, nil
}
e.row.Values = append(e.row.Values, values)
} else {
row := e.row
e.createRow(name, tags, values)
return row, nil
return row, true, nil
}
}
}
Expand Down
15 changes: 8 additions & 7 deletions influxql/emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestEmitter_Emit(t *testing.T) {
e.Columns = []string{"col1", "col2"}

// Verify the cpu region=west is emitted first.
if row, err := e.Emit(); err != nil {
if row, _, err := e.Emit(); err != nil {
t.Fatalf("unexpected error(0): %s", err)
} else if !deep.Equal(row, &models.Row{
Name: "cpu",
Expand All @@ -42,7 +42,7 @@ func TestEmitter_Emit(t *testing.T) {
}

// Verify the cpu region=north is emitted next.
if row, err := e.Emit(); err != nil {
if row, _, err := e.Emit(); err != nil {
t.Fatalf("unexpected error(1): %s", err)
} else if !deep.Equal(row, &models.Row{
Name: "cpu",
Expand All @@ -56,7 +56,7 @@ func TestEmitter_Emit(t *testing.T) {
}

// Verify the mem series is emitted last.
if row, err := e.Emit(); err != nil {
if row, _, err := e.Emit(); err != nil {
t.Fatalf("unexpected error(2): %s", err)
} else if !deep.Equal(row, &models.Row{
Name: "mem",
Expand All @@ -69,7 +69,7 @@ func TestEmitter_Emit(t *testing.T) {
}

// Verify EOF.
if row, err := e.Emit(); err != nil {
if row, _, err := e.Emit(); err != nil {
t.Fatalf("unexpected error(eof): %s", err)
} else if row != nil {
t.Fatalf("unexpected eof: %s", spew.Sdump(row))
Expand All @@ -88,7 +88,7 @@ func TestEmitter_ChunkSize(t *testing.T) {
e.Columns = []string{"col1"}

// Verify the cpu region=west is emitted first.
if row, err := e.Emit(); err != nil {
if row, _, err := e.Emit(); err != nil {
t.Fatalf("unexpected error(0): %s", err)
} else if !deep.Equal(row, &models.Row{
Name: "cpu",
Expand All @@ -97,12 +97,13 @@ func TestEmitter_ChunkSize(t *testing.T) {
Values: [][]interface{}{
{time.Unix(0, 0).UTC(), float64(1)},
},
Partial: true,
}) {
t.Fatalf("unexpected row(0): %s", spew.Sdump(row))
}

// Verify the cpu region=north is emitted next.
if row, err := e.Emit(); err != nil {
if row, _, err := e.Emit(); err != nil {
t.Fatalf("unexpected error(1): %s", err)
} else if !deep.Equal(row, &models.Row{
Name: "cpu",
Expand All @@ -116,7 +117,7 @@ func TestEmitter_ChunkSize(t *testing.T) {
}

// Verify EOF.
if row, err := e.Emit(); err != nil {
if row, _, err := e.Emit(); err != nil {
t.Fatalf("unexpected error(eof): %s", err)
} else if row != nil {
t.Fatalf("unexpected eof: %s", spew.Sdump(row))
Expand Down
11 changes: 11 additions & 0 deletions influxql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ type ExecutionContext struct {
ExecutionOptions
}

// Send sends a Result to the Results channel and will exit if the query has
// been interrupted.
func (ctx *ExecutionContext) Send(result *Result) error {
select {
case <-ctx.InterruptCh:
return ErrQueryInterrupted
case ctx.Results <- result:
}
return nil
}

// StatementExecutor executes a statement within the QueryExecutor.
type StatementExecutor interface {
// ExecuteStatement executes a statement. Results should be sent to the
Expand Down
23 changes: 16 additions & 7 deletions influxql/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,29 @@ func ReadOnlyWarning(stmt string) *Message {
type Result struct {
// StatementID is just the statement's position in the query. It's used
// to combine statement results if they're being buffered in memory.
StatementID int `json:"-"`
StatementID int
Series models.Rows
Messages []*Message
Partial bool
Err error
}

// MarshalJSON encodes the result into JSON.
func (r *Result) MarshalJSON() ([]byte, error) {
// Define a struct that outputs "error" as a string.
var o struct {
Series []*models.Row `json:"series,omitempty"`
Messages []*Message `json:"messages,omitempty"`
Err string `json:"error,omitempty"`
StatementID int `json:"statement_id"`
Series []*models.Row `json:"series,omitempty"`
Messages []*Message `json:"messages,omitempty"`
Partial bool `json:"partial,omitempty"`
Err string `json:"error,omitempty"`
}

// Copy fields to output struct.
o.StatementID = r.StatementID
o.Series = r.Series
o.Messages = r.Messages
o.Partial = r.Partial
if r.Err != nil {
o.Err = r.Err.Error()
}
Expand All @@ -86,17 +91,21 @@ func (r *Result) MarshalJSON() ([]byte, error) {
// UnmarshalJSON decodes the data into the Result struct
func (r *Result) UnmarshalJSON(b []byte) error {
var o struct {
Series []*models.Row `json:"series,omitempty"`
Messages []*Message `json:"messages,omitempty"`
Err string `json:"error,omitempty"`
StatementID int `json:"statement_id"`
Series []*models.Row `json:"series,omitempty"`
Messages []*Message `json:"messages,omitempty"`
Partial bool `json:"partial,omitempty"`
Err string `json:"error,omitempty"`
}

err := json.Unmarshal(b, &o)
if err != nil {
return err
}
r.StatementID = o.StatementID
r.Series = o.Series
r.Messages = o.Messages
r.Partial = o.Partial
if o.Err != "" {
r.Err = errors.New(o.Err)
}
Expand Down
1 change: 1 addition & 0 deletions models/rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type Row struct {
Tags map[string]string `json:"tags,omitempty"`
Columns []string `json:"columns,omitempty"`
Values [][]interface{} `json:"values,omitempty"`
Partial bool `json:"partial,omitempty"`
}

// SameSeries returns true if r contains values for the same series as o.
Expand Down
Loading

0 comments on commit 79c5a3b

Please sign in to comment.