From ce67343dac71cffe7e9e0e33bdc802ceef28c95d Mon Sep 17 00:00:00 2001 From: Feng Liyuan Date: Tue, 10 Sep 2019 13:58:11 +0800 Subject: [PATCH 1/7] executor: utilities for disk-based hash join --- util/chunk/codec.go | 8 +- util/chunk/column.go | 12 ++ util/chunk/column_test.go | 39 ++++- util/chunk/disk.go | 286 ++++++++++++++++++++++++++++++++++++ util/chunk/disk_test.go | 73 +++++++++ util/disk/tracker.go | 26 ++++ util/memory/tracker.go | 24 +-- util/memory/tracker_test.go | 4 + 8 files changed, 457 insertions(+), 15 deletions(-) create mode 100644 util/chunk/disk.go create mode 100644 util/chunk/disk_test.go create mode 100644 util/disk/tracker.go diff --git a/util/chunk/codec.go b/util/chunk/codec.go index a878ba1949c25..d666c44370aa9 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -66,7 +66,7 @@ func (c *Codec) encodeColumn(buffer []byte, col *Column) []byte { // encode offsets. if !col.isFixed() { numOffsetBytes := (col.length + 1) * 8 - offsetBytes := c.i64SliceToBytes(col.offsets) + offsetBytes := i64SliceToBytes(col.offsets) buffer = append(buffer, offsetBytes[:numOffsetBytes]...) } @@ -75,7 +75,7 @@ func (c *Codec) encodeColumn(buffer []byte, col *Column) []byte { return buffer } -func (c *Codec) i64SliceToBytes(i64s []int64) (b []byte) { +func i64SliceToBytes(i64s []int64) (b []byte) { if len(i64s) == 0 { return nil } @@ -128,7 +128,7 @@ func (c *Codec) decodeColumn(buffer []byte, col *Column, ordinal int) (remained numDataBytes := int64(numFixedBytes * col.length) if numFixedBytes == -1 { numOffsetBytes := (col.length + 1) * 8 - col.offsets = append(col.offsets[:0], c.bytesToI64Slice(buffer[:numOffsetBytes])...) + col.offsets = append(col.offsets[:0], bytesToI64Slice(buffer[:numOffsetBytes])...) buffer = buffer[numOffsetBytes:] numDataBytes = col.offsets[col.length] } else if cap(col.elemBuf) < numFixedBytes { @@ -152,7 +152,7 @@ func (c *Codec) setAllNotNull(col *Column) { } } -func (c *Codec) bytesToI64Slice(b []byte) (i64s []int64) { +func bytesToI64Slice(b []byte) (i64s []int64) { if len(b) == 0 { return nil } diff --git a/util/chunk/column.go b/util/chunk/column.go index a1bb3e7bfabd7..7f80dc6774f7f 100644 --- a/util/chunk/column.go +++ b/util/chunk/column.go @@ -174,6 +174,18 @@ func (c *Column) AppendNull() { c.length++ } +// AppendRaw appends the raw bytes into this Column. +func (c *Column) AppendRaw(data []byte) { + c.appendNullBitmap(true) + if c.isFixed() { + c.data = append(c.data, data...) + } else { + c.data = append(c.data, data...) + c.offsets = append(c.offsets, int64(len(c.data))) + } + c.length++ +} + func (c *Column) finishAppendFixed() { c.data = append(c.data, c.elemBuf...) c.appendNullBitmap(true) diff --git a/util/chunk/column_test.go b/util/chunk/column_test.go index 867b5fe4e26a7..3c579aa4acdf7 100644 --- a/util/chunk/column_test.go +++ b/util/chunk/column_test.go @@ -16,6 +16,7 @@ package chunk import ( "fmt" "math/rand" + "strings" "testing" "time" "unsafe" @@ -757,12 +758,12 @@ func (s *testChunkSuite) TestGetRaw(c *check.C) { col.AppendFloat32(float32(i)) } it := NewIterator4Chunk(chk) - var i int64 + var i int for row := it.Begin(); row != it.End(); row = it.Next() { f := float32(i) b := (*[unsafe.Sizeof(f)]byte)(unsafe.Pointer(&f))[:] c.Assert(row.GetRaw(0), check.DeepEquals, b) - c.Assert(col.GetRaw(int(i)), check.DeepEquals, b) + c.Assert(col.GetRaw(i), check.DeepEquals, b) i++ } @@ -780,6 +781,40 @@ func (s *testChunkSuite) TestGetRaw(c *check.C) { } } +func (s *testChunkSuite) TestAppendRaw(c *check.C) { + chk := NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeFloat)}, 1024) + col := chk.Column(0) + for i := 0; i < 1024; i++ { + f := float32(i) + raw := (*[unsafe.Sizeof(f)]byte)(unsafe.Pointer(&f))[:] + col.AppendRaw(raw) + } + it := NewIterator4Chunk(chk) + var i int + for row := it.Begin(); row != it.End(); row = it.Next() { + f := float32(i) + c.Assert(row.GetFloat32(0), check.Equals, f) + c.Assert(col.GetFloat32(i), check.Equals, f) + i++ + } + + chk = NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeVarString)}, 1024) + col = chk.Column(0) + for i := 0; i < 1024; i++ { + str := strings.Repeat(fmt.Sprint(i), i) + raw := []byte(str) + col.AppendRaw(raw) + } + it = NewIterator4Chunk(chk) + i = 0 + for row := it.Begin(); row != it.End(); row = it.Next() { + str := strings.Repeat(fmt.Sprint(i), i) + c.Assert(row.GetString(0), check.Equals, str) + c.Assert(col.GetString(i), check.Equals, str) + i++ + } +} + func BenchmarkDurationRow(b *testing.B) { chk1 := NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeDuration)}, 1024) col1 := chk1.Column(0) diff --git a/util/chunk/disk.go b/util/chunk/disk.go new file mode 100644 index 0000000000000..e2700bc7f3b88 --- /dev/null +++ b/util/chunk/disk.go @@ -0,0 +1,286 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunk + +import ( + "bufio" + "fmt" + "io" + "io/ioutil" + "os" + "path" + "sync" + + "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/disk" + "github.com/pingcap/tidb/util/stringutil" +) + +const ( + writeBufSize = 128 * 1024 + readBufSize = 4 * 1024 +) + +var bufWriterPool = sync.Pool{ + New: func() interface{} { + return bufio.NewWriterSize(nil, writeBufSize) + }, +} + +var bufReaderPool = sync.Pool{New: func() interface{} { + return bufio.NewReaderSize(nil, readBufSize) +}} + +var tmpDir = path.Join(os.TempDir(), "tidb-server-hashJoin") + +func init() { + _ = os.RemoveAll(tmpDir) + _ = os.Mkdir(tmpDir, 0755) +} + +// ListInDisk represents a slice of chunks storing in temporary disk. +type ListInDisk struct { + fieldTypes []*types.FieldType + // offsets stores the offsets in disk of all RowPtr, + // the offset of one RowPtr is offsets[RowPtr.ChkIdx][RowPtr.RowIdx]. + offsets [][]int64 + // offWrite is the current offset for writing + offWrite int64 + + disk *os.File + bufWriter *bufio.Writer + diskTracker *disk.Tracker // track disk usage. +} + +var chunkListInDiskLabel fmt.Stringer = stringutil.StringerStr("chunk.ListInDisk") + +// NewListInDisk creates a new ListInDisk with field types. +func NewListInDisk(fieldTypes []*types.FieldType) *ListInDisk { + l := &ListInDisk{ + fieldTypes: fieldTypes, + // TODO(fengliyuan): set the quota of disk usage. + diskTracker: disk.NewTracker(chunkListInDiskLabel, -1), + } + return l +} + +// GetDiskTracker returns the memory tracker of this List. +func (l *ListInDisk) GetDiskTracker() *disk.Tracker { + return l.diskTracker +} + +// Add adds a chunk to the ListInDisk. Caller must make sure the input chk +// is not empty and not used any more and has the same field types. +func (l *ListInDisk) Add(chk *Chunk) (err error) { + if chk.NumRows() == 0 { + panic("chunk appended to List should have at least 1 row") + } + if l.disk == nil { + l.disk, err = ioutil.TempFile(tmpDir, "listInDisk") + if err != nil { + return + } + l.bufWriter = bufWriterPool.Get().(*bufio.Writer) + l.bufWriter.Reset(l.disk) + } + chk2 := chunkInDisk{Chunk: chk, off: l.offWrite} + n, err := chk2.WriteTo(l.bufWriter) + l.offWrite += n + if err != nil { + return + } + l.offsets = append(l.offsets, chk2.getOffsetsOfRows()) + err = l.bufWriter.Flush() + if err == nil { + l.diskTracker.Consume(n) + } + return +} + +// GetRow gets a Row from the ListInDisk by RowPtr. +func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) { + off := l.offsets[ptr.ChkIdx][ptr.RowIdx] + r := io.NewSectionReader(l.disk, off, l.offWrite-off) + bufReader := bufReaderPool.Get().(*bufio.Reader) + bufReader.Reset(r) + defer bufReaderPool.Put(bufReader) + + format := rowInDisk{numCol: len(l.fieldTypes)} + _, err = format.ReadFrom(bufReader) + if err != nil { + return + } + row = format.toRow(l.fieldTypes) + return row, err +} + +// NumChunks returns the number of chunks in the ListInDisk. +func (l *ListInDisk) NumChunks() int { + return len(l.offsets) +} + +// Close releases the disk resource. +func (l *ListInDisk) Close() error { + if l.disk != nil { + l.diskTracker.Consume(-l.diskTracker.BytesConsumed()) + terror.Call(l.disk.Close) + bufWriterPool.Put(l.bufWriter) + return os.Remove(l.disk.Name()) + } + return nil +} + +// chunkInDisk represents a chunk in disk format. Each row of the chunk +// is serialized and in sequence ordered. The format of each row is like +// the struct diskFormatRow, put size of each column first, then the +// data of each column. +// +// For example, a chunk has 2 rows and 3 columns, the disk format of the +// chunk is as follow: +// +// [size of row0 column0], [size of row0 column1], [size of row0 column2] +// [data of row0 column0], [data of row0 column1], [data of row0 column2] +// [size of row1 column0], [size of row1 column1], [size of row1 column2] +// [data of row1 column0], [data of row1 column1], [data of row1 column2] +// +// If a column of a row is null, the size of it is -1 and the data is empty. +type chunkInDisk struct { + *Chunk + off int64 + // offsetsOfRows stores the offset of each row. + offsetsOfRows []int64 +} + +// WriteTo serializes the chunk into the format of chunkInDisk, and +// writes to w. +func (chk *chunkInDisk) WriteTo(w io.Writer) (written int64, err error) { + var n int64 + numRows := chk.NumRows() + chk.offsetsOfRows = make([]int64, 0, numRows) + var format *diskFormatRow + for rowIdx := 0; rowIdx < numRows; rowIdx++ { + format = convertFromRow(chk.GetRow(rowIdx), format) + chk.offsetsOfRows = append(chk.offsetsOfRows, chk.off+written) + + n, err = chk.writeRowTo(w, format) + written += n + if err != nil { + return + } + } + return +} + +// writeRowTo serializes a row of the chunk into the format of +// diskFormatRow, and writes to w. +func (chk *chunkInDisk) writeRowTo(w io.Writer, format *diskFormatRow) (written int64, err error) { + n, err := w.Write(i64SliceToBytes(format.sizesOfColumns)) + written += int64(n) + if err != nil { + return + } + for _, data := range format.cells { + n, err = w.Write(data) + written += int64(n) + if err != nil { + return + } + } + return +} + +func (chk *chunkInDisk) getOffsetsOfRows() []int64 { return chk.offsetsOfRows } + +// rowInDisk represents a Row in format of diskFormatRow. +type rowInDisk struct { + numCol int + diskFormatRow +} + +// ReadFrom reads data of r, deserializes it from the format of diskFormatRow +// into Row. +func (row *rowInDisk) ReadFrom(r io.Reader) (n int64, err error) { + b := make([]byte, 8*row.numCol) + var n1 int + n1, err = io.ReadFull(r, b) + n += int64(n1) + if err != nil { + return + } + row.sizesOfColumns = bytesToI64Slice(b) + row.cells = make([][]byte, 0, row.numCol) + for _, size := range row.sizesOfColumns { + if size == -1 { + continue + } + cell := make([]byte, size) + row.cells = append(row.cells, cell) + n1, err = io.ReadFull(r, cell) + n += int64(n1) + if err != nil { + return + } + } + return +} + +// diskFormatRow represents a row in a chunk in disk format. The disk format +// of a row is described in the doc of chunkInDisk. +type diskFormatRow struct { + // sizesOfColumns stores the size of each column in a row. + // -1 means the value of this column is null. + sizesOfColumns []int64 // -1 means null + cells [][]byte +} + +// convertFromRow serializes one row of chunk to diskFormatRow +func convertFromRow(row Row, reuse *diskFormatRow) (format *diskFormatRow) { + numCols := row.Chunk().NumCols() + if reuse != nil { + format = reuse + format.sizesOfColumns = format.sizesOfColumns[:0] + format.cells = format.cells[:0] + } else { + format = &diskFormatRow{ + sizesOfColumns: make([]int64, 0, numCols), + cells: make([][]byte, 0, numCols), + } + } + for colIdx := 0; colIdx < numCols; colIdx++ { + if row.IsNull(colIdx) { + format.sizesOfColumns = append(format.sizesOfColumns, -1) + } else { + cell := row.GetRaw(colIdx) + format.sizesOfColumns = append(format.sizesOfColumns, int64(len(cell))) + format.cells = append(format.cells, cell) + } + } + return +} + +// toRow deserializes diskFormatRow to Row. +func (format *diskFormatRow) toRow(fields []*types.FieldType) Row { + chk := NewChunkWithCapacity(fields, 1) + var cellOff int + for colIdx, size := range format.sizesOfColumns { + if size == -1 { + chk.columns[colIdx].AppendNull() + } else { + chk.columns[colIdx].AppendRaw(format.cells[cellOff]) + cellOff++ + } + } + return chk.GetRow(0) +} diff --git a/util/chunk/disk_test.go b/util/chunk/disk_test.go new file mode 100644 index 0000000000000..04671ff0694b1 --- /dev/null +++ b/util/chunk/disk_test.go @@ -0,0 +1,73 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunk + +import ( + "fmt" + "os" + + "github.com/pingcap/check" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/types/json" +) + +func (s *testChunkSuite) TestListInDisk(c *check.C) { + fields := []*types.FieldType{ + types.NewFieldType(mysql.TypeVarString), + types.NewFieldType(mysql.TypeLonglong), + types.NewFieldType(mysql.TypeVarString), + types.NewFieldType(mysql.TypeLonglong), + types.NewFieldType(mysql.TypeJSON), + } + l := NewListInDisk(fields) + defer func() { + err := l.Close() + c.Check(err, check.IsNil) + _, err = os.Stat(l.disk.Name()) + c.Check(os.IsNotExist(err), check.IsTrue) + }() + + numChk, numRow := 2, 2 + chks := make([]*Chunk, 0, numChk) + for chkIdx := 0; chkIdx < numChk; chkIdx++ { + chk := NewChunkWithCapacity(fields, 2) + for rowIdx := 0; rowIdx < numRow; rowIdx++ { + data := int64(chkIdx*numRow + rowIdx) + chk.AppendString(0, fmt.Sprint(data)) + chk.AppendNull(1) + chk.AppendNull(2) + chk.AppendInt64(3, data) + if chkIdx == 0 { + chk.AppendJSON(4, json.CreateBinary(fmt.Sprint(data))) + } else { + chk.AppendNull(4) + } + } + chks = append(chks, chk) + err := l.Add(chk) + c.Check(err, check.IsNil) + } + + c.Check(l.NumChunks(), check.Equals, 2) + c.Check(l.GetDiskTracker().BytesConsumed() > 0, check.IsTrue) + + for chkIdx := 0; chkIdx < numChk; chkIdx++ { + for rowIdx := 0; rowIdx < numRow; rowIdx++ { + row, err := l.GetRow(RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) + c.Check(err, check.IsNil) + c.Check(row.GetDatumRow(fields), check.DeepEquals, chks[chkIdx].GetRow(rowIdx).GetDatumRow(fields)) + } + } +} diff --git a/util/disk/tracker.go b/util/disk/tracker.go new file mode 100644 index 0000000000000..1a94494597090 --- /dev/null +++ b/util/disk/tracker.go @@ -0,0 +1,26 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package disk + +import ( + "github.com/pingcap/tidb/util/memory" +) + +// Tracker is used to track the disk usage during query execution. +type Tracker = memory.Tracker + +// NewTracker creates a disk tracker. +// 1. "label" is the label used in the usage string. +// 2. "bytesLimit <= 0" means no limit. +var NewTracker = memory.NewTracker diff --git a/util/memory/tracker.go b/util/memory/tracker.go index 3b935360f0fce..1d3c09bc5303f 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -35,7 +35,7 @@ import ( // // NOTE: We only protect concurrent access to "bytesConsumed" and "children", // that is to say: -// 1. Only "BytesConsumed()", "Consume()", "AttachTo()" and "Detach" are thread-safe. +// 1. Only "BytesConsumed()", "Consume()" and "AttachTo()" are thread-safe. // 2. Other operations of a Tracker tree is not thread-safe. type Tracker struct { mu struct { @@ -53,7 +53,7 @@ type Tracker struct { // NewTracker creates a memory tracker. // 1. "label" is the label used in the usage string. -// 2. "bytesLimit < 0" means no limit. +// 2. "bytesLimit <= 0" means no limit. func NewTracker(label fmt.Stringer, bytesLimit int64) *Tracker { return &Tracker{ label: label, @@ -62,7 +62,13 @@ func NewTracker(label fmt.Stringer, bytesLimit int64) *Tracker { } } -// SetActionOnExceed sets the action when memory usage is out of memory quota. +// SetBytesLimit sets the bytes limit for this tracker. +// "bytesLimit <= 0" means no limit. +func (t *Tracker) SetBytesLimit(bytesLimit int64) { + t.bytesLimit = bytesLimit +} + +// SetActionOnExceed sets the action when memory usage exceeds bytesLimit. func (t *Tracker) SetActionOnExceed(a ActionOnExceed) { t.actionOnExceed = a } @@ -131,12 +137,15 @@ func (t *Tracker) ReplaceChild(oldChild, newChild *Tracker) { } // Consume is used to consume a memory usage. "bytes" can be a negative value, -// which means this is a memory release operation. +// which means this is a memory release operation. When memory usage of a tracker +// exceeds its bytesLimit, the tracker calls its action, so does each of its ancestors. func (t *Tracker) Consume(bytes int64) { - var rootExceed *Tracker for tracker := t; tracker != nil; tracker = tracker.parent { if atomic.AddInt64(&tracker.bytesConsumed, bytes) >= tracker.bytesLimit && tracker.bytesLimit > 0 { - rootExceed = tracker + // TODO(fengliyuan): try to find a way to avoid logging at each tracker in chain. + if tracker.actionOnExceed != nil { + tracker.actionOnExceed.Action(tracker) + } } for { @@ -148,9 +157,6 @@ func (t *Tracker) Consume(bytes int64) { break } } - if rootExceed != nil { - rootExceed.actionOnExceed.Action(rootExceed) - } } // BytesConsumed returns the consumed memory usage value in bytes. diff --git a/util/memory/tracker_test.go b/util/memory/tracker_test.go index bf7ac98ecc506..a40d6d481bbc8 100644 --- a/util/memory/tracker_test.go +++ b/util/memory/tracker_test.go @@ -86,6 +86,10 @@ func (s *testSuite) TestConsume(c *C) { func (s *testSuite) TestOOMAction(c *C) { tracker := NewTracker(stringutil.StringerStr("oom tracker"), 100) + // make sure no panic here. + tracker.Consume(10000) + + tracker = NewTracker(stringutil.StringerStr("oom tracker"), 100) action := &mockAction{} tracker.SetActionOnExceed(action) From 6de458390eab7fc44cee9f4ba3af4379bfef2d59 Mon Sep 17 00:00:00 2001 From: Feng Liyuan Date: Tue, 10 Sep 2019 14:38:59 +0800 Subject: [PATCH 2/7] format --- util/chunk/disk.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/util/chunk/disk.go b/util/chunk/disk.go index e2700bc7f3b88..f492e70c33d59 100644 --- a/util/chunk/disk.go +++ b/util/chunk/disk.go @@ -34,14 +34,12 @@ const ( ) var bufWriterPool = sync.Pool{ - New: func() interface{} { - return bufio.NewWriterSize(nil, writeBufSize) - }, + New: func() interface{} { return bufio.NewWriterSize(nil, writeBufSize) }, } -var bufReaderPool = sync.Pool{New: func() interface{} { - return bufio.NewReaderSize(nil, readBufSize) -}} +var bufReaderPool = sync.Pool{ + New: func() interface{} { return bufio.NewReaderSize(nil, readBufSize) }, +} var tmpDir = path.Join(os.TempDir(), "tidb-server-hashJoin") From dcb84c979ab01dd1cf312f3fee96b463d4af4cdc Mon Sep 17 00:00:00 2001 From: Feng Liyuan Date: Wed, 11 Sep 2019 11:41:55 +0800 Subject: [PATCH 3/7] address comments --- util/chunk/column.go | 12 ----- util/chunk/column_test.go | 35 -------------- util/chunk/disk.go | 51 +++++++++++++------- util/chunk/disk_test.go | 98 ++++++++++++++++++++++++++++++++------- util/chunk/mutrow.go | 1 - util/memory/tracker.go | 4 ++ 6 files changed, 119 insertions(+), 82 deletions(-) diff --git a/util/chunk/column.go b/util/chunk/column.go index 7f80dc6774f7f..a1bb3e7bfabd7 100644 --- a/util/chunk/column.go +++ b/util/chunk/column.go @@ -174,18 +174,6 @@ func (c *Column) AppendNull() { c.length++ } -// AppendRaw appends the raw bytes into this Column. -func (c *Column) AppendRaw(data []byte) { - c.appendNullBitmap(true) - if c.isFixed() { - c.data = append(c.data, data...) - } else { - c.data = append(c.data, data...) - c.offsets = append(c.offsets, int64(len(c.data))) - } - c.length++ -} - func (c *Column) finishAppendFixed() { c.data = append(c.data, c.elemBuf...) c.appendNullBitmap(true) diff --git a/util/chunk/column_test.go b/util/chunk/column_test.go index 3c579aa4acdf7..0ad5da90d0f09 100644 --- a/util/chunk/column_test.go +++ b/util/chunk/column_test.go @@ -16,7 +16,6 @@ package chunk import ( "fmt" "math/rand" - "strings" "testing" "time" "unsafe" @@ -781,40 +780,6 @@ func (s *testChunkSuite) TestGetRaw(c *check.C) { } } -func (s *testChunkSuite) TestAppendRaw(c *check.C) { - chk := NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeFloat)}, 1024) - col := chk.Column(0) - for i := 0; i < 1024; i++ { - f := float32(i) - raw := (*[unsafe.Sizeof(f)]byte)(unsafe.Pointer(&f))[:] - col.AppendRaw(raw) - } - it := NewIterator4Chunk(chk) - var i int - for row := it.Begin(); row != it.End(); row = it.Next() { - f := float32(i) - c.Assert(row.GetFloat32(0), check.Equals, f) - c.Assert(col.GetFloat32(i), check.Equals, f) - i++ - } - - chk = NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeVarString)}, 1024) - col = chk.Column(0) - for i := 0; i < 1024; i++ { - str := strings.Repeat(fmt.Sprint(i), i) - raw := []byte(str) - col.AppendRaw(raw) - } - it = NewIterator4Chunk(chk) - i = 0 - for row := it.Begin(); row != it.End(); row = it.Next() { - str := strings.Repeat(fmt.Sprint(i), i) - c.Assert(row.GetString(0), check.Equals, str) - c.Assert(col.GetString(i), check.Equals, str) - i++ - } -} - func BenchmarkDurationRow(b *testing.B) { chk1 := NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeDuration)}, 1024) col1 := chk1.Column(0) diff --git a/util/chunk/disk.go b/util/chunk/disk.go index f492e70c33d59..8a1b323dbc08c 100644 --- a/util/chunk/disk.go +++ b/util/chunk/disk.go @@ -41,10 +41,10 @@ var bufReaderPool = sync.Pool{ New: func() interface{} { return bufio.NewReaderSize(nil, readBufSize) }, } -var tmpDir = path.Join(os.TempDir(), "tidb-server-hashJoin") +var tmpDir = path.Join(os.TempDir(), "tidb-server-"+os.Args[0]) func init() { - _ = os.RemoveAll(tmpDir) + _ = os.RemoveAll(tmpDir) // clean the uncleared temp file during the last run. _ = os.Mkdir(tmpDir, 0755) } @@ -54,7 +54,7 @@ type ListInDisk struct { // offsets stores the offsets in disk of all RowPtr, // the offset of one RowPtr is offsets[RowPtr.ChkIdx][RowPtr.RowIdx]. offsets [][]int64 - // offWrite is the current offset for writing + // offWrite is the current offset for writing. offWrite int64 disk *os.File @@ -62,14 +62,14 @@ type ListInDisk struct { diskTracker *disk.Tracker // track disk usage. } -var chunkListInDiskLabel fmt.Stringer = stringutil.StringerStr("chunk.ListInDisk") +var defaultChunkListInDiskLabel fmt.Stringer = stringutil.StringerStr("chunk.ListInDisk") // NewListInDisk creates a new ListInDisk with field types. func NewListInDisk(fieldTypes []*types.FieldType) *ListInDisk { l := &ListInDisk{ fieldTypes: fieldTypes, // TODO(fengliyuan): set the quota of disk usage. - diskTracker: disk.NewTracker(chunkListInDiskLabel, -1), + diskTracker: disk.NewTracker(defaultChunkListInDiskLabel, -1), } return l } @@ -86,14 +86,14 @@ func (l *ListInDisk) Add(chk *Chunk) (err error) { panic("chunk appended to List should have at least 1 row") } if l.disk == nil { - l.disk, err = ioutil.TempFile(tmpDir, "listInDisk") + l.disk, err = ioutil.TempFile(tmpDir, l.diskTracker.Label().String()) if err != nil { return } l.bufWriter = bufWriterPool.Get().(*bufio.Writer) l.bufWriter.Reset(l.disk) } - chk2 := chunkInDisk{Chunk: chk, off: l.offWrite} + chk2 := chunkInDisk{Chunk: chk, offWrite: l.offWrite} n, err := chk2.WriteTo(l.bufWriter) l.offWrite += n if err != nil { @@ -120,7 +120,7 @@ func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) { if err != nil { return } - row = format.toRow(l.fieldTypes) + row = format.toMutRow(l.fieldTypes).ToRow() return row, err } @@ -156,7 +156,8 @@ func (l *ListInDisk) Close() error { // If a column of a row is null, the size of it is -1 and the data is empty. type chunkInDisk struct { *Chunk - off int64 + // offWrite is the current offset for writing. + offWrite int64 // offsetsOfRows stores the offset of each row. offsetsOfRows []int64 } @@ -170,7 +171,7 @@ func (chk *chunkInDisk) WriteTo(w io.Writer) (written int64, err error) { var format *diskFormatRow for rowIdx := 0; rowIdx < numRows; rowIdx++ { format = convertFromRow(chk.GetRow(rowIdx), format) - chk.offsetsOfRows = append(chk.offsetsOfRows, chk.off+written) + chk.offsetsOfRows = append(chk.offsetsOfRows, chk.offWrite+written) n, err = chk.writeRowTo(w, format) written += n @@ -268,17 +269,33 @@ func convertFromRow(row Row, reuse *diskFormatRow) (format *diskFormatRow) { return } -// toRow deserializes diskFormatRow to Row. -func (format *diskFormatRow) toRow(fields []*types.FieldType) Row { - chk := NewChunkWithCapacity(fields, 1) +// toMutRow deserializes diskFormatRow to MutRow. +func (format *diskFormatRow) toMutRow(fields []*types.FieldType) MutRow { + chk := &Chunk{columns: make([]*Column, 0, len(format.sizesOfColumns))} var cellOff int for colIdx, size := range format.sizesOfColumns { - if size == -1 { - chk.columns[colIdx].AppendNull() + col := &Column{length: 1} + elemSize := getFixedLen(fields[colIdx]) + if size == -1 { // isNull + col.nullBitmap = []byte{0} + if elemSize == varElemLen { + col.offsets = []int64{0, 0} + } else { + buf := make([]byte, elemSize) + col.data = buf + col.elemBuf = buf + } } else { - chk.columns[colIdx].AppendRaw(format.cells[cellOff]) + col.nullBitmap = []byte{1} + col.data = format.cells[cellOff] cellOff++ + if elemSize == varElemLen { + col.offsets = []int64{0, int64(len(col.data))} + } else { + col.elemBuf = col.data + } } + chk.columns = append(chk.columns, col) } - return chk.GetRow(0) + return MutRow{c: chk} } diff --git a/util/chunk/disk_test.go b/util/chunk/disk_test.go index 04671ff0694b1..6171020173780 100644 --- a/util/chunk/disk_test.go +++ b/util/chunk/disk_test.go @@ -15,7 +15,9 @@ package chunk import ( "fmt" + "math/rand" "os" + "testing" "github.com/pingcap/check" "github.com/pingcap/parser/mysql" @@ -24,22 +26,45 @@ import ( ) func (s *testChunkSuite) TestListInDisk(c *check.C) { - fields := []*types.FieldType{ - types.NewFieldType(mysql.TypeVarString), - types.NewFieldType(mysql.TypeLonglong), - types.NewFieldType(mysql.TypeVarString), - types.NewFieldType(mysql.TypeLonglong), - types.NewFieldType(mysql.TypeJSON), + numChk, numRow := 2, 2 + chks, fields, err := initListInDisk(numChk, numRow) + if err != nil { + c.Fatal(err) } l := NewListInDisk(fields) defer func() { err := l.Close() c.Check(err, check.IsNil) + c.Check(l.disk, check.Not(check.IsNil)) _, err = os.Stat(l.disk.Name()) c.Check(os.IsNotExist(err), check.IsTrue) }() + for _, chk := range chks { + err := l.Add(chk) + c.Check(err, check.IsNil) + } + + c.Check(l.NumChunks(), check.Equals, numChk) + c.Check(l.GetDiskTracker().BytesConsumed() > 0, check.IsTrue) + + for chkIdx := 0; chkIdx < numChk; chkIdx++ { + for rowIdx := 0; rowIdx < numRow; rowIdx++ { + row, err := l.GetRow(RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) + c.Check(err, check.IsNil) + c.Check(row.ToRow().GetDatumRow(fields), check.DeepEquals, chks[chkIdx].GetRow(rowIdx).GetDatumRow(fields)) + } + } +} + +func initListInDisk(numChk, numRow int) ([]*Chunk, []*types.FieldType, error) { + fields := []*types.FieldType{ + types.NewFieldType(mysql.TypeVarString), + types.NewFieldType(mysql.TypeLonglong), + types.NewFieldType(mysql.TypeVarString), + types.NewFieldType(mysql.TypeLonglong), + types.NewFieldType(mysql.TypeJSON), + } - numChk, numRow := 2, 2 chks := make([]*Chunk, 0, numChk) for chkIdx := 0; chkIdx < numChk; chkIdx++ { chk := NewChunkWithCapacity(fields, 2) @@ -49,25 +74,64 @@ func (s *testChunkSuite) TestListInDisk(c *check.C) { chk.AppendNull(1) chk.AppendNull(2) chk.AppendInt64(3, data) - if chkIdx == 0 { + if chkIdx%2 == 0 { chk.AppendJSON(4, json.CreateBinary(fmt.Sprint(data))) } else { chk.AppendNull(4) } } chks = append(chks, chk) - err := l.Add(chk) - c.Check(err, check.IsNil) } + return chks, fields, nil +} - c.Check(l.NumChunks(), check.Equals, 2) - c.Check(l.GetDiskTracker().BytesConsumed() > 0, check.IsTrue) +func BenchmarkListInDiskAdd(b *testing.B) { + numChk, numRow := 1, 2 + chks, fields, err := initListInDisk(numChk, numRow) + if err != nil { + b.Fatal(err) + } + chk := chks[0] + l := NewListInDisk(fields) + defer l.Close() - for chkIdx := 0; chkIdx < numChk; chkIdx++ { - for rowIdx := 0; rowIdx < numRow; rowIdx++ { - row, err := l.GetRow(RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) - c.Check(err, check.IsNil) - c.Check(row.GetDatumRow(fields), check.DeepEquals, chks[chkIdx].GetRow(rowIdx).GetDatumRow(fields)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := l.Add(chk) + if err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkListInDiskGetRow(b *testing.B) { + numChk, numRow := 10000, 2 + chks, fields, err := initListInDisk(numChk, numRow) + if err != nil { + b.Fatal(err) + } + l := NewListInDisk(fields) + defer l.Close() + for _, chk := range chks { + err := l.Add(chk) + if err != nil { + b.Fatal(err) + } + } + rand.Seed(0) + ptrs := make([]RowPtr, 0, b.N) + for i := 0; i < b.N; i++ { + ptrs = append(ptrs, RowPtr{ + ChkIdx: rand.Uint32() % uint32(numChk), + RowIdx: rand.Uint32() % uint32(numRow), + }) + } + b.ResetTimer() + // fmt.Println(b.N) + for i := 0; i < b.N; i++ { + _, err = l.GetRow(ptrs[i]) + if err != nil { + b.Fatal(err) } } } diff --git a/util/chunk/mutrow.go b/util/chunk/mutrow.go index bc46b448db514..c1ecb4982ca6d 100644 --- a/util/chunk/mutrow.go +++ b/util/chunk/mutrow.go @@ -195,7 +195,6 @@ func makeMutRowUint64Column(val uint64) *Column { func makeMutRowBytesColumn(bin []byte) *Column { col := newMutRowVarLenColumn(len(bin)) copy(col.data, bin) - col.nullBitmap[0] = 1 return col } diff --git a/util/memory/tracker.go b/util/memory/tracker.go index 1d3c09bc5303f..6cb80986fc32d 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -78,6 +78,10 @@ func (t *Tracker) SetLabel(label fmt.Stringer) { t.label = label } +func (t *Tracker) Label() fmt.Stringer { + return t.label +} + // AttachTo attaches this memory tracker as a child to another Tracker. If it // already has a parent, this function will remove it from the old parent. // Its consumed memory usage is used to update all its ancestors. From d329a79d73770520f69586647d420fbe6ecd4b09 Mon Sep 17 00:00:00 2001 From: Feng Liyuan Date: Wed, 11 Sep 2019 11:50:28 +0800 Subject: [PATCH 4/7] address comments --- util/chunk/disk.go | 12 +++++++++--- util/chunk/disk_test.go | 3 +-- util/chunk/list.go | 1 + 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/util/chunk/disk.go b/util/chunk/disk.go index 8a1b323dbc08c..1c6710d247722 100644 --- a/util/chunk/disk.go +++ b/util/chunk/disk.go @@ -15,6 +15,7 @@ package chunk import ( "bufio" + "errors" "fmt" "io" "io/ioutil" @@ -22,10 +23,12 @@ import ( "path" "sync" + "github.com/pingcap/log" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/stringutil" + "go.uber.org/zap" ) const ( @@ -41,11 +44,14 @@ var bufReaderPool = sync.Pool{ New: func() interface{} { return bufio.NewReaderSize(nil, readBufSize) }, } -var tmpDir = path.Join(os.TempDir(), "tidb-server-"+os.Args[0]) +var tmpDir = path.Join(os.TempDir(), "tidb-server-"+path.Base(os.Args[0])) func init() { _ = os.RemoveAll(tmpDir) // clean the uncleared temp file during the last run. - _ = os.Mkdir(tmpDir, 0755) + err := os.Mkdir(tmpDir, 0755) + if err != nil { + log.Warn("Mkdir temporary file error", zap.String("tmpDir", tmpDir), zap.Error(err)) + } } // ListInDisk represents a slice of chunks storing in temporary disk. @@ -83,7 +89,7 @@ func (l *ListInDisk) GetDiskTracker() *disk.Tracker { // is not empty and not used any more and has the same field types. func (l *ListInDisk) Add(chk *Chunk) (err error) { if chk.NumRows() == 0 { - panic("chunk appended to List should have at least 1 row") + return errors.New("chunk appended to List should have at least 1 row") } if l.disk == nil { l.disk, err = ioutil.TempFile(tmpDir, l.diskTracker.Label().String()) diff --git a/util/chunk/disk_test.go b/util/chunk/disk_test.go index 6171020173780..c73612220c41c 100644 --- a/util/chunk/disk_test.go +++ b/util/chunk/disk_test.go @@ -51,7 +51,7 @@ func (s *testChunkSuite) TestListInDisk(c *check.C) { for rowIdx := 0; rowIdx < numRow; rowIdx++ { row, err := l.GetRow(RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) c.Check(err, check.IsNil) - c.Check(row.ToRow().GetDatumRow(fields), check.DeepEquals, chks[chkIdx].GetRow(rowIdx).GetDatumRow(fields)) + c.Check(row.GetDatumRow(fields), check.DeepEquals, chks[chkIdx].GetRow(rowIdx).GetDatumRow(fields)) } } } @@ -127,7 +127,6 @@ func BenchmarkListInDiskGetRow(b *testing.B) { }) } b.ResetTimer() - // fmt.Println(b.N) for i := 0; i < b.N; i++ { _, err = l.GetRow(ptrs[i]) if err != nil { diff --git a/util/chunk/list.go b/util/chunk/list.go index 46f226310f506..2f71febb1179d 100644 --- a/util/chunk/list.go +++ b/util/chunk/list.go @@ -100,6 +100,7 @@ func (l *List) AppendRow(row Row) RowPtr { func (l *List) Add(chk *Chunk) { // FixMe: we should avoid add a Chunk that chk.NumRows() > list.maxChunkSize. if chk.NumRows() == 0 { + // TODO: return error here. panic("chunk appended to List should have at least 1 row") } if chkIdx := len(l.chunks) - 1; l.consumedIdx != chkIdx { From 95865a6683f2840afb74439711c35ab171818f45 Mon Sep 17 00:00:00 2001 From: Feng Liyuan Date: Wed, 11 Sep 2019 11:53:15 +0800 Subject: [PATCH 5/7] address comments --- util/memory/tracker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/util/memory/tracker.go b/util/memory/tracker.go index 6cb80986fc32d..003a1867e3b9f 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -78,6 +78,7 @@ func (t *Tracker) SetLabel(label fmt.Stringer) { t.label = label } +// Label gets the label of a Tracker. func (t *Tracker) Label() fmt.Stringer { return t.label } From 7d42f05568b182f40d91b442e0c18277ef911360 Mon Sep 17 00:00:00 2001 From: Feng Liyuan Date: Wed, 11 Sep 2019 12:49:56 +0800 Subject: [PATCH 6/7] more benchmark --- util/chunk/disk_test.go | 64 ++++++++++++++++++++--------------------- util/chunk/list_test.go | 40 ++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 32 deletions(-) diff --git a/util/chunk/disk_test.go b/util/chunk/disk_test.go index c73612220c41c..832b2110537db 100644 --- a/util/chunk/disk_test.go +++ b/util/chunk/disk_test.go @@ -25,9 +25,38 @@ import ( "github.com/pingcap/tidb/types/json" ) +func initChunks(numChk, numRow int) ([]*Chunk, []*types.FieldType, error) { + fields := []*types.FieldType{ + types.NewFieldType(mysql.TypeVarString), + types.NewFieldType(mysql.TypeLonglong), + types.NewFieldType(mysql.TypeVarString), + types.NewFieldType(mysql.TypeLonglong), + types.NewFieldType(mysql.TypeJSON), + } + + chks := make([]*Chunk, 0, numChk) + for chkIdx := 0; chkIdx < numChk; chkIdx++ { + chk := NewChunkWithCapacity(fields, 2) + for rowIdx := 0; rowIdx < numRow; rowIdx++ { + data := int64(chkIdx*numRow + rowIdx) + chk.AppendString(0, fmt.Sprint(data)) + chk.AppendNull(1) + chk.AppendNull(2) + chk.AppendInt64(3, data) + if chkIdx%2 == 0 { + chk.AppendJSON(4, json.CreateBinary(fmt.Sprint(data))) + } else { + chk.AppendNull(4) + } + } + chks = append(chks, chk) + } + return chks, fields, nil +} + func (s *testChunkSuite) TestListInDisk(c *check.C) { numChk, numRow := 2, 2 - chks, fields, err := initListInDisk(numChk, numRow) + chks, fields, err := initChunks(numChk, numRow) if err != nil { c.Fatal(err) } @@ -56,38 +85,9 @@ func (s *testChunkSuite) TestListInDisk(c *check.C) { } } -func initListInDisk(numChk, numRow int) ([]*Chunk, []*types.FieldType, error) { - fields := []*types.FieldType{ - types.NewFieldType(mysql.TypeVarString), - types.NewFieldType(mysql.TypeLonglong), - types.NewFieldType(mysql.TypeVarString), - types.NewFieldType(mysql.TypeLonglong), - types.NewFieldType(mysql.TypeJSON), - } - - chks := make([]*Chunk, 0, numChk) - for chkIdx := 0; chkIdx < numChk; chkIdx++ { - chk := NewChunkWithCapacity(fields, 2) - for rowIdx := 0; rowIdx < numRow; rowIdx++ { - data := int64(chkIdx*numRow + rowIdx) - chk.AppendString(0, fmt.Sprint(data)) - chk.AppendNull(1) - chk.AppendNull(2) - chk.AppendInt64(3, data) - if chkIdx%2 == 0 { - chk.AppendJSON(4, json.CreateBinary(fmt.Sprint(data))) - } else { - chk.AppendNull(4) - } - } - chks = append(chks, chk) - } - return chks, fields, nil -} - func BenchmarkListInDiskAdd(b *testing.B) { numChk, numRow := 1, 2 - chks, fields, err := initListInDisk(numChk, numRow) + chks, fields, err := initChunks(numChk, numRow) if err != nil { b.Fatal(err) } @@ -106,7 +106,7 @@ func BenchmarkListInDiskAdd(b *testing.B) { func BenchmarkListInDiskGetRow(b *testing.B) { numChk, numRow := 10000, 2 - chks, fields, err := initListInDisk(numChk, numRow) + chks, fields, err := initChunks(numChk, numRow) if err != nil { b.Fatal(err) } diff --git a/util/chunk/list_test.go b/util/chunk/list_test.go index e1102d9c7f9a0..00a2cdd488f53 100644 --- a/util/chunk/list_test.go +++ b/util/chunk/list_test.go @@ -15,6 +15,7 @@ package chunk import ( "math" + "math/rand" "strconv" "strings" "testing" @@ -218,3 +219,42 @@ func BenchmarkPreAllocChunk(b *testing.B) { } } } + +func BenchmarkListAdd(b *testing.B) { + numChk, numRow := 1, 2 + chks, fields, err := initChunks(numChk, numRow) + if err != nil { + b.Fatal(err) + } + chk := chks[0] + l := NewList(fields, numRow, numRow) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + l.Add(chk) + } +} + +func BenchmarkListGetRow(b *testing.B) { + numChk, numRow := 10000, 2 + chks, fields, err := initChunks(numChk, numRow) + if err != nil { + b.Fatal(err) + } + l := NewList(fields, numRow, numRow) + for _, chk := range chks { + l.Add(chk) + } + rand.Seed(0) + ptrs := make([]RowPtr, 0, b.N) + for i := 0; i < b.N; i++ { + ptrs = append(ptrs, RowPtr{ + ChkIdx: rand.Uint32() % uint32(numChk), + RowIdx: rand.Uint32() % uint32(numRow), + }) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + l.GetRow(ptrs[i]) + } +} From 9373cddd5e8b4a41e5c2622f81e52649f6b876d9 Mon Sep 17 00:00:00 2001 From: Feng Liyuan Date: Wed, 11 Sep 2019 17:28:18 +0800 Subject: [PATCH 7/7] address comments --- util/chunk/disk.go | 37 +++++++++++++++++++++---------------- util/chunk/disk_test.go | 31 +++++++++++++------------------ util/chunk/list_test.go | 16 +++++++--------- 3 files changed, 41 insertions(+), 43 deletions(-) diff --git a/util/chunk/disk.go b/util/chunk/disk.go index 1c6710d247722..e84d52f77d32e 100644 --- a/util/chunk/disk.go +++ b/util/chunk/disk.go @@ -124,7 +124,7 @@ func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) { format := rowInDisk{numCol: len(l.fieldTypes)} _, err = format.ReadFrom(bufReader) if err != nil { - return + return row, err } row = format.toMutRow(l.fieldTypes).ToRow() return row, err @@ -179,7 +179,7 @@ func (chk *chunkInDisk) WriteTo(w io.Writer) (written int64, err error) { format = convertFromRow(chk.GetRow(rowIdx), format) chk.offsetsOfRows = append(chk.offsetsOfRows, chk.offWrite+written) - n, err = chk.writeRowTo(w, format) + n, err = rowInDisk{diskFormatRow: *format}.WriteTo(w) written += n if err != nil { return @@ -188,15 +188,24 @@ func (chk *chunkInDisk) WriteTo(w io.Writer) (written int64, err error) { return } -// writeRowTo serializes a row of the chunk into the format of +// getOffsetsOfRows gets the offset of each row. +func (chk *chunkInDisk) getOffsetsOfRows() []int64 { return chk.offsetsOfRows } + +// rowInDisk represents a Row in format of diskFormatRow. +type rowInDisk struct { + numCol int + diskFormatRow +} + +// WriteTo serializes a row of the chunk into the format of // diskFormatRow, and writes to w. -func (chk *chunkInDisk) writeRowTo(w io.Writer, format *diskFormatRow) (written int64, err error) { - n, err := w.Write(i64SliceToBytes(format.sizesOfColumns)) +func (row rowInDisk) WriteTo(w io.Writer) (written int64, err error) { + n, err := w.Write(i64SliceToBytes(row.sizesOfColumns)) written += int64(n) if err != nil { return } - for _, data := range format.cells { + for _, data := range row.cells { n, err = w.Write(data) written += int64(n) if err != nil { @@ -206,14 +215,6 @@ func (chk *chunkInDisk) writeRowTo(w io.Writer, format *diskFormatRow) (written return } -func (chk *chunkInDisk) getOffsetsOfRows() []int64 { return chk.offsetsOfRows } - -// rowInDisk represents a Row in format of diskFormatRow. -type rowInDisk struct { - numCol int - diskFormatRow -} - // ReadFrom reads data of r, deserializes it from the format of diskFormatRow // into Row. func (row *rowInDisk) ReadFrom(r io.Reader) (n int64, err error) { @@ -247,10 +248,14 @@ type diskFormatRow struct { // sizesOfColumns stores the size of each column in a row. // -1 means the value of this column is null. sizesOfColumns []int64 // -1 means null - cells [][]byte + // cells represents raw data of not-null columns in one row. + // In convertFromRow, data from Row is shallow copied to cells. + // In toMutRow, data in cells is shallow copied to MutRow. + cells [][]byte } -// convertFromRow serializes one row of chunk to diskFormatRow +// convertFromRow serializes one row of chunk to diskFormatRow, then +// we can use diskFormatRow to write to disk. func convertFromRow(row Row, reuse *diskFormatRow) (format *diskFormatRow) { numCols := row.Chunk().NumCols() if reuse != nil { diff --git a/util/chunk/disk_test.go b/util/chunk/disk_test.go index 832b2110537db..9f6e30ee74bd4 100644 --- a/util/chunk/disk_test.go +++ b/util/chunk/disk_test.go @@ -19,13 +19,14 @@ import ( "os" "testing" + "github.com/cznic/mathutil" "github.com/pingcap/check" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/types/json" ) -func initChunks(numChk, numRow int) ([]*Chunk, []*types.FieldType, error) { +func initChunks(numChk, numRow int) ([]*Chunk, []*types.FieldType) { fields := []*types.FieldType{ types.NewFieldType(mysql.TypeVarString), types.NewFieldType(mysql.TypeLonglong), @@ -36,7 +37,7 @@ func initChunks(numChk, numRow int) ([]*Chunk, []*types.FieldType, error) { chks := make([]*Chunk, 0, numChk) for chkIdx := 0; chkIdx < numChk; chkIdx++ { - chk := NewChunkWithCapacity(fields, 2) + chk := NewChunkWithCapacity(fields, numRow) for rowIdx := 0; rowIdx < numRow; rowIdx++ { data := int64(chkIdx*numRow + rowIdx) chk.AppendString(0, fmt.Sprint(data)) @@ -51,20 +52,17 @@ func initChunks(numChk, numRow int) ([]*Chunk, []*types.FieldType, error) { } chks = append(chks, chk) } - return chks, fields, nil + return chks, fields } func (s *testChunkSuite) TestListInDisk(c *check.C) { numChk, numRow := 2, 2 - chks, fields, err := initChunks(numChk, numRow) - if err != nil { - c.Fatal(err) - } + chks, fields := initChunks(numChk, numRow) l := NewListInDisk(fields) defer func() { err := l.Close() c.Check(err, check.IsNil) - c.Check(l.disk, check.Not(check.IsNil)) + c.Check(l.disk, check.NotNil) _, err = os.Stat(l.disk.Name()) c.Check(os.IsNotExist(err), check.IsTrue) }() @@ -87,10 +85,7 @@ func (s *testChunkSuite) TestListInDisk(c *check.C) { func BenchmarkListInDiskAdd(b *testing.B) { numChk, numRow := 1, 2 - chks, fields, err := initChunks(numChk, numRow) - if err != nil { - b.Fatal(err) - } + chks, fields := initChunks(numChk, numRow) chk := chks[0] l := NewListInDisk(fields) defer l.Close() @@ -106,10 +101,7 @@ func BenchmarkListInDiskAdd(b *testing.B) { func BenchmarkListInDiskGetRow(b *testing.B) { numChk, numRow := 10000, 2 - chks, fields, err := initChunks(numChk, numRow) - if err != nil { - b.Fatal(err) - } + chks, fields := initChunks(numChk, numRow) l := NewListInDisk(fields) defer l.Close() for _, chk := range chks { @@ -120,15 +112,18 @@ func BenchmarkListInDiskGetRow(b *testing.B) { } rand.Seed(0) ptrs := make([]RowPtr, 0, b.N) - for i := 0; i < b.N; i++ { + for i := 0; i < mathutil.Min(b.N, 10000); i++ { ptrs = append(ptrs, RowPtr{ ChkIdx: rand.Uint32() % uint32(numChk), RowIdx: rand.Uint32() % uint32(numRow), }) } + for i := 10000; i < cap(ptrs); i++ { + ptrs = append(ptrs, ptrs[i%10000]) + } b.ResetTimer() for i := 0; i < b.N; i++ { - _, err = l.GetRow(ptrs[i]) + _, err := l.GetRow(ptrs[i]) if err != nil { b.Fatal(err) } diff --git a/util/chunk/list_test.go b/util/chunk/list_test.go index 00a2cdd488f53..b7d7780ffa2f2 100644 --- a/util/chunk/list_test.go +++ b/util/chunk/list_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/cznic/mathutil" "github.com/pingcap/check" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/types" @@ -222,10 +223,7 @@ func BenchmarkPreAllocChunk(b *testing.B) { func BenchmarkListAdd(b *testing.B) { numChk, numRow := 1, 2 - chks, fields, err := initChunks(numChk, numRow) - if err != nil { - b.Fatal(err) - } + chks, fields := initChunks(numChk, numRow) chk := chks[0] l := NewList(fields, numRow, numRow) @@ -237,22 +235,22 @@ func BenchmarkListAdd(b *testing.B) { func BenchmarkListGetRow(b *testing.B) { numChk, numRow := 10000, 2 - chks, fields, err := initChunks(numChk, numRow) - if err != nil { - b.Fatal(err) - } + chks, fields := initChunks(numChk, numRow) l := NewList(fields, numRow, numRow) for _, chk := range chks { l.Add(chk) } rand.Seed(0) ptrs := make([]RowPtr, 0, b.N) - for i := 0; i < b.N; i++ { + for i := 0; i < mathutil.Min(b.N, 10000); i++ { ptrs = append(ptrs, RowPtr{ ChkIdx: rand.Uint32() % uint32(numChk), RowIdx: rand.Uint32() % uint32(numRow), }) } + for i := 10000; i < cap(ptrs); i++ { + ptrs = append(ptrs, ptrs[i%10000]) + } b.ResetTimer() for i := 0; i < b.N; i++ { l.GetRow(ptrs[i])