Skip to content

Commit

Permalink
Refactor how results are returned to the client
Browse files Browse the repository at this point in the history
This refactors the internal result returning system to match more
closely how we iterative return points between result sets, series, and
rows within the series. It uses the updated terminology rather than
older terminology that no longer matches how we refer to things in the
documentation or within the query engine.

The refactor moves the aggregation and chunking behavior from
`influxql.Emitter` to the HTTP service so that behavior is isolated to
one location rather than sprinkled around in multiple locations.
  • Loading branch information
jsternberg committed Aug 28, 2017
1 parent 37f4a7b commit 373b9da
Show file tree
Hide file tree
Showing 21 changed files with 1,518 additions and 1,329 deletions.
110 changes: 0 additions & 110 deletions coordinator/points_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,116 +436,6 @@ func (f *fakePointsWriter) WritePointsInto(req *coordinator.IntoWriteRequest) er
return f.WritePointsIntoFn(req)
}

func TestBufferedPointsWriter(t *testing.T) {
db := "db0"
rp := "rp0"
capacity := 10000

writePointsIntoCnt := 0
pointsWritten := []models.Point{}

reset := func() {
writePointsIntoCnt = 0
pointsWritten = pointsWritten[:0]
}

fakeWriter := &fakePointsWriter{
WritePointsIntoFn: func(req *coordinator.IntoWriteRequest) error {
writePointsIntoCnt++
pointsWritten = append(pointsWritten, req.Points...)
return nil
},
}

w := coordinator.NewBufferedPointsWriter(fakeWriter, db, rp, capacity)

// Test that capacity and length are correct for new buffered writer.
if w.Cap() != capacity {
t.Fatalf("exp %d, got %d", capacity, w.Cap())
} else if w.Len() != 0 {
t.Fatalf("exp %d, got %d", 0, w.Len())
}

// Test flushing an empty buffer.
if err := w.Flush(); err != nil {
t.Fatal(err)
} else if writePointsIntoCnt > 0 {
t.Fatalf("exp 0, got %d", writePointsIntoCnt)
}

// Test writing zero points.
if err := w.WritePointsInto(&coordinator.IntoWriteRequest{
Database: db,
RetentionPolicy: rp,
Points: []models.Point{},
}); err != nil {
t.Fatal(err)
} else if writePointsIntoCnt > 0 {
t.Fatalf("exp 0, got %d", writePointsIntoCnt)
} else if w.Len() > 0 {
t.Fatalf("exp 0, got %d", w.Len())
}

// Test writing single large bunch of points points.
req := coordinator.WritePointsRequest{
Database: db,
RetentionPolicy: rp,
}

numPoints := int(float64(capacity) * 5.5)
for i := 0; i < numPoints; i++ {
req.AddPoint("cpu", float64(i), time.Now().Add(time.Duration(i)*time.Second), nil)
}

r := coordinator.IntoWriteRequest(req)
if err := w.WritePointsInto(&r); err != nil {
t.Fatal(err)
} else if writePointsIntoCnt != 5 {
t.Fatalf("exp 5, got %d", writePointsIntoCnt)
} else if w.Len() != capacity/2 {
t.Fatalf("exp %d, got %d", capacity/2, w.Len())
} else if len(pointsWritten) != numPoints-capacity/2 {
t.Fatalf("exp %d, got %d", numPoints-capacity/2, len(pointsWritten))
}

if err := w.Flush(); err != nil {
t.Fatal(err)
} else if writePointsIntoCnt != 6 {
t.Fatalf("exp 6, got %d", writePointsIntoCnt)
} else if w.Len() != 0 {
t.Fatalf("exp 0, got %d", w.Len())
} else if len(pointsWritten) != numPoints {
t.Fatalf("exp %d, got %d", numPoints, len(pointsWritten))
} else if !reflect.DeepEqual(r.Points, pointsWritten) {
t.Fatal("points don't match")
}

reset()

// Test writing points one at a time.
for i, _ := range r.Points {
if err := w.WritePointsInto(&coordinator.IntoWriteRequest{
Database: db,
RetentionPolicy: rp,
Points: r.Points[i : i+1],
}); err != nil {
t.Fatal(err)
}
}

if err := w.Flush(); err != nil {
t.Fatal(err)
} else if writePointsIntoCnt != 6 {
t.Fatalf("exp 6, got %d", writePointsIntoCnt)
} else if w.Len() != 0 {
t.Fatalf("exp 0, got %d", w.Len())
} else if len(pointsWritten) != numPoints {
t.Fatalf("exp %d, got %d", numPoints, len(pointsWritten))
} else if !reflect.DeepEqual(r.Points, pointsWritten) {
t.Fatal("points don't match")
}
}

var shardID uint64

type fakeStore struct {
Expand Down
Loading

0 comments on commit 373b9da

Please sign in to comment.