Skip to content

Commit

Permalink
Improve performance of TSI bloom filter
Browse files Browse the repository at this point in the history
This commit replaces the previous hashing algorithm used by the pkg.Filter with
one based on xxhash. Further, taking from the hashing literature, we can
represent k hashes with only two hash function, where previously Filter was using
four.

Further, unlike `murmur3`, `xxhash` is allocation-free, so allocations have
dramatically reduced when inserting and checking for hashes.
  • Loading branch information
e-dard committed Sep 20, 2017
1 parent 0feeb18 commit 094b92e
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 110 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- [#8851](https://github.com/influxdata/influxdb/pull/8851): Improve performance of `Include` and `Exclude` functions
- [#8854](https://github.com/influxdata/influxdb/pull/8854): Report the task status for a query.
- [#8853](https://github.com/influxdata/influxdb/pull/8853): Reduce allocations, improve `readEntries` performance by simplifying loop
- [#8857](https://github.com/influxdata/influxdb/pull/8857): Improve performance of Bloom Filter in TSI index.

### Bugfixes

Expand Down
64 changes: 24 additions & 40 deletions pkg/bloom/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,31 @@ package bloom

// NOTE:
// This package implements a limited bloom filter implementation based on
// Will Fitzgerald's bloom & bitset packages. It's implemented locally to
// support zero-copy memory-mapped slices.
// Will Fitzgerald's bloom & bitset packages. It uses a zero-allocation xxhash
// implementation, rather than murmur3. It's implemented locally to support
// zero-copy memory-mapped slices.
//
// This also optimizes the filter by always using a bitset size with a power of 2.

import (
"fmt"
"math"

"github.com/influxdata/influxdb/pkg/pool"
"github.com/spaolacci/murmur3"
"github.com/cespare/xxhash"
)

// Filter represents a bloom filter.
type Filter struct {
k uint64
b []byte
mask uint64
hashPool *pool.Generic
k uint64
b []byte
mask uint64
}

// NewFilter returns a new instance of Filter using m bits and k hash functions.
// If m is not a power of two then it is rounded to the next highest power of 2.
func NewFilter(m uint64, k uint64) *Filter {
m = pow2(m)

return &Filter{
k: k,
b: make([]byte, m/8),
mask: m - 1,
hashPool: pool.NewGeneric(16, func(sz int) interface{} {
return murmur3.New128()
}),
}
return &Filter{k: k, b: make([]byte, m/8), mask: m - 1}
}

// NewFilterBuffer returns a new instance of a filter using a backing buffer.
Expand All @@ -45,15 +36,7 @@ func NewFilterBuffer(buf []byte, k uint64) (*Filter, error) {
if m != uint64(len(buf))*8 {
return nil, fmt.Errorf("bloom.Filter: buffer bit count must a power of two: %d/%d", len(buf)*8, m)
}

return &Filter{
k: k,
b: buf,
mask: m - 1,
hashPool: pool.NewGeneric(16, func(sz int) interface{} {
return murmur3.New128()
}),
}, nil
return &Filter{k: k, b: buf, mask: m - 1}, nil
}

// Len returns the number of bits used in the filter.
Expand All @@ -67,7 +50,7 @@ func (f *Filter) Bytes() []byte { return f.b }

// Clone returns a copy of f.
func (f *Filter) Clone() *Filter {
other := &Filter{k: f.k, b: make([]byte, len(f.b)), mask: f.mask, hashPool: f.hashPool}
other := &Filter{k: f.k, b: make([]byte, len(f.b)), mask: f.mask}
copy(other.b, f.b)
return other
}
Expand Down Expand Up @@ -116,21 +99,22 @@ func (f *Filter) Merge(other *Filter) error {
return nil
}

// location returns the ith hashed location using the four base hash values.
func (f *Filter) location(h [4]uint64, i uint64) uint {
return uint((h[i%2] + i*h[2+(((i+(i%2))%4)/2)]) & f.mask)
// location returns the ith hashed location using two hash values.
func (f *Filter) location(h [2]uint64, i uint64) uint {
return uint((h[0] + h[1]*i) & f.mask)
}

// hash returns a set of 4 based hashes.
func (f *Filter) hash(data []byte) [4]uint64 {
h := f.hashPool.Get(0).(murmur3.Hash128)
defer f.hashPool.Put(h)
h.Reset()
h.Write(data)
v1, v2 := h.Sum128()
h.Write([]byte{1})
v3, v4 := h.Sum128()
return [4]uint64{v1, v2, v3, v4}
// hash returns two 64-bit hashes based on the output of xxhash.
func (f *Filter) hash(data []byte) [2]uint64 {
v1 := xxhash.Sum64(data)
var v2 uint64
if len(data) > 0 {
b := data[len(data)-1] // We'll put the original byte back.
data[len(data)-1] = byte(0)
v2 = xxhash.Sum64(data)
data[len(data)-1] = b
}
return [2]uint64{v1, v2}
}

// Estimate returns an estimated bit count and hash count given the element count and false positive rate.
Expand Down
157 changes: 87 additions & 70 deletions pkg/bloom/bloom_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bloom_test

import (
"encoding/binary"
"fmt"
"testing"

Expand All @@ -9,46 +10,97 @@ import (

// Ensure filter can insert values and verify they exist.
func TestFilter_InsertContains(t *testing.T) {
f := bloom.NewFilter(1000, 4)
// Short, less comprehensive test.
testShortFilter_InsertContains(t)

// Insert value and validate.
f.Insert([]byte("Bess"))
if !f.Contains([]byte("Bess")) {
t.Fatal("expected true")
if testing.Short() {
return // Just run the above short test
}

// Insert another value and test.
f.Insert([]byte("Emma"))
if !f.Contains([]byte("Emma")) {
t.Fatal("expected true")
}
// More comprehensive test for the xxhash based Bloom Filter.

// Validate that a non-existent value doesn't exist.
if f.Contains([]byte("Jane")) {
t.Fatal("expected false")
// These parameters will result, for 10M entries, with a bloom filter
// with 0.001 false positive rate (1 in 1000 values will be incorrectly
// identified as being present in the set).
filter := bloom.NewFilter(143775876, 10)
v := make([]byte, 4, 4)
for i := 0; i < 10000000; i++ {
binary.BigEndian.PutUint32(v, uint32(i))
filter.Insert(v)
}

// None of the values inserted should ever be considered "not possibly in
// the filter".
t.Run("100M", func(t *testing.T) {
for i := 0; i < 10000000; i++ {
binary.BigEndian.PutUint32(v, uint32(i))
if !filter.Contains(v) {
t.Fatalf("got false for value %q, expected true", v)
}
}

// If we check for 100,000,000 values that we know are not present in the
// filter then we might expect around 100,000 of them to be false positives.
var fp int
for i := 10000000; i < 110000000; i++ {
binary.BigEndian.PutUint32(v, uint32(i))
if filter.Contains(v) {
fp++
}
}

if fp > 1000000 {
// If we're an order of magnitude off, then it's arguable that there
// is a bug in the bloom filter.
t.Fatalf("got %d false positives which is an error rate of %f, expected error rate <=0.001", fp, float64(fp)/100000000)
}
t.Logf("Bloom false positive error rate was %f", float64(fp)/100000000)
})
}

func BenchmarkFilter_Insert(b *testing.B) {
cases := []struct {
m, k uint64
n int
}{
{m: 100, k: 4, n: 1000},
{m: 1000, k: 4, n: 1000},
{m: 10000, k: 4, n: 1000},
{m: 100000, k: 4, n: 1000},
{m: 100, k: 8, n: 1000},
{m: 1000, k: 8, n: 1000},
{m: 10000, k: 8, n: 1000},
{m: 100000, k: 8, n: 1000},
{m: 100, k: 20, n: 1000},
{m: 1000, k: 20, n: 1000},
{m: 10000, k: 20, n: 1000},
{m: 100000, k: 20, n: 1000},
}
func testShortFilter_InsertContains(t *testing.T) {
t.Run("short", func(t *testing.T) {
f := bloom.NewFilter(1000, 4)

// Insert value and validate.
f.Insert([]byte("Bess"))
if !f.Contains([]byte("Bess")) {
t.Fatal("expected true")
}

for _, c := range cases {
// Insert another value and test.
f.Insert([]byte("Emma"))
if !f.Contains([]byte("Emma")) {
t.Fatal("expected true")
}

// Validate that a non-existent value doesn't exist.
if f.Contains([]byte("Jane")) {
t.Fatal("expected false")
}
})
}

var benchCases = []struct {
m, k uint64
n int
}{
{m: 100, k: 4, n: 1000},
{m: 1000, k: 4, n: 1000},
{m: 10000, k: 4, n: 1000},
{m: 100000, k: 4, n: 1000},
{m: 100, k: 8, n: 1000},
{m: 1000, k: 8, n: 1000},
{m: 10000, k: 8, n: 1000},
{m: 100000, k: 8, n: 1000},
{m: 100, k: 20, n: 1000},
{m: 1000, k: 20, n: 1000},
{m: 10000, k: 20, n: 1000},
{m: 100000, k: 20, n: 1000},
}

func BenchmarkFilter_Insert(b *testing.B) {
for _, c := range benchCases {
data := make([][]byte, 0, c.n)
for i := 0; i < c.n; i++ {
data = append(data, []byte(fmt.Sprintf("%d", i)))
Expand All @@ -63,31 +115,14 @@ func BenchmarkFilter_Insert(b *testing.B) {
}
}
})

}
}

var okResult bool

func BenchmarkFilter_Contains(b *testing.B) {
cases := []struct {
m, k uint64
n int
}{
{m: 100, k: 4, n: 1000},
{m: 1000, k: 4, n: 1000},
{m: 10000, k: 4, n: 1000},
{m: 100000, k: 4, n: 1000},
{m: 100, k: 8, n: 1000},
{m: 1000, k: 8, n: 1000},
{m: 10000, k: 8, n: 1000},
{m: 100000, k: 8, n: 1000},
{m: 100, k: 20, n: 1000},
{m: 1000, k: 20, n: 1000},
{m: 10000, k: 20, n: 1000},
{m: 100000, k: 20, n: 1000},
}

for _, c := range cases {
for _, c := range benchCases {
data := make([][]byte, 0, c.n)
notData := make([][]byte, 0, c.n)
for i := 0; i < c.n; i++ {
Expand Down Expand Up @@ -120,25 +155,7 @@ func BenchmarkFilter_Contains(b *testing.B) {
}

func BenchmarkFilter_Merge(b *testing.B) {
cases := []struct {
m, k uint64
n int
}{
{m: 100, k: 4, n: 1000},
{m: 1000, k: 4, n: 1000},
{m: 10000, k: 4, n: 1000},
{m: 100000, k: 4, n: 1000},
{m: 100, k: 8, n: 1000},
{m: 1000, k: 8, n: 1000},
{m: 10000, k: 8, n: 1000},
{m: 100000, k: 8, n: 1000},
{m: 100, k: 20, n: 1000},
{m: 1000, k: 20, n: 1000},
{m: 10000, k: 20, n: 1000},
{m: 100000, k: 20, n: 1000},
}

for _, c := range cases {
for _, c := range benchCases {
data1 := make([][]byte, 0, c.n)
data2 := make([][]byte, 0, c.n)
for i := 0; i < c.n; i++ {
Expand Down

0 comments on commit 094b92e

Please sign in to comment.