Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
26441: distsql: add NewFinalIterator to the rowIterator interface r=asubiotto a=asubiotto

Some implementations of the rowIterator interface would destroy rows as
they were iterated over to free memory eagerly. NewFinalIterator is
introduced in this change to provide non-reusable behavior and
NewIterator is explicitly described as reusable.

A reusable iterator has been added to the memRowContainer to satisfy
these new interface semantics.

Release note: None

26463: storage: Disable campaign-on-wake when receiving raft messages r=bdarnell a=bdarnell

When the incoming message is a MsgVote (which is likely the case for
the first message received by a quiesced follower), immediate
campaigning will cause the election to fail.

This is similar to reverting commit 44e3977, but only disables
campaigning in one location.

Fixes #26391

Release note: None

26469: lint: Fix a file descriptor leak r=bdarnell a=bdarnell

This leak is enough to cause `make lintshort` fail when run under the
default file descriptor limit on macos (256).

Release note: None

26470: build: Pin go.uuid to the version currently in use r=bdarnell a=bdarnell

Updates #26332

Release note: None

Co-authored-by: Alfonso Subiotto Marqués <alfonso@cockroachlabs.com>
Co-authored-by: Ben Darnell <ben@cockroachlabs.com>
  • Loading branch information
3 people committed Jun 6, 2018
5 parents e8911d4 + b0ab0d6 + da2025f + 20e8446 + b09ab16 commit f6f6ee8
Show file tree
Hide file tree
Showing 13 changed files with 266 additions and 29 deletions.
8 changes: 8 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ ignored = [
name = "github.com/xwb1989/sqlparser"
source = "https://github.com/dt/sqlparser"

# The master version of go.uuid has an incompatible interface and (as
# of 2018-06-06) a serious bug. Don't upgrade without making sure
# that bug is fixed.
# https://github.com/cockroachdb/cockroach/issues/26332
[[constraint]]
name = "github.com/satori/go.uuid"
version = "v1.2.0"

# github.com/docker/docker depends on a few functions not included in the
# latest release: reference.{FamiliarName,ParseNormalizedNamed,TagNameOnly}.
#
Expand Down
4 changes: 4 additions & 0 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,10 @@ func (c *cluster) Start(ctx context.Context, opts ...option) {
if encrypt && !argExists(args, "--encrypt") {
args = append(args, "--encrypt")
}
if local {
// This avoids annoying firewall prompts on macos
args = append(args, "--args", "--host=127.0.0.1")
}
if err := execCmd(ctx, c.l, args...); err != nil {
c.t.Fatal(err)
}
Expand Down
73 changes: 73 additions & 0 deletions pkg/cmd/roachtest/election.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.

package main

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

func registerElectionAfterRestart(r *registry) {
r.Add(testSpec{
Name: "election-after-restart",
Nodes: nodes(3),
Stable: false, // Introduced 2018-06-06
Run: func(ctx context.Context, t *test, c *cluster) {
t.Status("starting up")
c.Put(ctx, cockroach, "./cockroach")
c.Start(ctx)

t.Status("creating table and splits")
c.Run(ctx, c.Node(1), `./cockroach sql --insecure -e "
CREATE TABLE kv (k INT PRIMARY KEY, v INT);
ALTER TABLE kv SPLIT AT SELECT generate_series(0, 10000, 100)"`)

start := timeutil.Now()
c.Run(ctx, c.Node(1), `./cockroach sql --insecure -e "
SELECT * FROM kv"`)
duration := timeutil.Since(start)
c.l.printf("pre-restart, query took %s\n", duration)

t.Status("restarting")
c.Stop(ctx)
c.Start(ctx)

// Each of the 100 ranges in this table must elect a leader for
// this query to complete. In naive raft, each of these
// elections would require waiting for a 3-second timeout, one
// at a time. This test verifies that our mechanisms to speed
// this up are working (we trigger elections eagerly, but not so
// eagerly that multiple elections conflict with each other).
start = timeutil.Now()
c.Run(ctx, c.Node(1), `./cockroach sql --insecure -e "
SELECT * FROM kv"`)
duration = timeutil.Since(start)
c.l.printf("post-restart, query took %s\n", duration)
if expected := 15 * time.Second; duration > expected {
// In the happy case, this query runs in around 250ms. Prior
// to the introduction of this test, a bug caused most
// elections to fail and the query would take over 100
// seconds. There are still issues that can cause a few
// elections to fail (the biggest one as I write this is
// #26448), so we must use a generous timeout here. We may be
// able to tighten the bounds as we make more improvements.
t.Fatalf("expected query to succeed in less than %s, took %s", expected, duration)
}
},
})
}
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func registerTests(r *registry) {
registerDecommission(r)
registerDiskUsage(r)
registerDrop(r)
registerElectionAfterRestart(r)
registerHotSpotSplits(r)
registerImportTPCC(r)
registerImportTPCH(r)
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/distsqlrun/disk_row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ func (d *diskRowContainer) NewIterator(ctx context.Context) rowIterator {
return diskRowIterator{rowContainer: d, SortedDiskMapIterator: d.diskMap.NewIterator()}
}

// NewFinalIterator is equivalent to NewIterator.
func (d *diskRowContainer) NewFinalIterator(ctx context.Context) rowIterator {
return d.NewIterator(ctx)
}

// Row returns the current row. The returned sqlbase.EncDatumRow is only valid
// until the next call to Row().
func (r diskRowIterator) Row() (sqlbase.EncDatumRow, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsqlrun/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func (h *hashJoiner) buildPhase(
}

// Transfer rows from memory.
i := h.rows[h.storedSide].NewIterator(ctx)
i := h.rows[h.storedSide].NewFinalIterator(ctx)
defer i.Close()
for i.Rewind(); ; i.Next() {
if err := h.cancelChecker.Check(); err != nil {
Expand Down Expand Up @@ -636,7 +636,7 @@ func (h *hashJoiner) probePhase(
src = h.rightSource
}
// First process the rows that were already buffered.
probeIterator := h.rows[side].NewIterator(ctx)
probeIterator := h.rows[side].NewFinalIterator(ctx)
defer probeIterator.Close()
for probeIterator.Rewind(); ; probeIterator.Next() {
if ok, err := probeIterator.Valid(); err != nil {
Expand Down
63 changes: 53 additions & 10 deletions pkg/sql/distsqlrun/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ type sortableRowContainer interface {
// NewIterator returns a rowIterator that can be used to iterate over
// the rows.
NewIterator(context.Context) rowIterator
// NewFinalIterator returns a rowIterator that can be used to iterate over the
// rows, possibly freeing resources along the way. Subsequent calls to
// NewIterator or NewFinalIterator are not guaranteed to return any rows.
NewFinalIterator(context.Context) rowIterator

// Close frees up resources held by the sortableRowContainer.
Close(context.Context)
Expand Down Expand Up @@ -194,39 +198,78 @@ func (mc *memRowContainer) InitMaxHeap() {
}

// memRowIterator is a rowIterator that iterates over a memRowContainer. This
// iterator doesn't iterate over a snapshot of memRowContainer and deletes rows
// as soon as they are iterated over to free up memory eagerly.
// iterator doesn't iterate over a snapshot of memRowContainer.
type memRowIterator struct {
*memRowContainer
curIdx int
}

var _ rowIterator = memRowIterator{}
var _ rowIterator = &memRowIterator{}

// NewIterator returns an iterator that can be used to iterate over a
// memRowContainer. Note that this iterator doesn't iterate over a snapshot
// of memRowContainer.
func (mc *memRowContainer) NewIterator(_ context.Context) rowIterator {
return &memRowIterator{memRowContainer: mc}
}

// Rewind implements the rowIterator interface.
func (i *memRowIterator) Rewind() {
i.curIdx = 0
}

// Valid implements the rowIterator interface.
func (i *memRowIterator) Valid() (bool, error) {
return i.curIdx < i.Len(), nil
}

// Next implements the rowIterator interface.
func (i *memRowIterator) Next() {
i.curIdx++
}

// Row implements the rowIterator interface.
func (i *memRowIterator) Row() (sqlbase.EncDatumRow, error) {
return i.EncRow(i.curIdx), nil
}

// Close implements the rowIterator interface.
func (i *memRowIterator) Close() {}

// memRowFinalIterator is a rowIterator that iterates over a memRowContainer.
// This iterator doesn't iterate over a snapshot of memRowContainer and deletes
// rows as soon as they are iterated over to free up memory eagerly.
type memRowFinalIterator struct {
*memRowContainer
}

// NewFinalIterator returns an iterator that can be used to iterate over a
// memRowContainer. Note that this iterator doesn't iterate over a snapshot
// of memRowContainer and that it deletes rows as soon as they are iterated
// over.
func (mc *memRowContainer) NewIterator(_ context.Context) rowIterator {
return memRowIterator{memRowContainer: mc}
func (mc *memRowContainer) NewFinalIterator(_ context.Context) rowIterator {
return memRowFinalIterator{memRowContainer: mc}
}

var _ rowIterator = memRowFinalIterator{}

// Rewind implements the rowIterator interface.
func (i memRowIterator) Rewind() {}
func (i memRowFinalIterator) Rewind() {}

// Valid implements the rowIterator interface.
func (i memRowIterator) Valid() (bool, error) {
func (i memRowFinalIterator) Valid() (bool, error) {
return i.Len() > 0, nil
}

// Next implements the rowIterator interface.
func (i memRowIterator) Next() {
func (i memRowFinalIterator) Next() {
i.PopFirst()
}

// Row implements the rowIterator interface.
func (i memRowIterator) Row() (sqlbase.EncDatumRow, error) {
func (i memRowFinalIterator) Row() (sqlbase.EncDatumRow, error) {
return i.EncRow(0), nil
}

// Close implements the rowIterator interface.
func (i memRowIterator) Close() {}
func (i memRowFinalIterator) Close() {}
89 changes: 89 additions & 0 deletions pkg/sql/distsqlrun/row_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"math"
"testing"

"fmt"

"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
Expand All @@ -28,6 +30,40 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

// verifyRows verifies that the rows read with the given rowIterator match up
// with the given rows. evalCtx and ordering are used to compare rows.
func verifyRows(
ctx context.Context,
i rowIterator,
expectedRows sqlbase.EncDatumRows,
evalCtx *tree.EvalContext,
ordering sqlbase.ColumnOrdering,
) error {
for i.Rewind(); ; i.Next() {
if ok, err := i.Valid(); err != nil {
return err
} else if !ok {
break
}
row, err := i.Row()
if err != nil {
return err
}
if cmp, err := compareRows(
oneIntCol, row, expectedRows[0], evalCtx, &sqlbase.DatumAlloc{}, ordering,
); err != nil {
return err
} else if cmp != 0 {
return fmt.Errorf("unexpected row %v, expected %v", row, expectedRows[0])
}
expectedRows = expectedRows[1:]
}
if len(expectedRows) != 0 {
return fmt.Errorf("iterator missed %d row(s)", len(expectedRows))
}
return nil
}

// TestRowContainerReplaceMax verifies that MaybeReplaceMax correctly adjusts
// the memory accounting.
func TestRowContainerReplaceMax(t *testing.T) {
Expand Down Expand Up @@ -87,3 +123,56 @@ func TestRowContainerReplaceMax(t *testing.T) {
mc.PopFirst()
}
}

func TestRowContainerIterators(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.NewTestingEvalContext(st)
defer evalCtx.Stop(ctx)

const numRows = 10
const numCols = 1
rows := makeIntRows(numRows, numCols)
ordering := sqlbase.ColumnOrdering{{ColIdx: 0, Direction: encoding.Ascending}}

var mc memRowContainer
mc.init(
ordering,
oneIntCol,
evalCtx,
)
defer mc.Close(ctx)

for _, row := range rows {
if err := mc.AddRow(ctx, row); err != nil {
t.Fatal(err)
}
}

// NewIterator verifies that we read the expected rows from the
// memRowContainer and can recreate an iterator.
t.Run("NewIterator", func(t *testing.T) {
for k := 0; k < 2; k++ {
i := mc.NewIterator(ctx)
if err := verifyRows(ctx, i, rows, evalCtx, ordering); err != nil {
t.Fatalf("rows mismatch on the run number %d: %s", k+1, err)
}
i.Close()
}
})

// NewFinalIterator verifies that we read the expected rows from the
// memRowContainer and as we do so, these rows are deleted from the
// memRowContainer.
t.Run("NewFinalIterator", func(t *testing.T) {
i := mc.NewFinalIterator(ctx)
if err := verifyRows(ctx, i, rows, evalCtx, ordering); err != nil {
t.Fatal(err)
}
if mc.Len() != 0 {
t.Fatal("memRowContainer is not empty")
}
})
}
4 changes: 2 additions & 2 deletions pkg/sql/distsqlrun/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (s *sortAllProcessor) fill() (ok bool, _ error) {
s.diskContainer = &diskContainer

// Transfer the rows from memory to disk. This frees up the memory taken up by s.rows.
i := s.rows.NewIterator(ctx)
i := s.rows.NewFinalIterator(ctx)
for i.Rewind(); ; i.Next() {
if ok, err := i.Valid(); err != nil {
return false, err
Expand Down Expand Up @@ -312,7 +312,7 @@ func (s *sortAllProcessor) fillWithContainer(
}
r.Sort(ctx)

s.i = r.NewIterator(ctx)
s.i = r.NewFinalIterator(ctx)
s.i.Rewind()

return true, nil, nil
Expand Down
Loading

0 comments on commit f6f6ee8

Please sign in to comment.