Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: utilities for disk-based hash join #12116

Merged
merged 7 commits into from
Sep 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions util/chunk/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (c *Codec) encodeColumn(buffer []byte, col *Column) []byte {
// encode offsets.
if !col.isFixed() {
numOffsetBytes := (col.length + 1) * 8
offsetBytes := c.i64SliceToBytes(col.offsets)
offsetBytes := i64SliceToBytes(col.offsets)
buffer = append(buffer, offsetBytes[:numOffsetBytes]...)
}

Expand All @@ -75,7 +75,7 @@ func (c *Codec) encodeColumn(buffer []byte, col *Column) []byte {
return buffer
}

func (c *Codec) i64SliceToBytes(i64s []int64) (b []byte) {
func i64SliceToBytes(i64s []int64) (b []byte) {
if len(i64s) == 0 {
return nil
}
Expand Down Expand Up @@ -128,7 +128,7 @@ func (c *Codec) decodeColumn(buffer []byte, col *Column, ordinal int) (remained
numDataBytes := int64(numFixedBytes * col.length)
if numFixedBytes == -1 {
numOffsetBytes := (col.length + 1) * 8
col.offsets = append(col.offsets[:0], c.bytesToI64Slice(buffer[:numOffsetBytes])...)
col.offsets = append(col.offsets[:0], bytesToI64Slice(buffer[:numOffsetBytes])...)
buffer = buffer[numOffsetBytes:]
numDataBytes = col.offsets[col.length]
} else if cap(col.elemBuf) < numFixedBytes {
Expand All @@ -152,7 +152,7 @@ func (c *Codec) setAllNotNull(col *Column) {
}
}

func (c *Codec) bytesToI64Slice(b []byte) (i64s []int64) {
func bytesToI64Slice(b []byte) (i64s []int64) {
if len(b) == 0 {
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions util/chunk/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,12 +757,12 @@ func (s *testChunkSuite) TestGetRaw(c *check.C) {
col.AppendFloat32(float32(i))
}
it := NewIterator4Chunk(chk)
var i int64
var i int
for row := it.Begin(); row != it.End(); row = it.Next() {
f := float32(i)
b := (*[unsafe.Sizeof(f)]byte)(unsafe.Pointer(&f))[:]
c.Assert(row.GetRaw(0), check.DeepEquals, b)
c.Assert(col.GetRaw(int(i)), check.DeepEquals, b)
c.Assert(col.GetRaw(i), check.DeepEquals, b)
i++
}

Expand Down
312 changes: 312 additions & 0 deletions util/chunk/disk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package chunk

import (
"bufio"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"sync"

"github.com/pingcap/log"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/stringutil"
"go.uber.org/zap"
)

const (
writeBufSize = 128 * 1024
readBufSize = 4 * 1024
)

var bufWriterPool = sync.Pool{
New: func() interface{} { return bufio.NewWriterSize(nil, writeBufSize) },
}

var bufReaderPool = sync.Pool{
New: func() interface{} { return bufio.NewReaderSize(nil, readBufSize) },
}

var tmpDir = path.Join(os.TempDir(), "tidb-server-"+path.Base(os.Args[0]))

func init() {
_ = os.RemoveAll(tmpDir) // clean the uncleared temp file during the last run.
err := os.Mkdir(tmpDir, 0755)
if err != nil {
log.Warn("Mkdir temporary file error", zap.String("tmpDir", tmpDir), zap.Error(err))
}
}

// ListInDisk represents a slice of chunks storing in temporary disk.
type ListInDisk struct {
fieldTypes []*types.FieldType
// offsets stores the offsets in disk of all RowPtr,
// the offset of one RowPtr is offsets[RowPtr.ChkIdx][RowPtr.RowIdx].
offsets [][]int64
// offWrite is the current offset for writing.
offWrite int64

disk *os.File
bufWriter *bufio.Writer
diskTracker *disk.Tracker // track disk usage.
}

var defaultChunkListInDiskLabel fmt.Stringer = stringutil.StringerStr("chunk.ListInDisk")

// NewListInDisk creates a new ListInDisk with field types.
func NewListInDisk(fieldTypes []*types.FieldType) *ListInDisk {
l := &ListInDisk{
fieldTypes: fieldTypes,
// TODO(fengliyuan): set the quota of disk usage.
diskTracker: disk.NewTracker(defaultChunkListInDiskLabel, -1),
}
return l
}

// GetDiskTracker returns the memory tracker of this List.
func (l *ListInDisk) GetDiskTracker() *disk.Tracker {
return l.diskTracker
}

// Add adds a chunk to the ListInDisk. Caller must make sure the input chk
// is not empty and not used any more and has the same field types.
func (l *ListInDisk) Add(chk *Chunk) (err error) {
if chk.NumRows() == 0 {
return errors.New("chunk appended to List should have at least 1 row")
}
if l.disk == nil {
l.disk, err = ioutil.TempFile(tmpDir, l.diskTracker.Label().String())
if err != nil {
return
}
l.bufWriter = bufWriterPool.Get().(*bufio.Writer)
l.bufWriter.Reset(l.disk)
}
chk2 := chunkInDisk{Chunk: chk, offWrite: l.offWrite}
n, err := chk2.WriteTo(l.bufWriter)
l.offWrite += n
if err != nil {
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to directly use the return err form to improve code readability.

}
l.offsets = append(l.offsets, chk2.getOffsetsOfRows())
err = l.bufWriter.Flush()
if err == nil {
l.diskTracker.Consume(n)
}
return
}

// GetRow gets a Row from the ListInDisk by RowPtr.
func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) {
off := l.offsets[ptr.ChkIdx][ptr.RowIdx]
r := io.NewSectionReader(l.disk, off, l.offWrite-off)
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
bufReader := bufReaderPool.Get().(*bufio.Reader)
bufReader.Reset(r)
defer bufReaderPool.Put(bufReader)

format := rowInDisk{numCol: len(l.fieldTypes)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ format/ rowInDiskFormat ?

_, err = format.ReadFrom(bufReader)
if err != nil {
return row, err
}
row = format.toMutRow(l.fieldTypes).ToRow()
return row, err
}

// NumChunks returns the number of chunks in the ListInDisk.
func (l *ListInDisk) NumChunks() int {
return len(l.offsets)
}

// Close releases the disk resource.
func (l *ListInDisk) Close() error {
if l.disk != nil {
l.diskTracker.Consume(-l.diskTracker.BytesConsumed())
terror.Call(l.disk.Close)
bufWriterPool.Put(l.bufWriter)
return os.Remove(l.disk.Name())
}
return nil
}

// chunkInDisk represents a chunk in disk format. Each row of the chunk
// is serialized and in sequence ordered. The format of each row is like
// the struct diskFormatRow, put size of each column first, then the
// data of each column.
//
// For example, a chunk has 2 rows and 3 columns, the disk format of the
// chunk is as follow:
//
// [size of row0 column0], [size of row0 column1], [size of row0 column2]
// [data of row0 column0], [data of row0 column1], [data of row0 column2]
// [size of row1 column0], [size of row1 column1], [size of row1 column2]
// [data of row1 column0], [data of row1 column1], [data of row1 column2]
//
// If a column of a row is null, the size of it is -1 and the data is empty.
type chunkInDisk struct {
*Chunk
// offWrite is the current offset for writing.
offWrite int64
// offsetsOfRows stores the offset of each row.
offsetsOfRows []int64
}

// WriteTo serializes the chunk into the format of chunkInDisk, and
// writes to w.
func (chk *chunkInDisk) WriteTo(w io.Writer) (written int64, err error) {
var n int64
numRows := chk.NumRows()
chk.offsetsOfRows = make([]int64, 0, numRows)
var format *diskFormatRow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ format/ rowInDiskFormat ?

for rowIdx := 0; rowIdx < numRows; rowIdx++ {
format = convertFromRow(chk.GetRow(rowIdx), format)
chk.offsetsOfRows = append(chk.offsetsOfRows, chk.offWrite+written)

n, err = rowInDisk{diskFormatRow: *format}.WriteTo(w)
written += n
if err != nil {
return
}
}
return
}

// getOffsetsOfRows gets the offset of each row.
func (chk *chunkInDisk) getOffsetsOfRows() []int64 { return chk.offsetsOfRows }

// rowInDisk represents a Row in format of diskFormatRow.
type rowInDisk struct {
numCol int
diskFormatRow
}

// WriteTo serializes a row of the chunk into the format of
// diskFormatRow, and writes to w.
func (row rowInDisk) WriteTo(w io.Writer) (written int64, err error) {
n, err := w.Write(i64SliceToBytes(row.sizesOfColumns))
written += int64(n)
if err != nil {
return
}
for _, data := range row.cells {
n, err = w.Write(data)
written += int64(n)
if err != nil {
return
}
}
return
}

// ReadFrom reads data of r, deserializes it from the format of diskFormatRow
// into Row.
func (row *rowInDisk) ReadFrom(r io.Reader) (n int64, err error) {
b := make([]byte, 8*row.numCol)
var n1 int
n1, err = io.ReadFull(r, b)
n += int64(n1)
if err != nil {
return
}
row.sizesOfColumns = bytesToI64Slice(b)
row.cells = make([][]byte, 0, row.numCol)
for _, size := range row.sizesOfColumns {
if size == -1 {
continue
}
cell := make([]byte, size)
row.cells = append(row.cells, cell)
n1, err = io.ReadFull(r, cell)
n += int64(n1)
if err != nil {
return
}
}
return
}

// diskFormatRow represents a row in a chunk in disk format. The disk format
// of a row is described in the doc of chunkInDisk.
type diskFormatRow struct {
// sizesOfColumns stores the size of each column in a row.
// -1 means the value of this column is null.
sizesOfColumns []int64 // -1 means null
// cells represents raw data of not-null columns in one row.
// In convertFromRow, data from Row is shallow copied to cells.
// In toMutRow, data in cells is shallow copied to MutRow.
cells [][]byte
}

// convertFromRow serializes one row of chunk to diskFormatRow, then
// we can use diskFormatRow to write to disk.
func convertFromRow(row Row, reuse *diskFormatRow) (format *diskFormatRow) {
numCols := row.Chunk().NumCols()
if reuse != nil {
format = reuse
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ format/ rowInDiskFormat ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to keep the short-lived value short.

format.sizesOfColumns = format.sizesOfColumns[:0]
format.cells = format.cells[:0]
} else {
format = &diskFormatRow{
sizesOfColumns: make([]int64, 0, numCols),
cells: make([][]byte, 0, numCols),
}
}
for colIdx := 0; colIdx < numCols; colIdx++ {
if row.IsNull(colIdx) {
format.sizesOfColumns = append(format.sizesOfColumns, -1)
} else {
cell := row.GetRaw(colIdx)
format.sizesOfColumns = append(format.sizesOfColumns, int64(len(cell)))
format.cells = append(format.cells, cell)
}
}
return
}

// toMutRow deserializes diskFormatRow to MutRow.
func (format *diskFormatRow) toMutRow(fields []*types.FieldType) MutRow {
chk := &Chunk{columns: make([]*Column, 0, len(format.sizesOfColumns))}
var cellOff int
for colIdx, size := range format.sizesOfColumns {
col := &Column{length: 1}
elemSize := getFixedLen(fields[colIdx])
if size == -1 { // isNull
col.nullBitmap = []byte{0}
if elemSize == varElemLen {
col.offsets = []int64{0, 0}
} else {
buf := make([]byte, elemSize)
col.data = buf
col.elemBuf = buf
}
} else {
col.nullBitmap = []byte{1}
col.data = format.cells[cellOff]
cellOff++
if elemSize == varElemLen {
col.offsets = []int64{0, int64(len(col.data))}
} else {
col.elemBuf = col.data
}
}
chk.columns = append(chk.columns, col)
}
return MutRow{c: chk}
}
Loading