Skip to content

Commit

Permalink
distsql: add NewFinalIterator to the rowIterator interface
Browse files Browse the repository at this point in the history
Some implementations of the rowIterator interface would destroy rows as
they were iterated over to free memory eagerly. NewFinalIterator is
introduced in this change to provide non-reusable behavior and
NewIterator is explicitly described as reusable.

A reusable iterator has been added to the memRowContainer to satisfy
these new interface semantics.

Release note: None
  • Loading branch information
asubiotto committed Jun 6, 2018
1 parent 099fc95 commit b0ab0d6
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 14 deletions.
5 changes: 5 additions & 0 deletions pkg/sql/distsqlrun/disk_row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ func (d *diskRowContainer) NewIterator(ctx context.Context) rowIterator {
return diskRowIterator{rowContainer: d, SortedDiskMapIterator: d.diskMap.NewIterator()}
}

// NewFinalIterator is equivalent to NewIterator.
func (d *diskRowContainer) NewFinalIterator(ctx context.Context) rowIterator {
return d.NewIterator(ctx)
}

// Row returns the current row. The returned sqlbase.EncDatumRow is only valid
// until the next call to Row().
func (r diskRowIterator) Row() (sqlbase.EncDatumRow, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsqlrun/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func (h *hashJoiner) buildPhase(
}

// Transfer rows from memory.
i := h.rows[h.storedSide].NewIterator(ctx)
i := h.rows[h.storedSide].NewFinalIterator(ctx)
defer i.Close()
for i.Rewind(); ; i.Next() {
if err := h.cancelChecker.Check(); err != nil {
Expand Down Expand Up @@ -636,7 +636,7 @@ func (h *hashJoiner) probePhase(
src = h.rightSource
}
// First process the rows that were already buffered.
probeIterator := h.rows[side].NewIterator(ctx)
probeIterator := h.rows[side].NewFinalIterator(ctx)
defer probeIterator.Close()
for probeIterator.Rewind(); ; probeIterator.Next() {
if ok, err := probeIterator.Valid(); err != nil {
Expand Down
63 changes: 53 additions & 10 deletions pkg/sql/distsqlrun/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ type sortableRowContainer interface {
// NewIterator returns a rowIterator that can be used to iterate over
// the rows.
NewIterator(context.Context) rowIterator
// NewFinalIterator returns a rowIterator that can be used to iterate over the
// rows, possibly freeing resources along the way. Subsequent calls to
// NewIterator or NewFinalIterator are not guaranteed to return any rows.
NewFinalIterator(context.Context) rowIterator

// Close frees up resources held by the sortableRowContainer.
Close(context.Context)
Expand Down Expand Up @@ -194,39 +198,78 @@ func (mc *memRowContainer) InitMaxHeap() {
}

// memRowIterator is a rowIterator that iterates over a memRowContainer. This
// iterator doesn't iterate over a snapshot of memRowContainer and deletes rows
// as soon as they are iterated over to free up memory eagerly.
// iterator doesn't iterate over a snapshot of memRowContainer.
type memRowIterator struct {
*memRowContainer
curIdx int
}

var _ rowIterator = memRowIterator{}
var _ rowIterator = &memRowIterator{}

// NewIterator returns an iterator that can be used to iterate over a
// memRowContainer. Note that this iterator doesn't iterate over a snapshot
// of memRowContainer.
func (mc *memRowContainer) NewIterator(_ context.Context) rowIterator {
return &memRowIterator{memRowContainer: mc}
}

// Rewind implements the rowIterator interface.
func (i *memRowIterator) Rewind() {
i.curIdx = 0
}

// Valid implements the rowIterator interface.
func (i *memRowIterator) Valid() (bool, error) {
return i.curIdx < i.Len(), nil
}

// Next implements the rowIterator interface.
func (i *memRowIterator) Next() {
i.curIdx++
}

// Row implements the rowIterator interface.
func (i *memRowIterator) Row() (sqlbase.EncDatumRow, error) {
return i.EncRow(i.curIdx), nil
}

// Close implements the rowIterator interface.
func (i *memRowIterator) Close() {}

// memRowFinalIterator is a rowIterator that iterates over a memRowContainer.
// This iterator doesn't iterate over a snapshot of memRowContainer and deletes
// rows as soon as they are iterated over to free up memory eagerly.
type memRowFinalIterator struct {
*memRowContainer
}

// NewFinalIterator returns an iterator that can be used to iterate over a
// memRowContainer. Note that this iterator doesn't iterate over a snapshot
// of memRowContainer and that it deletes rows as soon as they are iterated
// over.
func (mc *memRowContainer) NewIterator(_ context.Context) rowIterator {
return memRowIterator{memRowContainer: mc}
func (mc *memRowContainer) NewFinalIterator(_ context.Context) rowIterator {
return memRowFinalIterator{memRowContainer: mc}
}

var _ rowIterator = memRowFinalIterator{}

// Rewind implements the rowIterator interface.
func (i memRowIterator) Rewind() {}
func (i memRowFinalIterator) Rewind() {}

// Valid implements the rowIterator interface.
func (i memRowIterator) Valid() (bool, error) {
func (i memRowFinalIterator) Valid() (bool, error) {
return i.Len() > 0, nil
}

// Next implements the rowIterator interface.
func (i memRowIterator) Next() {
func (i memRowFinalIterator) Next() {
i.PopFirst()
}

// Row implements the rowIterator interface.
func (i memRowIterator) Row() (sqlbase.EncDatumRow, error) {
func (i memRowFinalIterator) Row() (sqlbase.EncDatumRow, error) {
return i.EncRow(0), nil
}

// Close implements the rowIterator interface.
func (i memRowIterator) Close() {}
func (i memRowFinalIterator) Close() {}
89 changes: 89 additions & 0 deletions pkg/sql/distsqlrun/row_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"math"
"testing"

"fmt"

"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
Expand All @@ -28,6 +30,40 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

// verifyRows verifies that the rows read with the given rowIterator match up
// with the given rows. evalCtx and ordering are used to compare rows.
func verifyRows(
ctx context.Context,
i rowIterator,
expectedRows sqlbase.EncDatumRows,
evalCtx *tree.EvalContext,
ordering sqlbase.ColumnOrdering,
) error {
for i.Rewind(); ; i.Next() {
if ok, err := i.Valid(); err != nil {
return err
} else if !ok {
break
}
row, err := i.Row()
if err != nil {
return err
}
if cmp, err := compareRows(
oneIntCol, row, expectedRows[0], evalCtx, &sqlbase.DatumAlloc{}, ordering,
); err != nil {
return err
} else if cmp != 0 {
return fmt.Errorf("unexpected row %v, expected %v", row, expectedRows[0])
}
expectedRows = expectedRows[1:]
}
if len(expectedRows) != 0 {
return fmt.Errorf("iterator missed %d row(s)", len(expectedRows))
}
return nil
}

// TestRowContainerReplaceMax verifies that MaybeReplaceMax correctly adjusts
// the memory accounting.
func TestRowContainerReplaceMax(t *testing.T) {
Expand Down Expand Up @@ -87,3 +123,56 @@ func TestRowContainerReplaceMax(t *testing.T) {
mc.PopFirst()
}
}

func TestRowContainerIterators(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.NewTestingEvalContext(st)
defer evalCtx.Stop(ctx)

const numRows = 10
const numCols = 1
rows := makeIntRows(numRows, numCols)
ordering := sqlbase.ColumnOrdering{{ColIdx: 0, Direction: encoding.Ascending}}

var mc memRowContainer
mc.init(
ordering,
oneIntCol,
evalCtx,
)
defer mc.Close(ctx)

for _, row := range rows {
if err := mc.AddRow(ctx, row); err != nil {
t.Fatal(err)
}
}

// NewIterator verifies that we read the expected rows from the
// memRowContainer and can recreate an iterator.
t.Run("NewIterator", func(t *testing.T) {
for k := 0; k < 2; k++ {
i := mc.NewIterator(ctx)
if err := verifyRows(ctx, i, rows, evalCtx, ordering); err != nil {
t.Fatalf("rows mismatch on the run number %d: %s", k+1, err)
}
i.Close()
}
})

// NewFinalIterator verifies that we read the expected rows from the
// memRowContainer and as we do so, these rows are deleted from the
// memRowContainer.
t.Run("NewFinalIterator", func(t *testing.T) {
i := mc.NewFinalIterator(ctx)
if err := verifyRows(ctx, i, rows, evalCtx, ordering); err != nil {
t.Fatal(err)
}
if mc.Len() != 0 {
t.Fatal("memRowContainer is not empty")
}
})
}
4 changes: 2 additions & 2 deletions pkg/sql/distsqlrun/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (s *sortAllProcessor) fill() (ok bool, _ error) {
s.diskContainer = &diskContainer

// Transfer the rows from memory to disk. This frees up the memory taken up by s.rows.
i := s.rows.NewIterator(ctx)
i := s.rows.NewFinalIterator(ctx)
for i.Rewind(); ; i.Next() {
if ok, err := i.Valid(); err != nil {
return false, err
Expand Down Expand Up @@ -312,7 +312,7 @@ func (s *sortAllProcessor) fillWithContainer(
}
r.Sort(ctx)

s.i = r.NewIterator(ctx)
s.i = r.NewFinalIterator(ctx)
s.i.Rewind()

return true, nil, nil
Expand Down

0 comments on commit b0ab0d6

Please sign in to comment.