-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaggregate_snapshot.go
127 lines (104 loc) · 2.83 KB
/
aggregate_snapshot.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package ehscylla
import (
"context"
"encoding/json"
"errors"
"time"
"github.com/gocql/gocql"
eh "github.com/looplab/eventhorizon"
)
type AggregateSnapshotError struct {
Err error
}
// Error implements the Error method of the error interface.
func (e *AggregateSnapshotError) Error() string {
str := "aggregate snapshot: "
if e.Err != nil {
str += e.Err.Error()
} else {
str += "unknown error"
}
return str
}
var ErrUpdateSnapshot = errors.New("could not update snapshot")
type AggregateSnapshot interface {
Restore(ctx context.Context, aggregate eh.Aggregate) (eh.Aggregate, error)
Store(ctx context.Context, aggregate eh.Aggregate) error
}
type aggregateSnapshot struct {
session *gocql.Session
boundedContext string
}
func NewAggregateSnapshot(session *gocql.Session, boundedContext string) (AggregateSnapshot, error) {
return &aggregateSnapshot{
session: session,
boundedContext: boundedContext,
}, nil
}
func (a *aggregateSnapshot) Restore(ctx context.Context, aggregate eh.Aggregate) (eh.Aggregate, error) {
aggSnapshotSupported, ok := aggregate.(Aggregate)
if !ok {
return aggregate, nil
}
var snapshotData string
var snapshotVersion int
var snapshotMetaData string
err := a.session.Query(`
SELECT
snapshot_data,
snapshot_version,
snapshot_metadata
FROM aggregate_snapshot WHERE
bounded_context = ?
AND aggregate_id = ?
AND aggregate_type = ?
`,
a.boundedContext, aggregate.EntityID().String(), aggregate.AggregateType()).Consistency(gocql.One).Scan(&snapshotData, &snapshotVersion, &snapshotMetaData)
if err != nil {
if err == gocql.ErrNotFound {
return aggSnapshotSupported, nil
}
return aggSnapshotSupported, &AggregateSnapshotError{
Err: err,
}
}
err = json.Unmarshal([]byte(snapshotData), aggSnapshotSupported.SnapshotData())
if err != nil {
return aggSnapshotSupported, &AggregateSnapshotError{
Err: err,
}
}
// Increment Version to snapshot version
aggSnapshotSupported.SetAggregateVersion(snapshotVersion)
return aggSnapshotSupported, nil
}
func (a *aggregateSnapshot) Store(ctx context.Context, aggregate eh.Aggregate) error {
aggSnapshotSupported, ok := aggregate.(Aggregate)
if !ok {
return nil
}
batch := a.session.NewBatch(gocql.LoggedBatch)
data, err := json.Marshal(aggSnapshotSupported.SnapshotData())
if err != nil {
return &eh.AggregateStoreError{
Err: err,
}
}
batch.Query(`INSERT INTO aggregate_snapshot (
bounded_context,
aggregate_id,
aggregate_type,
snapshot_data,
snapshot_version,
snapshot_timestamp,
snapshot_metadata)
VALUES (?,?,?,?,?,?,?)`, a.boundedContext, aggregate.EntityID().String(),
aggregate.AggregateType(), data, aggSnapshotSupported.AggregateVersion(), time.Now().UTC(), "")
err = a.session.ExecuteBatch(batch)
if err != nil {
return &AggregateSnapshotError{
Err: err,
}
}
return nil
}