diff --git a/internal/broker/cluster/swarm.go b/internal/broker/cluster/swarm.go
index 97925254..171b9502 100644
--- a/internal/broker/cluster/swarm.go
+++ b/internal/broker/cluster/swarm.go
@@ -151,7 +151,7 @@ func (s *Swarm) onPeerOffline(name mesh.PeerName) {
dead := &deadPeer{name: name}
s.state.SubscriptionsOf(name, func(ev *event.Subscription) {
s.OnUnsubscribe(dead, ev) // Notify locally that the subscription is gone
- s.state.Remove(ev) // Remove the state from ourselves
+ s.state.Del(ev) // Remove the state from ourselves
})
}
}
@@ -248,22 +248,22 @@ func (s *Swarm) merge(buf []byte) (mesh.GossipData, error) {
// Merge and get the delta
delta := s.state.Merge(other)
- other.Subscriptions(func(ev *event.Subscription, t event.Time) {
+ other.Subscriptions(func(ev *event.Subscription, v event.Value) {
if ev.Peer == uint64(s.router.Ourself.Name) {
return // Skip ourselves
}
// Find the active peer for this subscription event
- encoded := ev.Encode()
+ key := ev.Key()
peer := s.findPeer(mesh.PeerName(ev.Peer))
// If the subscription is added, notify (TODO: use channels)
- if t.IsAdded() && peer.onSubscribe(encoded, ev.Ssid) && peer.IsActive() {
+ if v.IsAdded() && peer.onSubscribe(key, ev.Ssid) && peer.IsActive() {
s.OnSubscribe(peer, ev)
}
// If the subscription is removed, notify (TODO: use channels)
- if t.IsRemoved() && peer.onUnsubscribe(encoded, ev.Ssid) && peer.IsActive() {
+ if v.IsRemoved() && peer.onUnsubscribe(key, ev.Ssid) && peer.IsActive() {
s.OnUnsubscribe(peer, ev)
}
})
@@ -348,17 +348,17 @@ func (s *Swarm) NotifyBeginOf(ev event.Event) {
// NotifyEndOf notifies the swarm when an event is stopped being triggered.
func (s *Swarm) NotifyEndOf(ev event.Event) {
- s.state.Remove(ev)
+ s.state.Del(ev)
// Create a delta for broadcasting just this operation
op := event.NewState("")
- op.Remove(ev)
+ op.Del(ev)
s.gossip.GossipBroadcast(op)
}
// Contains checks whether an event is currently triggered within the cluster.
func (s *Swarm) Contains(ev event.Event) bool {
- return s.state.Contains(ev)
+ return s.state.Has(ev)
}
// Close terminates the connection.
diff --git a/internal/event/crdt/durable.go b/internal/event/crdt/durable.go
index 35f86142..fe04e12f 100644
--- a/internal/event/crdt/durable.go
+++ b/internal/event/crdt/durable.go
@@ -26,12 +26,12 @@ import (
"github.com/tidwall/buntdb"
)
-// getTime retrieves a time from the store.
-func getTime(tx *buntdb.Tx, item string) Time {
+// getValue retrieves a time from the store.
+func getValue(tx *buntdb.Tx, item string) Value {
if t, err := tx.Get(item); err == nil {
- return decodeTime(t)
+ return decodeValue(t)
}
- return Time{}
+ return newValue()
}
// Durable represents a last-write-wins CRDT set which can be persisted to disk.
@@ -46,7 +46,7 @@ func NewDurable(dir string) *Durable {
}
// newDurableWith creates a new last-write-wins set with bias for 'add'.
-func newDurableWith(path string, items map[string]Time) *Durable {
+func newDurableWith(path string, items map[string]Value) *Durable {
if path == "" {
path = ":memory:"
}
@@ -72,7 +72,7 @@ func newDurableWith(path string, items map[string]Time) *Durable {
}
// Store stores the item into the transaction.
-func (s *Durable) store(tx *buntdb.Tx, key string, t Time) {
+func (s *Durable) store(tx *buntdb.Tx, key string, t Value) {
var opts *buntdb.SetOptions
if t.IsRemoved() {
opts = &buntdb.SetOptions{
@@ -81,83 +81,91 @@ func (s *Durable) store(tx *buntdb.Tx, key string, t Time) {
}
}
- tx.Set(key, t.Encode(), opts)
+ tx.Set(key, t.encode(), opts)
}
// Fetch fetches the item either from transaction or cache.
-func (s *Durable) fetch(item string) Time {
+func (s *Durable) fetch(item string) Value {
cacheKey := binary.ToBytes(item)
if v, err := s.cache.Get(cacheKey); err == nil {
- return decodeTime(binary.ToString(&v))
+ return decodeValue(binary.ToString(&v))
}
tx, _ := s.db.Begin(false)
defer tx.Rollback()
if t, err := tx.Get(item); err == nil {
s.cache.Set(cacheKey, binary.ToBytes(t), 60)
- return decodeTime(t)
+ return decodeValue(t)
}
- return Time{}
+ return newValue()
}
// Add adds a value to the set.
-func (s *Durable) Add(item string) {
+func (s *Durable) Add(item string, value []byte) {
s.db.Update(func(tx *buntdb.Tx) error {
- t := getTime(tx, item)
- s.store(tx, item, Time{AddTime: Now(), DelTime: t.DelTime})
+ t, now := getValue(tx, item), Now()
+ if t.AddTime() < now {
+ t.setAddTime(Now())
+ t.setValue(value)
+ s.store(tx, item, t)
+ }
return nil
})
}
-// Remove removes the value from the set.
-func (s *Durable) Remove(item string) {
+// Del removes the value from the set.
+func (s *Durable) Del(item string) {
s.db.Update(func(tx *buntdb.Tx) error {
- v := getTime(tx, item)
- s.store(tx, item, Time{AddTime: v.AddTime, DelTime: Now()})
+ t, now := getValue(tx, item), Now()
+ if t.DelTime() < now {
+ t.setDelTime(Now())
+ s.store(tx, item, t)
+ }
return nil
})
}
-// Contains checks if a value is present in the set.
-func (s *Durable) Contains(item string) bool {
+// Has checks if a value is present in the set.
+func (s *Durable) Has(item string) bool {
return s.fetch(item).IsAdded()
}
// Get retrieves the time for an item.
-func (s *Durable) Get(item string) Time {
+func (s *Durable) Get(item string) Value {
return s.fetch(item)
}
// Merge merges two LWW sets. This also modifies the set being merged in
// to leave only the delta.
-func (s *Durable) Merge(other Set) {
+func (s *Durable) Merge(other Map) {
r := other.(*Volatile)
r.lock.Lock()
defer r.lock.Unlock()
s.db.Update(func(stx *buntdb.Tx) error {
for key, rt := range r.data {
- st := getTime(stx, key)
+ st := getValue(stx, key)
- // Update add time
- if st.AddTime < rt.AddTime {
- st.AddTime = rt.AddTime
+ // Update add time & value
+ if st.AddTime() < rt.AddTime() {
+ st.setAddTime(rt.AddTime())
} else {
- rt.AddTime = 0 // Remove from delta
+ rt.setAddTime(0) // Remove from delta
}
// Update delete time
- if st.DelTime < rt.DelTime {
- st.DelTime = rt.DelTime
+ if st.DelTime() < rt.DelTime() {
+ st.setDelTime(rt.DelTime())
} else {
- rt.DelTime = 0 // Remove from delta
+ rt.setDelTime(0) // Remove from delta
}
if rt.IsZero() {
delete(r.data, key) // Remove from delta
} else {
- s.store(stx, key, st) // Merge the new value
- r.data[key] = rt // Update the delta
+ st.setValue(rt.Value()) // Set the new value
+ s.store(stx, key, st) // Merge the new value
+ r.data[key] = rt // Update the delta
}
}
@@ -166,21 +174,21 @@ func (s *Durable) Merge(other Set) {
}
// Range iterates through the events for a specific prefix.
-func (s *Durable) Range(prefix []byte, f func(string, Time) bool) {
+func (s *Durable) Range(prefix []byte, f func(string, Value) bool) {
s.db.View(func(tx *buntdb.Tx) error {
return tx.Ascend("", func(k, v string) bool {
if !bytes.HasPrefix(binary.ToBytes(k), prefix) {
return true
}
- return f(k, decodeTime(v))
+ return f(k, decodeValue(v))
})
})
}
// Count returns the number of items in the set.
func (s *Durable) Count() (count int) {
- s.Range(nil, func(k string, v Time) bool {
+ s.Range(nil, func(k string, v Value) bool {
count++
return true
})
@@ -188,9 +196,9 @@ func (s *Durable) Count() (count int) {
}
// ToMap converts the set to a map (useful for testing).
-func (s *Durable) toMap() map[string]Time {
- m := make(map[string]Time)
- s.Range(nil, func(k string, v Time) bool {
+func (s *Durable) toMap() map[string]Value {
+ m := make(map[string]Value)
+ s.Range(nil, func(k string, v Value) bool {
m[k] = v
return true
})
diff --git a/internal/event/crdt/durable_test.go b/internal/event/crdt/durable_test.go
index b16e1b3d..a10a1aaa 100644
--- a/internal/event/crdt/durable_test.go
+++ b/internal/event/crdt/durable_test.go
@@ -16,144 +16,206 @@ package crdt
import (
"fmt"
- "io/ioutil"
"sync"
"testing"
- "time"
- "github.com/golang/snappy"
"github.com/kelindar/binary"
"github.com/stretchr/testify/assert"
)
-func TestDurableAddContains(t *testing.T) {
- testStr := "ABCD"
-
- lww := NewDurable("")
- assert.False(t, lww.Contains(testStr))
-
- lww.Add(testStr)
- assert.True(t, lww.Contains(testStr))
-
- entry := lww.Get(testStr)
- assert.True(t, entry.IsAdded())
- assert.False(t, entry.IsRemoved())
- assert.False(t, entry.IsZero())
-}
-
-func TestDurableAddRemoveContains(t *testing.T) {
- lww := NewDurable("")
- testStr := "object2"
-
- lww.Add(testStr)
- time.Sleep(1 * time.Millisecond)
- lww.Remove(testStr)
-
- assert.False(t, lww.Contains(testStr))
-
- entry := lww.Get(testStr)
- assert.False(t, entry.IsAdded())
- assert.True(t, entry.IsRemoved())
- assert.False(t, entry.IsZero())
-}
-
-func TestDurableMerge(t *testing.T) {
- var T = func(add, del int64) Time {
- return Time{AddTime: add, DelTime: del}
+func TestAddRemove(t *testing.T) {
+ defer restoreClock(Now)
+ for _, tc := range []struct {
+ initial Map
+ expected Map
+ actions []Action
+ }{
+ {
+ initial: mapOf(false, T("A", 10, 0, "A1")),
+ expected: mapOf(false, T("A", 20, 0, "A2")),
+ actions: []Action{T("A", 20, 0, "A2")},
+ },
+ {
+ initial: mapOf(false, T("A", 10, 0, "A1")),
+ expected: mapOf(false, T("A", 10, 20, "A1")),
+ actions: []Action{T("A", 0, 20, "A1")},
+ },
+ {
+ initial: mapOf(false, T("A", 10, 0, "A1")),
+ expected: mapOf(false, T("A", 20, 0, "A2")),
+ actions: []Action{T("A", 20, 0, "A2"), T("A", 15, 0, "A3")},
+ },
+ {
+ initial: mapOf(false, T("A", 10, 0, "A1")),
+ expected: mapOf(false, T("A", 10, 20, "A1")),
+ actions: []Action{T("A", 0, 20), T("A", 0, 15)},
+ },
+ {
+ initial: mapOf(true, T("A", 10, 0, "A1")),
+ expected: mapOf(true, T("A", 20, 0, "A2")),
+ actions: []Action{T("A", 20, 0, "A2")},
+ },
+ {
+ initial: mapOf(true, T("A", 10, 0, "A1")),
+ expected: mapOf(true, T("A", 10, 20, "A1")),
+ actions: []Action{T("A", 0, 20, "A1")},
+ },
+ {
+ initial: mapOf(true, T("A", 10, 0, "A1")),
+ expected: mapOf(true, T("A", 20, 0, "A2")),
+ actions: []Action{T("A", 20, 0, "A2"), T("A", 15, 0, "A3")},
+ },
+ {
+ initial: mapOf(true, T("A", 10, 0, "A1")),
+ expected: mapOf(true, T("A", 10, 20, "A1")),
+ actions: []Action{T("A", 0, 20), T("A", 0, 15)},
+ },
+ } {
+ for _, f := range tc.actions {
+ k, v := f()
+ if v.IsAdded() {
+ setClock(v.AddTime())
+ tc.initial.Add(k, v.Value())
+ }
+ if v.IsRemoved() {
+ setClock(v.DelTime())
+ tc.initial.Del(k)
+ }
+
+ equalSets(t, tc.expected, tc.initial)
+ assert.Equal(t, tc.expected.Count(), tc.initial.Count())
+ }
}
+}
+func TestMerge(t *testing.T) {
for _, tc := range []struct {
- lww1, expected *Durable
- lww2, delta *Volatile
+ lww1, expected Map
+ lww2, delta Map
valid, invalid []string
}{
+ // Volatile -> Durable
+ {
+ lww1: mapOf(true, T("A", 10, 0, "A1"), T("B", 20, 0, "B1")),
+ lww2: mapOf(false, T("A", 0, 20, "A2"), T("B", 0, 20, "B2")),
+ expected: mapOf(true, T("A", 10, 20, "A2"), T("B", 20, 20, "B2")),
+ delta: mapOf(false, T("A", 0, 20, "A2"), T("B", 0, 20, "B2")),
+ valid: []string{"B"},
+ invalid: []string{"A"},
+ },
+ {
+ lww1: mapOf(true, T("A", 10, 0, "A1"), T("B", 20, 0, "B1")),
+ lww2: mapOf(false, T("A", 0, 20), T("B", 10, 0, "B2")),
+ expected: mapOf(true, T("A", 10, 20), T("B", 20, 0, "B1")),
+ delta: mapOf(false, T("A", 0, 20)),
+ valid: []string{"B"},
+ invalid: []string{"A"},
+ },
+ {
+ lww1: mapOf(true, T("A", 30, 0, "A1"), T("B", 20, 0, "B1")),
+ lww2: mapOf(false, T("A", 20, 0, "A2"), T("B", 10, 0, "B2")),
+ expected: mapOf(true, T("A", 30, 0, "A1"), T("B", 20, 0, "B1")),
+ delta: NewVolatile(),
+ valid: []string{"A", "B"},
+ invalid: []string{},
+ },
+ {
+ lww1: mapOf(true, T("A", 10, 0, "A1"), T("B", 0, 20)),
+ lww2: mapOf(false, T("C", 10, 0, "C1"), T("D", 0, 20)),
+ expected: mapOf(true, T("A", 10, 0, "A1"), T("B", 0, 20), T("C", 10, 0, "C1"), T("D", 0, 20)),
+ delta: mapOf(false, T("C", 10, 0, "C1"), T("D", 0, 20)),
+ valid: []string{"A", "C"},
+ invalid: []string{"B", "D"},
+ },
{
- lww1: newDurableWith("", map[string]Time{"A": T(10, 0), "B": T(20, 0)}),
- lww2: newVolatileWith(map[string]Time{"A": T(0, 20), "B": T(0, 20)}),
- expected: newDurableWith("", map[string]Time{"A": T(10, 20), "B": T(20, 20)}),
- delta: newVolatileWith(map[string]Time{"A": T(0, 20), "B": T(0, 20)}),
+ lww1: mapOf(true, T("A", 10, 0, "A1"), T("B", 30, 0, "B1")),
+ lww2: mapOf(false, T("A", 20, 0, "A2"), T("B", 20, 0, "B2")),
+ expected: mapOf(true, T("A", 20, 0, "A2"), T("B", 30, 0, "B1")),
+ delta: mapOf(false, T("A", 20, 0, "A2")),
+ valid: []string{"A", "B"},
+ invalid: []string{},
+ },
+ {
+ lww1: mapOf(true, T("A", 0, 10), T("B", 0, 30)),
+ lww2: mapOf(false, T("A", 0, 20), T("B", 0, 20)),
+ expected: mapOf(true, T("A", 0, 20), T("B", 0, 30)),
+ delta: mapOf(false, T("A", 0, 20)),
+ valid: []string{},
+ invalid: []string{"A", "B"},
+ },
+
+ // Volatile -> Volatile
+ {
+ lww1: mapOf(false, T("A", 10, 0, "A1"), T("B", 20, 0, "B1")),
+ lww2: mapOf(false, T("A", 0, 20, "A2"), T("B", 0, 20, "B2")),
+ expected: mapOf(false, T("A", 10, 20, "A2"), T("B", 20, 20, "B2")),
+ delta: mapOf(false, T("A", 0, 20, "A2"), T("B", 0, 20, "B2")),
valid: []string{"B"},
invalid: []string{"A"},
},
{
- lww1: newDurableWith("", map[string]Time{"A": T(10, 0), "B": T(20, 0)}),
- lww2: newVolatileWith(map[string]Time{"A": T(0, 20), "B": T(10, 0)}),
- expected: newDurableWith("", map[string]Time{"A": T(10, 20), "B": T(20, 0)}),
- delta: newVolatileWith(map[string]Time{"A": T(0, 20)}),
+ lww1: mapOf(false, T("A", 10, 0, "A1"), T("B", 20, 0, "B1")),
+ lww2: mapOf(false, T("A", 0, 20), T("B", 10, 0, "B2")),
+ expected: mapOf(false, T("A", 10, 20), T("B", 20, 0, "B1")),
+ delta: mapOf(false, T("A", 0, 20)),
valid: []string{"B"},
invalid: []string{"A"},
},
{
- lww1: newDurableWith("", map[string]Time{"A": T(30, 0), "B": T(20, 0)}),
- lww2: newVolatileWith(map[string]Time{"A": T(20, 0), "B": T(10, 0)}),
- expected: newDurableWith("", map[string]Time{"A": T(30, 0), "B": T(20, 0)}),
+ lww1: mapOf(false, T("A", 30, 0, "A1"), T("B", 20, 0, "B1")),
+ lww2: mapOf(false, T("A", 20, 0, "A2"), T("B", 10, 0, "B2")),
+ expected: mapOf(false, T("A", 30, 0, "A1"), T("B", 20, 0, "B1")),
delta: NewVolatile(),
valid: []string{"A", "B"},
invalid: []string{},
},
{
- lww1: newDurableWith("", map[string]Time{"A": T(10, 0), "B": T(0, 20)}),
- lww2: newVolatileWith(map[string]Time{"C": T(10, 0), "D": T(0, 20)}),
- expected: newDurableWith("", map[string]Time{"A": T(10, 0), "B": T(0, 20), "C": T(10, 0), "D": T(0, 20)}),
- delta: newVolatileWith(map[string]Time{"C": T(10, 0), "D": T(0, 20)}),
+ lww1: mapOf(false, T("A", 10, 0, "A1"), T("B", 0, 20)),
+ lww2: mapOf(false, T("C", 10, 0, "C1"), T("D", 0, 20)),
+ expected: mapOf(false, T("A", 10, 0, "A1"), T("B", 0, 20), T("C", 10, 0, "C1"), T("D", 0, 20)),
+ delta: mapOf(false, T("C", 10, 0, "C1"), T("D", 0, 20)),
valid: []string{"A", "C"},
invalid: []string{"B", "D"},
},
{
- lww1: newDurableWith("", map[string]Time{"A": T(10, 0), "B": T(30, 0)}),
- lww2: newVolatileWith(map[string]Time{"A": T(20, 0), "B": T(20, 0)}),
- expected: newDurableWith("", map[string]Time{"A": T(20, 0), "B": T(30, 0)}),
- delta: newVolatileWith(map[string]Time{"A": T(20, 0)}),
+ lww1: mapOf(false, T("A", 10, 0, "A1"), T("B", 30, 0, "B1")),
+ lww2: mapOf(false, T("A", 20, 0, "A2"), T("B", 20, 0, "B2")),
+ expected: mapOf(false, T("A", 20, 0, "A2"), T("B", 30, 0, "B1")),
+ delta: mapOf(false, T("A", 20, 0, "A2")),
valid: []string{"A", "B"},
invalid: []string{},
},
{
- lww1: newDurableWith("", map[string]Time{"A": T(0, 10), "B": T(0, 30)}),
- lww2: newVolatileWith(map[string]Time{"A": T(0, 20), "B": T(0, 20)}),
- expected: newDurableWith("", map[string]Time{"A": T(0, 20), "B": T(0, 30)}),
- delta: newVolatileWith(map[string]Time{"A": T(0, 20)}),
+ lww1: mapOf(false, T("A", 0, 10), T("B", 0, 30)),
+ lww2: mapOf(false, T("A", 0, 20), T("B", 0, 20)),
+ expected: mapOf(false, T("A", 0, 20), T("B", 0, 30)),
+ delta: mapOf(false, T("A", 0, 20)),
valid: []string{},
invalid: []string{"A", "B"},
},
} {
tc.lww1.Merge(tc.lww2)
-
- assert.Equal(t, tc.expected.toMap(), tc.lww1.toMap(), "Merged set is not the same")
- assert.Equal(t, tc.delta.data, tc.lww2.data, "Delta set is not the same")
+ equalSets(t, tc.expected, tc.lww1)
+ equalSets(t, tc.delta, tc.lww2)
for _, obj := range tc.valid {
- assert.True(t, tc.lww1.Contains(obj), fmt.Sprintf("expected merged set to contain %v", obj))
+ assert.True(t, tc.lww1.Has(obj), fmt.Sprintf("expected merged set to contain %v", obj))
}
for _, obj := range tc.invalid {
- assert.False(t, tc.lww1.Contains(obj), fmt.Sprintf("expected merged set to NOT contain %v", obj))
+ assert.False(t, tc.lww1.Has(obj), fmt.Sprintf("expected merged set to NOT contain %v", obj))
}
}
}
-func TestDurableAll(t *testing.T) {
- defer restoreClock(Now)
-
- setClock(0)
- lww := NewDurable("")
- lww.Add("A")
- lww.Add("B")
- lww.Add("C")
-
- all := lww.toMap()
- assert.Equal(t, 3, len(all))
- assert.Equal(t, 3, lww.Count())
- assert.NoError(t, lww.Close())
- assert.Equal(t, 0, lww.Count())
-}
-
func TestDurableConcurrent(t *testing.T) {
i := 0
lww := NewDurable("")
+ defer lww.Close()
for ; i < 100; i++ {
setClock(int64(i))
- lww.Add(fmt.Sprintf("%v", i))
+ lww.Add(fmt.Sprintf("%v", i), nil)
}
go func() {
@@ -170,7 +232,7 @@ func TestDurableConcurrent(t *testing.T) {
for ; gi < gu; gi++ {
setClock(int64(100000 + gi))
- other.Remove(fmt.Sprintf("%v", i))
+ other.Del(fmt.Sprintf("%v", i))
}
stop.Add(1)
@@ -184,38 +246,21 @@ func TestDurableConcurrent(t *testing.T) {
stop.Wait()
}
-// Lock for the timer
-var lock sync.Mutex
-
-// RestoreClock restores the clock time
-func restoreClock(clk clock) {
- lock.Lock()
- Now = clk
- lock.Unlock()
-}
-
-// SetClock sets the clock time for testing
-func setClock(t int64) {
- lock.Lock()
- Now = func() int64 { return t }
- lock.Unlock()
-}
-
// ------------------------------------------------------------------------------------
func TestDurableRange(t *testing.T) {
state := newDurableWith("",
- map[string]Time{
- "AC": {AddTime: 60, DelTime: 50},
- "AB": {AddTime: 60, DelTime: 50},
- "AA": {AddTime: 10, DelTime: 50}, // Deleted
- "BA": {AddTime: 60, DelTime: 50},
- "BB": {AddTime: 60, DelTime: 50},
- "BC": {AddTime: 60, DelTime: 50},
+ map[string]Value{
+ "AC": newTime(60, 50, nil),
+ "AB": newTime(60, 50, nil),
+ "AA": newTime(10, 50, nil), // Deleted
+ "BA": newTime(60, 50, nil),
+ "BB": newTime(60, 50, nil),
+ "BC": newTime(60, 50, nil),
})
var count int
- state.Range([]byte("A"), func(_ string, v Time) bool {
+ state.Range([]byte("A"), func(_ string, v Value) bool {
if v.IsAdded() {
count++
}
@@ -224,7 +269,7 @@ func TestDurableRange(t *testing.T) {
assert.Equal(t, 2, count)
count = 0
- state.Range(nil, func(_ string, v Time) bool {
+ state.Range(nil, func(_ string, v Value) bool {
if v.IsAdded() {
count++
}
@@ -237,14 +282,14 @@ func TestDurableRange(t *testing.T) {
func TestDurableMarshal(t *testing.T) {
defer restoreClock(Now)
+ setClock(10)
- setClock(0)
- state := newDurableWith("", map[string]Time{"A": {AddTime: 10, DelTime: 50}})
+ state := mapOf(true, T("A", 10, 50)).(*Durable)
// Encode
enc, err := binary.Marshal(state)
assert.NoError(t, err)
- assert.Equal(t, []byte{0x1, 0x1, 0x41, 0x2, 0x14, 0x64}, enc)
+ assert.Equal(t, []byte{0x1, 0x1, 0x41, 0x10, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xa, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x32}, enc)
// Decode
dec := NewDurable("")
@@ -253,68 +298,3 @@ func TestDurableMarshal(t *testing.T) {
println(dec.toMap()["A"].AddTime)
assert.Equal(t, state.toMap(), dec.toMap())
}
-
-// 15852470 -> 1866217 bytes, 11.77%
-func TestDurableSizeMarshal(t *testing.T) {
- state, size := loadTestData(t)
-
- // Encode
- enc, err := binary.Marshal(state)
- assert.NoError(t, err)
-
- fmt.Printf("%d -> %d bytes, %.2f%% \n", size, len(enc), float64(len(enc))/float64(size)*100)
- assert.Greater(t, 20000000, len(enc))
-
- // Decode
- out := NewDurable("")
- err = binary.Unmarshal(enc, out)
- assert.NoError(t, err)
- assert.Equal(t, 50000, len(out.toMap()))
-
- out.Range(nil, func(k string, _ Time) bool {
- assert.True(t, out.Contains(k))
- return false
- })
-}
-
-// Benchmark_Marshal/encode-8 21 58190505 ns/op 6761445 B/op 23 allocs/op
-// Benchmark_Marshal/decode-8 66 19757589 ns/op 8966002 B/op 54439 allocs/op
-func Benchmark_Marshal(b *testing.B) {
- state, _ := loadTestData(b)
-
- // Encode
- b.Run("encode", func(b *testing.B) {
- b.ReportAllocs()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- binary.Marshal(state)
- }
- })
-
- // Decode
- enc, err := binary.Marshal(state)
- assert.NoError(b, err)
- b.Run("decode", func(b *testing.B) {
- b.ReportAllocs()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- state := NewDurable("")
- binary.Unmarshal(enc, state)
- }
- })
-}
-
-func loadTestData(t assert.TestingT) (state *Durable, size int) {
- buf, err := ioutil.ReadFile("test.bin")
- assert.NoError(t, err)
-
- decoded, err := snappy.Decode(nil, buf)
- assert.NoError(t, err)
-
- data := make(map[string]Time)
- err = binary.Unmarshal(decoded, &data)
- state = newDurableWith("", data)
- assert.NoError(t, err)
- size = len(decoded)
- return
-}
diff --git a/internal/event/crdt/map.go b/internal/event/crdt/map.go
new file mode 100644
index 00000000..97dbf035
--- /dev/null
+++ b/internal/event/crdt/map.go
@@ -0,0 +1,116 @@
+/**********************************************************************************
+* Copyright (c) 2009-2020 Misakai Ltd.
+* This program is free software: you can redistribute it and/or modify it under the
+* terms of the GNU Affero General Public License as published by the Free Software
+* Foundation, either version 3 of the License, or(at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful, but WITHOUT ANY
+* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License along
+* with this program. If not, see.
+************************************************************************************/
+
+package crdt
+
+import (
+ "time"
+
+ "github.com/kelindar/binary"
+)
+
+// Map represents a contract for a CRDT map.
+type Map interface {
+ Add(string, []byte)
+ Del(string)
+ Has(string) bool
+ Get(string) Value
+ Merge(Map)
+ Range([]byte, func(string, Value) bool)
+ Count() int
+}
+
+// New creates a new CRDT map.
+func New(durable bool, path string) Map {
+ if durable {
+ return NewDurable(path)
+ }
+ return NewVolatile()
+}
+
+// ------------------------------------------------------------------------------------
+
+// Value represents a time pair with a value.
+type Value []byte
+
+// newValue returns zero time.
+func newValue() Value {
+ return Value(make([]byte, 16))
+}
+
+// decodeValue decodes the time from a string
+func decodeValue(t string) Value {
+ return Value(binary.ToBytes(t))
+}
+
+// IsZero checks if the time is zero
+func (v Value) IsZero() bool {
+ return (v.AddTime() == 0 && v.DelTime() == 0)
+}
+
+// IsAdded checks if add time is larger than remove time.
+func (v Value) IsAdded() bool {
+ return v.AddTime() != 0 && v.AddTime() >= v.DelTime()
+}
+
+// IsRemoved checks if remove time is larger than add time.
+func (v Value) IsRemoved() bool {
+ return v.AddTime() < v.DelTime()
+}
+
+// AddTime returns when the entry was added.
+func (v Value) AddTime() int64 {
+ return int64(binary.BigEndian.Uint64(v[0:8]))
+}
+
+// setAddTime sets when the entry was added.
+func (v Value) setAddTime(t int64) {
+ binary.BigEndian.PutUint64(v[0:8], uint64(t))
+}
+
+// DelTime returns when the entry was removed.
+func (v Value) DelTime() int64 {
+ return int64(binary.BigEndian.Uint64(v[8:16]))
+}
+
+// setDelTime sets when the entry was removed.
+func (v Value) setDelTime(t int64) {
+ binary.BigEndian.PutUint64(v[8:16], uint64(t))
+}
+
+// Value returns the extra value payload.
+func (v Value) Value() []byte {
+ return v[16:]
+}
+
+// setValue sets the extra value payload.
+func (v *Value) setValue(p []byte) {
+ h := (*v)[:16]
+ *v = append(h, p...)
+}
+
+// encode encodes the value to a string
+func (v *Value) encode() string {
+ return binary.ToString((*[]byte)(v))
+}
+
+// ------------------------------------------------------------------------------------
+
+// The clock for unit-testing
+type clock func() int64
+
+// Now gets the current time in Unix nanoseconds
+var Now clock = func() int64 {
+ return time.Now().UnixNano()
+}
diff --git a/internal/event/crdt/map_test.go b/internal/event/crdt/map_test.go
new file mode 100644
index 00000000..95b884d5
--- /dev/null
+++ b/internal/event/crdt/map_test.go
@@ -0,0 +1,135 @@
+/**********************************************************************************
+* Copyright (c) 2009-2020 Misakai Ltd.
+* This program is free software: you can redistribute it and/or modify it under the
+* terms of the GNU Affero General Public License as published by the Free Software
+* Foundation, either version 3 of the License, or(at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful, but WITHOUT ANY
+* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License along
+* with this program. If not, see.
+************************************************************************************/
+
+package crdt
+
+import (
+ "sync"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+// Benchmark_Time/new-8 57209055 20.7 ns/op 16 B/op 1 allocs/op
+func Benchmark_Time(b *testing.B) {
+
+ // Encode
+ b.Run("new", func(b *testing.B) {
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ t := newTime(10, 20, nil)
+ t.encode()
+ }
+ })
+}
+
+func TestNew(t *testing.T) {
+ s1 := New(true, "")
+ assert.IsType(t, new(Durable), s1)
+
+ s2 := New(false, "")
+ assert.IsType(t, new(Volatile), s2)
+}
+
+func TestTimeCodec(t *testing.T) {
+ v1 := newTime(10, 50, []byte("hello"))
+ enc := v1.encode()
+ assert.Equal(t, int64(10), v1.AddTime())
+ assert.Equal(t, int64(50), v1.DelTime())
+ assert.Equal(t, []byte{
+ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xa, // 10
+ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x32, // 50
+ 0x68, 0x65, 0x6c, 0x6c, 0x6f, // hello
+ }, []byte(enc))
+
+ assert.Equal(t, 21, cap(v1))
+
+ v2 := decodeValue(enc)
+ assert.Equal(t, v1, v2)
+
+}
+
+func TestTime(t *testing.T) {
+ v := newTime(Now(), Now(), []byte("hello"))
+ assert.Equal(t, 21, len(v))
+ assert.Equal(t, 21, cap(v))
+ v.setValue([]byte("larger value"))
+ assert.Equal(t, 28, len(v))
+ v.setValue(nil)
+ assert.Equal(t, 16, len(v))
+}
+
+// ------------------------------------------------------------------------------------
+
+// Lock for the timer
+var lock sync.Mutex
+
+// RestoreClock restores the clock time
+func restoreClock(clk clock) {
+ lock.Lock()
+ Now = clk
+ lock.Unlock()
+}
+
+// SetClock sets the clock time for testing
+func setClock(t int64) {
+ lock.Lock()
+ Now = func() int64 { return t }
+ lock.Unlock()
+}
+
+// New creates a new CRDT set.
+func mapOf(durable bool, entries ...func() (string, Value)) Map {
+ m := make(map[string]Value, len(entries))
+ for _, f := range entries {
+ k, v := f()
+ m[k] = v
+ }
+
+ if durable {
+ return newDurableWith("", m)
+ }
+ return newVolatileWith(m)
+}
+
+type Action = func() (string, Value)
+
+// T returns the entry constructor.
+func T(k string, add, del int64, v ...string) Action {
+ if len(v) == 0 {
+ v = append(v, "")
+ }
+
+ value := newTime(add, del, []byte(v[0]))
+ return func() (string, Value) {
+ return k, value
+ }
+}
+
+func equalSets(t *testing.T, expected, current Map) {
+ expected.Range(nil, func(k string, v Value) bool {
+ assert.Equal(t, v, current.Get(k))
+ return true
+ })
+}
+
+// newTime creates a new instance of time stamp.
+func newTime(addTime, delTime int64, value []byte) Value {
+ b := Value(make([]byte, 16, 16+len(value)))
+ b.setAddTime(addTime)
+ b.setDelTime(delTime)
+ b.setValue(value)
+ return b
+}
diff --git a/internal/event/crdt/set.go b/internal/event/crdt/set.go
deleted file mode 100644
index 59f01ffe..00000000
--- a/internal/event/crdt/set.go
+++ /dev/null
@@ -1,91 +0,0 @@
-/**********************************************************************************
-* Copyright (c) 2009-2020 Misakai Ltd.
-* This program is free software: you can redistribute it and/or modify it under the
-* terms of the GNU Affero General Public License as published by the Free Software
-* Foundation, either version 3 of the License, or(at your option) any later version.
-*
-* This program is distributed in the hope that it will be useful, but WITHOUT ANY
-* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
-* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License along
-* with this program. If not, see.
-************************************************************************************/
-
-package crdt
-
-import (
- bin "encoding/binary"
- "time"
-
- "github.com/kelindar/binary"
-)
-
-// Set represents a contract for a CRDT set.
-type Set interface {
- Add(string)
- Remove(string)
- Contains(string) bool
- Get(item string) Time
- Merge(Set)
- Range([]byte, func(string, Time) bool)
- Count() int
-}
-
-// New creates a new CRDT set.
-func New(durable bool, path string) Set {
- if durable {
- return NewDurable(path)
- }
- return NewVolatile()
-}
-
-// ------------------------------------------------------------------------------------
-
-// Time represents a time pair.
-type Time struct {
- AddTime int64
- DelTime int64
-}
-
-// IsZero checks if the time is zero
-func (t Time) IsZero() bool {
- return (t.AddTime == 0 && t.DelTime == 0)
-}
-
-// IsAdded checks if add time is larger than remove time.
-func (t Time) IsAdded() bool {
- return t.AddTime != 0 && t.AddTime >= t.DelTime
-}
-
-// IsRemoved checks if remove time is larger than add time.
-func (t Time) IsRemoved() bool {
- return t.AddTime < t.DelTime
-}
-
-// Encode encodes the value to a string
-func (t Time) Encode() string {
- b := make([]byte, 20)
- n1 := bin.PutVarint(b, t.AddTime)
- n2 := bin.PutVarint(b[n1:], t.DelTime)
- b = b[:n1+n2]
- return binary.ToString(&b)
-}
-
-// DecodeTime decodes the time from a string
-func decodeTime(t string) (v Time) {
- b, n := binary.ToBytes(t), 0
- v.AddTime, n = bin.Varint(b)
- v.DelTime, _ = bin.Varint(b[n:])
- return
-}
-
-// ------------------------------------------------------------------------------------
-
-// The clock for unit-testing
-type clock func() int64
-
-// Now gets the current time in Unix nanoseconds
-var Now clock = func() int64 {
- return time.Now().UnixNano()
-}
diff --git a/internal/event/crdt/set_test.go b/internal/event/crdt/set_test.go
deleted file mode 100644
index a74dfe3d..00000000
--- a/internal/event/crdt/set_test.go
+++ /dev/null
@@ -1,61 +0,0 @@
-/**********************************************************************************
-* Copyright (c) 2009-2020 Misakai Ltd.
-* This program is free software: you can redistribute it and/or modify it under the
-* terms of the GNU Affero General Public License as published by the Free Software
-* Foundation, either version 3 of the License, or(at your option) any later version.
-*
-* This program is distributed in the hope that it will be useful, but WITHOUT ANY
-* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
-* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License along
-* with this program. If not, see.
-************************************************************************************/
-
-package crdt
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func Benchmark_Time(b *testing.B) {
- t := Time{10, 20}
-
- // Encode
- b.Run("encode", func(b *testing.B) {
- b.ReportAllocs()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- t.Encode()
- }
- })
-
- // Decode
- enc := t.Encode()
- b.Run("decode", func(b *testing.B) {
- b.ReportAllocs()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- decodeTime(enc)
- }
- })
-}
-
-func TestNew(t *testing.T) {
- s1 := New(true, "")
- assert.IsType(t, new(Durable), s1)
-
- s2 := New(false, "")
- assert.IsType(t, new(Volatile), s2)
-}
-
-func TestTimeCodec(t *testing.T) {
- v1 := Time{AddTime: 10, DelTime: 50}
- enc := v1.Encode()
- assert.Equal(t, []byte{0x14, 0x64}, []byte(enc))
-
- v2 := decodeTime(enc)
- assert.Equal(t, v1, v2)
-}
diff --git a/internal/event/crdt/test.bin b/internal/event/crdt/test.bin
deleted file mode 100644
index 136b447d..00000000
Binary files a/internal/event/crdt/test.bin and /dev/null differ
diff --git a/internal/event/crdt/volatile.go b/internal/event/crdt/volatile.go
index 6cd1e1c3..4bd7794f 100644
--- a/internal/event/crdt/volatile.go
+++ b/internal/event/crdt/volatile.go
@@ -24,62 +24,76 @@ import (
// Volatile represents a last-write-wins CRDT set.
type Volatile struct {
- lock *sync.Mutex // The associated mutex
- data map[string]Time // The data containing the set
+ lock *sync.Mutex // The associated mutex
+ data map[string]Value // The data containing the set
}
// NewVolatile creates a new last-write-wins set with bias for 'add'.
func NewVolatile() *Volatile {
- return newVolatileWith(make(map[string]Time, 64))
+ return newVolatileWith(make(map[string]Value, 64))
}
// newVolatileWith creates a new last-write-wins set with bias for 'add'.
-func newVolatileWith(items map[string]Time) *Volatile {
+func newVolatileWith(items map[string]Value) *Volatile {
return &Volatile{
lock: new(sync.Mutex),
data: items,
}
}
+// Fetch fetches the item from the dictionary.
+func (s *Volatile) fetch(item string) Value {
+ if t, ok := s.data[item]; ok {
+ return t
+ }
+ return newValue()
+}
+
// Add adds a value to the set.
-func (s *Volatile) Add(item string) {
+func (s *Volatile) Add(item string, value []byte) {
s.lock.Lock()
defer s.lock.Unlock()
- v, _ := s.data[item]
- s.data[item] = Time{AddTime: Now(), DelTime: v.DelTime}
+ t, now := s.fetch(item), Now()
+ if t.AddTime() < now {
+ t.setAddTime(now)
+ t.setValue(value)
+ s.data[item] = t
+ }
}
-// Remove removes the value from the set.
-func (s *Volatile) Remove(item string) {
+// Del removes the value from the set.
+func (s *Volatile) Del(item string) {
s.lock.Lock()
defer s.lock.Unlock()
- v, _ := s.data[item]
- s.data[item] = Time{AddTime: v.AddTime, DelTime: Now()}
+ t, now := s.fetch(item), Now()
+ if t.DelTime() < now {
+ t.setDelTime(now)
+ s.data[item] = t
+ }
}
-// Contains checks if a value is present in the set.
-func (s *Volatile) Contains(item string) bool {
+// Has checks if a value is present in the set.
+func (s *Volatile) Has(item string) bool {
s.lock.Lock()
defer s.lock.Unlock()
- v, _ := s.data[item]
- return v.IsAdded()
+ t := s.fetch(item)
+ return t.IsAdded()
}
// Get retrieves the time for an item.
-func (s *Volatile) Get(item string) Time {
+func (s *Volatile) Get(item string) Value {
s.lock.Lock()
defer s.lock.Unlock()
- v, _ := s.data[item]
- return v
+ return s.fetch(item)
}
// Merge merges two LWW sets. This also modifies the set being merged in
// to leave only the delta.
-func (s *Volatile) Merge(other Set) {
+func (s *Volatile) Merge(other Map) {
r := other.(*Volatile)
s.lock.Lock()
r.lock.Lock()
@@ -87,31 +101,34 @@ func (s *Volatile) Merge(other Set) {
defer r.lock.Unlock()
for key, rt := range r.data {
- t, _ := s.data[key]
+ st := s.fetch(key)
- if t.AddTime < rt.AddTime {
- t.AddTime = rt.AddTime
+ // Update add time & value
+ if st.AddTime() < rt.AddTime() {
+ st.setAddTime(rt.AddTime())
} else {
- rt.AddTime = 0 // Remove from delta
+ rt.setAddTime(0) // Remove from delta
}
- if t.DelTime < rt.DelTime {
- t.DelTime = rt.DelTime
+ // Update delete time
+ if st.DelTime() < rt.DelTime() {
+ st.setDelTime(rt.DelTime())
} else {
- rt.DelTime = 0 // Remove from delta
+ rt.setDelTime(0) // Remove from delta
}
if rt.IsZero() {
delete(r.data, key) // Remove from delta
} else {
- s.data[key] = t // Merge the new value
- r.data[key] = rt // Update the delta
+ st.setValue(rt.Value()) // Set the new value
+ s.data[key] = st // Merge the new value
+ r.data[key] = rt // Update the delta
}
}
}
// Range iterates through the events for a specific prefix.
-func (s *Volatile) Range(prefix []byte, f func(string, Time) bool) {
+func (s *Volatile) Range(prefix []byte, f func(string, Value) bool) {
s.lock.Lock()
defer s.lock.Unlock()
@@ -150,7 +167,7 @@ func (c *codecVolatile) EncodeTo(e *binary.Encoder, rv reflect.Value) (err error
e.WriteUvarint(uint64(len(s.data)))
for k, t := range s.data {
e.WriteString(k)
- e.WriteString(t.Encode())
+ e.WriteString(t.encode())
}
return
}
@@ -174,7 +191,7 @@ func (c *codecVolatile) DecodeTo(d *binary.Decoder, rv reflect.Value) (err error
return nil
}
- out.data[binary.ToString(&k)] = decodeTime(binary.ToString(&v))
+ out.data[binary.ToString(&k)] = decodeValue(binary.ToString(&v))
}
rv.Set(reflect.ValueOf(*out))
diff --git a/internal/event/crdt/volatile_test.go b/internal/event/crdt/volatile_test.go
index 60f65761..6b1f6a4f 100644
--- a/internal/event/crdt/volatile_test.go
+++ b/internal/event/crdt/volatile_test.go
@@ -23,121 +23,12 @@ import (
"github.com/stretchr/testify/assert"
)
-func TestVolatileAddContains(t *testing.T) {
- testStr := "ABCD"
-
- lww := NewVolatile()
- assert.False(t, lww.Contains(testStr))
-
- lww.Add(testStr)
- assert.True(t, lww.Contains(testStr))
-
- entry := lww.Get(testStr)
- assert.True(t, entry.IsAdded())
- assert.False(t, entry.IsRemoved())
- assert.False(t, entry.IsZero())
-}
-
-func TestVolatileAddRemoveContains(t *testing.T) {
- defer restoreClock(Now)
-
- setClock(0)
-
- lww := NewVolatile()
- testStr := "object2"
-
- lww.Add(testStr)
-
- setClock(100)
- lww.Remove(testStr)
-
- assert.False(t, lww.Contains(testStr))
-
- entry := lww.data[testStr]
- assert.False(t, entry.IsAdded())
- assert.True(t, entry.IsRemoved())
- assert.False(t, entry.IsZero())
-}
-
-func TestVolatileMerge(t *testing.T) {
- var T = func(add, del int64) Time {
- return Time{AddTime: add, DelTime: del}
- }
-
- for _, tc := range []struct {
- lww1, lww2, expected, delta *Volatile
- valid, invalid []string
- }{
- {
- lww1: newVolatileWith(map[string]Time{"A": T(10, 0), "B": T(20, 0)}),
- lww2: newVolatileWith(map[string]Time{"A": T(0, 20), "B": T(0, 20)}),
- expected: newVolatileWith(map[string]Time{"A": T(10, 20), "B": T(20, 20)}),
- delta: newVolatileWith(map[string]Time{"A": T(0, 20), "B": T(0, 20)}),
- valid: []string{"B"},
- invalid: []string{"A"},
- },
- {
- lww1: newVolatileWith(map[string]Time{"A": T(10, 0), "B": T(20, 0)}),
- lww2: newVolatileWith(map[string]Time{"A": T(0, 20), "B": T(10, 0)}),
- expected: newVolatileWith(map[string]Time{"A": T(10, 20), "B": T(20, 0)}),
- delta: newVolatileWith(map[string]Time{"A": T(0, 20)}),
- valid: []string{"B"},
- invalid: []string{"A"},
- },
- {
- lww1: newVolatileWith(map[string]Time{"A": T(30, 0), "B": T(20, 0)}),
- lww2: newVolatileWith(map[string]Time{"A": T(20, 0), "B": T(10, 0)}),
- expected: newVolatileWith(map[string]Time{"A": T(30, 0), "B": T(20, 0)}),
- delta: NewVolatile(),
- valid: []string{"A", "B"},
- invalid: []string{},
- },
- {
- lww1: newVolatileWith(map[string]Time{"A": T(10, 0), "B": T(0, 20)}),
- lww2: newVolatileWith(map[string]Time{"C": T(10, 0), "D": T(0, 20)}),
- expected: newVolatileWith(map[string]Time{"A": T(10, 0), "B": T(0, 20), "C": T(10, 0), "D": T(0, 20)}),
- delta: newVolatileWith(map[string]Time{"C": T(10, 0), "D": T(0, 20)}),
- valid: []string{"A", "C"},
- invalid: []string{"B", "D"},
- },
- {
- lww1: newVolatileWith(map[string]Time{"A": T(10, 0), "B": T(30, 0)}),
- lww2: newVolatileWith(map[string]Time{"A": T(20, 0), "B": T(20, 0)}),
- expected: newVolatileWith(map[string]Time{"A": T(20, 0), "B": T(30, 0)}),
- delta: newVolatileWith(map[string]Time{"A": T(20, 0)}),
- valid: []string{"A", "B"},
- invalid: []string{},
- },
- {
- lww1: newVolatileWith(map[string]Time{"A": T(0, 10), "B": T(0, 30)}),
- lww2: newVolatileWith(map[string]Time{"A": T(0, 20), "B": T(0, 20)}),
- expected: newVolatileWith(map[string]Time{"A": T(0, 20), "B": T(0, 30)}),
- delta: newVolatileWith(map[string]Time{"A": T(0, 20)}),
- valid: []string{},
- invalid: []string{"A", "B"},
- },
- } {
-
- tc.lww1.Merge(tc.lww2)
- assert.Equal(t, tc.expected, tc.lww1, "Merged set is not the same")
- assert.Equal(t, tc.delta, tc.lww2, "Delta set is not the same")
-
- for _, obj := range tc.valid {
- assert.True(t, tc.lww1.Contains(obj), fmt.Sprintf("expected merged set to contain %v", obj))
- }
-
- for _, obj := range tc.invalid {
- assert.False(t, tc.lww1.Contains(obj), fmt.Sprintf("expected merged set to NOT contain %v", obj))
- }
- }
-}
-
-func TestConcurrent(t *testing.T) {
+func TestVolatile_Concurrent(t *testing.T) {
i := 0
lww := NewVolatile()
for ; i < 100; i++ {
setClock(int64(i))
- lww.Add(fmt.Sprintf("%v", i))
+ lww.Add(fmt.Sprintf("%v", i), nil)
}
go func() {
@@ -154,7 +45,7 @@ func TestConcurrent(t *testing.T) {
for ; gi < gu; gi++ {
setClock(int64(100000 + gi))
- other.Remove(fmt.Sprintf("%v", i))
+ other.Del(fmt.Sprintf("%v", i))
}
stop.Add(1)
@@ -173,17 +64,17 @@ func TestConcurrent(t *testing.T) {
func TestRange(t *testing.T) {
state := newVolatileWith(
- map[string]Time{
- "AC": {AddTime: 60, DelTime: 50},
- "AB": {AddTime: 60, DelTime: 50},
- "AA": {AddTime: 10, DelTime: 50}, // Deleted
- "BA": {AddTime: 60, DelTime: 50},
- "BB": {AddTime: 60, DelTime: 50},
- "BC": {AddTime: 60, DelTime: 50},
+ map[string]Value{
+ "AC": newTime(60, 50, nil),
+ "AB": newTime(60, 50, nil),
+ "AA": newTime(10, 50, nil), // Deleted
+ "BA": newTime(60, 50, nil),
+ "BB": newTime(60, 50, nil),
+ "BC": newTime(60, 50, nil),
})
var count int
- state.Range([]byte("A"), func(_ string, v Time) bool {
+ state.Range([]byte("A"), func(_ string, v Value) bool {
if v.IsAdded() {
count++
}
@@ -192,7 +83,7 @@ func TestRange(t *testing.T) {
assert.Equal(t, 2, count)
count = 0
- state.Range(nil, func(_ string, v Time) bool {
+ state.Range(nil, func(_ string, v Value) bool {
if v.IsAdded() {
count++
}
@@ -208,12 +99,12 @@ func TestTempMarshal(t *testing.T) {
defer restoreClock(Now)
setClock(0)
- state := newVolatileWith(map[string]Time{"A": {AddTime: 10, DelTime: 50}})
+ state := newVolatileWith(map[string]Value{"A": newTime(10, 50, nil)})
// Encode
enc, err := binary.Marshal(state)
assert.NoError(t, err)
- assert.Equal(t, []byte{0x1, 0x1, 0x41, 0x2, 0x14, 0x64}, enc)
+ assert.Equal(t, []byte{0x1, 0x1, 0x41, 0x10, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xa, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x32}, enc)
// Decode
dec := NewVolatile()
diff --git a/internal/event/events.go b/internal/event/events.go
index 73bddfaa..e27008b6 100644
--- a/internal/event/events.go
+++ b/internal/event/events.go
@@ -30,18 +30,19 @@ const (
// Event represents an encodable event that happened at some point in time.
type Event interface {
unitType() uint8
- Encode() string
+ Key() string
+ Val() []byte
}
// ------------------------------------------------------------------------------------
// Subscription represents a subscription event.
type Subscription struct {
- Peer uint64 // The name of the peer. This must be first, since we're doing prefix search.
- Conn security.ID // The connection identifier.
+ Peer uint64 `binary:"-"` // The name of the peer. This must be first, since we're doing prefix search.
+ Conn security.ID `binary:"-"` // The connection identifier.
+ Ssid message.Ssid `binary:"-"` // The SSID for the subscription.
User nocopy.String // The connection username.
Channel nocopy.Bytes // The channel string.
- Ssid message.Ssid // The SSID for the subscription.
}
// Type retuns the unit type.
@@ -54,16 +55,39 @@ func (e *Subscription) ConnID() string {
return e.Conn.Unique(uint64(e.Peer), "emitter")
}
-// Encode encodes the event to string representation.
-func (e *Subscription) Encode() string {
- buf, _ := binary.Marshal(e)
- return string(buf)
+// Key returns the event key.
+func (e *Subscription) Key() string {
+ buffer := make([]byte, 16+4*len(e.Ssid))
+ binary.BigEndian.PutUint64(buffer[0:8], e.Peer)
+ binary.BigEndian.PutUint64(buffer[8:16], uint64(e.Conn))
+ for i := 0; i < len(e.Ssid); i++ {
+ binary.BigEndian.PutUint32(buffer[16+(i*4):20+(i*4)], e.Ssid[i])
+ }
+ return binary.ToString(&buffer)
+}
+
+// Val returns the event value.
+func (e *Subscription) Val() []byte {
+ buffer, _ := binary.Marshal(e)
+ return buffer
}
// decodeSubscription decodes the event
-func decodeSubscription(encoded string) (Subscription, error) {
- var out Subscription
- return out, binary.Unmarshal([]byte(encoded), &out)
+func decodeSubscription(k string, v []byte) (e Subscription, err error) {
+ if len(v) > 0 {
+ err = binary.Unmarshal(v, &e)
+ }
+
+ // Decode the key
+ buffer := binary.ToBytes(k)
+ e.Peer = binary.BigEndian.Uint64(buffer[0:8])
+ e.Conn = security.ID(binary.BigEndian.Uint64(buffer[8:16]))
+ e.Ssid = make(message.Ssid, (len(buffer)-16)/4)
+ for i := 0; i < len(e.Ssid); i++ {
+ e.Ssid[i] = binary.BigEndian.Uint32(buffer[16+(i*4) : 20+(i*4)])
+ }
+
+ return e, err
}
// ------------------------------------------------------------------------------------
@@ -76,11 +100,16 @@ func (e *Ban) unitType() uint8 {
return typeBan
}
-// Encode encodes the event to string representation.
-func (e Ban) Encode() string {
+// Key returns the event key.
+func (e Ban) Key() string {
return string(e)
}
+// Val returns the event value.
+func (e Ban) Val() []byte {
+ return nil
+}
+
// decodeBan decodes the event
func decodeBan(encoded string) (Ban, error) {
return Ban(encoded), nil
diff --git a/internal/event/events_test.go b/internal/event/events_test.go
index 23ad85c2..b02b92b0 100644
--- a/internal/event/events_test.go
+++ b/internal/event/events_test.go
@@ -18,12 +18,13 @@ import (
"testing"
"github.com/emitter-io/emitter/internal/message"
+ "github.com/emitter-io/emitter/internal/security/hash"
"github.com/stretchr/testify/assert"
)
func TestEncodeSubscription(t *testing.T) {
ev := Subscription{
- Ssid: message.Ssid{1, 2, 3, 4, 5},
+ Ssid: message.Ssid{hash.OfString("a"), hash.OfString("b"), hash.OfString("c"), hash.OfString("d"), hash.OfString("e")},
Peer: 657,
Conn: 12456,
User: "hello",
@@ -33,25 +34,27 @@ func TestEncodeSubscription(t *testing.T) {
assert.Equal(t, "LPCGOQV6DEDQFHIRWBMKICQCZE", ev.ConnID())
// Encode
- enc := ev.Encode()
+ k, v := ev.Key(), ev.Val()
assert.Equal(t, typeSub, ev.unitType())
- assert.Equal(t, 27, len(enc))
+ assert.Equal(t, 36, len(k))
assert.Equal(t,
- []byte{0x91, 0x5, 0xa8, 0x61, 0x5, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0xa, 0x61, 0x2f, 0x62, 0x2f, 0x63, 0x2f, 0x64, 0x2f, 0x65, 0x2f, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5},
- []byte(enc),
+ []byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x91, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x30, 0xa8, 0xc1, 0x3, 0xea,
+ 0xb3, 0x1d, 0xd8, 0x2e, 0x48, 0x3d, 0x43, 0x19, 0x23, 0x16, 0x1b, 0xa0, 0x5b, 0x7f, 0xd4, 0x1, 0x2},
+ []byte(k),
)
// Decode
- dec, err := decodeSubscription(enc)
+ dec, err := decodeSubscription(k, v)
assert.NoError(t, err)
assert.Equal(t, ev, dec)
}
func TestEncodeBan(t *testing.T) {
ev := Ban("a/b/c/d/e/")
+ assert.Nil(t, ev.Val())
// Encode
- enc := ev.Encode()
+ enc := ev.Key()
assert.Equal(t, typeBan, ev.unitType())
assert.Equal(t, 10, len(enc))
assert.Equal(t,
@@ -65,8 +68,8 @@ func TestEncodeBan(t *testing.T) {
assert.Equal(t, ev, dec)
}
-// Benchmark_Subscription/encode-8 4379755 270 ns/op 112 B/op 2 allocs/op
-// Benchmark_Subscription/decode-8 2803533 428 ns/op 176 B/op 4 allocs/op
+// Benchmark_Subscription/encode-8 5939726 199 ns/op 160 B/op 3 allocs/op
+// Benchmark_Subscription/decode-8 6665554 178 ns/op 112 B/op 2 allocs/op
func Benchmark_Subscription(b *testing.B) {
ev := Subscription{
Ssid: message.Ssid{1, 2, 3, 4, 5},
@@ -81,17 +84,18 @@ func Benchmark_Subscription(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
- ev.Encode()
+ ev.Key()
+ ev.Val()
}
})
// Decode
- enc := ev.Encode()
+ k, v := ev.Key(), ev.Val()
b.Run("decode", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
- decodeSubscription(enc)
+ decodeSubscription(k, v)
}
})
diff --git a/internal/event/state.go b/internal/event/state.go
index 643891c8..ccc4aa2f 100644
--- a/internal/event/state.go
+++ b/internal/event/state.go
@@ -15,7 +15,6 @@
package event
import (
- bin "encoding/binary"
"io"
"path"
@@ -25,13 +24,13 @@ import (
"github.com/weaveworks/mesh"
)
-// Time represents an event time (CRDT metadata).
-type Time = crdt.Time
+// Value represents an event time & value.
+type Value = crdt.Value
// State represents globally synchronised state.
type State struct {
durable bool // Whether the state is durable or not.
- subsets map[uint8]crdt.Set // The subsets of the state.
+ subsets map[uint8]crdt.Map // The subsets of the state.
}
// NewState creates a new replicated state.
@@ -39,7 +38,7 @@ func NewState(dir string) *State {
durable := dir != ""
return &State{
durable: durable,
- subsets: map[uint8]crdt.Set{
+ subsets: map[uint8]crdt.Map{
typeSub: crdt.New(durable, ""),
typeBan: crdt.New(durable, fileOf(dir, "ban.db")),
},
@@ -118,27 +117,27 @@ func (st *State) Merge(other mesh.GossipData) mesh.GossipData {
// Add adds the unit to the state.
func (st *State) Add(ev Event) {
set := st.subsets[ev.unitType()]
- set.Add(ev.Encode())
+ set.Add(ev.Key(), ev.Val())
}
-// Remove removes the unit from the state.
-func (st *State) Remove(ev Event) {
+// Del removes the unit from the state.
+func (st *State) Del(ev Event) {
set := st.subsets[ev.unitType()]
- set.Remove(ev.Encode())
+ set.Del(ev.Key())
}
-// Contains checks if the state contains an event.
-func (st *State) Contains(ev Event) bool {
+// Has checks if the state contains an event.
+func (st *State) Has(ev Event) bool {
set := st.subsets[ev.unitType()]
- return set.Contains(ev.Encode())
+ return set.Has(ev.Key())
}
// Subscriptions iterates through all of the subscription units. This call is
// blocking and will lock the entire set of subscriptions while iterating.
-func (st *State) Subscriptions(f func(*Subscription, Time)) {
+func (st *State) Subscriptions(f func(*Subscription, Value)) {
set := st.subsets[typeSub]
- set.Range(nil, func(v string, t crdt.Time) bool {
- if ev, err := decodeSubscription(v); err == nil {
+ set.Range(nil, func(v string, t Value) bool {
+ if ev, err := decodeSubscription(v, t.Value()); err == nil {
f(&ev, t)
}
return true
@@ -147,16 +146,15 @@ func (st *State) Subscriptions(f func(*Subscription, Time)) {
// SubscriptionsOf iterates through the subscription events for a specific peer.
func (st *State) SubscriptionsOf(name mesh.PeerName, f func(*Subscription)) {
- buffer := make([]byte, 10, 10)
- offset := bin.PutUvarint(buffer, uint64(name))
- prefix := buffer[:offset]
+ prefix := make([]byte, 8)
+ binary.BigEndian.PutUint64(prefix, uint64(name))
// Copy since the Range() is locked
var events []*Subscription
set := st.subsets[typeSub]
- set.Range(prefix, func(v string, t crdt.Time) bool {
+ set.Range(prefix, func(v string, t crdt.Value) bool {
if t.IsAdded() {
- if ev, err := decodeSubscription(v); err == nil {
+ if ev, err := decodeSubscription(v, t.Value()); err == nil {
events = append(events, &ev)
}
}
diff --git a/internal/event/state_test.go b/internal/event/state_test.go
index 5fec3e5e..4fc69101 100644
--- a/internal/event/state_test.go
+++ b/internal/event/state_test.go
@@ -34,7 +34,7 @@ func setClock(t int64) {
crdt.Now = func() int64 { return t }
}
-// Benchmark_State/contains-8 11535033 104 ns/op 4 B/op 1 allocs/op
+// Benchmark_State/contains-8 11538494 100 ns/op 16 B/op 1 allocs/op
func Benchmark_State(b *testing.B) {
state := NewState(":memory:")
for i := 1; i <= 20000; i++ {
@@ -45,13 +45,13 @@ func Benchmark_State(b *testing.B) {
// Encode
target := Ban("10000")
- state.Contains(&target)
+ state.Has(&target)
time.Sleep(10 * time.Millisecond)
b.Run("contains", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
- state.Contains(&target)
+ state.Has(&target)
}
})
}
@@ -70,18 +70,16 @@ func TestEncodeSubscriptionState(t *testing.T) {
Channel: nocopy.Bytes("A"),
}
state.Add(ev)
- assert.True(t, state.Contains(ev))
+ assert.True(t, state.Has(ev))
// Encode / decode
enc := state.Encode()[0]
dec, err := DecodeState(enc)
assert.NoError(t, err)
- dec.Subscriptions(func(k *Subscription, v Time) {
+ dec.Subscriptions(func(k *Subscription, v Value) {
assert.Equal(t, "A", string(k.Channel))
- assert.Equal(t, Time{
- AddTime: 10,
- DelTime: 0,
- }, v)
+ assert.Equal(t, int64(10), v.AddTime())
+ assert.Equal(t, int64(0), v.DelTime())
})
state.Close()
@@ -99,32 +97,27 @@ func TestMergeState(t *testing.T) {
state1 := NewState("")
state1.Add(&ev)
- // Remove from state 2
+ // Del from state 2
setClock(50)
state2 := NewState("")
- state2.Remove(&ev)
+ state2.Del(&ev)
// Merge
delta := state1.Merge(state2)
assert.Equal(t, state2, delta)
- state1.Subscriptions(func(_ *Subscription, v Time) {
- assert.Equal(t, Time{
- AddTime: 20,
- DelTime: 50,
- }, v)
+ state1.Subscriptions(func(_ *Subscription, v Value) {
+ assert.Equal(t, int64(20), v.AddTime())
+ assert.Equal(t, int64(50), v.DelTime())
})
- state2.Subscriptions(func(_ *Subscription, v Time) {
- assert.Equal(t, Time{
- AddTime: 0,
- DelTime: 50,
- }, v)
+ state2.Subscriptions(func(_ *Subscription, v Value) {
+ assert.Equal(t, int64(50), v.DelTime())
})
// Merge with zero delta
state3 := NewState("")
- state3.Remove(&ev)
+ state3.Del(&ev)
delta = state3.Merge(state2)
assert.Nil(t, delta)
}
@@ -149,13 +142,13 @@ func TestSubscriptions(t *testing.T) {
// Must have 2 keys alive after removal
setClock(int64(21))
state.SubscriptionsOf(1, func(ev *Subscription) {
- state.Remove(ev)
+ state.Del(ev)
})
assert.Equal(t, 2, countAdded(state))
// Count all of the subscriptions (alive or dead)
count := 0
- state.Subscriptions(func(ev *Subscription, _ Time) {
+ state.Subscriptions(func(ev *Subscription, _ Value) {
count++
})
assert.Equal(t, 3, count)
@@ -163,7 +156,7 @@ func TestSubscriptions(t *testing.T) {
func countAdded(state *State) (added int) {
set := state.subsets[typeSub]
- set.Range(nil, func(_ string, v Time) bool {
+ set.Range(nil, func(_ string, v Value) bool {
if v.IsAdded() {
added++
}