Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

fix test race condition #9

Merged
merged 1 commit into from
Sep 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 51 additions & 37 deletions bloom_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blockstore

import (
"context"
"fmt"
"sync/atomic"
"time"

Expand All @@ -19,82 +20,95 @@ func bloomCached(ctx context.Context, bs Blockstore, bloomSize, hashCount int) (
if err != nil {
return nil, err
}
bc := &bloomcache{blockstore: bs, bloom: bl}
bc.hits = metrics.NewCtx(ctx, "bloom.hits_total",
"Number of cache hits in bloom cache").Counter()
bc.total = metrics.NewCtx(ctx, "bloom_total",
"Total number of requests to bloom cache").Counter()

bc.Invalidate()
go bc.Rebuild(ctx)
if metrics.Active() {
go func() {
bc := &bloomcache{
blockstore: bs,
bloom: bl,
hits: metrics.NewCtx(ctx, "bloom.hits_total",
"Number of cache hits in bloom cache").Counter(),
total: metrics.NewCtx(ctx, "bloom_total",
"Total number of requests to bloom cache").Counter(),
buildChan: make(chan struct{}),
}
go func() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might as well do this all in one goroutine.

err := bc.build(ctx)
if err != nil {
select {
case <-ctx.Done():
log.Warning("Cache rebuild closed by context finishing: ", err)
default:
log.Error(err)
}
return
}
if metrics.Active() {
fill := metrics.NewCtx(ctx, "bloom_fill_ratio",
"Ratio of bloom filter fullnes, (updated once a minute)").Gauge()

<-bc.rebuildChan
t := time.NewTicker(1 * time.Minute)
defer t.Stop()
for {
select {
case <-ctx.Done():
t.Stop()
return
case <-t.C:
fill.Set(bc.bloom.FillRatio())
}
}
}()
}
}
}()
return bc, nil
}

type bloomcache struct {
bloom *bloom.Bloom
active int32

// This chan is only used for testing to wait for bloom to enable
rebuildChan chan struct{}
blockstore Blockstore
bloom *bloom.Bloom
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put the atomic first because go can be stupid about things like this.

buildErr error

buildChan chan struct{}
blockstore Blockstore

// Statistics
hits metrics.Counter
total metrics.Counter
}

func (b *bloomcache) Invalidate() {
b.rebuildChan = make(chan struct{})
atomic.StoreInt32(&b.active, 0)
}

func (b *bloomcache) BloomActive() bool {
return atomic.LoadInt32(&b.active) != 0
}

func (b *bloomcache) Rebuild(ctx context.Context) {
evt := log.EventBegin(ctx, "bloomcache.Rebuild")
func (b *bloomcache) Wait(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-b.buildChan:
return b.buildErr
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way we have a way of getting at these errors.

}
}

func (b *bloomcache) build(ctx context.Context) error {
evt := log.EventBegin(ctx, "bloomcache.build")
defer evt.Done()
defer close(b.buildChan)

ch, err := b.blockstore.AllKeysChan(ctx)
if err != nil {
log.Errorf("AllKeysChan failed in bloomcache rebuild with: %v", err)
return
b.buildErr = fmt.Errorf("AllKeysChan failed in bloomcache rebuild with: %v", err)
return b.buildErr
}
finish := false
for !finish {
for {
select {
case key, ok := <-ch:
if ok {
b.bloom.AddTS(key.Bytes()) // Use binary key, the more compact the better
} else {
finish = true
if !ok {
atomic.StoreInt32(&b.active, 1)
return nil
}
b.bloom.AddTS(key.Bytes()) // Use binary key, the more compact the better
case <-ctx.Done():
log.Warning("Cache rebuild closed by context finishing.")
return
b.buildErr = ctx.Err()
return b.buildErr
}
}
close(b.rebuildChan)
atomic.StoreInt32(&b.active, 1)
}

func (b *bloomcache) DeleteBlock(k *cid.Cid) error {
Expand Down
14 changes: 5 additions & 9 deletions bloom_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ func TestPutManyAddsToBloom(t *testing.T) {
t.Fatal(err)
}

select {
case <-cachedbs.rebuildChan:
case <-ctx.Done():
t.Fatalf("Timeout wating for rebuild: %d", cachedbs.bloom.ElementsAdded())
if err := cachedbs.Wait(ctx); err != nil {
t.Fatalf("Failed while waiting for the filter to build: %d", cachedbs.bloom.ElementsAdded())
}

block1 := blocks.NewBlock([]byte("foo"))
Expand Down Expand Up @@ -86,10 +84,8 @@ func TestHasIsBloomCached(t *testing.T) {
t.Fatal(err)
}

select {
case <-cachedbs.rebuildChan:
case <-ctx.Done():
t.Fatalf("Timeout wating for rebuild: %d", cachedbs.bloom.ElementsAdded())
if err := cachedbs.Wait(ctx); err != nil {
t.Fatalf("Failed while waiting for the filter to build: %d", cachedbs.bloom.ElementsAdded())
}

cacheFails := 0
Expand All @@ -102,7 +98,7 @@ func TestHasIsBloomCached(t *testing.T) {
}

if float64(cacheFails)/float64(1000) > float64(0.05) {
t.Fatal("Bloom filter has cache miss rate of more than 5%")
t.Fatalf("Bloom filter has cache miss rate of more than 5%%")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go vet wasn't happy.

}

cacheFails = 0
Expand Down