Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store): Parallel write in the CacheMultiStore #20817

Closed
wants to merge 13 commits into from
1 change: 1 addition & 0 deletions store/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Ref: https://keepachangelog.com/en/1.0.0/

### Improvements

* [#20817](https://github.com/cosmos/cosmos-sdk/pull/20817) Parallelize the `CacheMultiStore.Write` method.
* [#19770](https://github.com/cosmos/cosmos-sdk/pull/19770) Upgrade IAVL to IAVL v1.1.1.

## v1.0.2 (January 10, 2024)
Expand Down
63 changes: 63 additions & 0 deletions store/cachemulti/benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package cachemulti

import (
"fmt"
"testing"

"cosmossdk.io/core/log"
"cosmossdk.io/store/iavl"
"cosmossdk.io/store/types"
dbm "github.com/cosmos/cosmos-db"
"github.com/stretchr/testify/require"
)

func setupStore(b *testing.B, storeCount uint) (Store, map[string]types.StoreKey) {
db := dbm.NewMemDB()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing against a LSM db would be more illustrative of performance improvements, let's try leveldb and pebbledb?

Copy link
Contributor Author

@cool-develope cool-develope Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not for i/o, mostly for tree updates in memory, in WorkingHash liefcycle


storeKeys := make(map[string]types.StoreKey)
stores := make(map[types.StoreKey]types.CacheWrapper)
Comment on lines +20 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preallocate maps to improve performance.

Preallocating the storeKeys and stores maps with the known size storeCount can improve performance.

-  storeKeys := make(map[string]types.StoreKey)
-  stores := make(map[types.StoreKey]types.CacheWrapper)
+  storeKeys := make(map[string]types.StoreKey, storeCount)
+  stores := make(map[types.StoreKey]types.CacheWrapper, storeCount)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
storeKeys := make(map[string]types.StoreKey)
stores := make(map[types.StoreKey]types.CacheWrapper)
storeKeys := make(map[string]types.StoreKey, storeCount)
stores := make(map[types.StoreKey]types.CacheWrapper, storeCount)

for i := uint(0); i < storeCount; i++ {
key := types.NewKVStoreKey(fmt.Sprintf("store%d", i))
storeKeys[key.Name()] = key
sdb := dbm.NewPrefixDB(db, []byte(key.Name()))
istore, err := iavl.LoadStore(sdb, log.NewNopLogger(), key, types.CommitID{}, 1000, false, nil)
require.NoError(b, err)
stores[key] = types.KVStore(istore)
}

return NewStore(db, stores, storeKeys, nil, types.TraceContext{}), storeKeys
}

func benchmarkStore(b *testing.B, storeCount, keyCount uint) {
store, storeKeys := setupStore(b, storeCount)
b.ResetTimer()

b.ReportAllocs()
for i := 0; i < b.N; i++ {
b.StopTimer()
for _, key := range storeKeys {
cstore := store.GetKVStore(key)
for j := uint(0); j < keyCount; j++ {
dataKey := fmt.Sprintf("key%s-%d", key.Name(), j)
dataValue := fmt.Sprintf("value%s-%d", key.Name(), j)
cstore.Set([]byte(dataKey), []byte(dataValue))
}
}
b.StartTimer()
store.Write()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve readability and performance of timer management in benchmarks.

Moving the StopTimer and StartTimer calls outside the inner loop can help in accurately measuring the time taken for store.Write() without the overhead of setup operations.

- for i := 0; i < b.N; i++ {
-   b.StopTimer()
-   for _, key := range storeKeys {
-     cstore := store.GetKVStore(key)
-     for j := uint(0); j < keyCount; j++ {
-       dataKey := fmt.Sprintf("key%s-%d", key.Name(), j)
-       dataValue := fmt.Sprintf("value%s-%d", key.Name(), j)
-       cstore.Set([]byte(dataKey), []byte(dataValue))
-     }
-   }
-   b.StartTimer()
-   store.Write()
- }
+ for i := 0; i < b.N; i++ {
+   for _, key := range storeKeys {
+     cstore := store.GetKVStore(key)
+     b.StopTimer()
+     for j := uint(0); j < keyCount; j++ {
+       dataKey := fmt.Sprintf("key%s-%d", key.Name(), j)
+       dataValue := fmt.Sprintf("value%s-%d", key.Name(), j)
+       cstore.Set([]byte(dataKey), []byte(dataValue))
+     }
+     b.StartTimer()
+   }
+   store.Write()
+ }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for i := 0; i < b.N; i++ {
b.StopTimer()
for _, key := range storeKeys {
cstore := store.GetKVStore(key)
for j := uint(0); j < keyCount; j++ {
dataKey := fmt.Sprintf("key%s-%d", key.Name(), j)
dataValue := fmt.Sprintf("value%s-%d", key.Name(), j)
cstore.Set([]byte(dataKey), []byte(dataValue))
}
}
b.StartTimer()
store.Write()
for i := 0; i < b.N; i++ {
for _, key := range storeKeys {
cstore := store.GetKVStore(key)
b.StopTimer()
for j := uint(0); j < keyCount; j++ {
dataKey := fmt.Sprintf("key%s-%d", key.Name(), j)
dataValue := fmt.Sprintf("value%s-%d", key.Name(), j)
cstore.Set([]byte(dataKey), []byte(dataValue))
}
b.StartTimer()
}
store.Write()
}

}
}

func BenchmarkCacheMultiStore(b *testing.B) {
storeCounts := []uint{2, 4, 8, 16, 32}
keyCounts := []uint{100, 1000, 10000}

for _, storeCount := range storeCounts {
for _, keyCount := range keyCounts {
b.Run(fmt.Sprintf("storeCount=%d/keyCount=%d", storeCount, keyCount), func(sub *testing.B) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One argument was that there will be a number of untouched stores where the Go routine may add significant overhead. It would be good to add that to your test for a fair comparison.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can be covered by the storeCount, for example if there are 2 significant writes among 10 stores, then it will be saved by 37% from the result. right?

benchmarkStore(sub, storeCount, keyCount)
})
}
}

}
9 changes: 8 additions & 1 deletion store/cachemulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"fmt"
"io"
"sync"

dbm "github.com/cosmos/cosmos-db"

Expand Down Expand Up @@ -122,9 +123,15 @@
// Write calls Write on each underlying store.
func (cms Store) Write() {
cms.db.Write()
wg := sync.WaitGroup{}
wg.Add(len(cms.stores))
for _, store := range cms.stores {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should there be a concurrency limit? or is it good enough to delegate this work completely to the go scheduler?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should do some benchmarks with databases and come up with a optimal number of runners

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we can see in #20817 (comment), from 8 stores, there is not much improvement, let me benchmark with limited number of runners

store.Write()
go func(s types.CacheWrap) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

most of the time, the dirty writes are small or even empty, does it justify the overhead of goroutine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, it would be helpful if there were only two meaningful writes (two stores), since WorkingHash requires the iavl tree re-building and root hash (one of bottlenecks)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should get some benchmarks.

defer wg.Done()
s.Write()
}(store)
Fixed Show fixed Hide fixed
}
wg.Wait()
}

// Implements CacheWrapper.
Expand Down
39 changes: 38 additions & 1 deletion store/rootmulti/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestCacheMultiStoreWithVersion(t *testing.T) {
// require we cannot commit (write) to a cache-versioned multi-store
require.Panics(t, func() {
kvStore.Set(k, []byte("newValue"))
cms.Write()
kvStore.(types.CacheWrap).Write()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I realized recovering panics from multi goroutines is tricky, @alpe have you any idea?
the above update is same check with original one, but curious

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume, you want to test for cms.Write() to panic. Your workaround with kvStore.(types.CacheWrap).Write() is pragmatic but with a cms.Write() you give away control from the main thread. Therefore the caller can not handle panics which is the bigger problem.
Another issue would be deterministic behaviour when more than one store panics.
You could recover in the go routines, collect and sort the errors. Maybe it is worth to refactor to error return values instead of panics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point!

})
}

Expand Down Expand Up @@ -1032,3 +1032,40 @@ func TestCommitStores(t *testing.T) {
})
}
}

