Skip to content

Commit

Permalink
Refactor fuzzyfilter (#15)
Browse files Browse the repository at this point in the history
* refactor fuzzyfilter to use new interface

* make bvt 100% pass

* init bloomfilter

* use new bloomfilter

* minor improve

* bug fixed
  • Loading branch information
ouyuanning authored Nov 17, 2023
1 parent 0c4f851 commit b54f090
Show file tree
Hide file tree
Showing 27 changed files with 691 additions and 283 deletions.
10 changes: 5 additions & 5 deletions pkg/common/bitmap/bitmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func (n *Bitmap) InitWith(other *Bitmap) {
n.data = append([]uint64(nil), other.data...)
}

func (n *Bitmap) InitWithSize(len int) {
n.len = int64(len)
func (n *Bitmap) InitWithSize(len int64) {
n.len = len
n.emptyFlag.Store(kEmptyFlagEmpty)
n.data = make([]uint64, (len+63)/64)
}
Expand Down Expand Up @@ -164,8 +164,8 @@ func (n *Bitmap) Reset() {
}

// Len returns the number of bits in the Bitmap.
func (n *Bitmap) Len() int {
return int(n.len)
func (n *Bitmap) Len() int64 {
return n.len
}

// Size return number of bytes in n.data
Expand Down Expand Up @@ -343,7 +343,7 @@ func (n *Bitmap) TryExpandWithSize(size int) {

func (n *Bitmap) Filter(sels []int64) *Bitmap {
var m Bitmap
m.InitWithSize(int(n.len))
m.InitWithSize(n.len)
for i, sel := range sels {
if n.Contains(uint64(sel)) {
m.Add(uint64(i))
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/bitmap/bitmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (

func newBm(n int) *Bitmap {
var bm Bitmap
bm.InitWithSize(n)
bm.InitWithSize(int64(n))
return &bm
}

Expand Down
84 changes: 84 additions & 0 deletions pkg/common/bloomfilter/bloomfilter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2021 - 2023 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bloomfilter

import (
"github.com/matrixorigin/matrixone/pkg/common/hashmap"
"github.com/matrixorigin/matrixone/pkg/container/hashtable"
"github.com/matrixorigin/matrixone/pkg/container/vector"
)

func (bf *BloomFilter) Clean() {
bf.bitmap.Reset()
bf.hashSeed = nil
}

func (bf *BloomFilter) TestAndAddForVector(v *vector.Vector, callBack func(exits bool, idx int)) {
length := v.Length()
keys := make([][]byte, hashmap.UnitLimit)
states := make([][3]uint64, hashmap.UnitLimit)
bitSize := uint64(bf.bitmap.Len())
var val1, val2, val3 uint64

for i := 0; i < length; i += hashmap.UnitLimit {
n := length - i
if n > hashmap.UnitLimit {
n = hashmap.UnitLimit
}
exitsArr := make([]bool, n)
for j := 0; j < n; j++ {
keys[j] = keys[j][:0]
exitsArr[j] = true
}
encodeHashKeys(keys, v, i, n)

for _, seed := range bf.hashSeed {
hashtable.BytesBatchGenHashStatesWithSeed(&keys[0], &states[0], n, seed)
for j := 0; j < n; j++ {
val1 = states[j][0]
if val1 > bitSize {
val1 = val1 % bitSize
}
if exitsArr[j] {
exitsArr[j] = bf.bitmap.Contains(val1)
}
bf.bitmap.Add(val1)

val2 = states[j][1]
if val2 > bitSize {
val2 = val2 % bitSize
}
if exitsArr[j] {
exitsArr[j] = bf.bitmap.Contains(val2)
}
bf.bitmap.Add(val2)

val3 = states[j][2]
if val3 > bitSize {
val3 = val3 % bitSize
}
if exitsArr[j] {
exitsArr[j] = bf.bitmap.Contains(val3)
}
bf.bitmap.Add(val3)
}
}

for j := 0; j < n; j++ {
callBack(exitsArr[j], i+j)
}

}
}
64 changes: 64 additions & 0 deletions pkg/common/bloomfilter/bloomfilter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bloomfilter

import (
"math"
"testing"

"github.com/bits-and-blooms/bloom"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/testutil"
)

const TEST_COUNT = 10000000
const TEST_RATE = 0.00001

func TestBloomFilter(t *testing.T) {
mp := mpool.MustNewZero()
vec := testutil.NewVector(TEST_COUNT, types.New(types.T_int64, 0, 0), mp, false, nil)

boom := New(TEST_COUNT, TEST_RATE)
boom.TestAndAddForVector(vec, func(_ bool, _ int) {})
}

func BenchmarkBloomFiltrer(b *testing.B) {
mp := mpool.MustNewZero()
vec := testutil.NewVector(TEST_COUNT, types.New(types.T_int64, 0, 0), mp, false, nil)

for i := 0; i < b.N; i++ {
boom := New(TEST_COUNT, TEST_RATE)
boom.TestAndAddForVector(vec, func(_ bool, _ int) {})
}
}

func BenchmarkBloom(b *testing.B) {
mp := mpool.MustNewZero()
vec := testutil.NewVector(TEST_COUNT, types.New(types.T_int64, 0, 0), mp, false, nil)
k := 3
n := float64(TEST_COUNT)
p := TEST_RATE
e := -float64(k) * math.Ceil(1.001*n) / math.Log(1-math.Pow(p, 1.0/float64(k)))
m := uint(math.Ceil(e))

for i := 0; i < b.N; i++ {
filter := bloom.New(m, 3)
for i := 0; i < TEST_COUNT; i++ {
var bytes = vec.GetRawBytesAt(i)
filter.TestAndAdd(bytes)
}
}
}
75 changes: 75 additions & 0 deletions pkg/common/bloomfilter/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2021 - 2023 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bloomfilter

import (
"math"
"math/rand"

"github.com/matrixorigin/matrixone/pkg/common/bitmap"
)

var constLogValue float64

func init() {
constLogValue = math.Log(1 / math.Pow(2, math.Log(2)))
}

// JoinMap is used for join
type BloomFilter struct {
bitmap *bitmap.Bitmap
hashSeed []uint64
}

func New(rowCount int64, probability float64) *BloomFilter {
bitSize, seedCount := computeMemAndHashCount(rowCount, probability)
hashSeed := make([]uint64, seedCount)
for i := 0; i < seedCount; i++ {
hashSeed[i] = rand.Uint64()
}
bits := &bitmap.Bitmap{}
bits.InitWithSize(bitSize)

return &BloomFilter{
bitmap: bits,
hashSeed: hashSeed,
}
}

func computeMemAndHashCount(rowCount int64, probability float64) (int64, int) {
if rowCount < 10001 {
return 64 * 10000, 1
} else if rowCount < 100001 {
return 64 * 100000, 1
} else if rowCount < 1000001 {
return 16 * 1000000, 1
} else if rowCount < 10000001 {
return 38 * 10000000, 2
} else if rowCount < 100000001 {
// m := ceil((rowCount * log(0.000001)) / log(1/pow(2, log(2))))
m := math.Ceil((float64(rowCount) * math.Log(probability)) / constLogValue)
return int64(m), 3
} else if rowCount < 1000000001 {
// m := ceil((rowCount * log(0.000001)) / log(1/pow(2, log(2))))
m := math.Ceil((float64(rowCount) * math.Log(probability)) / constLogValue)
return int64(m), 3
} else if rowCount < 10000000001 {
// m := ceil((rowCount * log(0.000001)) / log(1/pow(2, log(2))))
m := math.Ceil((float64(rowCount) * math.Log(probability)) / constLogValue)
return int64(m), 4
} else {
panic("unsupport rowCount")
}
}
80 changes: 80 additions & 0 deletions pkg/common/bloomfilter/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2021 - 2023 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bloomfilter

import (
"unsafe"

"github.com/matrixorigin/matrixone/pkg/container/hashtable"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
)

func fillStringGroupStr(keys [][]byte, vec *vector.Vector, n int, start int) {
area := vec.GetArea()
vs := vector.MustFixedCol[types.Varlena](vec)
if !vec.GetNulls().Any() {
for i := 0; i < n; i++ {
keys[i] = append(keys[i], byte(0))
keys[i] = append(keys[i], vs[i+start].GetByteSlice(area)...)
}
} else {
nsp := vec.GetNulls()
for i := 0; i < n; i++ {
hasNull := nsp.Contains(uint64(i + start))
if hasNull {
keys[i] = append(keys[i], byte(1))
} else {
keys[i] = append(keys[i], byte(0))
keys[i] = append(keys[i], vs[i+start].GetByteSlice(area)...)
}
}
}
}

func fillGroupStr(keys [][]byte, vec *vector.Vector, n int, sz int, start int) {
data := unsafe.Slice(vector.GetPtrAt[byte](vec, 0), (n+start)*sz)
if !vec.GetNulls().Any() {
for i := 0; i < n; i++ {
keys[i] = append(keys[i], byte(0))
keys[i] = append(keys[i], data[(i+start)*sz:(i+start+1)*sz]...)
}
} else {
nsp := vec.GetNulls()
for i := 0; i < n; i++ {
isNull := nsp.Contains(uint64(i + start))
if isNull {
keys[i] = append(keys[i], byte(1))
} else {
keys[i] = append(keys[i], byte(0))
keys[i] = append(keys[i], data[(i+start)*sz:(i+start+1)*sz]...)
}
}
}
}

func encodeHashKeys(keys [][]byte, vec *vector.Vector, start, count int) {
if vec.GetType().IsFixedLen() {
fillGroupStr(keys, vec, count, vec.GetType().TypeSize(), start)
} else {
fillStringGroupStr(keys, vec, count, start)
}

for i := 0; i < count; i++ {
if l := len(keys[i]); l < 16 {
keys[i] = append(keys[i], hashtable.StrKeyPadding[l:]...)
}
}
}
23 changes: 17 additions & 6 deletions pkg/container/hashtable/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import (
)

var (
Int64BatchHash = wyhashInt64Batch
Int64HashWithFixedSeed = wyhash64WithFixedSeed
BytesBatchGenHashStates = wyhashBytesBatch
Int192BatchGenHashStates = wyhashInt192Batch
Int256BatchGenHashStates = wyhashInt256Batch
Int320BatchGenHashStates = wyhashInt320Batch
Int64BatchHash = wyhashInt64Batch
Int64HashWithFixedSeed = wyhash64WithFixedSeed
BytesBatchGenHashStates = wyhashBytesBatch
BytesBatchGenHashStatesWithSeed = wyhashBytesBatchWithSeed
Int192BatchGenHashStates = wyhashInt192Batch
Int256BatchGenHashStates = wyhashInt256Batch
Int320BatchGenHashStates = wyhashInt320Batch
)

// Hashing algorithm inspired by
Expand Down Expand Up @@ -139,6 +140,16 @@ func wyhashBytesBatch(data *[]byte, states *[3]uint64, length int) {
}
}

func wyhashBytesBatchWithSeed(data *[]byte, states *[3]uint64, length int, seed uint64) {
dataSlice := unsafe.Slice((*[]byte)(data), length)
hashSlice := unsafe.Slice((*[3]uint64)(states), length)
for i := 0; i < length; i++ {
hashSlice[i][0] = wyhash(unsafe.Pointer(&dataSlice[i][0]), seed, uint64(len(dataSlice[i])))
hashSlice[i][1] = wyhash(unsafe.Pointer(&dataSlice[i][0]), seed<<32, uint64(len(dataSlice[i])))
hashSlice[i][2] = wyhash(unsafe.Pointer(&dataSlice[i][0]), seed>>32, uint64(len(dataSlice[i])))
}
}

func wyhashInt192Batch(data *[3]uint64, states *[3]uint64, length int) {
dataSlice := unsafe.Slice((*[3]uint64)(data), length)
hashSlice := unsafe.Slice((*[3]uint64)(states), length)
Expand Down
Loading

0 comments on commit b54f090

Please sign in to comment.