Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kv): clear RPC method which completely cleans storage #736

Merged
merged 6 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 0 additions & 96 deletions .github/workflows/windows.yml

This file was deleted.

1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ v2.3.1 (_.06.2021)
## 👀 New:

- ✏️ Rework `broadcast` plugin. Add architecture diagrams to the `doc` folder. [PR](https://github.com/spiral/roadrunner/pull/732)
- ✏️ Add `Clear` method to the KV plugin RPC. [PR](https://github.com/spiral/roadrunner/pull/736)

## 🩹 Fixes:

Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
<a href="https://packagist.org/packages/spiral/roadrunner"><img src="https://poser.pugx.org/spiral/roadrunner/version"></a>
<a href="https://pkg.go.dev/github.com/spiral/roadrunner/v2?tab=doc"><img src="https://godoc.org/github.com/spiral/roadrunner/v2?status.svg"></a>
<a href="https://github.com/spiral/roadrunner/actions"><img src="https://github.com/spiral/roadrunner/workflows/Linux/badge.svg" alt=""></a>
<a href="https://github.com/spiral/roadrunner/actions"><img src="https://github.com/spiral/roadrunner/workflows/Windows/badge.svg" alt=""></a>
<a href="https://github.com/spiral/roadrunner/actions"><img src="https://github.com/spiral/roadrunner/workflows/Linters/badge.svg" alt=""></a>
<a href="https://goreportcard.com/report/github.com/spiral/roadrunner"><img src="https://goreportcard.com/badge/github.com/spiral/roadrunner"></a>
<a href="https://scrutinizer-ci.com/g/spiral/roadrunner/?branch=master"><img src="https://scrutinizer-ci.com/g/spiral/roadrunner/badges/quality-score.png"></a>
Expand Down
34 changes: 34 additions & 0 deletions plugins/kv/drivers/boltdb/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
)

type Driver struct {
clearMu sync.RWMutex
// db instance
DB *bolt.DB
// name should be UTF-8
Expand Down Expand Up @@ -373,6 +374,35 @@ func (d *Driver) TTL(keys ...string) (map[string]string, error) {
return m, nil
}

func (d *Driver) Clear() error {
err := d.DB.Update(func(tx *bolt.Tx) error {
err := tx.DeleteBucket(d.bucket)
if err != nil {
d.log.Error("boltdb delete bucket", "error", err)
return err
}

_, err = tx.CreateBucket(d.bucket)
if err != nil {
d.log.Error("boltdb create bucket", "error", err)
return err
}

return nil
})

if err != nil {
d.log.Error("clear transaction failed", "error", err)
return err
}

d.clearMu.Lock()
d.gc = sync.Map{}
d.clearMu.Unlock()

return nil
}

// ========================= PRIVATE =================================

func (d *Driver) startGCLoop() { //nolint:gocognit
Expand All @@ -382,6 +412,8 @@ func (d *Driver) startGCLoop() { //nolint:gocognit
for {
select {
case <-t.C:
d.clearMu.RLock()

// calculate current time before loop started to be fair
now := time.Now()
d.gc.Range(func(key, value interface{}) bool {
Expand Down Expand Up @@ -414,6 +446,8 @@ func (d *Driver) startGCLoop() { //nolint:gocognit
}
return true
})

d.clearMu.RUnlock()
case <-d.stop:
err := d.DB.Close()
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions plugins/kv/drivers/memcached/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,13 @@ func (d *Driver) Delete(keys ...string) error {
}
return nil
}

func (d *Driver) Clear() error {
err := d.client.DeleteAll()
if err != nil {
d.log.Error("flush_all operation failed", "error", err)
return err
}

return nil
}
5 changes: 4 additions & 1 deletion plugins/kv/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ type Storage interface {
MExpire(items ...*kvv1.Item) error

// TTL return the rest time to live for provided keys
// Not supported for the memcached and boltdb
// Not supported for the memcached
TTL(keys ...string) (map[string]string, error)

// Clear clean the entire storage
Clear() error

// Delete one or multiple keys.
Delete(keys ...string) error
}
Expand Down
16 changes: 16 additions & 0 deletions plugins/kv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,19 @@ func (r *rpc) Delete(in *kvv1.Request, _ *kvv1.Response) error {

return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}

// Clear clean the storage
func (r *rpc) Clear(in *kvv1.Request, _ *kvv1.Response) error {
const op = errors.Op("rcp_delete")

if st, exists := r.storages[in.GetStorage()]; exists {
err := st.Clear()
if err != nil {
return errors.E(op, err)
}

return nil
}

return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
16 changes: 15 additions & 1 deletion plugins/memory/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
)

type Driver struct {
heap sync.Map
clearMu sync.RWMutex
heap sync.Map
// stop is used to stop keys GC and close boltdb connection
stop chan struct{}
log logger.Logger
Expand Down Expand Up @@ -203,6 +204,14 @@ func (s *Driver) Delete(keys ...string) error {
return nil
}

func (s *Driver) Clear() error {
s.clearMu.Lock()
s.heap = sync.Map{}
s.clearMu.Unlock()

return nil
}

// ================================== PRIVATE ======================================

func (s *Driver) gc() {
Expand All @@ -213,6 +222,9 @@ func (s *Driver) gc() {
ticker.Stop()
return
case now := <-ticker.C:
// mutes needed to clear the map
s.clearMu.RLock()

// check every second
s.heap.Range(func(key, value interface{}) bool {
v := value.(*kvv1.Item)
Expand All @@ -231,6 +243,8 @@ func (s *Driver) gc() {
}
return true
})

s.clearMu.RUnlock()
}
}
}
9 changes: 9 additions & 0 deletions plugins/redis/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,12 @@ func (d *Driver) TTL(keys ...string) (map[string]string, error) {
}
return m, nil
}

func (d *Driver) Clear() error {
fdb := d.universalClient.FlushDB(context.Background())
if fdb.Err() != nil {
return fdb.Err()
}

return nil
}
6 changes: 4 additions & 2 deletions tests/plugins/broadcast/broadcast_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ func TestBroadcastSameSubscriber(t *testing.T) {
cfg,
&broadcast.Plugin{},
&rpcPlugin.Plugin{},
mockLogger,
&logger.ZapLogger{},
// mockLogger,
&server.Plugin{},
&redis.Plugin{},
&websockets.Plugin{},
Expand Down Expand Up @@ -314,7 +315,8 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) {
cfg,
&broadcast.Plugin{},
&rpcPlugin.Plugin{},
mockLogger,
&logger.ZapLogger{},
// mockLogger,
&server.Plugin{},
&redis.Plugin{},
&websockets.Plugin{},
Expand Down
Loading