Skip to content

Commit

Permalink
distsql: add missing MoveToDraining calls
Browse files Browse the repository at this point in the history
I found several places where processors were not checking for error
metadata from their inputs and draining appropriately. The most
egregious of these is in TableReader, which was not calling
MoveToDraining when it encountered an error from the underlying
RowFetcher. This meant that rather than draining it would continue
calling NextRow and generating errors in a tight loop, which caused the
query to hang as well as high CPU and memory usage on the affected node.

Other affected processors are Distinct, SortChunks, and SortTopK.

Fixes cockroachdb#29374
Fixes cockroachdb#29978

Release note: None
  • Loading branch information
solongordon committed Sep 12, 2018
1 parent 35100f0 commit 2145501
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pkg/sql/distsqlrun/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ func (d *Distinct) Next() (sqlbase.EncDatumRow, *ProducerMetadata) {
for d.State == StateRunning {
row, meta := d.input.Next()
if meta != nil {
if meta.Err != nil {
d.MoveToDraining(nil /* err */)
}
return nil, meta
}
if row == nil {
Expand Down Expand Up @@ -282,6 +285,9 @@ func (d *SortedDistinct) Next() (sqlbase.EncDatumRow, *ProducerMetadata) {
for d.State == StateRunning {
row, meta := d.input.Next()
if meta != nil {
if meta.Err != nil {
d.MoveToDraining(nil /* err */)
}
return nil, meta
}
if row == nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/distsqlrun/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,10 @@ func (s *sortTopKProcessor) Start(ctx context.Context) context.Context {
row, meta := s.input.Next()
if meta != nil {
s.trailingMeta = append(s.trailingMeta, *meta)
if meta.Err != nil {
s.MoveToDraining(nil /* err */)
break
}
continue
}
if row == nil {
Expand Down Expand Up @@ -549,6 +553,9 @@ func (s *sortChunksProcessor) fill() (bool, error) {

if meta != nil {
s.trailingMeta = append(s.trailingMeta, *meta)
if meta.Err != nil {
return false, nil
}
continue
}
if nextChunkRow == nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/distsqlrun/tablereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ func (tr *tableReader) Next() (sqlbase.EncDatumRow, *ProducerMetadata) {
row, meta := tr.input.Next()

if meta != nil {
if meta.Err != nil {
tr.MoveToDraining(nil /* err */)
}
return nil, meta
}
if row == nil {
Expand Down

0 comments on commit 2145501

Please sign in to comment.