Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store): Parallel write in the CacheMultiStore #20817

Closed
wants to merge 13 commits into from
1 change: 1 addition & 0 deletions store/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Ref: https://keepachangelog.com/en/1.0.0/

### Improvements

* [#20817](https://github.com/cosmos/cosmos-sdk/pull/20817) Parallelize the `CacheMultiStore.Write` method.
* [#19770](https://github.com/cosmos/cosmos-sdk/pull/19770) Upgrade IAVL to IAVL v1.1.1.

## v1.0.2 (January 10, 2024)
Expand Down
65 changes: 65 additions & 0 deletions store/cachemulti/benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package cachemulti

import (
"fmt"
"testing"

dbm "github.com/cosmos/cosmos-db"
"github.com/stretchr/testify/require"

"cosmossdk.io/core/log"
"cosmossdk.io/store/iavl"
"cosmossdk.io/store/types"
)

func setupStore(b *testing.B, storeCount uint) (Store, map[string]types.StoreKey) {
b.Helper()

db := dbm.NewMemDB()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing against a LSM db would be more illustrative of performance improvements, let's try leveldb and pebbledb?

Copy link
Contributor Author

@cool-develope cool-develope Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not for i/o, mostly for tree updates in memory, in WorkingHash liefcycle

storeKeys := make(map[string]types.StoreKey)
stores := make(map[types.StoreKey]types.CacheWrapper)
Comment on lines +20 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preallocate maps to improve performance.

Preallocating the storeKeys and stores maps with the known size storeCount can improve performance.

-  storeKeys := make(map[string]types.StoreKey)
-  stores := make(map[types.StoreKey]types.CacheWrapper)
+  storeKeys := make(map[string]types.StoreKey, storeCount)
+  stores := make(map[types.StoreKey]types.CacheWrapper, storeCount)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
storeKeys := make(map[string]types.StoreKey)
stores := make(map[types.StoreKey]types.CacheWrapper)
storeKeys := make(map[string]types.StoreKey, storeCount)
stores := make(map[types.StoreKey]types.CacheWrapper, storeCount)

for i := uint(0); i < storeCount; i++ {
key := types.NewKVStoreKey(fmt.Sprintf("store%d", i))
storeKeys[key.Name()] = key
sdb := dbm.NewPrefixDB(db, []byte(key.Name()))
istore, err := iavl.LoadStore(sdb, log.NewNopLogger(), key, types.CommitID{}, 1000, false, nil)
require.NoError(b, err)
stores[key] = types.KVStore(istore)
}

return NewStore(db, stores, storeKeys, nil, types.TraceContext{}), storeKeys
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider preallocating maps for performance optimization.

Given the known size of storeCount, preallocating the storeKeys and stores maps can improve performance by reducing the need for dynamic memory allocation.

- storeKeys := make(map[string]types.StoreKey)
- stores := make(map[types.StoreKey]types.CacheWrapper)
+ storeKeys := make(map[string]types.StoreKey, storeCount)
+ stores := make(map[types.StoreKey]types.CacheWrapper, storeCount)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func setupStore(b *testing.B, storeCount uint) (Store, map[string]types.StoreKey) {
b.Helper()
db := dbm.NewMemDB()
storeKeys := make(map[string]types.StoreKey)
stores := make(map[types.StoreKey]types.CacheWrapper)
for i := uint(0); i < storeCount; i++ {
key := types.NewKVStoreKey(fmt.Sprintf("store%d", i))
storeKeys[key.Name()] = key
sdb := dbm.NewPrefixDB(db, []byte(key.Name()))
istore, err := iavl.LoadStore(sdb, log.NewNopLogger(), key, types.CommitID{}, 1000, false, nil)
require.NoError(b, err)
stores[key] = types.KVStore(istore)
}
return NewStore(db, stores, storeKeys, nil, types.TraceContext{}), storeKeys
func setupStore(b *testing.B, storeCount uint) (Store, map[string]types.StoreKey) {
b.Helper()
db := dbm.NewMemDB()
storeKeys := make(map[string]types.StoreKey, storeCount)
stores := make(map[types.StoreKey]types.CacheWrapper, storeCount)
for i := uint(0); i < storeCount; i++ {
key := types.NewKVStoreKey(fmt.Sprintf("store%d", i))
storeKeys[key.Name()] = key
sdb := dbm.NewPrefixDB(db, []byte(key.Name()))
istore, err := iavl.LoadStore(sdb, log.NewNopLogger(), key, types.CommitID{}, 1000, false, nil)
require.NoError(b, err)
stores[key] = types.KVStore(istore)
}
return NewStore(db, stores, storeKeys, nil, types.TraceContext{}), storeKeys

}

func benchmarkStore(b *testing.B, storeCount, keyCount uint) {
b.Helper()
store, storeKeys := setupStore(b, storeCount)
b.ResetTimer()

b.ReportAllocs()
for i := 0; i < b.N; i++ {
b.StopTimer()
for _, key := range storeKeys {
cstore := store.GetKVStore(key)
for j := uint(0); j < keyCount; j++ {
dataKey := fmt.Sprintf("key%s-%d", key.Name(), j)
dataValue := fmt.Sprintf("value%s-%d", key.Name(), j)
cstore.Set([]byte(dataKey), []byte(dataValue))
}
}
b.StartTimer()
store.Write()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve readability and performance of timer management in benchmarks.

Moving the StopTimer and StartTimer calls outside the inner loop can help in accurately measuring the time taken for store.Write() without the overhead of setup operations.

- for i := 0; i < b.N; i++ {
-   b.StopTimer()
-   for _, key := range storeKeys {
-     cstore := store.GetKVStore(key)
-     for j := uint(0); j < keyCount; j++ {
-       dataKey := fmt.Sprintf("key%s-%d", key.Name(), j)
-       dataValue := fmt.Sprintf("value%s-%d", key.Name(), j)
-       cstore.Set([]byte(dataKey), []byte(dataValue))
-     }
-   }
-   b.StartTimer()
-   store.Write()
- }
+ for i := 0; i < b.N; i++ {
+   for _, key := range storeKeys {
+     cstore := store.GetKVStore(key)
+     b.StopTimer()
+     for j := uint(0); j < keyCount; j++ {
+       dataKey := fmt.Sprintf("key%s-%d", key.Name(), j)
+       dataValue := fmt.Sprintf("value%s-%d", key.Name(), j)
+       cstore.Set([]byte(dataKey), []byte(dataValue))
+     }
+     b.StartTimer()
+   }
+   store.Write()
+ }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for i := 0; i < b.N; i++ {
b.StopTimer()
for _, key := range storeKeys {
cstore := store.GetKVStore(key)
for j := uint(0); j < keyCount; j++ {
dataKey := fmt.Sprintf("key%s-%d", key.Name(), j)
dataValue := fmt.Sprintf("value%s-%d", key.Name(), j)
cstore.Set([]byte(dataKey), []byte(dataValue))
}
}
b.StartTimer()
store.Write()
for i := 0; i < b.N; i++ {
for _, key := range storeKeys {
cstore := store.GetKVStore(key)
b.StopTimer()
for j := uint(0); j < keyCount; j++ {
dataKey := fmt.Sprintf("key%s-%d", key.Name(), j)
dataValue := fmt.Sprintf("value%s-%d", key.Name(), j)
cstore.Set([]byte(dataKey), []byte(dataValue))
}
b.StartTimer()
}
store.Write()
}

}
}

func BenchmarkCacheMultiStore(b *testing.B) {
storeCounts := []uint{2, 4, 8, 16, 32}
keyCounts := []uint{100, 1000, 10000}

for _, storeCount := range storeCounts {
for _, keyCount := range keyCounts {
b.Run(fmt.Sprintf("storeCount=%d/keyCount=%d", storeCount, keyCount), func(sub *testing.B) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One argument was that there will be a number of untouched stores where the Go routine may add significant overhead. It would be good to add that to your test for a fair comparison.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can be covered by the storeCount, for example if there are 2 significant writes among 10 stores, then it will be saved by 37% from the result. right?

benchmarkStore(sub, storeCount, keyCount)
})
}
}
}
16 changes: 15 additions & 1 deletion store/cachemulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"

dbm "github.com/cosmos/cosmos-db"
"golang.org/x/sync/errgroup"

"cosmossdk.io/store/cachekv"
"cosmossdk.io/store/dbadapter"
Expand Down Expand Up @@ -122,8 +123,21 @@ func (cms Store) GetStoreType() types.StoreType {
// Write calls Write on each underlying store.
func (cms Store) Write() {
cms.db.Write()
eg := new(errgroup.Group)
for _, store := range cms.stores {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should there be a concurrency limit? or is it good enough to delegate this work completely to the go scheduler?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should do some benchmarks with databases and come up with a optimal number of runners

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we can see in #20817 (comment), from 8 stores, there is not much improvement, let me benchmark with limited number of runners

store.Write()
s := store // https://golang.org/doc/faq#closures_and_goroutines
eg.Go(func() (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic in Write: %v", r)
}
}()
s.Write()
return nil
})
}
if err := eg.Wait(); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if the panic is handled somewhere else in case: this is not deterministic for multiple errors.

In the godoc for Wait:

returns the first non-nil error (if any) from them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to care about the multiple errors, the Write is the memory action and it won't affect the finalized state

panic(err)
}
}

Expand Down
1 change: 1 addition & 0 deletions store/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/stretchr/testify v1.9.0
github.com/tidwall/btree v1.7.0
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
golang.org/x/sync v0.7.0
google.golang.org/grpc v1.64.1
google.golang.org/protobuf v1.34.2
gotest.tools/v3 v3.5.1
Expand Down
37 changes: 37 additions & 0 deletions store/rootmulti/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,3 +1032,40 @@ func TestCommitStores(t *testing.T) {
})
}
}

