-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
executor: utilities for disk-based hash join #12116
Changes from all commits
ce67343
6de4583
dcb84c9
d329a79
95865a6
7d42f05
9373cdd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,312 @@ | ||
// Copyright 2019 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package chunk | ||
|
||
import ( | ||
"bufio" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"io/ioutil" | ||
"os" | ||
"path" | ||
"sync" | ||
|
||
"github.com/pingcap/log" | ||
"github.com/pingcap/parser/terror" | ||
"github.com/pingcap/tidb/types" | ||
"github.com/pingcap/tidb/util/disk" | ||
"github.com/pingcap/tidb/util/stringutil" | ||
"go.uber.org/zap" | ||
) | ||
|
||
const ( | ||
writeBufSize = 128 * 1024 | ||
readBufSize = 4 * 1024 | ||
) | ||
|
||
var bufWriterPool = sync.Pool{ | ||
New: func() interface{} { return bufio.NewWriterSize(nil, writeBufSize) }, | ||
} | ||
|
||
var bufReaderPool = sync.Pool{ | ||
New: func() interface{} { return bufio.NewReaderSize(nil, readBufSize) }, | ||
} | ||
|
||
var tmpDir = path.Join(os.TempDir(), "tidb-server-"+path.Base(os.Args[0])) | ||
|
||
func init() { | ||
_ = os.RemoveAll(tmpDir) // clean the uncleared temp file during the last run. | ||
err := os.Mkdir(tmpDir, 0755) | ||
if err != nil { | ||
log.Warn("Mkdir temporary file error", zap.String("tmpDir", tmpDir), zap.Error(err)) | ||
} | ||
} | ||
|
||
// ListInDisk represents a slice of chunks storing in temporary disk. | ||
type ListInDisk struct { | ||
fieldTypes []*types.FieldType | ||
// offsets stores the offsets in disk of all RowPtr, | ||
// the offset of one RowPtr is offsets[RowPtr.ChkIdx][RowPtr.RowIdx]. | ||
offsets [][]int64 | ||
// offWrite is the current offset for writing. | ||
offWrite int64 | ||
|
||
disk *os.File | ||
bufWriter *bufio.Writer | ||
diskTracker *disk.Tracker // track disk usage. | ||
} | ||
|
||
var defaultChunkListInDiskLabel fmt.Stringer = stringutil.StringerStr("chunk.ListInDisk") | ||
|
||
// NewListInDisk creates a new ListInDisk with field types. | ||
func NewListInDisk(fieldTypes []*types.FieldType) *ListInDisk { | ||
l := &ListInDisk{ | ||
fieldTypes: fieldTypes, | ||
// TODO(fengliyuan): set the quota of disk usage. | ||
diskTracker: disk.NewTracker(defaultChunkListInDiskLabel, -1), | ||
} | ||
return l | ||
} | ||
|
||
// GetDiskTracker returns the memory tracker of this List. | ||
func (l *ListInDisk) GetDiskTracker() *disk.Tracker { | ||
return l.diskTracker | ||
} | ||
|
||
// Add adds a chunk to the ListInDisk. Caller must make sure the input chk | ||
// is not empty and not used any more and has the same field types. | ||
func (l *ListInDisk) Add(chk *Chunk) (err error) { | ||
if chk.NumRows() == 0 { | ||
return errors.New("chunk appended to List should have at least 1 row") | ||
} | ||
if l.disk == nil { | ||
l.disk, err = ioutil.TempFile(tmpDir, l.diskTracker.Label().String()) | ||
if err != nil { | ||
return | ||
} | ||
l.bufWriter = bufWriterPool.Get().(*bufio.Writer) | ||
l.bufWriter.Reset(l.disk) | ||
} | ||
chk2 := chunkInDisk{Chunk: chk, offWrite: l.offWrite} | ||
n, err := chk2.WriteTo(l.bufWriter) | ||
l.offWrite += n | ||
if err != nil { | ||
return | ||
} | ||
l.offsets = append(l.offsets, chk2.getOffsetsOfRows()) | ||
err = l.bufWriter.Flush() | ||
if err == nil { | ||
l.diskTracker.Consume(n) | ||
} | ||
return | ||
} | ||
|
||
// GetRow gets a Row from the ListInDisk by RowPtr. | ||
func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) { | ||
off := l.offsets[ptr.ChkIdx][ptr.RowIdx] | ||
r := io.NewSectionReader(l.disk, off, l.offWrite-off) | ||
qw4990 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
bufReader := bufReaderPool.Get().(*bufio.Reader) | ||
bufReader.Reset(r) | ||
defer bufReaderPool.Put(bufReader) | ||
|
||
format := rowInDisk{numCol: len(l.fieldTypes)} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/ format/ rowInDiskFormat ? |
||
_, err = format.ReadFrom(bufReader) | ||
if err != nil { | ||
return row, err | ||
} | ||
row = format.toMutRow(l.fieldTypes).ToRow() | ||
return row, err | ||
} | ||
|
||
// NumChunks returns the number of chunks in the ListInDisk. | ||
func (l *ListInDisk) NumChunks() int { | ||
return len(l.offsets) | ||
} | ||
|
||
// Close releases the disk resource. | ||
func (l *ListInDisk) Close() error { | ||
if l.disk != nil { | ||
l.diskTracker.Consume(-l.diskTracker.BytesConsumed()) | ||
terror.Call(l.disk.Close) | ||
bufWriterPool.Put(l.bufWriter) | ||
return os.Remove(l.disk.Name()) | ||
} | ||
return nil | ||
} | ||
|
||
// chunkInDisk represents a chunk in disk format. Each row of the chunk | ||
// is serialized and in sequence ordered. The format of each row is like | ||
// the struct diskFormatRow, put size of each column first, then the | ||
// data of each column. | ||
// | ||
// For example, a chunk has 2 rows and 3 columns, the disk format of the | ||
// chunk is as follow: | ||
// | ||
// [size of row0 column0], [size of row0 column1], [size of row0 column2] | ||
// [data of row0 column0], [data of row0 column1], [data of row0 column2] | ||
// [size of row1 column0], [size of row1 column1], [size of row1 column2] | ||
// [data of row1 column0], [data of row1 column1], [data of row1 column2] | ||
// | ||
// If a column of a row is null, the size of it is -1 and the data is empty. | ||
type chunkInDisk struct { | ||
*Chunk | ||
// offWrite is the current offset for writing. | ||
offWrite int64 | ||
// offsetsOfRows stores the offset of each row. | ||
offsetsOfRows []int64 | ||
} | ||
|
||
// WriteTo serializes the chunk into the format of chunkInDisk, and | ||
// writes to w. | ||
func (chk *chunkInDisk) WriteTo(w io.Writer) (written int64, err error) { | ||
var n int64 | ||
numRows := chk.NumRows() | ||
chk.offsetsOfRows = make([]int64, 0, numRows) | ||
var format *diskFormatRow | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/ format/ rowInDiskFormat ? |
||
for rowIdx := 0; rowIdx < numRows; rowIdx++ { | ||
format = convertFromRow(chk.GetRow(rowIdx), format) | ||
chk.offsetsOfRows = append(chk.offsetsOfRows, chk.offWrite+written) | ||
|
||
n, err = rowInDisk{diskFormatRow: *format}.WriteTo(w) | ||
written += n | ||
if err != nil { | ||
return | ||
} | ||
} | ||
return | ||
} | ||
|
||
// getOffsetsOfRows gets the offset of each row. | ||
func (chk *chunkInDisk) getOffsetsOfRows() []int64 { return chk.offsetsOfRows } | ||
|
||
// rowInDisk represents a Row in format of diskFormatRow. | ||
type rowInDisk struct { | ||
numCol int | ||
diskFormatRow | ||
} | ||
|
||
// WriteTo serializes a row of the chunk into the format of | ||
// diskFormatRow, and writes to w. | ||
func (row rowInDisk) WriteTo(w io.Writer) (written int64, err error) { | ||
n, err := w.Write(i64SliceToBytes(row.sizesOfColumns)) | ||
written += int64(n) | ||
if err != nil { | ||
return | ||
} | ||
for _, data := range row.cells { | ||
n, err = w.Write(data) | ||
written += int64(n) | ||
if err != nil { | ||
return | ||
} | ||
} | ||
return | ||
} | ||
|
||
// ReadFrom reads data of r, deserializes it from the format of diskFormatRow | ||
// into Row. | ||
func (row *rowInDisk) ReadFrom(r io.Reader) (n int64, err error) { | ||
b := make([]byte, 8*row.numCol) | ||
var n1 int | ||
n1, err = io.ReadFull(r, b) | ||
n += int64(n1) | ||
if err != nil { | ||
return | ||
} | ||
row.sizesOfColumns = bytesToI64Slice(b) | ||
row.cells = make([][]byte, 0, row.numCol) | ||
for _, size := range row.sizesOfColumns { | ||
if size == -1 { | ||
continue | ||
} | ||
cell := make([]byte, size) | ||
row.cells = append(row.cells, cell) | ||
n1, err = io.ReadFull(r, cell) | ||
n += int64(n1) | ||
if err != nil { | ||
return | ||
} | ||
} | ||
return | ||
} | ||
|
||
// diskFormatRow represents a row in a chunk in disk format. The disk format | ||
// of a row is described in the doc of chunkInDisk. | ||
type diskFormatRow struct { | ||
// sizesOfColumns stores the size of each column in a row. | ||
// -1 means the value of this column is null. | ||
sizesOfColumns []int64 // -1 means null | ||
// cells represents raw data of not-null columns in one row. | ||
// In convertFromRow, data from Row is shallow copied to cells. | ||
// In toMutRow, data in cells is shallow copied to MutRow. | ||
cells [][]byte | ||
} | ||
|
||
// convertFromRow serializes one row of chunk to diskFormatRow, then | ||
// we can use diskFormatRow to write to disk. | ||
func convertFromRow(row Row, reuse *diskFormatRow) (format *diskFormatRow) { | ||
numCols := row.Chunk().NumCols() | ||
if reuse != nil { | ||
format = reuse | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/ format/ rowInDiskFormat ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's better to keep the short-lived value short. |
||
format.sizesOfColumns = format.sizesOfColumns[:0] | ||
format.cells = format.cells[:0] | ||
} else { | ||
format = &diskFormatRow{ | ||
sizesOfColumns: make([]int64, 0, numCols), | ||
cells: make([][]byte, 0, numCols), | ||
} | ||
} | ||
for colIdx := 0; colIdx < numCols; colIdx++ { | ||
if row.IsNull(colIdx) { | ||
format.sizesOfColumns = append(format.sizesOfColumns, -1) | ||
} else { | ||
cell := row.GetRaw(colIdx) | ||
format.sizesOfColumns = append(format.sizesOfColumns, int64(len(cell))) | ||
format.cells = append(format.cells, cell) | ||
} | ||
} | ||
return | ||
} | ||
|
||
// toMutRow deserializes diskFormatRow to MutRow. | ||
func (format *diskFormatRow) toMutRow(fields []*types.FieldType) MutRow { | ||
chk := &Chunk{columns: make([]*Column, 0, len(format.sizesOfColumns))} | ||
var cellOff int | ||
for colIdx, size := range format.sizesOfColumns { | ||
col := &Column{length: 1} | ||
elemSize := getFixedLen(fields[colIdx]) | ||
if size == -1 { // isNull | ||
col.nullBitmap = []byte{0} | ||
if elemSize == varElemLen { | ||
col.offsets = []int64{0, 0} | ||
} else { | ||
buf := make([]byte, elemSize) | ||
col.data = buf | ||
col.elemBuf = buf | ||
} | ||
} else { | ||
col.nullBitmap = []byte{1} | ||
col.data = format.cells[cellOff] | ||
cellOff++ | ||
if elemSize == varElemLen { | ||
col.offsets = []int64{0, int64(len(col.data))} | ||
} else { | ||
col.elemBuf = col.data | ||
} | ||
} | ||
chk.columns = append(chk.columns, col) | ||
} | ||
return MutRow{c: chk} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to directly use the
return err
form to improve code readability.