Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
40208: distsql: add disk spilling to lookup joiner r=solongordon a=solongordon

In lookup joins on partial index keys, there is no limit on how many
rows might be returned by any particular lookup, so the joinreader may
be buffering an unbounded number of rows into memory. I changed
joinreader to use a disk-backed row container rather than just storing
the rows in memory with no accounting.

Fixes #39044

Release note (bug fix): Lookup joins now spill to disk if the index
lookups return more rows than can be stored in memory.

40284: storage: issue swaps on AllocatorConsiderRebalance r=nvanbenschoten a=tbg

Change the rebalancing code so that it not only looks up a new replica to
add, but also picks one to remove. Both actions are then given to a
ChangeReplicas invocation which will carry it out atomically as long as
that feature is enabled.

Release note (bug fix): Replicas can now be moved between stores without
entering an intermediate configuration that violates the zone constraints.
Violations may still occur during zone config changes, decommissioning, and
in the presence of dead nodes (NB: the remainder be addressed in a future
change, so merge the corresponding release note)

40300: store: pull updateMVCCGauges out of StoreMetrics lock, use atomics r=nvanbenschoten a=nvanbenschoten

The operations it performs are already atomic, so we can use atomic add instructions to avoid any critical section.

This was responsible for 8.15% of mutex contention on a YCSB run.

The change also removes MVCCStats from the `storeMetrics` interface, which addresses a long-standing TODO.

40301: roachtest: Deflake clock jump test r=tbg a=bdarnell

These tests perform various clock jumps, then reverse them. The
reverse can cause a crash even if the original jump did not.
Add some sleeps to make things more deterministic and improve the
recovery process at the end of the test.

Fixes #38723

Release note: None

40305: exec: modify tests to catch bad selection vector access r=rafiss a=rafiss

The runTests helper will now cause a panic if a vectorized operator
tries to access a part of the selection vector that is out of bounds.
This identified bugs in the projection operator.

Release note: None

Co-authored-by: Solon Gordon <solon@cockroachlabs.com>
Co-authored-by: Tobias Schottdorf <tobias.schottdorf@gmail.com>
Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
Co-authored-by: Ben Darnell <ben@cockroachlabs.com>
Co-authored-by: Rafi Shamim <rafi@cockroachlabs.com>
  • Loading branch information