func TestCacheMultiStoreWrite(t *testing.T) {
// for testing the cacheMultiStore parallel write, file based db is used
db, err := dbm.NewDB("test", dbm.GoLevelDBBackend, t.TempDir())
require.NoError(t, err)

ms := newMultiStoreWithMounts(db, pruningtypes.NewPruningOptions(pruningtypes.PruningNothing))
require.NoError(t, ms.LoadLatestVersion())

cacheMulti := ms.CacheMultiStore()

toVersion := int64(100)
keyCount := 100
storeKeys := []types.StoreKey{testStoreKey1, testStoreKey2, testStoreKey3}
for i := int64(1); i <= toVersion; i++ {
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for j := 0; j < keyCount; j++ {
store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
}
}
cacheMulti.Write()
ms.Commit()
}
Comment on lines +1045 to +1054
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance readability by extracting the nested loops into a helper function.

This will make the test function more concise and easier to understand.

+ func populateStore(t *testing.T, cacheMulti types.CacheMultiStore, storeKeys []types.StoreKey, toVersion int64, keyCount int) {
+     for i := int64(1); i <= toVersion; i++ {
+         for _, storeKey := range storeKeys {
+             store := cacheMulti.GetKVStore(storeKey)
+             for j := 0; j < keyCount; j++ {
+                 store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
+             }
+         }
+         cacheMulti.Write()
+         ms.Commit()
+     }
+ }