func TestCacheMultiStoreWrite(t *testing.T) {
// for testing the cacheMultiStore parallel write, file based db is used
db, err := dbm.NewDB("test", dbm.GoLevelDBBackend, t.TempDir())
require.NoError(t, err)

ms := newMultiStoreWithMounts(db, pruningtypes.NewPruningOptions(pruningtypes.PruningNothing))
require.NoError(t, ms.LoadLatestVersion())

cacheMulti := ms.CacheMultiStore()

toVersion := int64(100)
keyCount := 100
storeKeys := []types.StoreKey{testStoreKey1, testStoreKey2, testStoreKey3}
for i := int64(1); i <= toVersion; i++ {
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for j := 0; j < keyCount; j++ {
store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
}
}
cacheMulti.Write()
ms.Commit()
}
Comment on lines +1045 to +1054
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance readability by extracting the nested loops into a helper function.

This will make the test function more concise and easier to understand.

+ func populateStore(t *testing.T, cacheMulti types.CacheMultiStore, storeKeys []types.StoreKey, toVersion int64, keyCount int) {
+     for i := int64(1); i <= toVersion; i++ {
+         for _, storeKey := range storeKeys {
+             store := cacheMulti.GetKVStore(storeKey)
+             for j := 0; j < keyCount; j++ {
+                 store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
+             }
+         }
+         cacheMulti.Write()
+         ms.Commit()
+     }
+ }