6 people committed Aug 28, 2019
6 parents 8e07f50 + dfcf20f + f94a83e + 0be2cc7 + ee12856 + fa2ae02 commit f0e9166
Show file tree
Hide file tree
Showing 17 changed files with 915 additions and 464 deletions.
11 changes: 10 additions & 1 deletion pkg/cmd/roachtest/clock_jump_crash.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,22 @@ func runClockJump(ctx context.Context, t *test, c *cluster, tc clockJumpTestCase
// clock offset is reset or the node will crash again.
var aliveAfterOffset bool
defer func() {
if !aliveAfterOffset {
offsetInjector.recover(ctx, c.spec.NodeCount)
// Resetting the clock is a jump in the opposite direction which
// can cause a crash even if the original jump didn't. Wait a few
// seconds before checking whether the node is alive and
// restarting it if not.
time.Sleep(3 * time.Second)
if !isAlive(db) {
c.Start(ctx, t, c.Node(1))
}
}()
defer offsetInjector.recover(ctx, c.spec.NodeCount)
offsetInjector.offset(ctx, c.spec.NodeCount, tc.offset)

// Wait a few seconds to let it crash if it's going to crash.
time.Sleep(3 * time.Second)

t.Status("validating health")
aliveAfterOffset = isAlive(db)
if aliveAfterOffset != tc.aliveAfterOffset {
Expand Down
2 changes: 0 additions & 2 deletions pkg/server/status/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/status/statuspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -83,7 +82,6 @@ var recordHistogramQuantiles = []quantile{
type storeMetrics interface {
StoreID() roachpb.StoreID
Descriptor(bool) (*roachpb.StoreDescriptor, error)
MVCCStats() enginepb.MVCCStats
Registry() *metric.Registry
}

Expand Down
6 changes: 0 additions & 6 deletions pkg/server/status/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/status/statuspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -81,7 +80,6 @@ var _ sort.Interface = byStoreDescID{}
// interact with stores.
type fakeStore struct {
storeID roachpb.StoreID
stats enginepb.MVCCStats
desc roachpb.StoreDescriptor
registry *metric.Registry
}
Expand All @@ -94,10 +92,6 @@ func (fs fakeStore) Descriptor(_ bool) (*roachpb.StoreDescriptor, error) {
return &fs.desc, nil
}

func (fs fakeStore) MVCCStats() enginepb.MVCCStats {
return fs.stats
}

func (fs fakeStore) Registry() *metric.Registry {
return fs.registry
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,11 @@ func (dsp *DistSQLPlanner) setupFlows(
if _, err := distsqlrun.SupportsVectorized(
ctx, &distsqlrun.FlowCtx{
EvalCtx: &evalCtx.EvalContext,
Cfg: &distsqlrun.ServerConfig{},
NodeID: -1,
Cfg: &distsqlrun.ServerConfig{
DiskMonitor: &mon.BytesMonitor{},
Settings: dsp.st,
},
NodeID: -1,
}, spec.Processors,
); err != nil {
// Vectorization attempt failed with an error.
Expand Down
150 changes: 119 additions & 31 deletions pkg/sql/distsqlrun/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/rowcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/scrub"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/opentracing/opentracing-go"
Expand All @@ -32,6 +35,11 @@ import (
// nodes that "own" the respective ranges, and send out flows on those nodes.
const joinReaderBatchSize = 100

// partialJoinSentinel is used as the inputRowIdxToLookedUpRowIdx value for
// semi- and anti-joins, where we only need to know about the existence of a
// match.
var partialJoinSentinel = []int{-1}

// joinReaderState represents the state of the processor.
type joinReaderState int

Expand All @@ -57,6 +65,8 @@ type joinReader struct {
// ProcessorBase.State == StateRunning.
runningState joinReaderState

diskMonitor *mon.BytesMonitor

desc sqlbase.TableDescriptor
index *sqlbase.IndexDescriptor
colIdxMap map[sqlbase.ColumnID]int
Expand Down Expand Up @@ -84,23 +94,25 @@ type joinReader struct {

// State variables for each batch of input rows.
inputRows sqlbase.EncDatumRows
lookedUpRows rowcontainer.IndexedRowContainer
keyToInputRowIndices map[string][]int
// inputRowIdxToLookedUpRows is a slice of looked up rows, one per row in
// inputRows. It's populated in the jrPerformingLookup state. For non partial
// joins (everything but semi/anti join), the looked up rows are the rows that
// came back from the lookup span for each input row, without checking for
// matches with respect to the on-condition. For semi/anti join, we store at
// most one nil, indicating a matching lookup if it's present, since the right
// side of a semi/anti join is not used.
inputRowIdxToLookedUpRows []sqlbase.EncDatumRows
// inputRowIdxToLookedUpRowIdx is a multimap from input row indices to
// corresponding looked up row indices. It's populated in the
// jrPerformingLookup state. For non partial joins (everything but semi/anti
// join), the looked up rows are the rows that came back from the lookup
// span for each input row, without checking for matches with respect to the
// on-condition. For semi/anti join, we store at most one sentinel value,
// indicating a matching lookup if it's present, since the right side of a
// semi/anti join is not used.
inputRowIdxToLookedUpRowIdx [][]int
// emitCursor contains information about where the next row to emit is within
// inputRowIdxToOutputRows.
// inputRowIdxToLookedUpRowIdx.
emitCursor struct {
// inputRowIdx contains the index into inputRowIdxToOutputRows that we're
// about to emit.
// inputRowIdx contains the index into inputRowIdxToLookedUpRowIdx that
// we're about to emit.
inputRowIdx int
// outputRowIdx contains the index into the inputRowIdx'th row of
// inputRowIdxToOutputRows that we're about to emit.
// inputRowIdxToLookedUpRowIdx that we're about to emit.
outputRowIdx int
// seenMatch is true if there was a match at the current inputRowIdx. A
// match means that there's no need to output an outer or anti join row.
Expand Down Expand Up @@ -172,7 +184,7 @@ func newJoinReader(
ProcStateOpts{
InputsToDrain: []RowSource{jr.input},
TrailingMetaCallback: func(ctx context.Context) []distsqlpb.ProducerMetadata {
jr.InternalClose()
jr.close()
return jr.generateMeta(ctx)
},
},
Expand Down Expand Up @@ -214,6 +226,48 @@ func newJoinReader(
jr.neededRightCols(),
)

// Initialize memory monitors and row container for looked up rows.
st := flowCtx.Cfg.Settings
ctx := flowCtx.EvalCtx.Ctx()
if settingUseTempStorageJoins.Get(&st.SV) {
// Limit the memory use by creating a child monitor with a hard limit.
// joinReader will overflow to disk if this limit is not enough.
limit := flowCtx.Cfg.TestingKnobs.MemoryLimitBytes
if limit <= 0 {
limit = settingWorkMemBytes.Get(&st.SV)
}
limitedMon := mon.MakeMonitorInheritWithLimit("joinreader-limited", limit, flowCtx.EvalCtx.Mon)
limitedMon.Start(ctx, flowCtx.EvalCtx.Mon, mon.BoundAccount{})
jr.MemMonitor = &limitedMon
jr.diskMonitor = NewMonitor(ctx, flowCtx.Cfg.DiskMonitor, "joinreader-disk")
drc := rowcontainer.MakeDiskBackedIndexedRowContainer(
nil, /* ordering */
jr.desc.ColumnTypesWithMutations(returnMutations),
jr.evalCtx,
jr.flowCtx.Cfg.TempStorage,
jr.MemMonitor,
jr.diskMonitor,
0, /* rowCapacity */
)
if limit < mon.DefaultPoolAllocationSize {
// The memory limit is too low for caching, most likely to force disk
// spilling for testing.
drc.DisableCache = true
}
jr.lookedUpRows = drc
} else {
jr.MemMonitor = NewMonitor(ctx, flowCtx.EvalCtx.Mon, "joinreader-mem")
rc := rowcontainer.MemRowContainer{}
rc.InitWithMon(
nil, /* ordering */
jr.desc.ColumnTypesWithMutations(returnMutations),
jr.evalCtx,
jr.MemMonitor,
0, /* rowCapacity */
)
jr.lookedUpRows = &rc
}

// TODO(radu): verify the input types match the index key types
return jr, nil
}
Expand Down Expand Up @@ -358,13 +412,13 @@ func (jr *joinReader) readInput() (joinReaderState, *distsqlpb.ProducerMetadata)
// will allow us to preserve the order of the input in the face of multiple
// input rows having the same lookup keyspan, or if we're doing an outer join
// and we need to emit unmatched rows.
if cap(jr.inputRowIdxToLookedUpRows) >= len(jr.inputRows) {
jr.inputRowIdxToLookedUpRows = jr.inputRowIdxToLookedUpRows[:len(jr.inputRows)]
for i := range jr.inputRowIdxToLookedUpRows {
jr.inputRowIdxToLookedUpRows[i] = jr.inputRowIdxToLookedUpRows[i][:0]
if cap(jr.inputRowIdxToLookedUpRowIdx) >= len(jr.inputRows) {
jr.inputRowIdxToLookedUpRowIdx = jr.inputRowIdxToLookedUpRowIdx[:len(jr.inputRows)]
for i := range jr.inputRowIdxToLookedUpRowIdx {
jr.inputRowIdxToLookedUpRowIdx[i] = jr.inputRowIdxToLookedUpRowIdx[i][:0]
}
} else {
jr.inputRowIdxToLookedUpRows = make([]sqlbase.EncDatumRows, len(jr.inputRows))
jr.inputRowIdxToLookedUpRowIdx = make([][]int, len(jr.inputRows))
}

// Start the index lookup. We maintain a map from index key to the
Expand Down Expand Up @@ -407,13 +461,14 @@ func (jr *joinReader) readInput() (joinReaderState, *distsqlpb.ProducerMetadata)
}

// performLookup reads the next batch of index rows, joins them to the
// corresponding input rows, and adds the results to jr.inputRowIdxToOutputRows.
// corresponding input rows, and adds the results to
// jr.inputRowIdxToLookedUpRowIdx.
func (jr *joinReader) performLookup() (joinReaderState, *distsqlpb.ProducerMetadata) {
nCols := len(jr.lookupCols)

isJoinTypePartialJoin := jr.joinType == sqlbase.LeftSemiJoin || jr.joinType == sqlbase.LeftAntiJoin
// Read the entire set of rows looked up for the last input batch.
for {
for lookedUpRowIdx := 0; ; lookedUpRowIdx++ {
// Construct a "partial key" of nCols, so we can match the key format that
// was stored in our keyToInputRowIndices map. This matches the format that
// is output in jr.generateSpan.
Expand All @@ -423,6 +478,7 @@ func (jr *joinReader) performLookup() (joinReaderState, *distsqlpb.ProducerMetad
return jrStateUnknown, jr.DrainHelper()
}

// Fetch the next row and copy it into the row container.
lookedUpRow, meta := jr.fetcher.Next()
if meta != nil {
jr.MoveToDraining(scrub.UnwrapScrubError(meta.Err))
Expand All @@ -432,14 +488,27 @@ func (jr *joinReader) performLookup() (joinReaderState, *distsqlpb.ProducerMetad
// Done with this input batch.
break
}
if !isJoinTypePartialJoin {
// Replace missing values with nulls to appease the row container.
for i := range lookedUpRow {
if lookedUpRow[i].IsUnset() {
lookedUpRow[i].Datum = tree.DNull
}
}
if err := jr.lookedUpRows.AddRow(jr.Ctx, lookedUpRow); err != nil {
jr.MoveToDraining(err)
return jrStateUnknown, jr.DrainHelper()
}
}

// Update our map from input rows to looked up rows.
for _, inputRowIdx := range jr.keyToInputRowIndices[string(key)] {
if isJoinTypePartialJoin {
// During a SemiJoin or AntiJoin, we only output if we've seen no match
// for this input row yet. Additionally, since we don't have to render
// anything to output a Semi or Anti join match, we can evaluate our
// on condition now and only buffer if we pass it.
if len(jr.inputRowIdxToLookedUpRows[inputRowIdx]) == 0 {
if len(jr.inputRowIdxToLookedUpRowIdx[inputRowIdx]) == 0 {
renderedRow, err := jr.render(jr.inputRows[inputRowIdx], lookedUpRow)
if err != nil {
jr.MoveToDraining(err)
Expand All @@ -449,13 +518,11 @@ func (jr *joinReader) performLookup() (joinReaderState, *distsqlpb.ProducerMetad
// We failed our on-condition - don't buffer anything.
continue
}
jr.inputRowIdxToLookedUpRows[inputRowIdx] = append(
jr.inputRowIdxToLookedUpRows[inputRowIdx], nil)
jr.inputRowIdxToLookedUpRowIdx[inputRowIdx] = partialJoinSentinel
}
} else {
jr.inputRowIdxToLookedUpRows[inputRowIdx] = append(
jr.inputRowIdxToLookedUpRows[inputRowIdx],
jr.out.rowAlloc.CopyRow(lookedUpRow))
jr.inputRowIdxToLookedUpRowIdx[inputRowIdx] = append(
jr.inputRowIdxToLookedUpRowIdx[inputRowIdx], lookedUpRowIdx)
}
}
}
Expand All @@ -471,17 +538,21 @@ func (jr *joinReader) emitRow() (
*distsqlpb.ProducerMetadata,
) {
// Loop until we find a valid row to emit, or the cursor runs off the end.
if jr.emitCursor.inputRowIdx >= len(jr.inputRowIdxToLookedUpRows) {
if jr.emitCursor.inputRowIdx >= len(jr.inputRowIdxToLookedUpRowIdx) {
// Ready for another input batch. Reset state.
jr.inputRows = jr.inputRows[:0]
jr.keyToInputRowIndices = make(map[string][]int)
jr.emitCursor.outputRowIdx = 0
jr.emitCursor.inputRowIdx = 0
jr.emitCursor.seenMatch = false
if err := jr.lookedUpRows.UnsafeReset(jr.Ctx); err != nil {
jr.MoveToDraining(err)
return jrStateUnknown, nil, jr.DrainHelper()
}
return jrReadingInput, nil, nil
}
inputRow := jr.inputRows[jr.emitCursor.inputRowIdx]
lookedUpRows := jr.inputRowIdxToLookedUpRows[jr.emitCursor.inputRowIdx]
lookedUpRows := jr.inputRowIdxToLookedUpRowIdx[jr.emitCursor.inputRowIdx]
if jr.emitCursor.outputRowIdx >= len(lookedUpRows) {
// We have no more rows for the current input row. Emit an outer or anti
// row if we didn't see a match, and bump to the next input row.
Expand All @@ -505,7 +576,7 @@ func (jr *joinReader) emitRow() (
return jrEmittingRows, nil, nil
}

lookedUpRow := lookedUpRows[jr.emitCursor.outputRowIdx]
lookedUpRowIdx := lookedUpRows[jr.emitCursor.outputRowIdx]
jr.emitCursor.outputRowIdx++
switch jr.joinType {
case sqlbase.LeftSemiJoin:
Expand All @@ -518,7 +589,12 @@ func (jr *joinReader) emitRow() (
return jrEmittingRows, nil, nil
}

outputRow, err := jr.render(inputRow, lookedUpRow)
lookedUpRow, err := jr.lookedUpRows.GetRow(jr.Ctx, lookedUpRowIdx)
if err != nil {
jr.MoveToDraining(err)
return jrStateUnknown, nil, jr.DrainHelper()
}
outputRow, err := jr.render(inputRow, lookedUpRow.(rowcontainer.IndexedRow).Row)
if err != nil {
jr.MoveToDraining(err)
return jrStateUnknown, nil, jr.DrainHelper()
Expand Down Expand Up @@ -550,7 +626,19 @@ func (jr *joinReader) Start(ctx context.Context) context.Context {
// ConsumerClosed is part of the RowSource interface.
func (jr *joinReader) ConsumerClosed() {
// The consumer is done, Next() will not be called again.
jr.InternalClose()
jr.close()
}

func (jr *joinReader) close() {
if jr.InternalClose() {
if jr.lookedUpRows != nil {
jr.lookedUpRows.Close(jr.Ctx)
}
jr.MemMonitor.Stop(jr.Ctx)
if jr.diskMonitor != nil {
jr.diskMonitor.Stop(jr.Ctx)
}
}
}

var _ distsqlpb.DistSQLSpanStats = &JoinReaderStats{}
Expand Down
Loading

0 comments on commit f0e9166

Please sign in to comment.