for i := int64(1); i <= toVersion; i++ {
    for _, storeKey := range storeKeys {
        store := cacheMulti.GetKVStore(storeKey)
        for j := 0; j < keyCount; j++ {
            store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
        }
    }
    cacheMulti.Write()
    ms.Commit()
}
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for i := int64(1); i <= toVersion; i++ {
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for j := 0; j < keyCount; j++ {
store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
}
}
cacheMulti.Write()
ms.Commit()
}
func populateStore(t *testing.T, cacheMulti types.CacheMultiStore, storeKeys []types.StoreKey, toVersion int64, keyCount int) {
for i := int64(1); i <= toVersion; i++ {
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for j := 0; j < keyCount; j++ {
store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
}
}
cacheMulti.Write()
ms.Commit()
}
}
for i := int64(1); i <= toVersion; i++ {
for _, storeKey := storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for j := 0; j < keyCount; j++ {
store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
}
}
cacheMulti.Write()
ms.Commit()
}


// check the data
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for i := int64(1); i <= toVersion; i++ {
for j := 0; j < keyCount; j++ {
key := []byte(fmt.Sprintf("key-%d-%d", i, j))
value := store.Get(key)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", i, j)), value)
}
}
}
Comment on lines +1056 to +1066
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance readability by extracting the verification loop into a helper function.

This will make the test function more concise and easier to understand.

+ func verifyStoreData(t *testing.T, cacheMulti types.CacheMultiStore, storeKeys []types.StoreKey, toVersion int64, keyCount int) {
+     for _, storeKey := range storeKeys {
+         store := cacheMulti.GetKVStore(storeKey)
+         for i := int64(1); i <= toVersion; i++ {
+             for j := 0; j < keyCount; j++ {
+                 key := []byte(fmt.Sprintf("key-%d-%d", i, j))
+                 value := store.Get(key)
+                 require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", i, j)), value)
+             }
+         }
+     }
+ }

for _, storeKey := range storeKeys {
    store := cacheMulti.GetKVStore(storeKey)
    for i := int64(1); i <= toVersion; i++ {
        for j := 0; j < keyCount; j++ {
            key := []byte(fmt.Sprintf("key-%d-%d", i, j))
            value := store.Get(key)
            require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", i, j)), value)
        }
    }
}
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// check the data
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for i := int64(1); i <= toVersion; i++ {
for j := 0; j < keyCount; j++ {
key := []byte(fmt.Sprintf("key-%d-%d", i, j))
value := store.Get(key)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", i, j)), value)
}
}
}
func verifyStoreData(t *testing.T, cacheMulti types.CacheMultiStore, storeKeys []types.StoreKey, toVersion int64, keyCount int) {
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for i := int64(1); i <= toVersion; i++ {
for j := 0; j < keyCount; j++ {
key := []byte(fmt.Sprintf("key-%d-%d", i, j))
value := store.Get(key)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", i, j)), value)
}
}
}
}
// check the data
verifyStoreData(t, cacheMulti, storeKeys, toVersion, keyCount)

}
Loading