Skip to content

Commit

Permalink
Support chunking in the middle of a series in the emitter
Browse files Browse the repository at this point in the history
Limit the maximum size of the return value when chunking is not used to
prevent the server from going out of memory.

Fixes #6115.
  • Loading branch information
jsternberg committed Mar 30, 2016
1 parent c190778 commit 364dce3
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- [#6149](https://github.com/influxdata/influxdb/pull/6149): Kill running queries when server is shutdown.
- [#5372](https://github.com/influxdata/influxdb/pull/5372): Faster shard loading
- [#6148](https://github.com/influxdata/influxdb/pull/6148): Build script is now compatible with Python 3. Added ability to create detached signatures for packages. Build script now uses Python logging facility for messages.
- [#6115](https://github.com/influxdata/influxdb/issues/6115): Support chunking query results mid-series. Limit non-chunked output.

### Bugfixes

Expand Down
2 changes: 1 addition & 1 deletion cluster/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c
}

// Generate a row emitter from the iterator set.
em := influxql.NewEmitter(itrs, stmt.TimeAscending())
em := influxql.NewEmitter(itrs, stmt.TimeAscending(), chunkSize)
em.Columns = stmt.ColumnNames()
em.OmitTime = stmt.OmitTime
defer em.Close()
Expand Down
1 change: 1 addition & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ reporting-disabled = false
pprof-enabled = false
https-enabled = false
https-certificate = "/etc/ssl/influxdb.pem"
max-row-limit = 10000

###
### [[graphite]]
Expand Down
9 changes: 6 additions & 3 deletions influxql/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Emitter struct {
buf []Point
itrs []Iterator
ascending bool
chunkSize int

tags Tags
row *models.Row
Expand All @@ -25,11 +26,12 @@ type Emitter struct {
}

// NewEmitter returns a new instance of Emitter that pulls from itrs.
func NewEmitter(itrs []Iterator, ascending bool) *Emitter {
func NewEmitter(itrs []Iterator, ascending bool, chunkSize int) *Emitter {
return &Emitter{
buf: make([]Point, len(itrs)),
itrs: itrs,
ascending: ascending,
chunkSize: chunkSize,
}
}

Expand Down Expand Up @@ -65,11 +67,12 @@ func (e *Emitter) Emit() *models.Row {
}

// If there's no row yet then create one.
// If the name and tags match the existing row, append to that row.
// If the name and tags match the existing row, append to that row if
// the number of values doesn't exceed the chunk size.
// Otherwise return existing row and add values to next emitted row.
if e.row == nil {
e.createRow(name, tags, values)
} else if e.row.Name == name && e.tags.Equals(&tags) {
} else if e.row.Name == name && e.tags.Equals(&tags) && (e.chunkSize <= 0 || len(e.row.Values) < e.chunkSize) {
e.row.Values = append(e.row.Values, values)
} else {
row := e.row
Expand Down
43 changes: 42 additions & 1 deletion influxql/emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestEmitter_Emit(t *testing.T) {
{Name: "cpu", Tags: ParseTags("region=north"), Time: 0, Value: 4},
{Name: "mem", Time: 4, Value: 5},
}},
}, true)
}, true, 0)
e.Columns = []string{"col1", "col2"}

// Verify the cpu region=west is emitted first.
Expand Down Expand Up @@ -67,3 +67,44 @@ func TestEmitter_Emit(t *testing.T) {
t.Fatalf("unexpected eof: %s", spew.Sdump(row))
}
}

// Ensure the emitter will limit the chunked output from a series.
func TestEmitter_ChunkSize(t *testing.T) {
// Build an emitter that pulls from one iterator with multiple points in the same series.
e := influxql.NewEmitter([]influxql.Iterator{
&FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Tags: ParseTags("region=west"), Time: 0, Value: 1},
{Name: "cpu", Tags: ParseTags("region=west"), Time: 1, Value: 2},
}},
}, true, 1)
e.Columns = []string{"col1"}

// Verify the cpu region=west is emitted first.
if row := e.Emit(); !deep.Equal(row, &models.Row{
Name: "cpu",
Tags: map[string]string{"region": "west"},
Columns: []string{"col1"},
Values: [][]interface{}{
{time.Unix(0, 0).UTC(), float64(1)},
},
}) {
t.Fatalf("unexpected row(0): %s", spew.Sdump(row))
}

// Verify the cpu region=north is emitted next.
if row := e.Emit(); !deep.Equal(row, &models.Row{
Name: "cpu",
Tags: map[string]string{"region": "west"},
Columns: []string{"col1"},
Values: [][]interface{}{
{time.Unix(0, 1).UTC(), float64(2)},
},
}) {
t.Fatalf("unexpected row(1): %s", spew.Sdump(row))
}

