Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add snapshot and restore methods #39

Merged
merged 10 commits into from
Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
*.test
*.out
snapshot.bin

# Vendor and workspace
vendor/
go.work
52 changes: 25 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,22 @@ This package contains a **high-performance, columnar, in-memory storage engine**

## Features

* Optimized, cache-friendly **columnar data layout** that minimizes cache-misses.
* Optimized for **zero heap allocation** during querying (see benchmarks below).
* Optimized **batch updates/deletes**, an update during a transaction takes around `12ns`.
* Support for **SIMD-enabled filtering** (i.e. "where" clause) by leveraging [bitmap indexing](https://github.com/kelindar/bitmap).
* Support for **columnar projection** (i.e. "select" clause) for fast retrieval.
* Support for **computed indexes** that are dynamically calculated based on provided predicate.
* Support for **concurrent updates** using sharded latches to keep things fast.
* Support for **transaction isolation**, allowing you to create transactions and commit/rollback.
* Support for **expiration** of rows based on time-to-live or expiration column.
* Support for **atomic increment/decrement** of numerical values, transactionally.
* Support for **change data stream** that streams all commits consistently.
- Optimized, cache-friendly **columnar data layout** that minimizes cache-misses.
- Optimized for **zero heap allocation** during querying (see benchmarks below).
- Optimized **batch updates/deletes**, an update during a transaction takes around `12ns`.
- Support for **SIMD-enabled filtering** (i.e. "where" clause) by leveraging [bitmap indexing](https://github.com/kelindar/bitmap).
- Support for **columnar projection** (i.e. "select" clause) for fast retrieval.
- Support for **computed indexes** that are dynamically calculated based on provided predicate.
- Support for **concurrent updates** using sharded latches to keep things fast.
- Support for **transaction isolation**, allowing you to create transactions and commit/rollback.
- Support for **expiration** of rows based on time-to-live or expiration column.
- Support for **atomic increment/decrement** of numerical values, transactionally.
- Support for **change data stream** that streams all commits consistently.
- Support for **concurrent snapshotting** allowing to store the entire collection into a file.

## Documentation

The general idea is to leverage cache-friendly ways of organizing data in [structures of arrays (SoA)](https://en.wikipedia.org/wiki/AoS_and_SoA) otherwise known "columnar" storage in database design. This, in turn allows us to iterate and filter over columns very efficiently. On top of that, this package also adds [bitmap indexing](https://en.wikipedia.org/wiki/Bitmap_index) to the columnar storage, allowing to build filter queries using binary `and`, `and not`, `or` and `xor` (see [kelindar/bitmap](https://github.com/kelindar/bitmap) with SIMD support).
The general idea is to leverage cache-friendly ways of organizing data in [structures of arrays (SoA)](https://en.wikipedia.org/wiki/AoS_and_SoA) otherwise known "columnar" storage in database design. This, in turn allows us to iterate and filter over columns very efficiently. On top of that, this package also adds [bitmap indexing](https://en.wikipedia.org/wiki/Bitmap_index) to the columnar storage, allowing to build filter queries using binary `and`, `and not`, `or` and `xor` (see [kelindar/bitmap](https://github.com/kelindar/bitmap) with SIMD support).

- [Collection and Columns](#collection-and-columns)
- [Querying and Indexing](#querying-and-indexing)
Expand All @@ -43,7 +44,7 @@ The general idea is to leverage cache-friendly ways of organizing data in [struc

## Collection and Columns

In order to get data into the store, you'll need to first create a `Collection` by calling `NewCollection()` method. Each collection requires a schema, which can be either specified manually by calling `CreateColumn()` multiple times or automatically inferred from an object by calling `CreateColumnsOf()` function.
In order to get data into the store, you'll need to first create a `Collection` by calling `NewCollection()` method. Each collection requires a schema, which can be either specified manually by calling `CreateColumn()` multiple times or automatically inferred from an object by calling `CreateColumnsOf()` function.

In the example below we're loading some `JSON` data by using `json.Unmarshal()` and auto-creating colums based on the first element on the loaded slice. After this is done, we can then load our data by inserting the objects one by one into the collection. This is accomplished by calling `Insert()` method on the collection itself repeatedly.

Expand Down Expand Up @@ -89,7 +90,7 @@ players.Query(func(txn *Txn) error {

## Querying and Indexing

The store allows you to query the data based on a presence of certain attributes or their values. In the example below we are querying our collection and applying a *filtering* operation bu using `WithValue()` method on the transaction. This method scans the values and checks whether a certain predicate evaluates to `true`. In this case, we're scanning through all of the players and looking up their `class`, if their class is equal to "rogue", we'll take it. At the end, we're calling `Count()` method that simply counts the result set.
The store allows you to query the data based on a presence of certain attributes or their values. In the example below we are querying our collection and applying a _filtering_ operation bu using `WithValue()` method on the transaction. This method scans the values and checks whether a certain predicate evaluates to `true`. In this case, we're scanning through all of the players and looking up their `class`, if their class is equal to "rogue", we'll take it. At the end, we're calling `Count()` method that simply counts the result set.

```go
// This query performs a full scan of "class" column
Expand All @@ -101,7 +102,7 @@ players.Query(func(txn *column.Txn) error {
})
```

Now, what if we'll need to do this query very often? It is possible to simply *create an index* with the same predicate and have this computation being applied every time (a) an object is inserted into the collection and (b) an value of the dependent column is updated. Let's look at the example below, we're fist creating a `rogue` index which depends on "class" column. This index applies the same predicate which only returns `true` if a class is "rogue". We then can query this by simply calling `With()` method and providing the index name.
Now, what if we'll need to do this query very often? It is possible to simply _create an index_ with the same predicate and have this computation being applied every time (a) an object is inserted into the collection and (b) an value of the dependent column is updated. Let's look at the example below, we're fist creating a `rogue` index which depends on "class" column. This index applies the same predicate which only returns `true` if a class is "rogue". We then can query this by simply calling `With()` method and providing the index name.

An index is essentially akin to a boolean column, so you could technically also select it's value when querying it. Now, in this example the query would be around `10-100x` faster to execute as behind the scenes it uses [bitmap indexing](https://github.com/kelindar/bitmap) for the "rogue" index and performs a simple logical `AND` operation on two bitmaps when querying. This avoid the entire scanning and applying of a predicate during the `Query`.

Expand Down Expand Up @@ -140,7 +141,7 @@ players.Query(func(txn *Txn) error {
})
```

Now, you can combine all of the methods and keep building more complex queries. When querying indexed and non-indexed fields together it is important to know that as every scan will apply to only the selection, speeding up the query. So if you have a filter on a specific index that selects 50% of players and then you perform a scan on that (e.g. `WithValue()`), it will only scan 50% of users and hence will be 2x faster.
Now, you can combine all of the methods and keep building more complex queries. When querying indexed and non-indexed fields together it is important to know that as every scan will apply to only the selection, speeding up the query. So if you have a filter on a specific index that selects 50% of players and then you perform a scan on that (e.g. `WithValue()`), it will only scan 50% of users and hence will be 2x faster.

```go
// How many rogues that are over 30 years old?
Expand All @@ -156,8 +157,8 @@ players.Query(func(txn *Txn) error {

In all of the previous examples, we've only been doing `Count()` operation which counts the number of elements in the result set. In this section we'll look how we can iterate over the result set. In short, there's 2 main methods that allow us to do it:

1. `Range()` method which takes in a column name as an argument and allows faster get/set of the values for that column.
2. `Select()` method which doesn't pre-select any specific column, so it's usually a bit slower and it also does not allow any updates.
1. `Range()` method which takes in a column name as an argument and allows faster get/set of the values for that column.
2. `Select()` method which doesn't pre-select any specific column, so it's usually a bit slower and it also does not allow any updates.

Let's first examine the `Range()` method. In the example below we select all of the rogues from our collection and print out their name by using the `Range()` method and providing "name" column to it. The callback containing the `Cursor` allows us to quickly get the value of the column by calling `String()` method to retrieve a string value. It also contains methods such as `Int()`, `Uint()`, `Float()` or more generic `Value()` to pull data of different types.

Expand Down Expand Up @@ -194,7 +195,6 @@ players.Query(func(txn *Txn) error {
})
```


Now, what if you need to quickly delete all some of the data in the collection? In this case `DeleteAll()` or `DeleteIf()` methods come in handy. These methods are very fast (especially `DeleteAll()`) and allow you to quickly delete the appropriate results, transactionally. In the example below we delete all of the rogues from the collection by simply selecting them in the transaction and calling the `DeleteAll()` method.

```go
Expand All @@ -220,7 +220,7 @@ players.Query(func(txn *Txn) error {
})
```

In certain cases, you might want to atomically increment or decrement numerical values. In order to accomplish this you can use the provided `Add..()` or `Add..At()` operations of the `Cursor` or `Selector`. Note that the indexes will also be updated accordingly and the predicates re-evaluated with the most up-to-date values. In the below example we're incrementing the balance of all our rogues by *500* atomically.
In certain cases, you might want to atomically increment or decrement numerical values. In order to accomplish this you can use the provided `Add..()` or `Add..At()` operations of the `Cursor` or `Selector`. Note that the indexes will also be updated accordingly and the predicates re-evaluated with the most up-to-date values. In the below example we're incrementing the balance of all our rogues by _500_ atomically.

```go
players.Query(func(txn *Txn) error {
Expand All @@ -235,7 +235,7 @@ players.Query(func(txn *Txn) error {

Sometimes, it is useful to automatically delete certain rows when you do not need them anymore. In order to do this, the library automatically adds an `expire` column to each new collection and starts a cleanup goroutine aynchronously that runs periodically and cleans up the expired objects. In order to set this, you can simply use `InsertWithTTL()` method on the collection that allows to insert an object with a time-to-live duration defined.

In the example below we are inserting an object to the collection and setting the time-to-live to *5 seconds* from the current time. After this time, the object will be automatically evicted from the collection and its space can be reclaimed.
In the example below we are inserting an object to the collection and setting the time-to-live to _5 seconds_ from the current time. After this time, the object will be automatically evicted from the collection and its space can be reclaimed.

```go
players.InsertWithTTL(map[string]interface{}{
Expand Down Expand Up @@ -284,11 +284,10 @@ players.Query(func(txn *column.Txn) error {
})

// Returns an error, transaction will be rolled back
return fmt.Errorf("bug")
return fmt.Errorf("bug")
})
```


## Streaming Changes

This library also supports streaming out all transaction commits consistently, as they happen. This allows you to implement your own change data capture (CDC) listeners, stream data into kafka or into a remote database for durability. In order to enable it, you can simply provide an implementation of a `commit.Writer` interface during the creation of the collection.
Expand Down Expand Up @@ -334,7 +333,6 @@ go func() {
}()
```


## Complete Example

```go
Expand Down Expand Up @@ -365,7 +363,7 @@ func main(){
players.Insert(v)
}

// This performs a full scan on 3 different columns and compares them given the
// This performs a full scan on 3 different columns and compares them given the
// specified predicates. This is not indexed, but does a columnar scan which is
// cache-friendly.
players.Query(func(txn *column.Txn) error {
Expand All @@ -388,7 +386,7 @@ func main(){
return nil
})

// Same condition as above, but we also select the actual names of those
// Same condition as above, but we also select the actual names of those
// players and iterate through them.
players.Query(func(txn *column.Txn) error {
txn.With("human", "mage", "old").Range("name", func(v column.Cursor) {
Expand Down Expand Up @@ -444,7 +442,7 @@ running update of balance of everyone...

running update of age of mages...
-> updated 6040000 rows
-> update took 85.669422ms
-> update took 85.669422ms
```

## Contributing
Expand Down
53 changes: 36 additions & 17 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,25 @@ const (

// Collection represents a collection of objects in a columnar format
type Collection struct {
count uint64 // The current count of elements
txns *txnPool // The transaction pool
lock sync.RWMutex // The mutex to guard the fill-list
slock *smutex.SMutex128 // The sharded mutex for the collection
cols columns // The map of columns
fill bitmap.Bitmap // The fill-list
opts Options // The options configured
codec codec // The compression codec
writer commit.Writer // The commit writer
pk *columnKey // The primary key column
cancel context.CancelFunc // The cancellation function for the context
count uint64 // The current count of elements
txns *txnPool // The transaction pool
lock sync.RWMutex // The mutex to guard the fill-list
slock *smutex.SMutex128 // The sharded mutex for the collection
cols columns // The map of columns
fill bitmap.Bitmap // The fill-list
opts Options // The options configured
codec codec // The compression codec
logger commit.Logger // The commit logger for CDC
record *commit.Log // The commit logger for snapshot
pk *columnKey // The primary key column
cancel context.CancelFunc // The cancellation function for the context
commits sync.Map // The array of commit IDs for corresponding chunk
}

// Options represents the options for a collection.
type Options struct {
Capacity int // The initial capacity when creating columns
Writer commit.Writer // The writer for the commit log (optional)
Writer commit.Logger // The writer for the commit log (optional)
Vacuum time.Duration // The interval at which the vacuum of expired entries will be done
}

Expand Down Expand Up @@ -76,7 +78,7 @@ func NewCollection(opts ...Options) *Collection {
opts: options,
slock: new(smutex.SMutex128),
fill: make(bitmap.Bitmap, 0, options.Capacity>>6),
writer: options.Writer,
logger: options.Writer,
codec: newCodec(&options),
cancel: cancel,
}
Expand Down Expand Up @@ -227,13 +229,24 @@ func (c *Collection) createColumnKey(columnName string, column *columnKey) error
// CreateColumnsOf registers a set of columns that are present in the target object.
func (c *Collection) CreateColumnsOf(object Object) error {
for k, v := range object {
c.CreateColumn(k, ForKind(reflect.TypeOf(v).Kind()))
column, err := ForKind(reflect.TypeOf(v).Kind())
if err != nil {
return err
}

if err := c.CreateColumn(k, column); err != nil {
return err
}
}
return nil
}

// CreateColumn creates a column of a specified type and adds it to the collection.
func (c *Collection) CreateColumn(columnName string, column Column) error {
if _, ok := c.cols.Load(columnName); ok {
return fmt.Errorf("column: unable to create column '%s', already exists", columnName)
}

column.Grow(uint32(c.opts.Capacity))
c.cols.Store(columnName, columnFor(columnName, column))

Expand Down Expand Up @@ -327,6 +340,7 @@ func (c *Collection) Query(fn func(txn *Txn) error) error {
// Close closes the collection and clears up all of the resources.
func (c *Collection) Close() error {
c.cancel()

return nil
}

Expand Down Expand Up @@ -373,10 +387,15 @@ type columnEntry struct {
cols []*column // The columns and its computed
}

// Count returns the number of columns
func (c *columns) Count() int {
// Count returns the number of columns, excluding indexes.
func (c *columns) Count() (count int) {
cols := c.cols.Load().([]columnEntry)
return len(cols)
for _, v := range cols {
if !v.cols[0].IsIndex() {
count++
}
}
return
}

// Range iterates over columns in the registry. This is faster than RangeUntil
Expand Down
48 changes: 37 additions & 11 deletions collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,12 +450,48 @@ func TestInsertWithTTL(t *testing.T) {
})
}

func TestCreateColumnsOfInvalidKind(t *testing.T) {
obj := map[string]interface{}{
"name": complex64(1),
}

col := NewCollection()
assert.Error(t, col.CreateColumnsOf(obj))
}

func TestCreateColumnsOfDuplicate(t *testing.T) {
obj := map[string]interface{}{
"name": "Roman",
}

col := NewCollection()
assert.NoError(t, col.CreateColumnsOf(obj))
assert.Error(t, col.CreateColumnsOf(obj))
}

// --------------------------- Mocks & Fixtures ----------------------------

// loadPlayers loads a list of players from the fixture
func loadPlayers(amount int) *Collection {
out := newEmpty(amount)

// Load and copy until we reach the amount required
data := loadFixture("players.json")
for i := 0; i < amount/len(data); i++ {
out.Query(func(txn *Txn) error {
for _, p := range data {
txn.InsertObject(p)
}
return nil
})
}
return out
}

// newEmpty creates a new empty collection for a the fixture
func newEmpty(capacity int) *Collection {
out := NewCollection(Options{
Capacity: amount,
Capacity: capacity,
Vacuum: 500 * time.Millisecond,
Writer: new(noopWriter),
})
Expand Down Expand Up @@ -504,16 +540,6 @@ func loadPlayers(amount int) *Collection {
return r.Float() >= 30
})

// Load and copy until we reach the amount required
data := loadFixture("players.json")
for i := 0; i < amount/len(data); i++ {
out.Query(func(txn *Txn) error {
for _, p := range data {
txn.InsertObject(p)
}
return nil
})
}
return out
}

Expand Down
Loading