for i := int64(1); i <= toVersion; i++ {
    for _, storeKey := range storeKeys {
        store := cacheMulti.GetKVStore(storeKey)
        for j := 0; j < keyCount; j++ {
            store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
        }
    }
    cacheMulti.Write()
    ms.Commit()
}
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for i := int64(1); i <= toVersion; i++ {
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for j := 0; j < keyCount; j++ {
store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
}
}
cacheMulti.Write()
ms.Commit()
}
func populateStore(t *testing.T, cacheMulti types.CacheMultiStore, storeKeys []types.StoreKey, toVersion int64, keyCount int) {
for i := int64(1); i <= toVersion; i++ {
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for j := 0; j < keyCount; j++ {
store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
}
}
cacheMulti.Write()
ms.Commit()
}
}
for i := int64(1); i <= toVersion; i++ {
for _, storeKey := storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for j := 0; j < keyCount; j++ {
store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
}
}
cacheMulti.Write()
ms.Commit()
}


// check the data
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for i := int64(1); i <= toVersion; i++ {
for j := 0; j < keyCount; j++ {
key := []byte(fmt.Sprintf("key-%d-%d", i, j))
value := store.Get(key)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", i, j)), value)
}
}
}
Comment on lines +1056 to +1066
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance readability by extracting the verification loop into a helper function.

This will make the test function more concise and easier to understand.

+ func verifyStoreData(t *testing.T, cacheMulti types.CacheMultiStore, storeKeys []types.StoreKey, toVersion int64, keyCount int) {
+     for _, storeKey := range storeKeys {
+         store := cacheMulti.GetKVStore(storeKey)
+         for i := int64(1); i <= toVersion; i++ {
+             for j := 0; j < keyCount; j++ {
+                 key := []byte(fmt.Sprintf("key-%d-%d", i, j))
+                 value := store.Get(key)
+                 require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", i, j)), value)
+             }
+         }
+     }
+ }

for _, storeKey := range storeKeys {
    store := cacheMulti.GetKVStore(storeKey)
    for i := int64(1); i <= toVersion; i++ {
        for j := 0; j < keyCount; j++ {
            key := []byte(fmt.Sprintf("key-%d-%d", i, j))
            value := store.Get(key)
            require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", i, j)), value)
        }
    }
}
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// check the data
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for i := int64(1); i <= toVersion; i++ {
for j := 0; j < keyCount; j++ {
key := []byte(fmt.Sprintf("key-%d-%d", i, j))
value := store.Get(key)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", i, j)), value)
}
}
}
func verifyStoreData(t *testing.T, cacheMulti types.CacheMultiStore, storeKeys []types.StoreKey, toVersion int64, keyCount int) {
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for i := int64(1); i <= toVersion; i++ {
for j := 0; j < keyCount; j++ {
key := []byte(fmt.Sprintf("key-%d-%d", i, j))
value := store.Get(key)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", i, j)), value)
}
}
}
}
// check the data
verifyStoreData(t, cacheMulti, storeKeys, toVersion, keyCount)

}
Loading