Skip to content

Commit

Permalink
release version v1.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
datdao authored and datdao committed Jan 5, 2025
0 parents commit 5a50f13
Show file tree
Hide file tree
Showing 12 changed files with 1,912 additions and 0 deletions.
15 changes: 15 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib

# Test binary, build with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

.vscode
/vendor
71 changes: 71 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# EventHorizon Scylla
Build scalable eventstore with scylla

## Features
### Multi bounded-context
Take advantage of NoSQL. We can use event store as a single source of truth throughout all contexts in your domain without scaling issues.
### Partitioning vs. Sharding
To improve query performance, the data is partitioned and sharded based on the ***bounded_context***, ***aggregate_id***, and ***aggregate_type*** columns, also sorted by ***event_version*** column; this means that events belonging to one aggregate always live in a specific node.
### Transaction
By using LTW (Lightweight transaction), consistency level, and batch query, we archive the **Atomicity** property on a single partition.
### Snapshot
Snapshot logic is triggered when the aggregate store saves or loads events. Furthermore, you can customize the snapshot strategy.
### Compaction
Event_store table uses [Size-tiered compaction strategy](https://docs.scylladb.com/kb/compaction/#size-tiered-compaction-strategy-stcs) to improve read speed.

Aggregate_snapshot table uses [Leveled compaction strategy](https://docs.scylladb.com/kb/compaction/#leveled-compaction-strategy-lcs) to reduce disk size and improve read speed.

## Installation

```bash
go get github.com/datdao/eh-scylla
```

## Usage

### Run migration script
```sql
CREATE KEYSPACE event_sourcing WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 3};

CREATE TABLE event_store (bounded_context varchar, aggregate_id uuid, aggregate_type varchar, event_id uuid, event_type varchar, event_version int, event_data varchar, event_timestamp timestamp, event_metadata varchar, PRIMARY KEY ((bounded_context, aggregate_id, aggregate_type), event_version));

CREATE TABLE aggregate_snapshot (bounded_context varchar, aggregate_id uuid, aggregate_type varchar, snapshot_data varchar, snapshot_version int, snapshot_timestamp timestamp, snapshot_metadata varchar, PRIMARY KEY ((bounded_context, aggregate_id, aggregate_type))) WITH compaction = { 'class' : 'LeveledCompactionStrategy' };
```

### Create AggregateStore

```golang
hosts := strings.Split(os.Getenv("SCYLLA_HOSTS"), ",")
username := os.Getenv("SCYLLA_USERNAME")
password := os.Getenv("SCYLLA_PASSWORD")
hostSelectionPolicy := os.Getenv("SCYLLA_HOSTSELECTIONPOLICY")

var cluster = gocql.NewCluster(hosts...)
cluster.Authenticator = gocql.PasswordAuthenticator{Username: username, Password: password}
cluster.PoolConfig.HostSelectionPolicy = gocql.DCAwareRoundRobinPolicy(hostSelectionPolicy)
cluster.Keyspace = "event_sourcing"
cluster.Consistency = gocql.Quorum

// Create gocql cluster.
session, err := cluster.CreateSession()
if err != nil {
panic(err)
}

// Create aggreagate store
aggStore, err := ehScylla.NewAggregateStoreWithBoundedContext(session, "order", nil, nil)
if err != nil {
panic(err)
}

```

### Custom Snapshot strategy

```golang
var StrategySnapshotCustom = func(aggregate eh.Aggregate) bool {
return aggregate.EntityID().Version()%20 == 0
}

ehScylla.NewAggregateStoreWithBoundedContext(session, "order", nil, StrategySnapshotCustom)
```
127 changes: 127 additions & 0 deletions aggregate_snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package ehscylla

import (
"context"
"encoding/json"
"errors"
"time"

"github.com/gocql/gocql"
eh "github.com/looplab/eventhorizon"
)

type AggregateSnapshotError struct {
Err error
}

// Error implements the Error method of the error interface.
func (e *AggregateSnapshotError) Error() string {
str := "aggregate snapshot: "

if e.Err != nil {
str += e.Err.Error()
} else {
str += "unknown error"
}

return str
}

var ErrUpdateSnapshot = errors.New("could not update snapshot")

type AggregateSnapshot interface {
Restore(ctx context.Context, aggregate eh.Aggregate) (eh.Aggregate, error)
Store(ctx context.Context, aggregate eh.Aggregate) error
}

type aggregateSnapshot struct {
session *gocql.Session
boundedContext string
}

func NewAggregateSnapshot(session *gocql.Session, boundedContext string) (AggregateSnapshot, error) {
return &aggregateSnapshot{
session: session,
boundedContext: boundedContext,
}, nil
}

func (a *aggregateSnapshot) Restore(ctx context.Context, aggregate eh.Aggregate) (eh.Aggregate, error) {
aggSnapshotSupported, ok := aggregate.(Aggregate)
if !ok {
return aggregate, nil
}

var snapshotData string
var snapshotVersion int
var snapshotMetaData string

err := a.session.Query(`
SELECT
snapshot_data,
snapshot_version,
snapshot_metadata
FROM aggregate_snapshot WHERE
bounded_context = ?
AND aggregate_id = ?
AND aggregate_type = ?
`,
a.boundedContext, aggregate.EntityID().String(), aggregate.AggregateType()).Consistency(gocql.One).Scan(&snapshotData, &snapshotVersion, &snapshotMetaData)
if err != nil {
if err == gocql.ErrNotFound {
return aggSnapshotSupported, nil
}

return aggSnapshotSupported, &AggregateSnapshotError{
Err: err,
}
}

err = json.Unmarshal([]byte(snapshotData), aggSnapshotSupported.SnapshotData())
if err != nil {
return aggSnapshotSupported, &AggregateSnapshotError{
Err: err,
}
}

// Increment Version to snapshot version
aggSnapshotSupported.SetAggregateVersion(snapshotVersion)

return aggSnapshotSupported, nil
}

func (a *aggregateSnapshot) Store(ctx context.Context, aggregate eh.Aggregate) error {
aggSnapshotSupported, ok := aggregate.(Aggregate)
if !ok {
return nil
}

batch := a.session.NewBatch(gocql.LoggedBatch)
data, err := json.Marshal(aggSnapshotSupported.SnapshotData())
if err != nil {
return &eh.AggregateStoreError{
Err: err,
}
}

batch.Query(`INSERT INTO aggregate_snapshot (
bounded_context,
aggregate_id,
aggregate_type,
snapshot_data,
snapshot_version,
snapshot_timestamp,
snapshot_metadata)
VALUES (?,?,?,?,?,?,?)`, a.boundedContext, aggregate.EntityID().String(),
aggregate.AggregateType(), data, aggSnapshotSupported.AggregateVersion(), time.Now().UTC(), "")

err = a.session.ExecuteBatch(batch)

if err != nil {
return &AggregateSnapshotError{
Err: err,
}
}

return nil
}
7 changes: 7 additions & 0 deletions aggregate_snapshot_strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package ehscylla

import eh "github.com/looplab/eventhorizon"

var StrategySnapshotDefault = func(aggregate eh.Aggregate) bool {
return aggregate.EntityID().Version()%5 == 0
}
Loading

0 comments on commit 5a50f13

Please sign in to comment.