// Verify EOF.
if row := e.Emit(); row != nil {
t.Fatalf("unexpected eof: %s", spew.Sdump(row))
}
}
2 changes: 2 additions & 0 deletions services/httpd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Config struct {
PprofEnabled bool `toml:"pprof-enabled"`
HTTPSEnabled bool `toml:"https-enabled"`
HTTPSCertificate string `toml:"https-certificate"`
MaxRowLimit int `toml:"max-row-limit"`
}

// NewConfig returns a new Config with default settings.
Expand All @@ -23,5 +24,6 @@ func NewConfig() Config {
LogEnabled: true,
HTTPSEnabled: false,
HTTPSCertificate: "/etc/ssl/influxdb.pem",
MaxRowLimit: DefaultChunkSize,
}
}
26 changes: 20 additions & 6 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@ import (
)

const (
// DefaultChunkSize specifies the amount of data mappers will read
// up to, before sending results back to the engine. This is the
// default size in the number of values returned in a raw query.
// DefaultChunkSize specifies the maximum number of points that will
// be read before sending results back to the engine.
//
// Could be many more bytes depending on fields returned.
// This has no relation to the number of bytes that are returned.
DefaultChunkSize = 10000
)

Expand Down Expand Up @@ -75,17 +74,19 @@ type Handler struct {
Logger *log.Logger
loggingEnabled bool // Log every HTTP access.
WriteTrace bool // Detailed logging of write path
rowLimit int
statMap *expvar.Map
}

// NewHandler returns a new instance of handler with routes.
func NewHandler(requireAuthentication, loggingEnabled, writeTrace bool, statMap *expvar.Map) *Handler {
func NewHandler(requireAuthentication, loggingEnabled, writeTrace bool, rowLimit int, statMap *expvar.Map) *Handler {
h := &Handler{
mux: pat.New(),
requireAuthentication: requireAuthentication,
Logger: log.New(os.Stderr, "[http] ", log.LstdFlags),
loggingEnabled: loggingEnabled,
WriteTrace: writeTrace,
rowLimit: rowLimit,
statMap: statMap,
}

Expand Down Expand Up @@ -284,7 +285,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
chunked := (q.Get("chunked") == "true")
chunkSize := DefaultChunkSize
if chunked {
if n, err := strconv.ParseInt(q.Get("chunk_size"), 10, 64); err == nil {
if n, err := strconv.ParseInt(q.Get("chunk_size"), 10, 64); err == nil && int(n) > 0 {
chunkSize = int(n)
}
}
Expand Down Expand Up @@ -312,6 +313,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
w.WriteHeader(http.StatusOK)

// pull all results from the channel
rows := 0
for r := range results {
// Ignore nil results.
if r == nil {
Expand All @@ -328,11 +330,23 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
n, _ := w.Write(MarshalJSON(Response{
Results: []*influxql.Result{r},
}, pretty))
if !pretty {
w.Write([]byte("\n"))
}
h.statMap.Add(statQueryRequestBytesTransmitted, int64(n))
w.(http.Flusher).Flush()
continue
}

// Limit the number of rows that can be returned in a non-chunked response.
// This is to prevent the server from going OOM when returning a large response.
// If you want to return more than the default chunk size, then use chunking
// to process multiple blobs.
rows += len(r.Series)
if h.rowLimit > 0 && rows > h.rowLimit {
break
}

// It's not chunked so buffer results in memory.
// Results for statements need to be combined together.
// We need to check if this new result is for the same statement as
Expand Down
6 changes: 4 additions & 2 deletions services/httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ func TestHandler_Query_Chunked(t *testing.T) {
h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar&chunked=true&chunk_size=2", nil))
if w.Code != http.StatusOK {
t.Fatalf("unexpected status: %d", w.Code)
} else if w.Body.String() != `{"results":[{"series":[{"name":"series0"}]}]}{"results":[{"series":[{"name":"series1"}]}]}` {
} else if w.Body.String() != `{"results":[{"series":[{"name":"series0"}]}]}
{"results":[{"series":[{"name":"series1"}]}]}
` {
t.Fatalf("unexpected body: %s", w.Body.String())
}
}
Expand Down Expand Up @@ -287,7 +289,7 @@ type Handler struct {
func NewHandler(requireAuthentication bool) *Handler {
statMap := influxdb.NewStatistics("httpd", "httpd", nil)
h := &Handler{
Handler: httpd.NewHandler(requireAuthentication, true, false, statMap),
Handler: httpd.NewHandler(requireAuthentication, true, false, 0, statMap),
}
h.Handler.MetaClient = &h.MetaClient
h.Handler.QueryExecutor = &h.QueryExecutor
Expand Down
1 change: 1 addition & 0 deletions services/httpd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func NewService(c Config) *Service {
c.AuthEnabled,
c.LogEnabled,
c.WriteTracing,
c.MaxRowLimit,
statMap,
),
Logger: log.New(os.Stderr, "[httpd] ", log.LstdFlags),
Expand Down

0 comments on commit 364dce3

Please sign in to comment.