From b54f0907bef8d71c5b54ec9aa518f7cb1e7f84f5 Mon Sep 17 00:00:00 2001 From: ou yuanning <45346669+ouyuanning@users.noreply.github.com> Date: Fri, 17 Nov 2023 20:45:37 +0800 Subject: [PATCH] Refactor fuzzyfilter (#15) * refactor fuzzyfilter to use new interface * make bvt 100% pass * init bloomfilter * use new bloomfilter * minor improve * bug fixed --- pkg/common/bitmap/bitmap.go | 10 +- pkg/common/bitmap/bitmap_test.go | 2 +- pkg/common/bloomfilter/bloomfilter.go | 84 ++++++++++ pkg/common/bloomfilter/bloomfilter_test.go | 64 ++++++++ pkg/common/bloomfilter/types.go | 75 +++++++++ pkg/common/bloomfilter/util.go | 80 +++++++++ pkg/container/hashtable/hash.go | 23 ++- pkg/container/nulls/nulls.go | 10 +- pkg/pb/ctl/ctl.pb.go | 117 +++++++------ pkg/sql/colexec/fuzzyfilter/filter.go | 113 ++++++------- pkg/sql/colexec/fuzzyfilter/filter_test.go | 53 ++++-- pkg/sql/colexec/fuzzyfilter/types.go | 26 ++- pkg/sql/colexec/right/join.go | 2 +- pkg/sql/colexec/rightanti/join.go | 2 +- pkg/sql/colexec/rightsemi/join.go | 2 +- pkg/sql/compile/compile.go | 31 ++-- pkg/sql/compile/debugTools.go | 1 + pkg/sql/compile/fuzzyCheck.go | 4 +- pkg/sql/compile/operator.go | 2 + pkg/sql/compile/util.go | 4 +- pkg/sql/plan/build_delete.go | 2 +- pkg/sql/plan/build_dml_util.go | 155 +++++++----------- pkg/sql/plan/build_insert.go | 3 +- pkg/sql/plan/build_update.go | 2 +- pkg/sql/plan/stats.go | 103 +++++++++++- .../engine/tae/tables/jobs/flushTableTail.go | 2 +- proto/ctl.proto | 2 +- 27 files changed, 691 insertions(+), 283 deletions(-) create mode 100644 pkg/common/bloomfilter/bloomfilter.go create mode 100644 pkg/common/bloomfilter/bloomfilter_test.go create mode 100644 pkg/common/bloomfilter/types.go create mode 100644 pkg/common/bloomfilter/util.go diff --git a/pkg/common/bitmap/bitmap.go b/pkg/common/bitmap/bitmap.go index 149ec9390f8ca..ef96171cba808 100644 --- a/pkg/common/bitmap/bitmap.go +++ b/pkg/common/bitmap/bitmap.go @@ -63,8 +63,8 @@ func (n *Bitmap) InitWith(other *Bitmap) { n.data = append([]uint64(nil), other.data...) } -func (n *Bitmap) InitWithSize(len int) { - n.len = int64(len) +func (n *Bitmap) InitWithSize(len int64) { + n.len = len n.emptyFlag.Store(kEmptyFlagEmpty) n.data = make([]uint64, (len+63)/64) } @@ -164,8 +164,8 @@ func (n *Bitmap) Reset() { } // Len returns the number of bits in the Bitmap. -func (n *Bitmap) Len() int { - return int(n.len) +func (n *Bitmap) Len() int64 { + return n.len } // Size return number of bytes in n.data @@ -343,7 +343,7 @@ func (n *Bitmap) TryExpandWithSize(size int) { func (n *Bitmap) Filter(sels []int64) *Bitmap { var m Bitmap - m.InitWithSize(int(n.len)) + m.InitWithSize(n.len) for i, sel := range sels { if n.Contains(uint64(sel)) { m.Add(uint64(i)) diff --git a/pkg/common/bitmap/bitmap_test.go b/pkg/common/bitmap/bitmap_test.go index db738083f2f6a..712bada1f79ff 100644 --- a/pkg/common/bitmap/bitmap_test.go +++ b/pkg/common/bitmap/bitmap_test.go @@ -28,7 +28,7 @@ const ( func newBm(n int) *Bitmap { var bm Bitmap - bm.InitWithSize(n) + bm.InitWithSize(int64(n)) return &bm } diff --git a/pkg/common/bloomfilter/bloomfilter.go b/pkg/common/bloomfilter/bloomfilter.go new file mode 100644 index 0000000000000..aab01b22496dd --- /dev/null +++ b/pkg/common/bloomfilter/bloomfilter.go @@ -0,0 +1,84 @@ +// Copyright 2021 - 2023 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bloomfilter + +import ( + "github.com/matrixorigin/matrixone/pkg/common/hashmap" + "github.com/matrixorigin/matrixone/pkg/container/hashtable" + "github.com/matrixorigin/matrixone/pkg/container/vector" +) + +func (bf *BloomFilter) Clean() { + bf.bitmap.Reset() + bf.hashSeed = nil +} + +func (bf *BloomFilter) TestAndAddForVector(v *vector.Vector, callBack func(exits bool, idx int)) { + length := v.Length() + keys := make([][]byte, hashmap.UnitLimit) + states := make([][3]uint64, hashmap.UnitLimit) + bitSize := uint64(bf.bitmap.Len()) + var val1, val2, val3 uint64 + + for i := 0; i < length; i += hashmap.UnitLimit { + n := length - i + if n > hashmap.UnitLimit { + n = hashmap.UnitLimit + } + exitsArr := make([]bool, n) + for j := 0; j < n; j++ { + keys[j] = keys[j][:0] + exitsArr[j] = true + } + encodeHashKeys(keys, v, i, n) + + for _, seed := range bf.hashSeed { + hashtable.BytesBatchGenHashStatesWithSeed(&keys[0], &states[0], n, seed) + for j := 0; j < n; j++ { + val1 = states[j][0] + if val1 > bitSize { + val1 = val1 % bitSize + } + if exitsArr[j] { + exitsArr[j] = bf.bitmap.Contains(val1) + } + bf.bitmap.Add(val1) + + val2 = states[j][1] + if val2 > bitSize { + val2 = val2 % bitSize + } + if exitsArr[j] { + exitsArr[j] = bf.bitmap.Contains(val2) + } + bf.bitmap.Add(val2) + + val3 = states[j][2] + if val3 > bitSize { + val3 = val3 % bitSize + } + if exitsArr[j] { + exitsArr[j] = bf.bitmap.Contains(val3) + } + bf.bitmap.Add(val3) + } + } + + for j := 0; j < n; j++ { + callBack(exitsArr[j], i+j) + } + + } +} diff --git a/pkg/common/bloomfilter/bloomfilter_test.go b/pkg/common/bloomfilter/bloomfilter_test.go new file mode 100644 index 0000000000000..46fbd43a0057f --- /dev/null +++ b/pkg/common/bloomfilter/bloomfilter_test.go @@ -0,0 +1,64 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bloomfilter + +import ( + "math" + "testing" + + "github.com/bits-and-blooms/bloom" + "github.com/matrixorigin/matrixone/pkg/common/mpool" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/testutil" +) + +const TEST_COUNT = 10000000 +const TEST_RATE = 0.00001 + +func TestBloomFilter(t *testing.T) { + mp := mpool.MustNewZero() + vec := testutil.NewVector(TEST_COUNT, types.New(types.T_int64, 0, 0), mp, false, nil) + + boom := New(TEST_COUNT, TEST_RATE) + boom.TestAndAddForVector(vec, func(_ bool, _ int) {}) +} + +func BenchmarkBloomFiltrer(b *testing.B) { + mp := mpool.MustNewZero() + vec := testutil.NewVector(TEST_COUNT, types.New(types.T_int64, 0, 0), mp, false, nil) + + for i := 0; i < b.N; i++ { + boom := New(TEST_COUNT, TEST_RATE) + boom.TestAndAddForVector(vec, func(_ bool, _ int) {}) + } +} + +func BenchmarkBloom(b *testing.B) { + mp := mpool.MustNewZero() + vec := testutil.NewVector(TEST_COUNT, types.New(types.T_int64, 0, 0), mp, false, nil) + k := 3 + n := float64(TEST_COUNT) + p := TEST_RATE + e := -float64(k) * math.Ceil(1.001*n) / math.Log(1-math.Pow(p, 1.0/float64(k))) + m := uint(math.Ceil(e)) + + for i := 0; i < b.N; i++ { + filter := bloom.New(m, 3) + for i := 0; i < TEST_COUNT; i++ { + var bytes = vec.GetRawBytesAt(i) + filter.TestAndAdd(bytes) + } + } +} diff --git a/pkg/common/bloomfilter/types.go b/pkg/common/bloomfilter/types.go new file mode 100644 index 0000000000000..2a0fb8b037dc2 --- /dev/null +++ b/pkg/common/bloomfilter/types.go @@ -0,0 +1,75 @@ +// Copyright 2021 - 2023 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bloomfilter + +import ( + "math" + "math/rand" + + "github.com/matrixorigin/matrixone/pkg/common/bitmap" +) + +var constLogValue float64 + +func init() { + constLogValue = math.Log(1 / math.Pow(2, math.Log(2))) +} + +// JoinMap is used for join +type BloomFilter struct { + bitmap *bitmap.Bitmap + hashSeed []uint64 +} + +func New(rowCount int64, probability float64) *BloomFilter { + bitSize, seedCount := computeMemAndHashCount(rowCount, probability) + hashSeed := make([]uint64, seedCount) + for i := 0; i < seedCount; i++ { + hashSeed[i] = rand.Uint64() + } + bits := &bitmap.Bitmap{} + bits.InitWithSize(bitSize) + + return &BloomFilter{ + bitmap: bits, + hashSeed: hashSeed, + } +} + +func computeMemAndHashCount(rowCount int64, probability float64) (int64, int) { + if rowCount < 10001 { + return 64 * 10000, 1 + } else if rowCount < 100001 { + return 64 * 100000, 1 + } else if rowCount < 1000001 { + return 16 * 1000000, 1 + } else if rowCount < 10000001 { + return 38 * 10000000, 2 + } else if rowCount < 100000001 { + // m := ceil((rowCount * log(0.000001)) / log(1/pow(2, log(2)))) + m := math.Ceil((float64(rowCount) * math.Log(probability)) / constLogValue) + return int64(m), 3 + } else if rowCount < 1000000001 { + // m := ceil((rowCount * log(0.000001)) / log(1/pow(2, log(2)))) + m := math.Ceil((float64(rowCount) * math.Log(probability)) / constLogValue) + return int64(m), 3 + } else if rowCount < 10000000001 { + // m := ceil((rowCount * log(0.000001)) / log(1/pow(2, log(2)))) + m := math.Ceil((float64(rowCount) * math.Log(probability)) / constLogValue) + return int64(m), 4 + } else { + panic("unsupport rowCount") + } +} diff --git a/pkg/common/bloomfilter/util.go b/pkg/common/bloomfilter/util.go new file mode 100644 index 0000000000000..f6d757c83632e --- /dev/null +++ b/pkg/common/bloomfilter/util.go @@ -0,0 +1,80 @@ +// Copyright 2021 - 2023 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bloomfilter + +import ( + "unsafe" + + "github.com/matrixorigin/matrixone/pkg/container/hashtable" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/container/vector" +) + +func fillStringGroupStr(keys [][]byte, vec *vector.Vector, n int, start int) { + area := vec.GetArea() + vs := vector.MustFixedCol[types.Varlena](vec) + if !vec.GetNulls().Any() { + for i := 0; i < n; i++ { + keys[i] = append(keys[i], byte(0)) + keys[i] = append(keys[i], vs[i+start].GetByteSlice(area)...) + } + } else { + nsp := vec.GetNulls() + for i := 0; i < n; i++ { + hasNull := nsp.Contains(uint64(i + start)) + if hasNull { + keys[i] = append(keys[i], byte(1)) + } else { + keys[i] = append(keys[i], byte(0)) + keys[i] = append(keys[i], vs[i+start].GetByteSlice(area)...) + } + } + } +} + +func fillGroupStr(keys [][]byte, vec *vector.Vector, n int, sz int, start int) { + data := unsafe.Slice(vector.GetPtrAt[byte](vec, 0), (n+start)*sz) + if !vec.GetNulls().Any() { + for i := 0; i < n; i++ { + keys[i] = append(keys[i], byte(0)) + keys[i] = append(keys[i], data[(i+start)*sz:(i+start+1)*sz]...) + } + } else { + nsp := vec.GetNulls() + for i := 0; i < n; i++ { + isNull := nsp.Contains(uint64(i + start)) + if isNull { + keys[i] = append(keys[i], byte(1)) + } else { + keys[i] = append(keys[i], byte(0)) + keys[i] = append(keys[i], data[(i+start)*sz:(i+start+1)*sz]...) + } + } + } +} + +func encodeHashKeys(keys [][]byte, vec *vector.Vector, start, count int) { + if vec.GetType().IsFixedLen() { + fillGroupStr(keys, vec, count, vec.GetType().TypeSize(), start) + } else { + fillStringGroupStr(keys, vec, count, start) + } + + for i := 0; i < count; i++ { + if l := len(keys[i]); l < 16 { + keys[i] = append(keys[i], hashtable.StrKeyPadding[l:]...) + } + } +} diff --git a/pkg/container/hashtable/hash.go b/pkg/container/hashtable/hash.go index 7fbdc7344040e..3dd4f92120646 100644 --- a/pkg/container/hashtable/hash.go +++ b/pkg/container/hashtable/hash.go @@ -23,12 +23,13 @@ import ( ) var ( - Int64BatchHash = wyhashInt64Batch - Int64HashWithFixedSeed = wyhash64WithFixedSeed - BytesBatchGenHashStates = wyhashBytesBatch - Int192BatchGenHashStates = wyhashInt192Batch - Int256BatchGenHashStates = wyhashInt256Batch - Int320BatchGenHashStates = wyhashInt320Batch + Int64BatchHash = wyhashInt64Batch + Int64HashWithFixedSeed = wyhash64WithFixedSeed + BytesBatchGenHashStates = wyhashBytesBatch + BytesBatchGenHashStatesWithSeed = wyhashBytesBatchWithSeed + Int192BatchGenHashStates = wyhashInt192Batch + Int256BatchGenHashStates = wyhashInt256Batch + Int320BatchGenHashStates = wyhashInt320Batch ) // Hashing algorithm inspired by @@ -139,6 +140,16 @@ func wyhashBytesBatch(data *[]byte, states *[3]uint64, length int) { } } +func wyhashBytesBatchWithSeed(data *[]byte, states *[3]uint64, length int, seed uint64) { + dataSlice := unsafe.Slice((*[]byte)(data), length) + hashSlice := unsafe.Slice((*[3]uint64)(states), length) + for i := 0; i < length; i++ { + hashSlice[i][0] = wyhash(unsafe.Pointer(&dataSlice[i][0]), seed, uint64(len(dataSlice[i]))) + hashSlice[i][1] = wyhash(unsafe.Pointer(&dataSlice[i][0]), seed<<32, uint64(len(dataSlice[i]))) + hashSlice[i][2] = wyhash(unsafe.Pointer(&dataSlice[i][0]), seed>>32, uint64(len(dataSlice[i]))) + } +} + func wyhashInt192Batch(data *[3]uint64, states *[3]uint64, length int) { dataSlice := unsafe.Slice((*[3]uint64)(data), length) hashSlice := unsafe.Slice((*[3]uint64)(states), length) diff --git a/pkg/container/nulls/nulls.go b/pkg/container/nulls/nulls.go index b6bcb24cb4f9f..5295cd9fd4216 100644 --- a/pkg/container/nulls/nulls.go +++ b/pkg/container/nulls/nulls.go @@ -45,7 +45,7 @@ func (nsp *Nulls) InitWith(n *Nulls) { } func (nsp *Nulls) InitWithSize(size int) { - nsp.np.InitWithSize(size) + nsp.np.InitWithSize(int64(size)) } func NewWithSize(size int) *Nulls { @@ -215,7 +215,7 @@ func Range(nsp *Nulls, start, end, bias uint64, m *Nulls) { return } - m.np.InitWithSize(int(end + 1 - bias)) + m.np.InitWithSize(int64(end + 1 - bias)) for ; start < end; start++ { if nsp.np.Contains(start) { m.np.Add(start - bias) @@ -233,8 +233,8 @@ func Filter(nsp *Nulls, sels []int64, negate bool) { oldLen := nsp.np.Len() var bm bitmap.Bitmap bm.InitWithSize(oldLen) - for oldIdx, newIdx, selIdx, sel := 0, 0, 0, sels[0]; oldIdx < oldLen; oldIdx++ { - if oldIdx != int(sel) { + for oldIdx, newIdx, selIdx, sel := int64(0), 0, 0, sels[0]; oldIdx < oldLen; oldIdx++ { + if oldIdx != sel { if nsp.np.Contains(uint64(oldIdx)) { bm.Add(uint64(newIdx)) } @@ -256,7 +256,7 @@ func Filter(nsp *Nulls, sels []int64, negate bool) { nsp.np.InitWith(&bm) } else { var bm bitmap.Bitmap - bm.InitWithSize(len(sels)) + bm.InitWithSize(int64(len(sels))) upperLimit := int64(nsp.np.Len()) for i, sel := range sels { if sel >= upperLimit { diff --git a/pkg/pb/ctl/ctl.pb.go b/pkg/pb/ctl/ctl.pb.go index f880266e35b60..be619f6874076 100644 --- a/pkg/pb/ctl/ctl.pb.go +++ b/pkg/pb/ctl/ctl.pb.go @@ -62,7 +62,7 @@ const ( CmdMethod_TraceSpan CmdMethod = 13 // for CN launches storage usage query to TN, // not used in mo_ctl for now - CmdMethod_CmdMethod_StorageUsage CmdMethod = 14 + CmdMethod_StorageUsage CmdMethod = 14 ) var CmdMethod_name = map[int32]string{ @@ -80,25 +80,25 @@ var CmdMethod_name = map[int32]string{ 11: "AddFaultPoint", 12: "Backup", 13: "TraceSpan", - 14: "CmdMethod_StorageUsage", + 14: "StorageUsage", } var CmdMethod_value = map[string]int32{ - "Ping": 0, - "Flush": 1, - "Task": 2, - "Checkpoint": 3, - "UseSnapshot": 4, - "GetSnapshot": 5, - "ForceGC": 6, - "Inspect": 7, - "Label": 8, - "SyncCommit": 9, - "GetCommit": 10, - "AddFaultPoint": 11, - "Backup": 12, - "TraceSpan": 13, - "CmdMethod_StorageUsage": 14, + "Ping": 0, + "Flush": 1, + "Task": 2, + "Checkpoint": 3, + "UseSnapshot": 4, + "GetSnapshot": 5, + "ForceGC": 6, + "Inspect": 7, + "Label": 8, + "SyncCommit": 9, + "GetCommit": 10, + "AddFaultPoint": 11, + "Backup": 12, + "TraceSpan": 13, + "StorageUsage": 14, } func (x CmdMethod) String() string { @@ -645,48 +645,47 @@ func init() { func init() { proto.RegisterFile("ctl.proto", fileDescriptor_0646114e50303026) } var fileDescriptor_0646114e50303026 = []byte{ - // 645 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x94, 0xc1, 0x4e, 0xdb, 0x4a, - 0x14, 0x86, 0x31, 0x38, 0x04, 0x9f, 0x10, 0x33, 0x8c, 0x10, 0x37, 0x42, 0x57, 0xb9, 0xc8, 0x8b, - 0x2b, 0x74, 0x05, 0xc9, 0x15, 0xdd, 0x55, 0x6d, 0x25, 0x92, 0x14, 0x14, 0x09, 0x10, 0xb2, 0x83, - 0xaa, 0xb2, 0xa9, 0x26, 0xf6, 0xd4, 0xb1, 0x62, 0x7b, 0xdc, 0x99, 0x71, 0x55, 0x5e, 0xa9, 0x4f, - 0xc2, 0xae, 0x3c, 0x41, 0xd5, 0xb2, 0xa9, 0xd4, 0xa7, 0xa8, 0x3c, 0x76, 0x62, 0x37, 0x59, 0xb4, - 0x95, 0xd8, 0xcd, 0xf9, 0xe7, 0x3f, 0x67, 0xfe, 0x6f, 0x46, 0x36, 0x18, 0xae, 0x0c, 0x3b, 0x09, - 0x67, 0x92, 0xe1, 0x35, 0x57, 0x86, 0x7b, 0x47, 0x7e, 0x20, 0x27, 0xe9, 0xb8, 0xe3, 0xb2, 0xa8, - 0xeb, 0x33, 0x9f, 0x75, 0xd5, 0xde, 0x38, 0x7d, 0xab, 0x2a, 0x55, 0xa8, 0x55, 0xde, 0xb3, 0xb7, - 0x25, 0x83, 0x88, 0x0a, 0x49, 0xa2, 0x24, 0x17, 0xac, 0x23, 0x68, 0x8e, 0x2e, 0xaf, 0x82, 0xd8, - 0xb7, 0xe9, 0xbb, 0x94, 0x0a, 0x89, 0xff, 0x06, 0x23, 0x21, 0x9c, 0x44, 0x54, 0x52, 0xde, 0xd2, - 0xf6, 0xb5, 0x03, 0xc3, 0x2e, 0x05, 0xeb, 0xa3, 0x06, 0xe6, 0xcc, 0x2f, 0x12, 0x16, 0x0b, 0x8a, - 0x5b, 0x50, 0x17, 0x92, 0x71, 0x3a, 0x1c, 0x14, 0xf6, 0x59, 0x89, 0xff, 0x05, 0x53, 0x50, 0xfe, - 0x3e, 0x70, 0xe9, 0x89, 0xe7, 0x71, 0x2a, 0x44, 0x6b, 0x55, 0x19, 0x16, 0x54, 0x35, 0x61, 0x42, - 0xb8, 0x37, 0x1c, 0xb4, 0xd6, 0xf6, 0xb5, 0x03, 0xdd, 0x9e, 0x95, 0x59, 0x18, 0x4e, 0x93, 0x30, - 0x70, 0xc9, 0x70, 0xd0, 0xd2, 0xd5, 0x5e, 0x29, 0xe0, 0x36, 0x40, 0xc8, 0x7c, 0xa7, 0x68, 0xad, - 0xa9, 0xed, 0x8a, 0x62, 0xfd, 0x0f, 0x68, 0x74, 0xe9, 0x48, 0x5e, 0x4d, 0xab, 0x26, 0xca, 0x94, - 0xc7, 0x8e, 0x9c, 0xe3, 0xcd, 0x05, 0xeb, 0x93, 0x06, 0xf5, 0xca, 0x45, 0x14, 0xcb, 0x82, 0x4c, - 0xb7, 0x4b, 0x01, 0x1f, 0x82, 0xd1, 0xbf, 0x18, 0x5c, 0x50, 0x39, 0x61, 0x9e, 0xc2, 0x32, 0x8f, - 0xcd, 0x4e, 0xf6, 0x36, 0xfd, 0xc8, 0xcb, 0x55, 0xbb, 0x34, 0xe0, 0x67, 0x00, 0xce, 0xad, 0x1b, - 0xf7, 0x59, 0x14, 0x05, 0x52, 0x41, 0x36, 0x8e, 0x77, 0x95, 0xdd, 0xb9, 0x8d, 0xdd, 0x5c, 0x2e, - 0x66, 0xf7, 0xf4, 0xbb, 0xcf, 0xff, 0xac, 0xd8, 0x15, 0x3f, 0x7e, 0x0a, 0xc6, 0x19, 0x95, 0x45, - 0xb3, 0xfe, 0x1b, 0xcd, 0xa5, 0xdd, 0xfa, 0xa6, 0xc1, 0x46, 0x15, 0xfe, 0xd1, 0x90, 0x76, 0xa0, - 0xf6, 0x92, 0x73, 0xc6, 0x15, 0xcd, 0xa6, 0x9d, 0x17, 0xf8, 0xf9, 0x4f, 0xa0, 0x79, 0xd6, 0xbf, - 0x96, 0xb2, 0xe6, 0x71, 0x7e, 0x45, 0x5a, 0xab, 0x90, 0xce, 0xd5, 0x85, 0xe6, 0x0a, 0xe9, 0x2b, - 0xd8, 0x5e, 0xba, 0x0f, 0xdc, 0x03, 0xf3, 0x9c, 0x48, 0x2a, 0x0a, 0xd3, 0xc8, 0x51, 0xd8, 0x8d, - 0xe3, 0x9d, 0x4e, 0xf9, 0x21, 0x8c, 0x66, 0xab, 0x62, 0xe6, 0x42, 0x87, 0x75, 0x03, 0x78, 0x39, - 0x3c, 0x1e, 0xc0, 0x56, 0x3f, 0xe5, 0x9c, 0xc6, 0x7f, 0x32, 0x7a, 0xb1, 0xc5, 0xc2, 0x80, 0x2a, - 0x68, 0x2a, 0xb3, 0xf5, 0x1a, 0xb6, 0x97, 0x70, 0x1f, 0xe7, 0xb8, 0xff, 0xbe, 0x6b, 0x60, 0xcc, - 0x5f, 0x13, 0x6f, 0x80, 0x9e, 0x7d, 0xc9, 0x68, 0x05, 0x1b, 0x50, 0x3b, 0x0d, 0x53, 0x31, 0x41, - 0x5a, 0x26, 0x8e, 0x88, 0x98, 0xa2, 0x55, 0x6c, 0x02, 0xf4, 0x27, 0xd4, 0x9d, 0x26, 0x2c, 0x88, - 0x25, 0x5a, 0xc3, 0x5b, 0xd0, 0xb8, 0x16, 0xd4, 0x89, 0x49, 0x22, 0x26, 0x4c, 0x22, 0x3d, 0x13, - 0xce, 0xa8, 0x9c, 0x0b, 0x35, 0xdc, 0x80, 0xfa, 0x29, 0xe3, 0x2e, 0x3d, 0xeb, 0xa3, 0xf5, 0xac, - 0x18, 0xc6, 0x22, 0xa1, 0xae, 0x44, 0xf5, 0xec, 0x80, 0x73, 0x32, 0xa6, 0x21, 0xda, 0xc8, 0xc6, - 0x96, 0xd7, 0x89, 0x0c, 0xdc, 0xac, 0xbc, 0x39, 0x02, 0xbc, 0x0d, 0xcd, 0x13, 0xcf, 0x3b, 0x25, - 0x69, 0x28, 0xaf, 0xd4, 0xc1, 0x0d, 0x0c, 0xb0, 0xde, 0x23, 0xee, 0x34, 0x4d, 0xd0, 0x66, 0xe6, - 0x1e, 0x71, 0xe2, 0x52, 0x27, 0x21, 0x31, 0x6a, 0xe2, 0x3d, 0xd8, 0x9d, 0xf3, 0xbc, 0x71, 0x24, - 0xe3, 0xc4, 0xa7, 0xd7, 0x82, 0xf8, 0x14, 0x99, 0xbd, 0x17, 0xf7, 0x5f, 0xdb, 0xda, 0xdd, 0x43, - 0x5b, 0xbb, 0x7f, 0x68, 0x6b, 0x5f, 0x1e, 0xda, 0xda, 0xcd, 0x61, 0xe5, 0x67, 0x19, 0x11, 0xc9, - 0x83, 0x0f, 0x8c, 0x07, 0x7e, 0x10, 0xcf, 0x8a, 0x98, 0x76, 0x93, 0xa9, 0xdf, 0x4d, 0xc6, 0x5d, - 0x57, 0x86, 0xe3, 0x75, 0xf5, 0x87, 0x7c, 0xf2, 0x23, 0x00, 0x00, 0xff, 0xff, 0xa2, 0x9e, 0xc7, - 0xab, 0x73, 0x05, 0x00, 0x00, + // 638 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x94, 0xd1, 0x4e, 0xdb, 0x3e, + 0x14, 0xc6, 0x09, 0xa4, 0x94, 0x9c, 0xd2, 0x60, 0x2c, 0xf4, 0xff, 0x57, 0x68, 0xea, 0x50, 0x2e, + 0x26, 0x34, 0x41, 0x3b, 0xb1, 0xbb, 0x69, 0x9b, 0x44, 0xdb, 0x81, 0x2a, 0x01, 0x42, 0x49, 0xd1, + 0x34, 0xee, 0xdc, 0xc4, 0x4b, 0xa3, 0x26, 0x71, 0x66, 0x3b, 0xd3, 0x78, 0xa5, 0x3d, 0x09, 0x77, + 0xe3, 0x09, 0x26, 0xc6, 0xcd, 0x5e, 0x63, 0x8a, 0x93, 0x36, 0x59, 0x7b, 0xb1, 0x4d, 0xe2, 0xce, + 0xe7, 0xf3, 0x77, 0x8e, 0xbf, 0x9f, 0xad, 0x04, 0x0c, 0x57, 0x86, 0x9d, 0x84, 0x33, 0xc9, 0xf0, + 0x9a, 0x2b, 0xc3, 0xdd, 0x43, 0x3f, 0x90, 0x93, 0x74, 0xdc, 0x71, 0x59, 0xd4, 0xf5, 0x99, 0xcf, + 0xba, 0x6a, 0x6f, 0x9c, 0x7e, 0x54, 0x95, 0x2a, 0xd4, 0x2a, 0xef, 0xd9, 0xdd, 0x92, 0x41, 0x44, + 0x85, 0x24, 0x51, 0x92, 0x0b, 0xd6, 0x21, 0x34, 0x47, 0x17, 0x97, 0x41, 0xec, 0xdb, 0xf4, 0x53, + 0x4a, 0x85, 0xc4, 0x4f, 0xc0, 0x48, 0x08, 0x27, 0x11, 0x95, 0x94, 0xb7, 0xb4, 0x3d, 0x6d, 0xdf, + 0xb0, 0x4b, 0xc1, 0xfa, 0xaa, 0x81, 0x39, 0xf3, 0x8b, 0x84, 0xc5, 0x82, 0xe2, 0x16, 0xd4, 0x85, + 0x64, 0x9c, 0x0e, 0x07, 0x85, 0x7d, 0x56, 0xe2, 0x67, 0x60, 0x0a, 0xca, 0x3f, 0x07, 0x2e, 0x3d, + 0xf6, 0x3c, 0x4e, 0x85, 0x68, 0xad, 0x2a, 0xc3, 0x82, 0xaa, 0x26, 0x4c, 0x08, 0xf7, 0x86, 0x83, + 0xd6, 0xda, 0x9e, 0xb6, 0xaf, 0xdb, 0xb3, 0x32, 0x0b, 0xc3, 0x69, 0x12, 0x06, 0x2e, 0x19, 0x0e, + 0x5a, 0xba, 0xda, 0x2b, 0x05, 0xdc, 0x06, 0x08, 0x99, 0xef, 0x14, 0xad, 0x35, 0xb5, 0x5d, 0x51, + 0xac, 0x17, 0x80, 0x46, 0x17, 0x8e, 0xe4, 0xd5, 0xb4, 0x6a, 0xa2, 0x4c, 0x79, 0xec, 0xc8, 0x39, + 0xde, 0x5c, 0xb0, 0xbe, 0x69, 0x50, 0xaf, 0x5c, 0x44, 0xb1, 0x2c, 0xc8, 0x74, 0xbb, 0x14, 0xf0, + 0x01, 0x18, 0xfd, 0xf3, 0xc1, 0x39, 0x95, 0x13, 0xe6, 0x29, 0x2c, 0xf3, 0xc8, 0xec, 0x64, 0x6f, + 0xd3, 0x8f, 0xbc, 0x5c, 0xb5, 0x4b, 0x03, 0x7e, 0x0d, 0xe0, 0xdc, 0xb8, 0x71, 0x9f, 0x45, 0x51, + 0x20, 0x15, 0x64, 0xe3, 0xe8, 0x3f, 0x65, 0x77, 0x6e, 0x62, 0x37, 0x97, 0x8b, 0xd9, 0x3d, 0xfd, + 0xf6, 0xfb, 0xd3, 0x15, 0xbb, 0xe2, 0xc7, 0xaf, 0xc0, 0x38, 0xa5, 0xb2, 0x68, 0xd6, 0xff, 0xa2, + 0xb9, 0xb4, 0x5b, 0x3f, 0x35, 0xd8, 0xa8, 0xc2, 0x3f, 0x1a, 0xd2, 0x0e, 0xd4, 0xde, 0x71, 0xce, + 0xb8, 0xa2, 0xd9, 0xb4, 0xf3, 0x02, 0xbf, 0xf9, 0x0d, 0x34, 0xcf, 0xfa, 0xff, 0x52, 0xd6, 0x3c, + 0xce, 0x9f, 0x48, 0x6b, 0x15, 0xd2, 0xb9, 0xba, 0xd0, 0x5c, 0x21, 0x7d, 0x0f, 0xdb, 0x4b, 0xf7, + 0x81, 0x7b, 0x60, 0x9e, 0x11, 0x49, 0x45, 0x61, 0x1a, 0x39, 0x0a, 0xbb, 0x71, 0xb4, 0xd3, 0x29, + 0x3f, 0x84, 0xd1, 0x6c, 0x55, 0xcc, 0x5c, 0xe8, 0xb0, 0xae, 0x01, 0x2f, 0x87, 0xc7, 0x03, 0xd8, + 0xea, 0xa7, 0x9c, 0xd3, 0xf8, 0x5f, 0x46, 0x2f, 0xb6, 0x58, 0x18, 0x50, 0x05, 0x4d, 0x65, 0xb6, + 0x3e, 0xc0, 0xf6, 0x12, 0xee, 0xe3, 0x1c, 0xf7, 0xfc, 0x5e, 0x03, 0x63, 0xfe, 0x9a, 0x78, 0x03, + 0xf4, 0xec, 0x4b, 0x46, 0x2b, 0xd8, 0x80, 0xda, 0x49, 0x98, 0x8a, 0x09, 0xd2, 0x32, 0x71, 0x44, + 0xc4, 0x14, 0xad, 0x62, 0x13, 0xa0, 0x3f, 0xa1, 0xee, 0x34, 0x61, 0x41, 0x2c, 0xd1, 0x1a, 0xde, + 0x82, 0xc6, 0x95, 0xa0, 0x4e, 0x4c, 0x12, 0x31, 0x61, 0x12, 0xe9, 0x99, 0x70, 0x4a, 0xe5, 0x5c, + 0xa8, 0xe1, 0x06, 0xd4, 0x4f, 0x18, 0x77, 0xe9, 0x69, 0x1f, 0xad, 0x67, 0xc5, 0x30, 0x16, 0x09, + 0x75, 0x25, 0xaa, 0x67, 0x07, 0x9c, 0x91, 0x31, 0x0d, 0xd1, 0x46, 0x36, 0xb6, 0xbc, 0x4e, 0x64, + 0xe0, 0x66, 0xe5, 0xcd, 0x11, 0xe0, 0x6d, 0x68, 0x1e, 0x7b, 0xde, 0x09, 0x49, 0x43, 0x79, 0xa9, + 0x0e, 0x6e, 0x60, 0x80, 0xf5, 0x1e, 0x71, 0xa7, 0x69, 0x82, 0x36, 0x33, 0xf7, 0x88, 0x13, 0x97, + 0x3a, 0x09, 0x89, 0x51, 0x13, 0x23, 0xd8, 0x74, 0x24, 0xe3, 0xc4, 0xa7, 0x57, 0x82, 0xf8, 0x14, + 0x99, 0xbd, 0xb7, 0x77, 0x3f, 0xda, 0xda, 0xed, 0x43, 0x5b, 0xbb, 0x7b, 0x68, 0x6b, 0xf7, 0x0f, + 0x6d, 0xed, 0xfa, 0xa0, 0xf2, 0x8b, 0x8c, 0x88, 0xe4, 0xc1, 0x17, 0xc6, 0x03, 0x3f, 0x88, 0x67, + 0x45, 0x4c, 0xbb, 0xc9, 0xd4, 0xef, 0x26, 0xe3, 0xae, 0x2b, 0xc3, 0xf1, 0xba, 0xfa, 0x2f, 0xbe, + 0xfc, 0x15, 0x00, 0x00, 0xff, 0xff, 0xe8, 0x76, 0x33, 0xd0, 0x69, 0x05, 0x00, 0x00, } func (m *TNPingRequest) Marshal() (dAtA []byte, err error) { diff --git a/pkg/sql/colexec/fuzzyfilter/filter.go b/pkg/sql/colexec/fuzzyfilter/filter.go index fc749d2034de0..8bd16ecf1ba1d 100644 --- a/pkg/sql/colexec/fuzzyfilter/filter.go +++ b/pkg/sql/colexec/fuzzyfilter/filter.go @@ -16,12 +16,10 @@ package fuzzyfilter import ( "bytes" // "fmt" - "math" - "github.com/bits-and-blooms/bloom" - "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/matrixorigin/matrixone/pkg/common/bloomfilter" "github.com/matrixorigin/matrixone/pkg/container/batch" - "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/vm" "github.com/matrixorigin/matrixone/pkg/vm/process" ) @@ -47,91 +45,82 @@ Note: on duplicate key update */ -const ( - // Probability of false positives - p float64 = 0.00001 - // Number of hash functions - k uint = 3 -) - -// EstimateBitsNeed return the Number of bits should have in the filter -// by the formula: p = pow(1 - exp(-k / (m / n)), k) -// -// ==> m = - kn / ln(1 - p^(1/k)), use k * (1.001 * n) instead of kn to overcome floating point errors -func EstimateBitsNeed(n float64, k uint, p float64) float64 { - return -float64(k) * math.Ceil(1.001*n) / math.Log(1-math.Pow(p, 1.0/float64(k))) -} - -func String(_ any, buf *bytes.Buffer) { +func (arg *Argument) String(buf *bytes.Buffer) { buf.WriteString(" fuzzy check duplicate constraint") } -func Prepare(proc *process.Process, arg any) (err error) { - ap := arg.(*Argument) - e := EstimateBitsNeed(ap.N, k, p) - m := uint(math.Ceil(e)) - if float64(m) < e { - return moerr.NewInternalErrorNoCtx("Overflow occurred when estimating size of fuzzy filter") +func (arg *Argument) Prepare(proc *process.Process) (err error) { + rowCount := int64(arg.N * 1.1) + if rowCount < 10000 { + rowCount = 10000 + } + probability := 0.00001 + if rowCount < 10000001 { + probability = 0.00001 + } else if rowCount < 100000001 { + probability = 0.000001 + } else if rowCount < 1000000001 { + probability = 0.0000005 + } else { + probability = 0.0000001 } - ap.filter = bloom.New(m, k) + arg.filter = bloomfilter.New(rowCount, probability) + return nil } -func Call(idx int, proc *process.Process, arg any, isFirst bool, isLast bool) (process.ExecStatus, error) { - anal := proc.GetAnalyze(idx) +func (arg *Argument) Call(proc *process.Process) (vm.CallResult, error) { + anal := proc.GetAnalyze(arg.info.Idx) anal.Start() defer anal.Stop() - ap := arg.(*Argument) - bat := proc.InputBatch() - anal.Input(bat, isFirst) + result, err := arg.children[0].Call(proc) + if err != nil { + result.Status = vm.ExecStop + return result, err + } + bat := result.Batch if bat == nil { // this will happen in such case:create unique index from a table that unique col have no data - if ap.rbat == nil { - proc.SetInputBatch(nil) - return process.ExecStop, nil + if arg.rbat == nil || arg.collisionCnt == 0 { + result.Status = vm.ExecStop + return result, nil } // fmt.Printf("Estimated row count is %f, collisionCnt is %d, fp is %f\n", ap.N, ap.collisionCnt, float64(ap.collisionCnt)/float64(ap.N)) - ap.rbat.SetRowCount(ap.collisionCnt) - if ap.collisionCnt == 0 { - // case 1: pass duplicate constraint - proc.SetInputBatch(nil) - return process.ExecStop, nil - } else { - // case 2: send collisionKeys to output operator to run background SQL - proc.SetInputBatch(ap.rbat) - return process.ExecStop, nil - } + // send collisionKeys to output operator to run background SQL + arg.rbat.SetRowCount(arg.rbat.Vecs[0].Length()) + result.Batch = arg.rbat + arg.collisionCnt = 0 + result.Status = vm.ExecStop + return result, nil } + anal.Input(bat, arg.info.IsFirst) + rowCnt := bat.RowCount() if rowCnt == 0 { - proc.PutBatch(bat) - proc.SetInputBatch(batch.EmptyBatch) - return process.ExecNext, nil + return result, nil } - if ap.rbat == nil { - if err := generateRbat(proc, ap, bat); err != nil { - return process.ExecStop, err + if arg.rbat == nil { + if err := generateRbat(proc, arg, bat); err != nil { + result.Status = vm.ExecStop + return result, err } } pkCol := bat.GetVector(0) - for i := 0; i < rowCnt; i++ { - var bytes = pkCol.GetRawBytesAt(i) - if ap.filter.Test(bytes) { - appendCollisionKey(proc, ap, i, bat) - ap.collisionCnt++ - } else { - ap.filter.Add(bytes) + arg.filter.TestAndAddForVector(pkCol, func(exist bool, i int) { + if exist { + appendCollisionKey(proc, arg, i, bat) + arg.collisionCnt++ } - } + }) - proc.SetInputBatch(batch.EmptyBatch) - return process.ExecNext, nil + result.Batch = batch.EmptyBatch + return result, nil } // appendCollisionKey will append collision key into rbat @@ -143,7 +132,7 @@ func appendCollisionKey(proc *process.Process, arg *Argument, idx int, bat *batc // rbat will contain the keys that have hash collisions func generateRbat(proc *process.Process, arg *Argument, bat *batch.Batch) error { rbat := batch.NewWithSize(1) - rbat.SetVector(0, vector.NewVec(*bat.GetVector(0).GetType())) + rbat.SetVector(0, proc.GetVector(*bat.GetVector(0).GetType())) arg.rbat = rbat return nil } diff --git a/pkg/sql/colexec/fuzzyfilter/filter_test.go b/pkg/sql/colexec/fuzzyfilter/filter_test.go index 3605e73969735..078d7d6b68da2 100644 --- a/pkg/sql/colexec/fuzzyfilter/filter_test.go +++ b/pkg/sql/colexec/fuzzyfilter/filter_test.go @@ -20,7 +20,9 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/value_scan" "github.com/matrixorigin/matrixone/pkg/testutil" + "github.com/matrixorigin/matrixone/pkg/vm" "github.com/matrixorigin/matrixone/pkg/vm/process" "github.com/stretchr/testify/require" ) @@ -40,7 +42,7 @@ var ( ) func init() { - rowCnts = []float64{100000, 500000, 1000000, 5000000, 10000000} + rowCnts = []float64{100000, 1000000, 10000000} // https://hur.st/bloomfilter/?n=100000&p=0.00001&m=&k=3 referM = []float64{ @@ -93,36 +95,41 @@ func init() { func TestString(t *testing.T) { for _, tc := range tcs { buf := new(bytes.Buffer) - String(tc.arg, buf) + tc.arg.String(buf) require.Equal(t, " fuzzy check duplicate constraint", buf.String()) } } func TestPrepare(t *testing.T) { for _, tc := range tcs { - err := Prepare(tc.proc, tc.arg) + err := tc.arg.Prepare(tc.proc) require.NoError(t, err) } } -func TestEstimate(t *testing.T) { - for i, r := range rowCnts { - m := EstimateBitsNeed(r, k, p) - require.LessOrEqual(t, referM[i], m, "The estimated number of bits required is too small") - } -} - func TestFuzzyFilter(t *testing.T) { for _, tc := range tcs { for _, r := range rowCnts { tc.arg.N = r - err := Prepare(tc.proc, tc.arg) - require.NoError(t, err) - tc.proc.Reg.InputBatch = newBatch(t, tc.types, tc.proc, int64(r)) - _, err = Call(0, tc.proc, tc.arg, false, false) + tc.arg.info = &vm.OperatorInfo{ + Idx: 0, + IsFirst: false, + IsLast: false, + } + err := tc.arg.Prepare(tc.proc) require.NoError(t, err) - t.Logf("Estimated row count is %f, collisionCnt is %d, fp is %f", tc.arg.N, tc.arg.collisionCnt, float64(tc.arg.collisionCnt)/float64(tc.arg.N)) - require.LessOrEqual(t, float64(tc.arg.collisionCnt)/float64(tc.arg.N), 1.1*p, "collision cnt is too high, sth must went wrong") + bat := newBatch(t, tc.types, tc.proc, int64(r)) + resetChildren(tc.arg, []*batch.Batch{bat}) + for { + result, err := tc.arg.Call(tc.proc) + require.NoError(t, err) + if result.Status == vm.ExecStop { + tc.arg.Free(tc.proc, false, err) + tc.arg.children[0].Free(tc.proc, false, err) + require.Equal(t, int64(0), tc.proc.Mp().CurrNB()) + break + } + } } } } @@ -136,3 +143,17 @@ func newBatch(t *testing.T, ts []types.Type, proc *process.Process, rows int64) bat.SetAttributes(pkAttr) return bat } + +func resetChildren(arg *Argument, bats []*batch.Batch) { + if len(arg.children) == 0 { + arg.AppendChild(&value_scan.Argument{ + Batchs: bats, + }) + + } else { + arg.children = arg.children[:0] + arg.AppendChild(&value_scan.Argument{ + Batchs: bats, + }) + } +} diff --git a/pkg/sql/colexec/fuzzyfilter/types.go b/pkg/sql/colexec/fuzzyfilter/types.go index 0aef5bce5effe..09dc2bbf8aaf5 100644 --- a/pkg/sql/colexec/fuzzyfilter/types.go +++ b/pkg/sql/colexec/fuzzyfilter/types.go @@ -14,24 +14,40 @@ package fuzzyfilter import ( - "github.com/bits-and-blooms/bloom" + "github.com/matrixorigin/matrixone/pkg/common/bloomfilter" "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/vm" "github.com/matrixorigin/matrixone/pkg/vm/process" ) +var _ vm.Operator = new(Argument) + type Argument struct { // Number of items in the filter N float64 collisionCnt int - filter *bloom.BloomFilter + filter *bloomfilter.BloomFilter rbat *batch.Batch + + info *vm.OperatorInfo + children []vm.Operator +} + +func (arg *Argument) SetInfo(info *vm.OperatorInfo) { + arg.info = info } -func (arg *Argument) Free(proc *process.Process, pipelineFailed bool) { - arg.filter.ClearAll() - arg.filter = nil +func (arg *Argument) AppendChild(child vm.Operator) { + arg.children = append(arg.children, child) +} + +func (arg *Argument) Free(proc *process.Process, pipelineFailed bool, err error) { + if arg.filter != nil { + arg.filter.Clean() + } if arg.rbat != nil { arg.rbat.Clean(proc.GetMPool()) + arg.rbat = nil } } diff --git a/pkg/sql/colexec/right/join.go b/pkg/sql/colexec/right/join.go index a821d4c67b06f..e7eb932915738 100644 --- a/pkg/sql/colexec/right/join.go +++ b/pkg/sql/colexec/right/join.go @@ -136,7 +136,7 @@ func (ctr *container) build(ap *Argument, proc *process.Process, analyze process ctr.bat = bat ctr.mp = bat.DupJmAuxData() ctr.matched = &bitmap.Bitmap{} - ctr.matched.InitWithSize(bat.RowCount()) + ctr.matched.InitWithSize(int64(bat.RowCount())) analyze.Alloc(ctr.mp.Size()) } return nil diff --git a/pkg/sql/colexec/rightanti/join.go b/pkg/sql/colexec/rightanti/join.go index 5b158c6ad19ea..4e4164a05613e 100644 --- a/pkg/sql/colexec/rightanti/join.go +++ b/pkg/sql/colexec/rightanti/join.go @@ -136,7 +136,7 @@ func (ctr *container) build(ap *Argument, proc *process.Process, analyze process ctr.bat = bat ctr.mp = bat.DupJmAuxData() ctr.matched = &bitmap.Bitmap{} - ctr.matched.InitWithSize(bat.RowCount()) + ctr.matched.InitWithSize(int64(bat.RowCount())) analyze.Alloc(ctr.mp.Size()) } return nil diff --git a/pkg/sql/colexec/rightsemi/join.go b/pkg/sql/colexec/rightsemi/join.go index 90f2feb3faf28..d052593e97f00 100644 --- a/pkg/sql/colexec/rightsemi/join.go +++ b/pkg/sql/colexec/rightsemi/join.go @@ -138,7 +138,7 @@ func (ctr *container) build(ap *Argument, proc *process.Process, analyze process ctr.bat = bat ctr.mp = bat.DupJmAuxData() ctr.matched = &bitmap.Bitmap{} - ctr.matched.InitWithSize(bat.RowCount()) + ctr.matched.InitWithSize(int64(bat.RowCount())) analyze.Alloc(ctr.mp.Size()) } return nil diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index f1274c0941dac..89e400ae9d08f 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -374,17 +374,21 @@ func (c *Compile) run(s *Scope) error { } // Run is an important function of the compute-layer, it executes a single sql according to its scope -func (c *Compile) Run(_ uint64) (*util2.RunResult, error) { +func (c *Compile) Run(_ uint64) (result *util2.RunResult, err error) { start := time.Now() v2.TxnStatementExecuteLatencyDurationHistogram.Observe(start.Sub(c.startAt).Seconds()) defer func() { v2.TxnStatementExecuteDurationHistogram.Observe(time.Since(start).Seconds()) }() + if strings.Contains(c.sql, "generate_series") { + fmt.Printf("%s\n", DebugShowScopes(c.scope)) + } + var span trace.Span var cc *Compile // compile structure for rerun. - var result = &util2.RunResult{} - var err error + result = &util2.RunResult{} + // var err error sp := c.proc.GetStmtProfile() c.ctx, span = trace.Start(c.ctx, "Compile.Run", trace.WithKind(trace.SpanKindStatement)) @@ -418,7 +422,7 @@ func (c *Compile) Run(_ uint64) (*util2.RunResult, error) { c.fatalLog(0, err) if !c.ifNeedRerun(err) { - return nil, err + return } v2.TxnStatementRetryCounter.Inc() @@ -427,12 +431,14 @@ func (c *Compile) Run(_ uint64) (*util2.RunResult, error) { // clear the workspace of the failed statement if e := c.proc.TxnOperator.GetWorkspace().RollbackLastStatement(c.ctx); e != nil { - return nil, e + err = e + return } // increase the statement id if e := c.proc.TxnOperator.GetWorkspace().IncrStatementID(c.ctx, false); e != nil { - return nil, e + err = e + return } // FIXME: the current retry method is quite bad, the overhead is relatively large, and needs to be @@ -447,29 +453,30 @@ func (c *Compile) Run(_ uint64) (*util2.RunResult, error) { } if err = cc.Compile(c.proc.Ctx, c.pn, c.u, c.fill); err != nil { c.fatalLog(1, err) - return nil, err + return } if err = cc.runOnce(); err != nil { c.fatalLog(1, err) - return nil, err + return } err = c.proc.TxnOperator.GetWorkspace().Adjust() if err != nil { c.fatalLog(1, err) - return nil, err + return } // set affectedRows to old compile to return c.setAffectedRows(cc.getAffectedRows()) } if c.shouldReturnCtxErr() { - return nil, c.proc.Ctx.Err() + err = c.proc.Ctx.Err() + return } result.AffectRows = c.getAffectedRows() if c.proc.TxnOperator != nil { - return result, c.proc.TxnOperator.GetWorkspace().Adjust() + err = c.proc.TxnOperator.GetWorkspace().Adjust() } - return result, err + return } // if the error is ErrTxnNeedRetry and the transaction is RC isolation, we need to retry the statement diff --git a/pkg/sql/compile/debugTools.go b/pkg/sql/compile/debugTools.go index 9d3e7f89a4c68..ed127e3e76a63 100644 --- a/pkg/sql/compile/debugTools.go +++ b/pkg/sql/compile/debugTools.go @@ -72,6 +72,7 @@ var debugInstructionNames = map[vm.OpType]string{ vm.MergeDelete: "merge delete", vm.LockOp: "lockop", vm.MergeBlock: "merge block", + vm.FuzzyFilter: "fuzzy filter", } var debugMagicNames = map[magicType]string{ diff --git a/pkg/sql/compile/fuzzyCheck.go b/pkg/sql/compile/fuzzyCheck.go index deafd00f7cff0..d9f069ed5a617 100644 --- a/pkg/sql/compile/fuzzyCheck.go +++ b/pkg/sql/compile/fuzzyCheck.go @@ -120,6 +120,7 @@ func (f *fuzzyCheck) fill(ctx context.Context, bat *batch.Batch) error { for i = 0; i < lastRow; i++ { one.Reset() + one.WriteByte('(') // one compound primary key has multiple conditions, use "and" to join them for j = 0; j < len(cAttrs)-1; j++ { one.WriteString(fmt.Sprintf("%s = %s and ", cAttrs[j], pkeys[j][i])) @@ -127,6 +128,7 @@ func (f *fuzzyCheck) fill(ctx context.Context, bat *batch.Batch) error { // the last condition does not need to be followed by "and" one.WriteString(fmt.Sprintf("%s = %s", cAttrs[j], pkeys[j][i])) + one.WriteByte(')') one.WriteTo(&all) // use or join each compound primary keys @@ -223,7 +225,7 @@ func (f *fuzzyCheck) backgroundSQLCheck(c *Compile) error { cAttrs[k] = c.Name } attrs := strings.Join(cAttrs, ", ") - duplicateCheckSql = fmt.Sprintf(fuzzyCompoundCheck, attrs, f.db, f.tbl, f.condition, cAttrs[0], attrs) + duplicateCheckSql = fmt.Sprintf(fuzzyCompoundCheck, attrs, f.db, f.tbl, f.condition, attrs) } res, err := c.runSqlWithResult(duplicateCheckSql) diff --git a/pkg/sql/compile/operator.go b/pkg/sql/compile/operator.go index 760260ad78fde..e02e1de9863d4 100644 --- a/pkg/sql/compile/operator.go +++ b/pkg/sql/compile/operator.go @@ -481,6 +481,8 @@ func dupInstruction(sourceIns *vm.Instruction, regMap map[*process.WaitRegister] res.Arg = arg case vm.FuzzyFilter: t := sourceIns.Arg.(*fuzzyfilter.Argument) + arg := new(fuzzyfilter.Argument) + *arg = *t res.Arg = t default: panic(fmt.Sprintf("unexpected instruction type '%d' to dup", sourceIns.Op)) diff --git a/pkg/sql/compile/util.go b/pkg/sql/compile/util.go index f363cf58d17da..fcd2e91bf6490 100644 --- a/pkg/sql/compile/util.go +++ b/pkg/sql/compile/util.go @@ -59,8 +59,8 @@ const ( var ( selectOriginTableConstraintFormat = "select serial(%s) from %s.%s group by serial(%s) having count(*) > 1 and serial(%s) is not null;" // see the comment in fuzzyCheck func genCondition for the reason why has to be two SQLs - fuzzyNonCompoundCheck = "select %s from %s.%s where %s in (%s) group by %s having count(*) > 1;" - fuzzyCompoundCheck = "select serial(%s) from %s.%s where %s group by %s having count(*) > 1 and serial(%s) is not null;" + fuzzyNonCompoundCheck = "select %s from %s.%s where %s in (%s) group by %s having count(*) > 1 limit 1;" + fuzzyCompoundCheck = "select serial(%s) from %s.%s where %s group by serial(%s) having count(*) > 1 limit 1;" ) var ( diff --git a/pkg/sql/plan/build_delete.go b/pkg/sql/plan/build_delete.go index 931a8c0756b88..36296cbd7f848 100644 --- a/pkg/sql/plan/build_delete.go +++ b/pkg/sql/plan/build_delete.go @@ -78,7 +78,7 @@ func buildDelete(stmt *tree.Delete, ctx CompilerContext, isPrepareStmt bool) (*P lastNodeId = appendSinkNode(builder, deleteBindCtx, lastNodeId) nextSourceStep := builder.appendStep(lastNodeId) delPlanCtx.sourceStep = nextSourceStep - err = buildDeletePlans(ctx, builder, deleteBindCtx, delPlanCtx, false) + err = buildDeletePlans(ctx, builder, deleteBindCtx, delPlanCtx) if err != nil { return nil, err } diff --git a/pkg/sql/plan/build_dml_util.go b/pkg/sql/plan/build_dml_util.go index 98e848a3fe099..f7ce6ef91a9de 100644 --- a/pkg/sql/plan/build_dml_util.go +++ b/pkg/sql/plan/build_dml_util.go @@ -114,7 +114,7 @@ func buildInsertPlans( } // buildUpdatePlans build update plan. -func buildUpdatePlans(ctx CompilerContext, builder *QueryBuilder, bindCtx *BindContext, updatePlanCtx *dmlPlanCtx, hasOnDup bool) error { +func buildUpdatePlans(ctx CompilerContext, builder *QueryBuilder, bindCtx *BindContext, updatePlanCtx *dmlPlanCtx) error { var err error // sink_scan -> project -> [agg] -> [filter] -> sink lastNodeId := appendSinkScanNode(builder, bindCtx, updatePlanCtx.sourceStep) @@ -127,7 +127,7 @@ func buildUpdatePlans(ctx CompilerContext, builder *QueryBuilder, bindCtx *BindC updatePlanCtx.sourceStep = nextSourceStep // build delete plans - err = buildDeletePlans(ctx, builder, bindCtx, updatePlanCtx, hasOnDup) + err = buildDeletePlans(ctx, builder, bindCtx, updatePlanCtx) if err != nil { return err } @@ -217,7 +217,7 @@ func getStepByNodeId(builder *QueryBuilder, nodeId int32) int { [o1]sink_scan -> join[f1 inner join c4 on f1.id = c4.fid, get c3.*] -> sink ...(like delete) // delete stmt: if have refChild table with cascade [o1]sink_scan -> join[f1 inner join c4 on f1.id = c4.fid, get c3.*, update cols] -> sink ...(like update) // update stmt: if have refChild table with cascade */ -func buildDeletePlans(ctx CompilerContext, builder *QueryBuilder, bindCtx *BindContext, delCtx *dmlPlanCtx, hasOnDup bool) error { +func buildDeletePlans(ctx CompilerContext, builder *QueryBuilder, bindCtx *BindContext, delCtx *dmlPlanCtx) error { if sinkOrUnionNodeId, ok := builder.deleteNode[delCtx.tableDef.TblId]; ok { sinkOrUnionNode := builder.qry.Nodes[sinkOrUnionNodeId] if sinkOrUnionNode.NodeType == plan.Node_SINK { @@ -636,7 +636,7 @@ func buildDeletePlans(ctx CompilerContext, builder *QueryBuilder, bindCtx *BindC upPlanCtx.isFkRecursionCall = true upPlanCtx.updatePkCol = updatePk - err = buildUpdatePlans(ctx, builder, bindCtx, upPlanCtx, false) + err = buildUpdatePlans(ctx, builder, bindCtx, upPlanCtx) putDmlPlanCtx(upPlanCtx) if err != nil { return err @@ -679,7 +679,7 @@ func buildDeletePlans(ctx CompilerContext, builder *QueryBuilder, bindCtx *BindC upPlanCtx.isFkRecursionCall = true upPlanCtx.updatePkCol = updatePk - err = buildUpdatePlans(ctx, builder, bindCtx, upPlanCtx, false) + err = buildUpdatePlans(ctx, builder, bindCtx, upPlanCtx) putDmlPlanCtx(upPlanCtx) if err != nil { return err @@ -709,7 +709,7 @@ func buildDeletePlans(ctx CompilerContext, builder *QueryBuilder, bindCtx *BindC upPlanCtx.beginIdx = 0 upPlanCtx.allDelTableIDs = allDelTableIDs - err := buildDeletePlans(ctx, builder, bindCtx, upPlanCtx, hasOnDup) + err := buildDeletePlans(ctx, builder, bindCtx, upPlanCtx) putDmlPlanCtx(upPlanCtx) if err != nil { return err @@ -1001,115 +1001,78 @@ func makeInsertPlan( } lastNodeId = builder.appendNode(filterNode, bindCtx) builder.appendStep(lastNodeId) - } else { - // need to check insert data and existing data both - pkList := []*Expr{ - &plan.Expr{ - Typ: pkTyp, - Expr: &plan.Expr_Col{ - Col: &plan.ColRef{ - ColPos: int32(pkPos), - Name: tableDef.Pkey.PkeyColName, - }, - }, - }, - } - + } else if !isUpdate { // sink_scan sinkScanNode := &Node{ - NodeType: plan.Node_SINK_SCAN, - Stats: &plan.Stats{}, - SourceStep: []int32{sourceStep}, - ObjRef: objRef, - TableDef: tableDef, - ProjectList: pkList, - } - lastNodeId = builder.appendNode(sinkScanNode, bindCtx) - sinkScanId := lastNodeId - - // table_scan - var tableScanId int32 - if len(pkFilterExprs) > 0 { - scanTableDef := DeepCopyTableDef(tableDef, true) - // scanTableDef.Cols = []*ColDef{scanTableDef.Cols[pkPos]} - pkNameMap := make(map[string]int) - for i, n := range tableDef.Pkey.Names { - pkNameMap[n] = i - } - newCols := make([]*ColDef, len(scanTableDef.Pkey.Names)) - for _, def := range scanTableDef.Cols { - if i, ok := pkNameMap[def.Name]; ok { - newCols[i] = def - } - } - if len(newCols) > 1 { - for _, col := range scanTableDef.Cols { - if col.Name == scanTableDef.Pkey.PkeyColName { - newCols = append(newCols, col) - break - } - } - } - scanTableDef.Cols = newCols - scanNode := &plan.Node{ - NodeType: plan.Node_TABLE_SCAN, - Stats: &plan.Stats{}, - ObjRef: objRef, - TableDef: scanTableDef, - ProjectList: []*Expr{{ + NodeType: plan.Node_SINK_SCAN, + Stats: &plan.Stats{}, + SourceStep: []int32{sourceStep}, + ProjectList: []*Expr{ + &plan.Expr{ Typ: pkTyp, Expr: &plan.Expr_Col{ - Col: &ColRef{ - ColPos: int32(len(scanTableDef.Cols) - 1), + Col: &plan.ColRef{ + ColPos: int32(pkPos), Name: tableDef.Pkey.PkeyColName, }, }, - }}, + }, + }, + } + lastNodeId = builder.appendNode(sinkScanNode, bindCtx) + + pkNameMap := make(map[string]int) + for i, n := range tableDef.Pkey.Names { + pkNameMap[n] = i + } + pkSize := len(tableDef.Pkey.Names) + if pkSize > 1 { + pkSize++ + } + scanTableDef := DeepCopyTableDef(tableDef, false) + scanTableDef.Cols = make([]*ColDef, pkSize) + for _, col := range tableDef.Cols { + if i, ok := pkNameMap[col.Name]; ok { + scanTableDef.Cols[i] = DeepCopyColDef(col) + } else if col.Name == scanTableDef.Pkey.PkeyColName { + scanTableDef.Cols[pkSize-1] = DeepCopyColDef(col) + break } + } + scanNode := &plan.Node{ + NodeType: plan.Node_TABLE_SCAN, + Stats: &plan.Stats{}, + ObjRef: objRef, + TableDef: scanTableDef, + ProjectList: []*Expr{{ + Typ: pkTyp, + Expr: &plan.Expr_Col{ + Col: &ColRef{ + ColPos: int32(len(scanTableDef.Cols) - 1), + Name: tableDef.Pkey.PkeyColName, + }, + }, + }}, + } + var blockFilterList []*Expr + if len(pkFilterExprs) > 0 { scanNode.FilterList = pkFilterExprs - blockFilterList := make([]*Expr, len(pkFilterExprs)) + blockFilterList = make([]*Expr, len(pkFilterExprs)) for i, e := range pkFilterExprs { blockFilterList[i] = DeepCopyExpr(e) } + } + tableScanId := builder.appendNode(scanNode, bindCtx) + if len(blockFilterList) > 0 { scanNode.BlockFilterList = blockFilterList - lastNodeId = builder.appendNode(scanNode, bindCtx) - tableScanId = lastNodeId - } else { - rfTag := builder.genNewTag() - scanNode := &plan.Node{ - NodeType: plan.Node_TABLE_SCAN, - Stats: &plan.Stats{}, - ObjRef: objRef, - TableDef: tableDef, - ProjectList: pkList, - RuntimeFilterProbeList: []*plan.RuntimeFilterSpec{ - { - Tag: rfTag, - Expr: &plan.Expr{ - Typ: DeepCopyType(pkTyp), - Expr: &plan.Expr_Col{ - Col: &plan.ColRef{ - RelPos: 0, - ColPos: 0, - Name: tableDef.Pkey.PkeyColName, - }, - }, - }, - }, - }, - } - lastNodeId = builder.appendNode(scanNode, bindCtx) - tableScanId = lastNodeId } // table_scan -> union_all // sink_scan ---^ unionAllNode := &plan.Node{ NodeType: plan.Node_UNION_ALL, - TableDef: tableDef, - ObjRef: objRef, - Children: []int32{sinkScanId, tableScanId}, + Children: []int32{lastNodeId, tableScanId}, } lastNodeId = builder.appendNode(unionAllNode, bindCtx) @@ -1123,8 +1086,6 @@ func makeInsertPlan( lastNodeId = builder.appendNode(fuzzyFilterNode, bindCtx) builder.appendStep(lastNodeId) - - ReCalcNodeStats(lastNodeId, builder, true, true) } } diff --git a/pkg/sql/plan/build_insert.go b/pkg/sql/plan/build_insert.go index 023730da1b339..53d816fab8723 100644 --- a/pkg/sql/plan/build_insert.go +++ b/pkg/sql/plan/build_insert.go @@ -236,8 +236,7 @@ func buildInsert(stmt *tree.Insert, ctx CompilerContext, isReplace bool, isPrepa upPlanCtx.updateColPosMap = updateColPosMap upPlanCtx.checkInsertPkDup = checkInsertPkDup - hasOnDup := true - err = buildUpdatePlans(ctx, builder, updateBindCtx, upPlanCtx, hasOnDup) + err = buildUpdatePlans(ctx, builder, updateBindCtx, upPlanCtx) if err != nil { return nil, err } diff --git a/pkg/sql/plan/build_update.go b/pkg/sql/plan/build_update.go index 88eb31b378851..53bc555a298b2 100644 --- a/pkg/sql/plan/build_update.go +++ b/pkg/sql/plan/build_update.go @@ -65,7 +65,7 @@ func buildTableUpdate(stmt *tree.Update, ctx CompilerContext, isPrepareStmt bool updateBindCtx := NewBindContext(builder, nil) beginIdx = beginIdx + upPlanCtx.updateColLength + len(tableDef.Cols) - err = buildUpdatePlans(ctx, builder, updateBindCtx, upPlanCtx, false) + err = buildUpdatePlans(ctx, builder, updateBindCtx, upPlanCtx) if err != nil { return nil, err } diff --git a/pkg/sql/plan/stats.go b/pkg/sql/plan/stats.go index 9221cad7d9af9..f593069ed8e6f 100644 --- a/pkg/sql/plan/stats.go +++ b/pkg/sql/plan/stats.go @@ -18,11 +18,12 @@ import ( "bytes" "context" "fmt" - "github.com/matrixorigin/matrixone/pkg/sql/util" "math" "sort" "strings" + "github.com/matrixorigin/matrixone/pkg/sql/util" + "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" @@ -651,10 +652,12 @@ func ReCalcNodeStats(nodeID int32, builder *QueryBuilder, recursive bool, leafNo } */ case plan.Node_SINK_SCAN: - node.Stats = builder.qry.Nodes[node.GetSourceStep()[0]].Stats + sourceNode := builder.qry.Steps[node.GetSourceStep()[0]] + node.Stats = builder.qry.Nodes[sourceNode].Stats case plan.Node_RECURSIVE_SCAN: - node.Stats = builder.qry.Nodes[node.GetSourceStep()[0]].Stats + sourceNode := builder.qry.Steps[node.GetSourceStep()[0]] + node.Stats = builder.qry.Nodes[sourceNode].Stats case plan.Node_EXTERNAL_SCAN: //calc for external scan is heavy, avoid recalc of this @@ -679,6 +682,15 @@ func ReCalcNodeStats(nodeID int32, builder *QueryBuilder, recursive bool, leafNo node.Stats.Cost = childStats.Cost node.Stats.Selectivity = 0.05 + case plan.Node_FUNCTION_SCAN: + if !computeFunctionScan(node.TableDef.TblFunc.Name, node.TblFuncExprList, node.Stats) { + if len(node.Children) > 0 && childStats != nil { + node.Stats.Outcnt = childStats.Outcnt + node.Stats.Cost = childStats.Outcnt + node.Stats.Selectivity = childStats.Selectivity + } + } + default: if len(node.Children) > 0 && childStats != nil { node.Stats.Outcnt = childStats.Outcnt @@ -697,6 +709,91 @@ func ReCalcNodeStats(nodeID int32, builder *QueryBuilder, recursive bool, leafNo } } +func computeFunctionScan(name string, exprs []*Expr, nodeStat *Stats) bool { + if name != "generate_series" { + return false + } + var cost float64 + var canGetCost bool + if len(exprs) == 2 { + if exprs[0].Typ.Id != exprs[1].Typ.Id { + return false + } + cost, canGetCost = getCost(exprs[0], exprs[1], nil) + } else if len(exprs) == 3 { + if !(exprs[0].Typ.Id == exprs[1].Typ.Id && exprs[1].Typ.Id == exprs[2].Typ.Id) { + return false + } + cost, canGetCost = getCost(exprs[0], exprs[1], exprs[2]) + } else { + return false + } + if !canGetCost { + return false + } + nodeStat.Outcnt = cost + nodeStat.TableCnt = cost + nodeStat.Cost = cost + nodeStat.Selectivity = 1 + return true +} + +func getCost(start *Expr, end *Expr, step *Expr) (float64, bool) { + var startNum, endNum, stepNum float64 + var flag1, flag2, flag3 bool + getInt32Val := func(e *Expr) (float64, bool) { + if s, ok := e.Expr.(*plan.Expr_C); ok { + if v, ok := s.C.Value.(*plan.Const_I32Val); ok && !s.C.Isnull { + return float64(v.I32Val), true + } + } + return 0, false + } + getInt64Val := func(e *Expr) (float64, bool) { + if s, ok := e.Expr.(*plan.Expr_C); ok { + if v, ok := s.C.Value.(*plan.Const_I64Val); ok && !s.C.Isnull { + return float64(v.I64Val), true + } + } + return 0, false + } + + switch start.Typ.Id { + case int32(types.T_int32): + startNum, flag1 = getInt32Val(start) + endNum, flag2 = getInt32Val(end) + flag3 = true + if step != nil { + stepNum, flag3 = getInt32Val(step) + } + if !(flag1 && flag2 && flag3) { + return 0, false + } + case int32(types.T_int64): + startNum, flag1 = getInt64Val(start) + endNum, flag2 = getInt64Val(end) + flag3 = true + if step != nil { + stepNum, flag3 = getInt64Val(step) + } + if !(flag1 && flag2 && flag3) { + return 0, false + } + } + if step == nil { + if startNum > endNum { + stepNum = -1 + } else { + stepNum = 1 + } + } + ret := (endNum - startNum) / stepNum + if ret < 0 { + return 0, false + } + return ret, true +} + func foldTableScanFilters(proc *process.Process, qry *Query, nodeId int32) error { node := qry.Nodes[nodeId] if node.NodeType == plan.Node_TABLE_SCAN && len(node.FilterList) > 0 { diff --git a/pkg/vm/engine/tae/tables/jobs/flushTableTail.go b/pkg/vm/engine/tae/tables/jobs/flushTableTail.go index 255660dbbdcc0..4d3fe79af214e 100644 --- a/pkg/vm/engine/tae/tables/jobs/flushTableTail.go +++ b/pkg/vm/engine/tae/tables/jobs/flushTableTail.go @@ -672,7 +672,7 @@ func (task *flushTableTailTask) flushAllDeletesFromDelSrc(ctx context.Context) ( if deletes == nil || deletes.Length() == 0 { if emtpyDelBlkIdx == nil { emtpyDelBlkIdx = &bitmap.Bitmap{} - emtpyDelBlkIdx.InitWithSize(len(task.delSrcMetas)) + emtpyDelBlkIdx.InitWithSize(int64(len(task.delSrcMetas))) } emtpyDelBlkIdx.Add(uint64(i)) continue diff --git a/proto/ctl.proto b/proto/ctl.proto index 3cffa690e8c4a..3f5033fc4b791 100644 --- a/proto/ctl.proto +++ b/proto/ctl.proto @@ -59,7 +59,7 @@ enum CmdMethod { TraceSpan = 13; // for CN launches storage usage query to TN, // not used in mo_ctl for now - CmdMethod_StorageUsage = 14; + StorageUsage = 14; } // TNPingRequest ping request