From 829256c15ec192d2e5939f6a8a1c64bc01aa131d Mon Sep 17 00:00:00 2001 From: Dustin Xie Date: Wed, 1 May 2024 11:54:47 -0700 Subject: [PATCH 1/3] [db] KvWithVersion to handle both versioned and non-versioned namespace --- db/db_versioned.go | 3 + db/kvstore_versioned.go | 74 +++++ db/kvstore_versioned_test.go | 534 +++++++++++++++++++++++++++++++++++ 3 files changed, 611 insertions(+) create mode 100644 db/kvstore_versioned_test.go diff --git a/db/db_versioned.go b/db/db_versioned.go index 8cf057d1f3..2166ae1af2 100644 --- a/db/db_versioned.go +++ b/db/db_versioned.go @@ -41,6 +41,9 @@ type ( // Delete deletes a record by (namespace, key) Delete(uint64, string, []byte) error + // CommitBatch writes a batch to the underlying DB + CommitBatch(uint64, batch.KVStoreBatch) error + // Filter returns pair in a bucket that meet the condition Filter(uint64, string, Condition, []byte, []byte) ([][]byte, [][]byte, error) diff --git a/db/kvstore_versioned.go b/db/kvstore_versioned.go index e1e85b01b3..1c98265469 100644 --- a/db/kvstore_versioned.go +++ b/db/kvstore_versioned.go @@ -7,6 +7,9 @@ package db import ( + "context" + + "github.com/iotexproject/iotex-core/v2/db/batch" "github.com/iotexproject/iotex-core/v2/pkg/lifecycle" ) @@ -47,4 +50,75 @@ type ( // SetVersion sets the version, and returns a KVStore to call Put()/Get() SetVersion(uint64) KVStore } + + // KvWithVersion wraps the versioned DB implementation with a certain version + KvWithVersion struct { + db VersionedDB + version uint64 // the current version + } ) + +// Option sets an option +type Option func(*KvWithVersion) + +// NewKVStoreWithVersion implements a KVStore that can handle both versioned +// and non-versioned namespace +func NewKVStoreWithVersion(cfg Config, opts ...Option) *KvWithVersion { + db := NewBoltDBVersioned(cfg) + kv := KvWithVersion{ + db: db, + } + for _, opt := range opts { + opt(&kv) + } + return &kv +} + +// Start starts the DB +func (b *KvWithVersion) Start(ctx context.Context) error { + return b.db.Start(ctx) +} + +// Stop stops the DB +func (b *KvWithVersion) Stop(ctx context.Context) error { + return b.db.Stop(ctx) +} + +// Put writes a record +func (b *KvWithVersion) Put(ns string, key, value []byte) error { + return b.db.Put(b.version, ns, key, value) +} + +// Get retrieves a key's value +func (b *KvWithVersion) Get(ns string, key []byte) ([]byte, error) { + return b.db.Get(b.version, ns, key) +} + +// Delete deletes a key +func (b *KvWithVersion) Delete(ns string, key []byte) error { + return b.db.Delete(b.version, ns, key) +} + +// Filter returns pair in a bucket that meet the condition +func (b *KvWithVersion) Filter(ns string, cond Condition, minKey, maxKey []byte) ([][]byte, [][]byte, error) { + return b.db.Filter(b.version, ns, cond, minKey, maxKey) +} + +// WriteBatch commits a batch +func (b *KvWithVersion) WriteBatch(kvsb batch.KVStoreBatch) error { + return b.db.CommitBatch(b.version, kvsb) +} + +// Version returns the key's most recent version +func (b *KvWithVersion) Version(ns string, key []byte) (uint64, error) { + return b.db.Version(ns, key) +} + +// SetVersion sets the version, and returns a KVStore to call Put()/Get() +func (b *KvWithVersion) SetVersion(v uint64) KVStore { + kv := KvWithVersion{ + db: b.db, + version: v, + } + return &kv +} diff --git a/db/kvstore_versioned_test.go b/db/kvstore_versioned_test.go new file mode 100644 index 0000000000..9c5e396c22 --- /dev/null +++ b/db/kvstore_versioned_test.go @@ -0,0 +1,534 @@ +// Copyright (c) 2021 IoTeX Foundation +// This is an alpha (internal) release and is not suitable for production. This source code is provided 'as is' and no +// warranties are given as to title or non-infringement, merchantability or fitness for purpose and, to the extent +// permitted by law, all liability for your use of the code is disclaimed. This source code is governed by Apache +// License 2.0 that can be found in the LICENSE file. + +package db + +import ( + "context" + "math" + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + + "github.com/iotexproject/iotex-core/v2/db/batch" + "github.com/iotexproject/iotex-core/v2/testutil" +) + +var ( + _ns = "ns" + _errNonVersioned = "namespace ns is non-versioned" +) + +func TestKVStoreWithVersion(t *testing.T) { + r := require.New(t) + testPath, err := testutil.PathOfTempFile("test-kversion") + r.NoError(err) + defer func() { + testutil.CleanupPath(testPath) + }() + + cfg := DefaultConfig + cfg.DbPath = testPath + db := NewKVStoreWithVersion(cfg) + ctx := context.Background() + r.NoError(db.Start(ctx)) + defer func() { + db.Stop(ctx) + }() + + // write first key + r.NoError(db.Put(_bucket1, _k2, _v2)) + v, err := db.Get(_bucket1, _k2) + r.NoError(err) + r.Equal(_v2, v) + n, err := db.Version(_bucket1, _k2) + r.NoError(err) + r.Zero(n) + // check more Put/Get + err = db.SetVersion(1).Put(_bucket1, _k10, _v1) + r.ErrorContains(err, "invalid key length, expecting 5, got 6: invalid input") + r.NoError(db.SetVersion(1).Put(_bucket1, _k2, _v1)) + r.NoError(db.SetVersion(1).Put(_bucket2, _k2, _v2)) + r.NoError(db.SetVersion(3).Put(_bucket1, _k2, _v3)) + r.NoError(db.SetVersion(3).Put(_bucket2, _k2, _v1)) + r.NoError(db.SetVersion(6).Put(_bucket1, _k2, _v2)) + r.NoError(db.SetVersion(6).Put(_bucket2, _k2, _v3)) + r.NoError(db.SetVersion(2).Put(_bucket1, _k4, _v2)) + r.NoError(db.SetVersion(4).Put(_bucket1, _k4, _v1)) + r.NoError(db.SetVersion(7).Put(_bucket1, _k4, _v3)) + // non-versioned namespace + r.NoError(db.Put(_ns, _k1, _v1)) + r.NoError(db.Put(_ns, _k2, _v2)) + for _, e := range []versionTest{ + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, _v2, 0, ""}, + {_bucket1, _k2, _v1, 1, ""}, + {_bucket1, _k2, _v1, 2, ""}, + {_bucket1, _k2, _v3, 3, ""}, + {_bucket1, _k2, _v3, 5, ""}, + {_bucket1, _k2, _v2, 6, ""}, + {_bucket1, _k2, _v2, 7, ""}, // after last write version + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 1, _errNotExist}, // before first write version + {_bucket1, _k4, _v2, 2, ""}, + {_bucket1, _k4, _v2, 3, ""}, + {_bucket1, _k4, _v1, 4, ""}, + {_bucket1, _k4, _v1, 6, ""}, + {_bucket1, _k4, _v3, 7, ""}, + {_bucket1, _k4, _v3, 8, ""}, // larger than last key in bucket + {_bucket1, _k5, nil, 0, _errNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid.Error()}, + // bucket2 + {_bucket2, _k1, nil, 0, _errNotExist}, + {_bucket2, _k2, nil, 0, _errNotExist}, // before first write version + {_bucket2, _k2, _v2, 1, ""}, + {_bucket2, _k2, _v2, 2, ""}, + {_bucket2, _k2, _v1, 3, ""}, + {_bucket2, _k2, _v1, 5, ""}, + {_bucket2, _k2, _v3, 6, ""}, + {_bucket2, _k2, _v3, 7, ""}, // after last write version + {_bucket2, _k3, nil, 0, _errNotExist}, + {_bucket2, _k4, nil, 1, _errNotExist}, + {_bucket2, _k5, nil, 0, _errNotExist}, + {_bucket2, _k10, nil, 0, ErrInvalid.Error()}, + // non-versioned namespace + {_ns, _k1, _v1, 0, ""}, + {_ns, _k2, _v2, 0, ""}, + {_ns, _k3, nil, 0, _errNotExist}, + {_ns, _k4, nil, 0, _errNotExist}, + {_ns, _k5, nil, 0, _errNotExist}, + {_ns, _k10, nil, 0, _errNotExist}, + } { + value, err := db.SetVersion(e.height).Get(e.ns, e.k) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } + r.Equal(e.v, value) + } + // overwrite the same height again + r.NoError(db.SetVersion(6).Put(_bucket1, _k2, _v4)) + r.NoError(db.SetVersion(6).Put(_bucket2, _k2, _v4)) + r.NoError(db.SetVersion(7).Put(_bucket1, _k4, _v4)) + // write to earlier version again is invalid + r.Equal(ErrInvalid, db.SetVersion(3).Put(_bucket1, _k2, _v4)) + r.Equal(ErrInvalid, db.SetVersion(4).Put(_bucket1, _k4, _v4)) + // write with same value + r.NoError(db.SetVersion(9).Put(_bucket1, _k2, _v4)) + r.NoError(db.SetVersion(10).Put(_bucket1, _k4, _v4)) + for _, e := range []versionTest{ + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, _v2, 0, ""}, + {_bucket1, _k2, _v1, 1, ""}, + {_bucket1, _k2, _v1, 2, ""}, + {_bucket1, _k2, _v3, 3, ""}, + {_bucket1, _k2, _v3, 5, ""}, + {_bucket1, _k2, _v4, 6, ""}, + {_bucket1, _k2, _v4, 8, ""}, + {_bucket1, _k2, _v4, 9, ""}, + {_bucket1, _k2, _v4, 10, ""}, // after last write version + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 1, _errNotExist}, // before first write version + {_bucket1, _k4, _v2, 2, ""}, + {_bucket1, _k4, _v2, 3, ""}, + {_bucket1, _k4, _v1, 4, ""}, + {_bucket1, _k4, _v1, 6, ""}, + {_bucket1, _k4, _v4, 7, ""}, + {_bucket1, _k4, _v4, 9, ""}, + {_bucket1, _k4, _v4, 10, ""}, + {_bucket1, _k4, _v4, 11, ""}, // larger than last key in bucket + {_bucket1, _k5, nil, 0, _errNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid.Error()}, + // bucket2 + {_bucket2, _k1, nil, 0, _errNotExist}, + {_bucket2, _k2, nil, 0, _errNotExist}, // before first write version + {_bucket2, _k2, _v2, 1, ""}, + {_bucket2, _k2, _v2, 2, ""}, + {_bucket2, _k2, _v1, 3, ""}, + {_bucket2, _k2, _v1, 5, ""}, + {_bucket2, _k2, _v4, 6, ""}, + {_bucket2, _k2, _v4, 7, ""}, // after last write version + {_bucket2, _k3, nil, 0, _errNotExist}, + {_bucket2, _k4, nil, 1, _errNotExist}, + {_bucket2, _k5, nil, 0, _errNotExist}, + {_bucket2, _k10, nil, 0, ErrInvalid.Error()}, + // non-versioned namespace + {_ns, _k1, _v1, 0, ""}, + {_ns, _k2, _v2, 0, ""}, + {_ns, _k3, nil, 0, _errNotExist}, + {_ns, _k4, nil, 0, _errNotExist}, + {_ns, _k5, nil, 0, _errNotExist}, + {_ns, _k10, nil, 0, _errNotExist}, + } { + value, err := db.SetVersion(e.height).Get(e.ns, e.k) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } + r.Equal(e.v, value) + } + // check version + for _, e := range []versionTest{ + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, nil, 9, ""}, + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 10, ""}, + {_bucket1, _k5, nil, 0, _errNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid.Error()}, + // bucket2 + {_bucket2, _k1, nil, 0, _errNotExist}, + {_bucket2, _k2, nil, 6, ""}, + {_bucket2, _k3, nil, 0, _errNotExist}, + {_bucket2, _k4, nil, 0, _errNotExist}, + {_bucket2, _k5, nil, 0, _errNotExist}, + {_bucket2, _k10, nil, 0, ErrInvalid.Error()}, + // non-versioned namespace + {_ns, _k1, nil, 0, _errNonVersioned}, + {_ns, _k2, nil, 0, _errNonVersioned}, + {_ns, _k3, nil, 0, _errNonVersioned}, + {_ns, _k4, nil, 0, _errNonVersioned}, + {_ns, _k5, nil, 0, _errNonVersioned}, + {_ns, _k10, nil, 0, _errNonVersioned}, + } { + value, err := db.Version(e.ns, e.k) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } + r.Equal(e.height, value) + } + // test delete + kv := db.SetVersion(10) + r.Equal(ErrNotExist, errors.Cause(kv.Delete(_bucket2, _k1))) + for _, k := range [][]byte{_k2, _k4} { + r.NoError(kv.Delete(_bucket1, k)) + } + for _, k := range [][]byte{_k1, _k3, _k5} { + r.Equal(ErrNotExist, errors.Cause(kv.Delete(_bucket1, k))) + } + r.Equal(ErrInvalid, errors.Cause(kv.Delete(_bucket1, _k10))) + // key still can be read before delete version + for _, e := range []versionTest{ + {_bucket2, _k1, nil, 0, _errNotExist}, // bucket not exist + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, _v2, 0, ""}, + {_bucket1, _k2, _v1, 1, ""}, + {_bucket1, _k2, _v1, 2, ""}, + {_bucket1, _k2, _v3, 3, ""}, + {_bucket1, _k2, _v3, 5, ""}, + {_bucket1, _k2, _v4, 6, ""}, + {_bucket1, _k2, _v4, 8, ""}, + {_bucket1, _k2, _v4, 9, ""}, // before delete version + {_bucket1, _k2, nil, 10, _errDeleted}, // after delete version + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 1, _errNotExist}, // before first write version + {_bucket1, _k4, _v2, 2, ""}, + {_bucket1, _k4, _v2, 3, ""}, + {_bucket1, _k4, _v1, 4, ""}, + {_bucket1, _k4, _v1, 6, ""}, + {_bucket1, _k4, _v4, 7, ""}, + {_bucket1, _k4, _v4, 9, ""}, // before delete version + {_bucket1, _k4, nil, 10, _errDeleted}, // after delete version + {_bucket1, _k5, nil, 0, _errNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid.Error()}, + // bucket2 + {_bucket2, _k1, nil, 0, _errNotExist}, + {_bucket2, _k2, nil, 0, _errNotExist}, // before first write version + {_bucket2, _k2, _v2, 1, ""}, + {_bucket2, _k2, _v2, 2, ""}, + {_bucket2, _k2, _v1, 3, ""}, + {_bucket2, _k2, _v1, 5, ""}, + {_bucket2, _k2, _v4, 6, ""}, + {_bucket2, _k2, _v4, 7, ""}, // after last write version + {_bucket2, _k3, nil, 0, _errNotExist}, + {_bucket2, _k4, nil, 1, _errNotExist}, + {_bucket2, _k5, nil, 0, _errNotExist}, + {_bucket2, _k10, nil, 0, ErrInvalid.Error()}, + // non-versioned namespace + {_ns, _k1, _v1, 0, ""}, + {_ns, _k2, _v2, 0, ""}, + {_ns, _k3, nil, 0, _errNotExist}, + {_ns, _k4, nil, 0, _errNotExist}, + {_ns, _k5, nil, 0, _errNotExist}, + {_ns, _k10, nil, 0, _errNotExist}, + } { + value, err := db.SetVersion(e.height).Get(e.ns, e.k) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } + r.Equal(e.v, value) + } + // write before delete version is invalid + r.Equal(ErrInvalid, errors.Cause(db.SetVersion(9).Put(_bucket1, _k2, _k2))) + r.Equal(ErrInvalid, errors.Cause(db.SetVersion(9).Put(_bucket1, _k4, _k4))) + for _, e := range []versionTest{ + {_bucket1, _k2, _v4, 9, ""}, // before delete version + {_bucket1, _k2, nil, 10, _errDeleted}, // after delete version + {_bucket1, _k4, _v4, 9, ""}, // before delete version + {_bucket1, _k4, nil, 10, _errDeleted}, // after delete version + } { + value, err := db.SetVersion(e.height).Get(e.ns, e.k) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } + r.Equal(e.v, value) + } + // write after delete version + r.NoError(db.SetVersion(10).Put(_bucket1, _k2, _k2)) + r.NoError(db.SetVersion(10).Put(_bucket1, _k4, _k4)) + for _, e := range []versionTest{ + {_bucket2, _k1, nil, 0, _errNotExist}, // bucket not exist + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, _v2, 0, ""}, + {_bucket1, _k2, _v1, 1, ""}, + {_bucket1, _k2, _v1, 2, ""}, + {_bucket1, _k2, _v3, 3, ""}, + {_bucket1, _k2, _v3, 5, ""}, + {_bucket1, _k2, _v4, 6, ""}, + {_bucket1, _k2, _v4, 8, ""}, + {_bucket1, _k2, _v4, 9, ""}, // before delete version + {_bucket1, _k2, _k2, 10, ""}, // after delete version + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 1, _errNotExist}, // before first write version + {_bucket1, _k4, _v2, 2, ""}, + {_bucket1, _k4, _v2, 3, ""}, + {_bucket1, _k4, _v1, 4, ""}, + {_bucket1, _k4, _v1, 6, ""}, + {_bucket1, _k4, _v4, 7, ""}, + {_bucket1, _k4, _v4, 9, ""}, // before delete version + {_bucket1, _k4, _k4, 10, ""}, // after delete version + {_bucket1, _k5, nil, 0, _errNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid.Error()}, + // bucket2 + {_bucket2, _k1, nil, 0, _errNotExist}, + {_bucket2, _k2, nil, 0, _errNotExist}, // before first write version + {_bucket2, _k2, _v2, 1, ""}, + {_bucket2, _k2, _v2, 2, ""}, + {_bucket2, _k2, _v1, 3, ""}, + {_bucket2, _k2, _v1, 5, ""}, + {_bucket2, _k2, _v4, 6, ""}, + {_bucket2, _k2, _v4, 7, ""}, // after last write version + {_bucket2, _k3, nil, 0, _errNotExist}, + {_bucket2, _k4, nil, 1, _errNotExist}, + {_bucket2, _k5, nil, 0, _errNotExist}, + {_bucket2, _k10, nil, 0, ErrInvalid.Error()}, + // non-versioned namespace + {_ns, _k1, _v1, 0, ""}, + {_ns, _k2, _v2, 0, ""}, + {_ns, _k3, nil, 0, _errNotExist}, + {_ns, _k4, nil, 0, _errNotExist}, + {_ns, _k5, nil, 0, _errNotExist}, + {_ns, _k10, nil, 0, _errNotExist}, + } { + value, err := db.SetVersion(e.height).Get(e.ns, e.k) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } + r.Equal(e.v, value) + } + // check version + for _, e := range []versionTest{ + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, _k2, 10, ""}, + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, _k4, 10, ""}, + {_bucket1, _k5, nil, 0, _errNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid.Error()}, + // bucket2 + {_bucket2, _k1, nil, 0, _errNotExist}, + {_bucket2, _k2, nil, 6, ""}, + {_bucket2, _k3, nil, 0, _errNotExist}, + {_bucket2, _k4, nil, 0, _errNotExist}, + {_bucket2, _k5, nil, 0, _errNotExist}, + {_bucket2, _k10, nil, 0, ErrInvalid.Error()}, + // non-versioned namespace + {_ns, _k1, nil, 0, _errNonVersioned}, + {_ns, _k2, nil, 0, _errNonVersioned}, + {_ns, _k3, nil, 0, _errNonVersioned}, + {_ns, _k4, nil, 0, _errNonVersioned}, + {_ns, _k5, nil, 0, _errNonVersioned}, + {_ns, _k10, nil, 0, _errNonVersioned}, + } { + value, err := db.Version(e.ns, e.k) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } + r.Equal(e.height, value) + } +} + +func TestWriteBatch(t *testing.T) { + r := require.New(t) + testPath, err := testutil.PathOfTempFile("test-version") + r.NoError(err) + defer func() { + testutil.CleanupPath(testPath) + }() + + cfg := DefaultConfig + cfg.DbPath = testPath + db := NewKVStoreWithVersion(cfg) + ctx := context.Background() + r.NoError(db.Start(ctx)) + defer func() { + db.Stop(ctx) + }() + + b := batch.NewBatch() + for _, e := range []versionTest{ + {_bucket2, _v1, _k1, 0, ""}, + {_bucket2, _v2, _k2, 9, ""}, + {_bucket2, _v3, _k3, 3, ""}, + {_bucket1, _k1, _v1, 0, ""}, + {_bucket1, _k2, _v2, 9, ""}, + } { + b.Put(e.ns, e.k, e.v, "test") + } + + r.NoError(db.SetVersion(1).WriteBatch(b)) + b.Clear() + for _, e := range []versionTest{ + {_bucket2, _v1, nil, 0, _errNotExist}, + {_bucket2, _v2, nil, 0, _errNotExist}, + {_bucket2, _v3, nil, 0, _errNotExist}, + {_bucket2, _v4, nil, 0, _errNotExist}, + {_bucket2, _v1, _k1, 1, ""}, + {_bucket2, _v2, _k2, 1, ""}, + {_bucket2, _v3, _k3, 1, ""}, + {_bucket2, _v1, _k1, 2, ""}, + {_bucket2, _v2, _k2, 2, ""}, + {_bucket2, _v3, _k3, 2, ""}, + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, nil, 0, _errNotExist}, + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 0, _errNotExist}, + {_bucket1, _k1, _v1, 1, ""}, + {_bucket1, _k2, _v2, 1, ""}, + {_bucket1, _k1, _v1, 3, ""}, + {_bucket1, _k2, _v2, 3, ""}, + } { + value, err := db.SetVersion(e.height).Get(e.ns, e.k) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } + r.Equal(e.v, value) + } + + // batch with wrong key length would fail + b.Put(_bucket1, _v1, _k1, "test") + r.Equal(ErrInvalid, errors.Cause(db.SetVersion(3).WriteBatch(b))) + b.Clear() + for _, e := range []versionTest{ + {_bucket1, _k1, _v1, 0, ""}, + {_bucket1, _k2, _v3, 9, ""}, + {_bucket1, _k3, _v1, 3, ""}, + {_bucket1, _k4, _v2, 1, ""}, + {_bucket2, _v1, _k3, 0, ""}, + {_bucket2, _v2, _k2, 9, ""}, + {_bucket2, _v3, _k1, 3, ""}, + {_bucket2, _v4, _k4, 1, ""}, + // non-versioned namespace + {_ns, _k1, _v1, 1, ""}, + {_ns, _k2, _v2, 1, ""}, + {_ns, _v3, _k3, 1, ""}, + {_ns, _v4, _k4, 1, ""}, + } { + b.Put(e.ns, e.k, e.v, "test") + } + b.Delete(_bucket1, _k3, "test") + b.Delete(_bucket2, _v3, "test") + b.Delete(_ns, _v3, "test") + + r.NoError(db.SetVersion(5).WriteBatch(b)) + b.Clear() + for _, e := range []versionTest{ + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, nil, 0, _errNotExist}, + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 0, _errNotExist}, + {_bucket2, _v1, nil, 0, _errNotExist}, + {_bucket2, _v2, nil, 0, _errNotExist}, + {_bucket2, _v3, nil, 0, _errNotExist}, + {_bucket2, _v4, nil, 0, _errNotExist}, + } { + value, err := db.SetVersion(e.height).Get(e.ns, e.k) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } + r.Equal(e.v, value) + } + for _, e := range []versionTest{ + {_bucket1, _k1, _v1, 1, ""}, + {_bucket1, _k2, _v2, 1, ""}, + {_bucket1, _k3, nil, 1, _errNotExist}, + {_bucket1, _k4, nil, 1, _errNotExist}, + {_bucket2, _v1, _k1, 1, ""}, + {_bucket2, _v2, _k2, 1, ""}, + {_bucket2, _v3, _k3, 1, ""}, + {_bucket2, _v4, nil, 1, _errNotExist}, + } { + for _, h := range []uint64{1, 2, 3, 4} { + value, err := db.SetVersion(h).Get(e.ns, e.k) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } + r.Equal(e.v, value) + } + } + for _, e := range []versionTest{ + {_bucket1, _k1, _v1, 5, ""}, + {_bucket1, _k2, _v3, 5, ""}, + {_bucket1, _k3, nil, 5, _errDeleted}, + {_bucket1, _k4, _v2, 5, ""}, + {_bucket2, _v1, _k3, 5, ""}, + {_bucket2, _v2, _k2, 5, ""}, + {_bucket2, _v3, nil, 5, _errDeleted}, + {_bucket2, _v4, _k4, 5, ""}, + } { + for _, h := range []uint64{5, 16, 64, 3000, math.MaxUint64} { + value, err := db.SetVersion(h).Get(e.ns, e.k) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } + r.Equal(e.v, value) + } + } + // non-versioned namespace + for _, e := range []versionTest{ + {_ns, _k1, _v1, 1, ""}, + {_ns, _k2, _v2, 1, ""}, + {_ns, _v3, nil, 1, _errNotExist}, + {_ns, _v4, _k4, 1, ""}, + } { + value, err := db.Get(e.ns, e.k) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } + r.Equal(e.v, value) + } +} From 048c3fd8686819a693103281dabb632df966a5b2 Mon Sep 17 00:00:00 2001 From: dustinxie Date: Thu, 9 Jan 2025 18:36:49 -0800 Subject: [PATCH 2/3] create namespace on db init --- db/kvstore_versioned.go | 20 +++++++++++++++----- db/kvstore_versioned_test.go | 12 ++++++------ 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/db/kvstore_versioned.go b/db/kvstore_versioned.go index 1c98265469..1ea2ea81f5 100644 --- a/db/kvstore_versioned.go +++ b/db/kvstore_versioned.go @@ -54,23 +54,32 @@ type ( // KvWithVersion wraps the versioned DB implementation with a certain version KvWithVersion struct { db VersionedDB - version uint64 // the current version + version uint64 // the current version + vns []Namespace // versioned namespace } ) // Option sets an option type Option func(*KvWithVersion) +func VersionedNamespaceOption(ns ...Namespace) Option { + return func(k *KvWithVersion) { + k.vns = ns + } +} + // NewKVStoreWithVersion implements a KVStore that can handle both versioned // and non-versioned namespace func NewKVStoreWithVersion(cfg Config, opts ...Option) *KvWithVersion { - db := NewBoltDBVersioned(cfg) - kv := KvWithVersion{ - db: db, - } + kv := KvWithVersion{} for _, opt := range opts { opt(&kv) } + var dbOpts []BoltDBVersionedOption + if len(kv.vns) > 0 { + dbOpts = append(dbOpts, VnsOption(kv.vns...)) + } + kv.db = NewBoltDBVersioned(cfg, dbOpts...) return &kv } @@ -119,6 +128,7 @@ func (b *KvWithVersion) SetVersion(v uint64) KVStore { kv := KvWithVersion{ db: b.db, version: v, + vns: b.vns, } return &kv } diff --git a/db/kvstore_versioned_test.go b/db/kvstore_versioned_test.go index 9c5e396c22..d5b46193ef 100644 --- a/db/kvstore_versioned_test.go +++ b/db/kvstore_versioned_test.go @@ -33,7 +33,7 @@ func TestKVStoreWithVersion(t *testing.T) { cfg := DefaultConfig cfg.DbPath = testPath - db := NewKVStoreWithVersion(cfg) + db := NewKVStoreWithVersion(cfg, VersionedNamespaceOption(Namespace{_bucket1, 5}, Namespace{_bucket2, 5})) ctx := context.Background() r.NoError(db.Start(ctx)) defer func() { @@ -116,8 +116,8 @@ func TestKVStoreWithVersion(t *testing.T) { r.NoError(db.SetVersion(6).Put(_bucket2, _k2, _v4)) r.NoError(db.SetVersion(7).Put(_bucket1, _k4, _v4)) // write to earlier version again is invalid - r.Equal(ErrInvalid, db.SetVersion(3).Put(_bucket1, _k2, _v4)) - r.Equal(ErrInvalid, db.SetVersion(4).Put(_bucket1, _k4, _v4)) + r.ErrorContains(db.SetVersion(3).Put(_bucket1, _k2, _v4), "cannot write at earlier version 3: invalid input") + r.ErrorContains(db.SetVersion(4).Put(_bucket1, _k4, _v4), "cannot write at earlier version 4: invalid input") // write with same value r.NoError(db.SetVersion(9).Put(_bucket1, _k2, _v4)) r.NoError(db.SetVersion(10).Put(_bucket1, _k4, _v4)) @@ -206,12 +206,12 @@ func TestKVStoreWithVersion(t *testing.T) { } // test delete kv := db.SetVersion(10) - r.Equal(ErrNotExist, errors.Cause(kv.Delete(_bucket2, _k1))) for _, k := range [][]byte{_k2, _k4} { r.NoError(kv.Delete(_bucket1, k)) + r.ErrorContains(db.SetVersion(9).Delete(_bucket1, k), "cannot delete at earlier version 9") } for _, k := range [][]byte{_k1, _k3, _k5} { - r.Equal(ErrNotExist, errors.Cause(kv.Delete(_bucket1, k))) + r.NoError(kv.Delete(_bucket1, k)) } r.Equal(ErrInvalid, errors.Cause(kv.Delete(_bucket1, _k10))) // key still can be read before delete version @@ -382,7 +382,7 @@ func TestWriteBatch(t *testing.T) { cfg := DefaultConfig cfg.DbPath = testPath - db := NewKVStoreWithVersion(cfg) + db := NewKVStoreWithVersion(cfg, VersionedNamespaceOption(Namespace{_bucket1, 5}, Namespace{_bucket2, 7})) ctx := context.Background() r.NoError(db.Start(ctx)) defer func() { From c6884368e546418b0c9047c1c7ee45946199a3cc Mon Sep 17 00:00:00 2001 From: dustinxie Date: Mon, 13 Jan 2025 17:05:46 -0800 Subject: [PATCH 3/3] address comment --- db/kvstore_versioned.go | 1 + 1 file changed, 1 insertion(+) diff --git a/db/kvstore_versioned.go b/db/kvstore_versioned.go index 1ea2ea81f5..f15eb790ed 100644 --- a/db/kvstore_versioned.go +++ b/db/kvstore_versioned.go @@ -62,6 +62,7 @@ type ( // Option sets an option type Option func(*KvWithVersion) +// VersionedNamespaceOption pass in versioned namespaces func VersionedNamespaceOption(ns ...Namespace) Option { return func(k *KvWithVersion) { k.vns = ns