diff --git a/internal/broker/cluster/swarm.go b/internal/broker/cluster/swarm.go index 97925254..171b9502 100644 --- a/internal/broker/cluster/swarm.go +++ b/internal/broker/cluster/swarm.go @@ -151,7 +151,7 @@ func (s *Swarm) onPeerOffline(name mesh.PeerName) { dead := &deadPeer{name: name} s.state.SubscriptionsOf(name, func(ev *event.Subscription) { s.OnUnsubscribe(dead, ev) // Notify locally that the subscription is gone - s.state.Remove(ev) // Remove the state from ourselves + s.state.Del(ev) // Remove the state from ourselves }) } } @@ -248,22 +248,22 @@ func (s *Swarm) merge(buf []byte) (mesh.GossipData, error) { // Merge and get the delta delta := s.state.Merge(other) - other.Subscriptions(func(ev *event.Subscription, t event.Time) { + other.Subscriptions(func(ev *event.Subscription, v event.Value) { if ev.Peer == uint64(s.router.Ourself.Name) { return // Skip ourselves } // Find the active peer for this subscription event - encoded := ev.Encode() + key := ev.Key() peer := s.findPeer(mesh.PeerName(ev.Peer)) // If the subscription is added, notify (TODO: use channels) - if t.IsAdded() && peer.onSubscribe(encoded, ev.Ssid) && peer.IsActive() { + if v.IsAdded() && peer.onSubscribe(key, ev.Ssid) && peer.IsActive() { s.OnSubscribe(peer, ev) } // If the subscription is removed, notify (TODO: use channels) - if t.IsRemoved() && peer.onUnsubscribe(encoded, ev.Ssid) && peer.IsActive() { + if v.IsRemoved() && peer.onUnsubscribe(key, ev.Ssid) && peer.IsActive() { s.OnUnsubscribe(peer, ev) } }) @@ -348,17 +348,17 @@ func (s *Swarm) NotifyBeginOf(ev event.Event) { // NotifyEndOf notifies the swarm when an event is stopped being triggered. func (s *Swarm) NotifyEndOf(ev event.Event) { - s.state.Remove(ev) + s.state.Del(ev) // Create a delta for broadcasting just this operation op := event.NewState("") - op.Remove(ev) + op.Del(ev) s.gossip.GossipBroadcast(op) } // Contains checks whether an event is currently triggered within the cluster. func (s *Swarm) Contains(ev event.Event) bool { - return s.state.Contains(ev) + return s.state.Has(ev) } // Close terminates the connection. diff --git a/internal/event/crdt/durable.go b/internal/event/crdt/durable.go index 35f86142..fe04e12f 100644 --- a/internal/event/crdt/durable.go +++ b/internal/event/crdt/durable.go @@ -26,12 +26,12 @@ import ( "github.com/tidwall/buntdb" ) -// getTime retrieves a time from the store. -func getTime(tx *buntdb.Tx, item string) Time { +// getValue retrieves a time from the store. +func getValue(tx *buntdb.Tx, item string) Value { if t, err := tx.Get(item); err == nil { - return decodeTime(t) + return decodeValue(t) } - return Time{} + return newValue() } // Durable represents a last-write-wins CRDT set which can be persisted to disk. @@ -46,7 +46,7 @@ func NewDurable(dir string) *Durable { } // newDurableWith creates a new last-write-wins set with bias for 'add'. -func newDurableWith(path string, items map[string]Time) *Durable { +func newDurableWith(path string, items map[string]Value) *Durable { if path == "" { path = ":memory:" } @@ -72,7 +72,7 @@ func newDurableWith(path string, items map[string]Time) *Durable { } // Store stores the item into the transaction. -func (s *Durable) store(tx *buntdb.Tx, key string, t Time) { +func (s *Durable) store(tx *buntdb.Tx, key string, t Value) { var opts *buntdb.SetOptions if t.IsRemoved() { opts = &buntdb.SetOptions{ @@ -81,83 +81,91 @@ func (s *Durable) store(tx *buntdb.Tx, key string, t Time) { } } - tx.Set(key, t.Encode(), opts) + tx.Set(key, t.encode(), opts) } // Fetch fetches the item either from transaction or cache. -func (s *Durable) fetch(item string) Time { +func (s *Durable) fetch(item string) Value { cacheKey := binary.ToBytes(item) if v, err := s.cache.Get(cacheKey); err == nil { - return decodeTime(binary.ToString(&v)) + return decodeValue(binary.ToString(&v)) } tx, _ := s.db.Begin(false) defer tx.Rollback() if t, err := tx.Get(item); err == nil { s.cache.Set(cacheKey, binary.ToBytes(t), 60) - return decodeTime(t) + return decodeValue(t) } - return Time{} + return newValue() } // Add adds a value to the set. -func (s *Durable) Add(item string) { +func (s *Durable) Add(item string, value []byte) { s.db.Update(func(tx *buntdb.Tx) error { - t := getTime(tx, item) - s.store(tx, item, Time{AddTime: Now(), DelTime: t.DelTime}) + t, now := getValue(tx, item), Now() + if t.AddTime() < now { + t.setAddTime(Now()) + t.setValue(value) + s.store(tx, item, t) + } return nil }) } -// Remove removes the value from the set. -func (s *Durable) Remove(item string) { +// Del removes the value from the set. +func (s *Durable) Del(item string) { s.db.Update(func(tx *buntdb.Tx) error { - v := getTime(tx, item) - s.store(tx, item, Time{AddTime: v.AddTime, DelTime: Now()}) + t, now := getValue(tx, item), Now() + if t.DelTime() < now { + t.setDelTime(Now()) + s.store(tx, item, t) + } return nil }) } -// Contains checks if a value is present in the set. -func (s *Durable) Contains(item string) bool { +// Has checks if a value is present in the set. +func (s *Durable) Has(item string) bool { return s.fetch(item).IsAdded() } // Get retrieves the time for an item. -func (s *Durable) Get(item string) Time { +func (s *Durable) Get(item string) Value { return s.fetch(item) } // Merge merges two LWW sets. This also modifies the set being merged in // to leave only the delta. -func (s *Durable) Merge(other Set) { +func (s *Durable) Merge(other Map) { r := other.(*Volatile) r.lock.Lock() defer r.lock.Unlock() s.db.Update(func(stx *buntdb.Tx) error { for key, rt := range r.data { - st := getTime(stx, key) + st := getValue(stx, key) - // Update add time - if st.AddTime < rt.AddTime { - st.AddTime = rt.AddTime + // Update add time & value + if st.AddTime() < rt.AddTime() { + st.setAddTime(rt.AddTime()) } else { - rt.AddTime = 0 // Remove from delta + rt.setAddTime(0) // Remove from delta } // Update delete time - if st.DelTime < rt.DelTime { - st.DelTime = rt.DelTime + if st.DelTime() < rt.DelTime() { + st.setDelTime(rt.DelTime()) } else { - rt.DelTime = 0 // Remove from delta + rt.setDelTime(0) // Remove from delta } if rt.IsZero() { delete(r.data, key) // Remove from delta } else { - s.store(stx, key, st) // Merge the new value - r.data[key] = rt // Update the delta + st.setValue(rt.Value()) // Set the new value + s.store(stx, key, st) // Merge the new value + r.data[key] = rt // Update the delta } } @@ -166,21 +174,21 @@ func (s *Durable) Merge(other Set) { } // Range iterates through the events for a specific prefix. -func (s *Durable) Range(prefix []byte, f func(string, Time) bool) { +func (s *Durable) Range(prefix []byte, f func(string, Value) bool) { s.db.View(func(tx *buntdb.Tx) error { return tx.Ascend("", func(k, v string) bool { if !bytes.HasPrefix(binary.ToBytes(k), prefix) { return true } - return f(k, decodeTime(v)) + return f(k, decodeValue(v)) }) }) } // Count returns the number of items in the set. func (s *Durable) Count() (count int) { - s.Range(nil, func(k string, v Time) bool { + s.Range(nil, func(k string, v Value) bool { count++ return true }) @@ -188,9 +196,9 @@ func (s *Durable) Count() (count int) { } // ToMap converts the set to a map (useful for testing). -func (s *Durable) toMap() map[string]Time { - m := make(map[string]Time) - s.Range(nil, func(k string, v Time) bool { +func (s *Durable) toMap() map[string]Value { + m := make(map[string]Value) + s.Range(nil, func(k string, v Value) bool { m[k] = v return true }) diff --git a/internal/event/crdt/durable_test.go b/internal/event/crdt/durable_test.go index b16e1b3d..a10a1aaa 100644 --- a/internal/event/crdt/durable_test.go +++ b/internal/event/crdt/durable_test.go @@ -16,144 +16,206 @@ package crdt import ( "fmt" - "io/ioutil" "sync" "testing" - "time" - "github.com/golang/snappy" "github.com/kelindar/binary" "github.com/stretchr/testify/assert" ) -func TestDurableAddContains(t *testing.T) { - testStr := "ABCD" - - lww := NewDurable("") - assert.False(t, lww.Contains(testStr)) - - lww.Add(testStr) - assert.True(t, lww.Contains(testStr)) - - entry := lww.Get(testStr) - assert.True(t, entry.IsAdded()) - assert.False(t, entry.IsRemoved()) - assert.False(t, entry.IsZero()) -} - -func TestDurableAddRemoveContains(t *testing.T) { - lww := NewDurable("") - testStr := "object2" - - lww.Add(testStr) - time.Sleep(1 * time.Millisecond) - lww.Remove(testStr) - - assert.False(t, lww.Contains(testStr)) - - entry := lww.Get(testStr) - assert.False(t, entry.IsAdded()) - assert.True(t, entry.IsRemoved()) - assert.False(t, entry.IsZero()) -} - -func TestDurableMerge(t *testing.T) { - var T = func(add, del int64) Time { - return Time{AddTime: add, DelTime: del} +func TestAddRemove(t *testing.T) { + defer restoreClock(Now) + for _, tc := range []struct { + initial Map + expected Map + actions []Action + }{ + { + initial: mapOf(false, T("A", 10, 0, "A1")), + expected: mapOf(false, T("A", 20, 0, "A2")), + actions: []Action{T("A", 20, 0, "A2")}, + }, + { + initial: mapOf(false, T("A", 10, 0, "A1")), + expected: mapOf(false, T("A", 10, 20, "A1")), + actions: []Action{T("A", 0, 20, "A1")}, + }, + { + initial: mapOf(false, T("A", 10, 0, "A1")), + expected: mapOf(false, T("A", 20, 0, "A2")), + actions: []Action{T("A", 20, 0, "A2"), T("A", 15, 0, "A3")}, + }, + { + initial: mapOf(false, T("A", 10, 0, "A1")), + expected: mapOf(false, T("A", 10, 20, "A1")), + actions: []Action{T("A", 0, 20), T("A", 0, 15)}, + }, + { + initial: mapOf(true, T("A", 10, 0, "A1")), + expected: mapOf(true, T("A", 20, 0, "A2")), + actions: []Action{T("A", 20, 0, "A2")}, + }, + { + initial: mapOf(true, T("A", 10, 0, "A1")), + expected: mapOf(true, T("A", 10, 20, "A1")), + actions: []Action{T("A", 0, 20, "A1")}, + }, + { + initial: mapOf(true, T("A", 10, 0, "A1")), + expected: mapOf(true, T("A", 20, 0, "A2")), + actions: []Action{T("A", 20, 0, "A2"), T("A", 15, 0, "A3")}, + }, + { + initial: mapOf(true, T("A", 10, 0, "A1")), + expected: mapOf(true, T("A", 10, 20, "A1")), + actions: []Action{T("A", 0, 20), T("A", 0, 15)}, + }, + } { + for _, f := range tc.actions { + k, v := f() + if v.IsAdded() { + setClock(v.AddTime()) + tc.initial.Add(k, v.Value()) + } + if v.IsRemoved() { + setClock(v.DelTime()) + tc.initial.Del(k) + } + + equalSets(t, tc.expected, tc.initial) + assert.Equal(t, tc.expected.Count(), tc.initial.Count()) + } } +} +func TestMerge(t *testing.T) { for _, tc := range []struct { - lww1, expected *Durable - lww2, delta *Volatile + lww1, expected Map + lww2, delta Map valid, invalid []string }{ + // Volatile -> Durable + { + lww1: mapOf(true, T("A", 10, 0, "A1"), T("B", 20, 0, "B1")), + lww2: mapOf(false, T("A", 0, 20, "A2"), T("B", 0, 20, "B2")), + expected: mapOf(true, T("A", 10, 20, "A2"), T("B", 20, 20, "B2")), + delta: mapOf(false, T("A", 0, 20, "A2"), T("B", 0, 20, "B2")), + valid: []string{"B"}, + invalid: []string{"A"}, + }, + { + lww1: mapOf(true, T("A", 10, 0, "A1"), T("B", 20, 0, "B1")), + lww2: mapOf(false, T("A", 0, 20), T("B", 10, 0, "B2")), + expected: mapOf(true, T("A", 10, 20), T("B", 20, 0, "B1")), + delta: mapOf(false, T("A", 0, 20)), + valid: []string{"B"}, + invalid: []string{"A"}, + }, + { + lww1: mapOf(true, T("A", 30, 0, "A1"), T("B", 20, 0, "B1")), + lww2: mapOf(false, T("A", 20, 0, "A2"), T("B", 10, 0, "B2")), + expected: mapOf(true, T("A", 30, 0, "A1"), T("B", 20, 0, "B1")), + delta: NewVolatile(), + valid: []string{"A", "B"}, + invalid: []string{}, + }, + { + lww1: mapOf(true, T("A", 10, 0, "A1"), T("B", 0, 20)), + lww2: mapOf(false, T("C", 10, 0, "C1"), T("D", 0, 20)), + expected: mapOf(true, T("A", 10, 0, "A1"), T("B", 0, 20), T("C", 10, 0, "C1"), T("D", 0, 20)), + delta: mapOf(false, T("C", 10, 0, "C1"), T("D", 0, 20)), + valid: []string{"A", "C"}, + invalid: []string{"B", "D"}, + }, { - lww1: newDurableWith("", map[string]Time{"A": T(10, 0), "B": T(20, 0)}), - lww2: newVolatileWith(map[string]Time{"A": T(0, 20), "B": T(0, 20)}), - expected: newDurableWith("", map[string]Time{"A": T(10, 20), "B": T(20, 20)}), - delta: newVolatileWith(map[string]Time{"A": T(0, 20), "B": T(0, 20)}), + lww1: mapOf(true, T("A", 10, 0, "A1"), T("B", 30, 0, "B1")), + lww2: mapOf(false, T("A", 20, 0, "A2"), T("B", 20, 0, "B2")), + expected: mapOf(true, T("A", 20, 0, "A2"), T("B", 30, 0, "B1")), + delta: mapOf(false, T("A", 20, 0, "A2")), + valid: []string{"A", "B"}, + invalid: []string{}, + }, + { + lww1: mapOf(true, T("A", 0, 10), T("B", 0, 30)), + lww2: mapOf(false, T("A", 0, 20), T("B", 0, 20)), + expected: mapOf(true, T("A", 0, 20), T("B", 0, 30)), + delta: mapOf(false, T("A", 0, 20)), + valid: []string{}, + invalid: []string{"A", "B"}, + }, + + // Volatile -> Volatile + { + lww1: mapOf(false, T("A", 10, 0, "A1"), T("B", 20, 0, "B1")), + lww2: mapOf(false, T("A", 0, 20, "A2"), T("B", 0, 20, "B2")), + expected: mapOf(false, T("A", 10, 20, "A2"), T("B", 20, 20, "B2")), + delta: mapOf(false, T("A", 0, 20, "A2"), T("B", 0, 20, "B2")), valid: []string{"B"}, invalid: []string{"A"}, }, { - lww1: newDurableWith("", map[string]Time{"A": T(10, 0), "B": T(20, 0)}), - lww2: newVolatileWith(map[string]Time{"A": T(0, 20), "B": T(10, 0)}), - expected: newDurableWith("", map[string]Time{"A": T(10, 20), "B": T(20, 0)}), - delta: newVolatileWith(map[string]Time{"A": T(0, 20)}), + lww1: mapOf(false, T("A", 10, 0, "A1"), T("B", 20, 0, "B1")), + lww2: mapOf(false, T("A", 0, 20), T("B", 10, 0, "B2")), + expected: mapOf(false, T("A", 10, 20), T("B", 20, 0, "B1")), + delta: mapOf(false, T("A", 0, 20)), valid: []string{"B"}, invalid: []string{"A"}, }, { - lww1: newDurableWith("", map[string]Time{"A": T(30, 0), "B": T(20, 0)}), - lww2: newVolatileWith(map[string]Time{"A": T(20, 0), "B": T(10, 0)}), - expected: newDurableWith("", map[string]Time{"A": T(30, 0), "B": T(20, 0)}), + lww1: mapOf(false, T("A", 30, 0, "A1"), T("B", 20, 0, "B1")), + lww2: mapOf(false, T("A", 20, 0, "A2"), T("B", 10, 0, "B2")), + expected: mapOf(false, T("A", 30, 0, "A1"), T("B", 20, 0, "B1")), delta: NewVolatile(), valid: []string{"A", "B"}, invalid: []string{}, }, { - lww1: newDurableWith("", map[string]Time{"A": T(10, 0), "B": T(0, 20)}), - lww2: newVolatileWith(map[string]Time{"C": T(10, 0), "D": T(0, 20)}), - expected: newDurableWith("", map[string]Time{"A": T(10, 0), "B": T(0, 20), "C": T(10, 0), "D": T(0, 20)}), - delta: newVolatileWith(map[string]Time{"C": T(10, 0), "D": T(0, 20)}), + lww1: mapOf(false, T("A", 10, 0, "A1"), T("B", 0, 20)), + lww2: mapOf(false, T("C", 10, 0, "C1"), T("D", 0, 20)), + expected: mapOf(false, T("A", 10, 0, "A1"), T("B", 0, 20), T("C", 10, 0, "C1"), T("D", 0, 20)), + delta: mapOf(false, T("C", 10, 0, "C1"), T("D", 0, 20)), valid: []string{"A", "C"}, invalid: []string{"B", "D"}, }, { - lww1: newDurableWith("", map[string]Time{"A": T(10, 0), "B": T(30, 0)}), - lww2: newVolatileWith(map[string]Time{"A": T(20, 0), "B": T(20, 0)}), - expected: newDurableWith("", map[string]Time{"A": T(20, 0), "B": T(30, 0)}), - delta: newVolatileWith(map[string]Time{"A": T(20, 0)}), + lww1: mapOf(false, T("A", 10, 0, "A1"), T("B", 30, 0, "B1")), + lww2: mapOf(false, T("A", 20, 0, "A2"), T("B", 20, 0, "B2")), + expected: mapOf(false, T("A", 20, 0, "A2"), T("B", 30, 0, "B1")), + delta: mapOf(false, T("A", 20, 0, "A2")), valid: []string{"A", "B"}, invalid: []string{}, }, { - lww1: newDurableWith("", map[string]Time{"A": T(0, 10), "B": T(0, 30)}), - lww2: newVolatileWith(map[string]Time{"A": T(0, 20), "B": T(0, 20)}), - expected: newDurableWith("", map[string]Time{"A": T(0, 20), "B": T(0, 30)}), - delta: newVolatileWith(map[string]Time{"A": T(0, 20)}), + lww1: mapOf(false, T("A", 0, 10), T("B", 0, 30)), + lww2: mapOf(false, T("A", 0, 20), T("B", 0, 20)), + expected: mapOf(false, T("A", 0, 20), T("B", 0, 30)), + delta: mapOf(false, T("A", 0, 20)), valid: []string{}, invalid: []string{"A", "B"}, }, } { tc.lww1.Merge(tc.lww2) - - assert.Equal(t, tc.expected.toMap(), tc.lww1.toMap(), "Merged set is not the same") - assert.Equal(t, tc.delta.data, tc.lww2.data, "Delta set is not the same") + equalSets(t, tc.expected, tc.lww1) + equalSets(t, tc.delta, tc.lww2) for _, obj := range tc.valid { - assert.True(t, tc.lww1.Contains(obj), fmt.Sprintf("expected merged set to contain %v", obj)) + assert.True(t, tc.lww1.Has(obj), fmt.Sprintf("expected merged set to contain %v", obj)) } for _, obj := range tc.invalid { - assert.False(t, tc.lww1.Contains(obj), fmt.Sprintf("expected merged set to NOT contain %v", obj)) + assert.False(t, tc.lww1.Has(obj), fmt.Sprintf("expected merged set to NOT contain %v", obj)) } } } -func TestDurableAll(t *testing.T) { - defer restoreClock(Now) - - setClock(0) - lww := NewDurable("") - lww.Add("A") - lww.Add("B") - lww.Add("C") - - all := lww.toMap() - assert.Equal(t, 3, len(all)) - assert.Equal(t, 3, lww.Count()) - assert.NoError(t, lww.Close()) - assert.Equal(t, 0, lww.Count()) -} - func TestDurableConcurrent(t *testing.T) { i := 0 lww := NewDurable("") + defer lww.Close() for ; i < 100; i++ { setClock(int64(i)) - lww.Add(fmt.Sprintf("%v", i)) + lww.Add(fmt.Sprintf("%v", i), nil) } go func() { @@ -170,7 +232,7 @@ func TestDurableConcurrent(t *testing.T) { for ; gi < gu; gi++ { setClock(int64(100000 + gi)) - other.Remove(fmt.Sprintf("%v", i)) + other.Del(fmt.Sprintf("%v", i)) } stop.Add(1) @@ -184,38 +246,21 @@ func TestDurableConcurrent(t *testing.T) { stop.Wait() } -// Lock for the timer -var lock sync.Mutex - -// RestoreClock restores the clock time -func restoreClock(clk clock) { - lock.Lock() - Now = clk - lock.Unlock() -} - -// SetClock sets the clock time for testing -func setClock(t int64) { - lock.Lock() - Now = func() int64 { return t } - lock.Unlock() -} - // ------------------------------------------------------------------------------------ func TestDurableRange(t *testing.T) { state := newDurableWith("", - map[string]Time{ - "AC": {AddTime: 60, DelTime: 50}, - "AB": {AddTime: 60, DelTime: 50}, - "AA": {AddTime: 10, DelTime: 50}, // Deleted - "BA": {AddTime: 60, DelTime: 50}, - "BB": {AddTime: 60, DelTime: 50}, - "BC": {AddTime: 60, DelTime: 50}, + map[string]Value{ + "AC": newTime(60, 50, nil), + "AB": newTime(60, 50, nil), + "AA": newTime(10, 50, nil), // Deleted + "BA": newTime(60, 50, nil), + "BB": newTime(60, 50, nil), + "BC": newTime(60, 50, nil), }) var count int - state.Range([]byte("A"), func(_ string, v Time) bool { + state.Range([]byte("A"), func(_ string, v Value) bool { if v.IsAdded() { count++ } @@ -224,7 +269,7 @@ func TestDurableRange(t *testing.T) { assert.Equal(t, 2, count) count = 0 - state.Range(nil, func(_ string, v Time) bool { + state.Range(nil, func(_ string, v Value) bool { if v.IsAdded() { count++ } @@ -237,14 +282,14 @@ func TestDurableRange(t *testing.T) { func TestDurableMarshal(t *testing.T) { defer restoreClock(Now) + setClock(10) - setClock(0) - state := newDurableWith("", map[string]Time{"A": {AddTime: 10, DelTime: 50}}) + state := mapOf(true, T("A", 10, 50)).(*Durable) // Encode enc, err := binary.Marshal(state) assert.NoError(t, err) - assert.Equal(t, []byte{0x1, 0x1, 0x41, 0x2, 0x14, 0x64}, enc) + assert.Equal(t, []byte{0x1, 0x1, 0x41, 0x10, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xa, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x32}, enc) // Decode dec := NewDurable("") @@ -253,68 +298,3 @@ func TestDurableMarshal(t *testing.T) { println(dec.toMap()["A"].AddTime) assert.Equal(t, state.toMap(), dec.toMap()) } - -// 15852470 -> 1866217 bytes, 11.77% -func TestDurableSizeMarshal(t *testing.T) { - state, size := loadTestData(t) - - // Encode - enc, err := binary.Marshal(state) - assert.NoError(t, err) - - fmt.Printf("%d -> %d bytes, %.2f%% \n", size, len(enc), float64(len(enc))/float64(size)*100) - assert.Greater(t, 20000000, len(enc)) - - // Decode - out := NewDurable("") - err = binary.Unmarshal(enc, out) - assert.NoError(t, err) - assert.Equal(t, 50000, len(out.toMap())) - - out.Range(nil, func(k string, _ Time) bool { - assert.True(t, out.Contains(k)) - return false - }) -} - -// Benchmark_Marshal/encode-8 21 58190505 ns/op 6761445 B/op 23 allocs/op -// Benchmark_Marshal/decode-8 66 19757589 ns/op 8966002 B/op 54439 allocs/op -func Benchmark_Marshal(b *testing.B) { - state, _ := loadTestData(b) - - // Encode - b.Run("encode", func(b *testing.B) { - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - binary.Marshal(state) - } - }) - - // Decode - enc, err := binary.Marshal(state) - assert.NoError(b, err) - b.Run("decode", func(b *testing.B) { - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - state := NewDurable("") - binary.Unmarshal(enc, state) - } - }) -} - -func loadTestData(t assert.TestingT) (state *Durable, size int) { - buf, err := ioutil.ReadFile("test.bin") - assert.NoError(t, err) - - decoded, err := snappy.Decode(nil, buf) - assert.NoError(t, err) - - data := make(map[string]Time) - err = binary.Unmarshal(decoded, &data) - state = newDurableWith("", data) - assert.NoError(t, err) - size = len(decoded) - return -} diff --git a/internal/event/crdt/map.go b/internal/event/crdt/map.go new file mode 100644 index 00000000..97dbf035 --- /dev/null +++ b/internal/event/crdt/map.go @@ -0,0 +1,116 @@ +/********************************************************************************** +* Copyright (c) 2009-2020 Misakai Ltd. +* This program is free software: you can redistribute it and/or modify it under the +* terms of the GNU Affero General Public License as published by the Free Software +* Foundation, either version 3 of the License, or(at your option) any later version. +* +* This program is distributed in the hope that it will be useful, but WITHOUT ANY +* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License along +* with this program. If not, see. +************************************************************************************/ + +package crdt + +import ( + "time" + + "github.com/kelindar/binary" +) + +// Map represents a contract for a CRDT map. +type Map interface { + Add(string, []byte) + Del(string) + Has(string) bool + Get(string) Value + Merge(Map) + Range([]byte, func(string, Value) bool) + Count() int +} + +// New creates a new CRDT map. +func New(durable bool, path string) Map { + if durable { + return NewDurable(path) + } + return NewVolatile() +} + +// ------------------------------------------------------------------------------------ + +// Value represents a time pair with a value. +type Value []byte + +// newValue returns zero time. +func newValue() Value { + return Value(make([]byte, 16)) +} + +// decodeValue decodes the time from a string +func decodeValue(t string) Value { + return Value(binary.ToBytes(t)) +} + +// IsZero checks if the time is zero +func (v Value) IsZero() bool { + return (v.AddTime() == 0 && v.DelTime() == 0) +} + +// IsAdded checks if add time is larger than remove time. +func (v Value) IsAdded() bool { + return v.AddTime() != 0 && v.AddTime() >= v.DelTime() +} + +// IsRemoved checks if remove time is larger than add time. +func (v Value) IsRemoved() bool { + return v.AddTime() < v.DelTime() +} + +// AddTime returns when the entry was added. +func (v Value) AddTime() int64 { + return int64(binary.BigEndian.Uint64(v[0:8])) +} + +// setAddTime sets when the entry was added. +func (v Value) setAddTime(t int64) { + binary.BigEndian.PutUint64(v[0:8], uint64(t)) +} + +// DelTime returns when the entry was removed. +func (v Value) DelTime() int64 { + return int64(binary.BigEndian.Uint64(v[8:16])) +} + +// setDelTime sets when the entry was removed. +func (v Value) setDelTime(t int64) { + binary.BigEndian.PutUint64(v[8:16], uint64(t)) +} + +// Value returns the extra value payload. +func (v Value) Value() []byte { + return v[16:] +} + +// setValue sets the extra value payload. +func (v *Value) setValue(p []byte) { + h := (*v)[:16] + *v = append(h, p...) +} + +// encode encodes the value to a string +func (v *Value) encode() string { + return binary.ToString((*[]byte)(v)) +} + +// ------------------------------------------------------------------------------------ + +// The clock for unit-testing +type clock func() int64 + +// Now gets the current time in Unix nanoseconds +var Now clock = func() int64 { + return time.Now().UnixNano() +} diff --git a/internal/event/crdt/map_test.go b/internal/event/crdt/map_test.go new file mode 100644 index 00000000..95b884d5 --- /dev/null +++ b/internal/event/crdt/map_test.go @@ -0,0 +1,135 @@ +/********************************************************************************** +* Copyright (c) 2009-2020 Misakai Ltd. +* This program is free software: you can redistribute it and/or modify it under the +* terms of the GNU Affero General Public License as published by the Free Software +* Foundation, either version 3 of the License, or(at your option) any later version. +* +* This program is distributed in the hope that it will be useful, but WITHOUT ANY +* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License along +* with this program. If not, see. +************************************************************************************/ + +package crdt + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +// Benchmark_Time/new-8 57209055 20.7 ns/op 16 B/op 1 allocs/op +func Benchmark_Time(b *testing.B) { + + // Encode + b.Run("new", func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + t := newTime(10, 20, nil) + t.encode() + } + }) +} + +func TestNew(t *testing.T) { + s1 := New(true, "") + assert.IsType(t, new(Durable), s1) + + s2 := New(false, "") + assert.IsType(t, new(Volatile), s2) +} + +func TestTimeCodec(t *testing.T) { + v1 := newTime(10, 50, []byte("hello")) + enc := v1.encode() + assert.Equal(t, int64(10), v1.AddTime()) + assert.Equal(t, int64(50), v1.DelTime()) + assert.Equal(t, []byte{ + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xa, // 10 + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x32, // 50 + 0x68, 0x65, 0x6c, 0x6c, 0x6f, // hello + }, []byte(enc)) + + assert.Equal(t, 21, cap(v1)) + + v2 := decodeValue(enc) + assert.Equal(t, v1, v2) + +} + +func TestTime(t *testing.T) { + v := newTime(Now(), Now(), []byte("hello")) + assert.Equal(t, 21, len(v)) + assert.Equal(t, 21, cap(v)) + v.setValue([]byte("larger value")) + assert.Equal(t, 28, len(v)) + v.setValue(nil) + assert.Equal(t, 16, len(v)) +} + +// ------------------------------------------------------------------------------------ + +// Lock for the timer +var lock sync.Mutex + +// RestoreClock restores the clock time +func restoreClock(clk clock) { + lock.Lock() + Now = clk + lock.Unlock() +} + +// SetClock sets the clock time for testing +func setClock(t int64) { + lock.Lock() + Now = func() int64 { return t } + lock.Unlock() +} + +// New creates a new CRDT set. +func mapOf(durable bool, entries ...func() (string, Value)) Map { + m := make(map[string]Value, len(entries)) + for _, f := range entries { + k, v := f() + m[k] = v + } + + if durable { + return newDurableWith("", m) + } + return newVolatileWith(m) +} + +type Action = func() (string, Value) + +// T returns the entry constructor. +func T(k string, add, del int64, v ...string) Action { + if len(v) == 0 { + v = append(v, "") + } + + value := newTime(add, del, []byte(v[0])) + return func() (string, Value) { + return k, value + } +} + +func equalSets(t *testing.T, expected, current Map) { + expected.Range(nil, func(k string, v Value) bool { + assert.Equal(t, v, current.Get(k)) + return true + }) +} + +// newTime creates a new instance of time stamp. +func newTime(addTime, delTime int64, value []byte) Value { + b := Value(make([]byte, 16, 16+len(value))) + b.setAddTime(addTime) + b.setDelTime(delTime) + b.setValue(value) + return b +} diff --git a/internal/event/crdt/set.go b/internal/event/crdt/set.go deleted file mode 100644 index 59f01ffe..00000000 --- a/internal/event/crdt/set.go +++ /dev/null @@ -1,91 +0,0 @@ -/********************************************************************************** -* Copyright (c) 2009-2020 Misakai Ltd. -* This program is free software: you can redistribute it and/or modify it under the -* terms of the GNU Affero General Public License as published by the Free Software -* Foundation, either version 3 of the License, or(at your option) any later version. -* -* This program is distributed in the hope that it will be useful, but WITHOUT ANY -* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A -* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License along -* with this program. If not, see. -************************************************************************************/ - -package crdt - -import ( - bin "encoding/binary" - "time" - - "github.com/kelindar/binary" -) - -// Set represents a contract for a CRDT set. -type Set interface { - Add(string) - Remove(string) - Contains(string) bool - Get(item string) Time - Merge(Set) - Range([]byte, func(string, Time) bool) - Count() int -} - -// New creates a new CRDT set. -func New(durable bool, path string) Set { - if durable { - return NewDurable(path) - } - return NewVolatile() -} - -// ------------------------------------------------------------------------------------ - -// Time represents a time pair. -type Time struct { - AddTime int64 - DelTime int64 -} - -// IsZero checks if the time is zero -func (t Time) IsZero() bool { - return (t.AddTime == 0 && t.DelTime == 0) -} - -// IsAdded checks if add time is larger than remove time. -func (t Time) IsAdded() bool { - return t.AddTime != 0 && t.AddTime >= t.DelTime -} - -// IsRemoved checks if remove time is larger than add time. -func (t Time) IsRemoved() bool { - return t.AddTime < t.DelTime -} - -// Encode encodes the value to a string -func (t Time) Encode() string { - b := make([]byte, 20) - n1 := bin.PutVarint(b, t.AddTime) - n2 := bin.PutVarint(b[n1:], t.DelTime) - b = b[:n1+n2] - return binary.ToString(&b) -} - -// DecodeTime decodes the time from a string -func decodeTime(t string) (v Time) { - b, n := binary.ToBytes(t), 0 - v.AddTime, n = bin.Varint(b) - v.DelTime, _ = bin.Varint(b[n:]) - return -} - -// ------------------------------------------------------------------------------------ - -// The clock for unit-testing -type clock func() int64 - -// Now gets the current time in Unix nanoseconds -var Now clock = func() int64 { - return time.Now().UnixNano() -} diff --git a/internal/event/crdt/set_test.go b/internal/event/crdt/set_test.go deleted file mode 100644 index a74dfe3d..00000000 --- a/internal/event/crdt/set_test.go +++ /dev/null @@ -1,61 +0,0 @@ -/********************************************************************************** -* Copyright (c) 2009-2020 Misakai Ltd. -* This program is free software: you can redistribute it and/or modify it under the -* terms of the GNU Affero General Public License as published by the Free Software -* Foundation, either version 3 of the License, or(at your option) any later version. -* -* This program is distributed in the hope that it will be useful, but WITHOUT ANY -* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A -* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License along -* with this program. If not, see. -************************************************************************************/ - -package crdt - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func Benchmark_Time(b *testing.B) { - t := Time{10, 20} - - // Encode - b.Run("encode", func(b *testing.B) { - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - t.Encode() - } - }) - - // Decode - enc := t.Encode() - b.Run("decode", func(b *testing.B) { - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - decodeTime(enc) - } - }) -} - -func TestNew(t *testing.T) { - s1 := New(true, "") - assert.IsType(t, new(Durable), s1) - - s2 := New(false, "") - assert.IsType(t, new(Volatile), s2) -} - -func TestTimeCodec(t *testing.T) { - v1 := Time{AddTime: 10, DelTime: 50} - enc := v1.Encode() - assert.Equal(t, []byte{0x14, 0x64}, []byte(enc)) - - v2 := decodeTime(enc) - assert.Equal(t, v1, v2) -} diff --git a/internal/event/crdt/test.bin b/internal/event/crdt/test.bin deleted file mode 100644 index 136b447d..00000000 Binary files a/internal/event/crdt/test.bin and /dev/null differ diff --git a/internal/event/crdt/volatile.go b/internal/event/crdt/volatile.go index 6cd1e1c3..4bd7794f 100644 --- a/internal/event/crdt/volatile.go +++ b/internal/event/crdt/volatile.go @@ -24,62 +24,76 @@ import ( // Volatile represents a last-write-wins CRDT set. type Volatile struct { - lock *sync.Mutex // The associated mutex - data map[string]Time // The data containing the set + lock *sync.Mutex // The associated mutex + data map[string]Value // The data containing the set } // NewVolatile creates a new last-write-wins set with bias for 'add'. func NewVolatile() *Volatile { - return newVolatileWith(make(map[string]Time, 64)) + return newVolatileWith(make(map[string]Value, 64)) } // newVolatileWith creates a new last-write-wins set with bias for 'add'. -func newVolatileWith(items map[string]Time) *Volatile { +func newVolatileWith(items map[string]Value) *Volatile { return &Volatile{ lock: new(sync.Mutex), data: items, } } +// Fetch fetches the item from the dictionary. +func (s *Volatile) fetch(item string) Value { + if t, ok := s.data[item]; ok { + return t + } + return newValue() +} + // Add adds a value to the set. -func (s *Volatile) Add(item string) { +func (s *Volatile) Add(item string, value []byte) { s.lock.Lock() defer s.lock.Unlock() - v, _ := s.data[item] - s.data[item] = Time{AddTime: Now(), DelTime: v.DelTime} + t, now := s.fetch(item), Now() + if t.AddTime() < now { + t.setAddTime(now) + t.setValue(value) + s.data[item] = t + } } -// Remove removes the value from the set. -func (s *Volatile) Remove(item string) { +// Del removes the value from the set. +func (s *Volatile) Del(item string) { s.lock.Lock() defer s.lock.Unlock() - v, _ := s.data[item] - s.data[item] = Time{AddTime: v.AddTime, DelTime: Now()} + t, now := s.fetch(item), Now() + if t.DelTime() < now { + t.setDelTime(now) + s.data[item] = t + } } -// Contains checks if a value is present in the set. -func (s *Volatile) Contains(item string) bool { +// Has checks if a value is present in the set. +func (s *Volatile) Has(item string) bool { s.lock.Lock() defer s.lock.Unlock() - v, _ := s.data[item] - return v.IsAdded() + t := s.fetch(item) + return t.IsAdded() } // Get retrieves the time for an item. -func (s *Volatile) Get(item string) Time { +func (s *Volatile) Get(item string) Value { s.lock.Lock() defer s.lock.Unlock() - v, _ := s.data[item] - return v + return s.fetch(item) } // Merge merges two LWW sets. This also modifies the set being merged in // to leave only the delta. -func (s *Volatile) Merge(other Set) { +func (s *Volatile) Merge(other Map) { r := other.(*Volatile) s.lock.Lock() r.lock.Lock() @@ -87,31 +101,34 @@ func (s *Volatile) Merge(other Set) { defer r.lock.Unlock() for key, rt := range r.data { - t, _ := s.data[key] + st := s.fetch(key) - if t.AddTime < rt.AddTime { - t.AddTime = rt.AddTime + // Update add time & value + if st.AddTime() < rt.AddTime() { + st.setAddTime(rt.AddTime()) } else { - rt.AddTime = 0 // Remove from delta + rt.setAddTime(0) // Remove from delta } - if t.DelTime < rt.DelTime { - t.DelTime = rt.DelTime + // Update delete time + if st.DelTime() < rt.DelTime() { + st.setDelTime(rt.DelTime()) } else { - rt.DelTime = 0 // Remove from delta + rt.setDelTime(0) // Remove from delta } if rt.IsZero() { delete(r.data, key) // Remove from delta } else { - s.data[key] = t // Merge the new value - r.data[key] = rt // Update the delta + st.setValue(rt.Value()) // Set the new value + s.data[key] = st // Merge the new value + r.data[key] = rt // Update the delta } } } // Range iterates through the events for a specific prefix. -func (s *Volatile) Range(prefix []byte, f func(string, Time) bool) { +func (s *Volatile) Range(prefix []byte, f func(string, Value) bool) { s.lock.Lock() defer s.lock.Unlock() @@ -150,7 +167,7 @@ func (c *codecVolatile) EncodeTo(e *binary.Encoder, rv reflect.Value) (err error e.WriteUvarint(uint64(len(s.data))) for k, t := range s.data { e.WriteString(k) - e.WriteString(t.Encode()) + e.WriteString(t.encode()) } return } @@ -174,7 +191,7 @@ func (c *codecVolatile) DecodeTo(d *binary.Decoder, rv reflect.Value) (err error return nil } - out.data[binary.ToString(&k)] = decodeTime(binary.ToString(&v)) + out.data[binary.ToString(&k)] = decodeValue(binary.ToString(&v)) } rv.Set(reflect.ValueOf(*out)) diff --git a/internal/event/crdt/volatile_test.go b/internal/event/crdt/volatile_test.go index 60f65761..6b1f6a4f 100644 --- a/internal/event/crdt/volatile_test.go +++ b/internal/event/crdt/volatile_test.go @@ -23,121 +23,12 @@ import ( "github.com/stretchr/testify/assert" ) -func TestVolatileAddContains(t *testing.T) { - testStr := "ABCD" - - lww := NewVolatile() - assert.False(t, lww.Contains(testStr)) - - lww.Add(testStr) - assert.True(t, lww.Contains(testStr)) - - entry := lww.Get(testStr) - assert.True(t, entry.IsAdded()) - assert.False(t, entry.IsRemoved()) - assert.False(t, entry.IsZero()) -} - -func TestVolatileAddRemoveContains(t *testing.T) { - defer restoreClock(Now) - - setClock(0) - - lww := NewVolatile() - testStr := "object2" - - lww.Add(testStr) - - setClock(100) - lww.Remove(testStr) - - assert.False(t, lww.Contains(testStr)) - - entry := lww.data[testStr] - assert.False(t, entry.IsAdded()) - assert.True(t, entry.IsRemoved()) - assert.False(t, entry.IsZero()) -} - -func TestVolatileMerge(t *testing.T) { - var T = func(add, del int64) Time { - return Time{AddTime: add, DelTime: del} - } - - for _, tc := range []struct { - lww1, lww2, expected, delta *Volatile - valid, invalid []string - }{ - { - lww1: newVolatileWith(map[string]Time{"A": T(10, 0), "B": T(20, 0)}), - lww2: newVolatileWith(map[string]Time{"A": T(0, 20), "B": T(0, 20)}), - expected: newVolatileWith(map[string]Time{"A": T(10, 20), "B": T(20, 20)}), - delta: newVolatileWith(map[string]Time{"A": T(0, 20), "B": T(0, 20)}), - valid: []string{"B"}, - invalid: []string{"A"}, - }, - { - lww1: newVolatileWith(map[string]Time{"A": T(10, 0), "B": T(20, 0)}), - lww2: newVolatileWith(map[string]Time{"A": T(0, 20), "B": T(10, 0)}), - expected: newVolatileWith(map[string]Time{"A": T(10, 20), "B": T(20, 0)}), - delta: newVolatileWith(map[string]Time{"A": T(0, 20)}), - valid: []string{"B"}, - invalid: []string{"A"}, - }, - { - lww1: newVolatileWith(map[string]Time{"A": T(30, 0), "B": T(20, 0)}), - lww2: newVolatileWith(map[string]Time{"A": T(20, 0), "B": T(10, 0)}), - expected: newVolatileWith(map[string]Time{"A": T(30, 0), "B": T(20, 0)}), - delta: NewVolatile(), - valid: []string{"A", "B"}, - invalid: []string{}, - }, - { - lww1: newVolatileWith(map[string]Time{"A": T(10, 0), "B": T(0, 20)}), - lww2: newVolatileWith(map[string]Time{"C": T(10, 0), "D": T(0, 20)}), - expected: newVolatileWith(map[string]Time{"A": T(10, 0), "B": T(0, 20), "C": T(10, 0), "D": T(0, 20)}), - delta: newVolatileWith(map[string]Time{"C": T(10, 0), "D": T(0, 20)}), - valid: []string{"A", "C"}, - invalid: []string{"B", "D"}, - }, - { - lww1: newVolatileWith(map[string]Time{"A": T(10, 0), "B": T(30, 0)}), - lww2: newVolatileWith(map[string]Time{"A": T(20, 0), "B": T(20, 0)}), - expected: newVolatileWith(map[string]Time{"A": T(20, 0), "B": T(30, 0)}), - delta: newVolatileWith(map[string]Time{"A": T(20, 0)}), - valid: []string{"A", "B"}, - invalid: []string{}, - }, - { - lww1: newVolatileWith(map[string]Time{"A": T(0, 10), "B": T(0, 30)}), - lww2: newVolatileWith(map[string]Time{"A": T(0, 20), "B": T(0, 20)}), - expected: newVolatileWith(map[string]Time{"A": T(0, 20), "B": T(0, 30)}), - delta: newVolatileWith(map[string]Time{"A": T(0, 20)}), - valid: []string{}, - invalid: []string{"A", "B"}, - }, - } { - - tc.lww1.Merge(tc.lww2) - assert.Equal(t, tc.expected, tc.lww1, "Merged set is not the same") - assert.Equal(t, tc.delta, tc.lww2, "Delta set is not the same") - - for _, obj := range tc.valid { - assert.True(t, tc.lww1.Contains(obj), fmt.Sprintf("expected merged set to contain %v", obj)) - } - - for _, obj := range tc.invalid { - assert.False(t, tc.lww1.Contains(obj), fmt.Sprintf("expected merged set to NOT contain %v", obj)) - } - } -} - -func TestConcurrent(t *testing.T) { +func TestVolatile_Concurrent(t *testing.T) { i := 0 lww := NewVolatile() for ; i < 100; i++ { setClock(int64(i)) - lww.Add(fmt.Sprintf("%v", i)) + lww.Add(fmt.Sprintf("%v", i), nil) } go func() { @@ -154,7 +45,7 @@ func TestConcurrent(t *testing.T) { for ; gi < gu; gi++ { setClock(int64(100000 + gi)) - other.Remove(fmt.Sprintf("%v", i)) + other.Del(fmt.Sprintf("%v", i)) } stop.Add(1) @@ -173,17 +64,17 @@ func TestConcurrent(t *testing.T) { func TestRange(t *testing.T) { state := newVolatileWith( - map[string]Time{ - "AC": {AddTime: 60, DelTime: 50}, - "AB": {AddTime: 60, DelTime: 50}, - "AA": {AddTime: 10, DelTime: 50}, // Deleted - "BA": {AddTime: 60, DelTime: 50}, - "BB": {AddTime: 60, DelTime: 50}, - "BC": {AddTime: 60, DelTime: 50}, + map[string]Value{ + "AC": newTime(60, 50, nil), + "AB": newTime(60, 50, nil), + "AA": newTime(10, 50, nil), // Deleted + "BA": newTime(60, 50, nil), + "BB": newTime(60, 50, nil), + "BC": newTime(60, 50, nil), }) var count int - state.Range([]byte("A"), func(_ string, v Time) bool { + state.Range([]byte("A"), func(_ string, v Value) bool { if v.IsAdded() { count++ } @@ -192,7 +83,7 @@ func TestRange(t *testing.T) { assert.Equal(t, 2, count) count = 0 - state.Range(nil, func(_ string, v Time) bool { + state.Range(nil, func(_ string, v Value) bool { if v.IsAdded() { count++ } @@ -208,12 +99,12 @@ func TestTempMarshal(t *testing.T) { defer restoreClock(Now) setClock(0) - state := newVolatileWith(map[string]Time{"A": {AddTime: 10, DelTime: 50}}) + state := newVolatileWith(map[string]Value{"A": newTime(10, 50, nil)}) // Encode enc, err := binary.Marshal(state) assert.NoError(t, err) - assert.Equal(t, []byte{0x1, 0x1, 0x41, 0x2, 0x14, 0x64}, enc) + assert.Equal(t, []byte{0x1, 0x1, 0x41, 0x10, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xa, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x32}, enc) // Decode dec := NewVolatile() diff --git a/internal/event/events.go b/internal/event/events.go index 73bddfaa..e27008b6 100644 --- a/internal/event/events.go +++ b/internal/event/events.go @@ -30,18 +30,19 @@ const ( // Event represents an encodable event that happened at some point in time. type Event interface { unitType() uint8 - Encode() string + Key() string + Val() []byte } // ------------------------------------------------------------------------------------ // Subscription represents a subscription event. type Subscription struct { - Peer uint64 // The name of the peer. This must be first, since we're doing prefix search. - Conn security.ID // The connection identifier. + Peer uint64 `binary:"-"` // The name of the peer. This must be first, since we're doing prefix search. + Conn security.ID `binary:"-"` // The connection identifier. + Ssid message.Ssid `binary:"-"` // The SSID for the subscription. User nocopy.String // The connection username. Channel nocopy.Bytes // The channel string. - Ssid message.Ssid // The SSID for the subscription. } // Type retuns the unit type. @@ -54,16 +55,39 @@ func (e *Subscription) ConnID() string { return e.Conn.Unique(uint64(e.Peer), "emitter") } -// Encode encodes the event to string representation. -func (e *Subscription) Encode() string { - buf, _ := binary.Marshal(e) - return string(buf) +// Key returns the event key. +func (e *Subscription) Key() string { + buffer := make([]byte, 16+4*len(e.Ssid)) + binary.BigEndian.PutUint64(buffer[0:8], e.Peer) + binary.BigEndian.PutUint64(buffer[8:16], uint64(e.Conn)) + for i := 0; i < len(e.Ssid); i++ { + binary.BigEndian.PutUint32(buffer[16+(i*4):20+(i*4)], e.Ssid[i]) + } + return binary.ToString(&buffer) +} + +// Val returns the event value. +func (e *Subscription) Val() []byte { + buffer, _ := binary.Marshal(e) + return buffer } // decodeSubscription decodes the event -func decodeSubscription(encoded string) (Subscription, error) { - var out Subscription - return out, binary.Unmarshal([]byte(encoded), &out) +func decodeSubscription(k string, v []byte) (e Subscription, err error) { + if len(v) > 0 { + err = binary.Unmarshal(v, &e) + } + + // Decode the key + buffer := binary.ToBytes(k) + e.Peer = binary.BigEndian.Uint64(buffer[0:8]) + e.Conn = security.ID(binary.BigEndian.Uint64(buffer[8:16])) + e.Ssid = make(message.Ssid, (len(buffer)-16)/4) + for i := 0; i < len(e.Ssid); i++ { + e.Ssid[i] = binary.BigEndian.Uint32(buffer[16+(i*4) : 20+(i*4)]) + } + + return e, err } // ------------------------------------------------------------------------------------ @@ -76,11 +100,16 @@ func (e *Ban) unitType() uint8 { return typeBan } -// Encode encodes the event to string representation. -func (e Ban) Encode() string { +// Key returns the event key. +func (e Ban) Key() string { return string(e) } +// Val returns the event value. +func (e Ban) Val() []byte { + return nil +} + // decodeBan decodes the event func decodeBan(encoded string) (Ban, error) { return Ban(encoded), nil diff --git a/internal/event/events_test.go b/internal/event/events_test.go index 23ad85c2..b02b92b0 100644 --- a/internal/event/events_test.go +++ b/internal/event/events_test.go @@ -18,12 +18,13 @@ import ( "testing" "github.com/emitter-io/emitter/internal/message" + "github.com/emitter-io/emitter/internal/security/hash" "github.com/stretchr/testify/assert" ) func TestEncodeSubscription(t *testing.T) { ev := Subscription{ - Ssid: message.Ssid{1, 2, 3, 4, 5}, + Ssid: message.Ssid{hash.OfString("a"), hash.OfString("b"), hash.OfString("c"), hash.OfString("d"), hash.OfString("e")}, Peer: 657, Conn: 12456, User: "hello", @@ -33,25 +34,27 @@ func TestEncodeSubscription(t *testing.T) { assert.Equal(t, "LPCGOQV6DEDQFHIRWBMKICQCZE", ev.ConnID()) // Encode - enc := ev.Encode() + k, v := ev.Key(), ev.Val() assert.Equal(t, typeSub, ev.unitType()) - assert.Equal(t, 27, len(enc)) + assert.Equal(t, 36, len(k)) assert.Equal(t, - []byte{0x91, 0x5, 0xa8, 0x61, 0x5, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0xa, 0x61, 0x2f, 0x62, 0x2f, 0x63, 0x2f, 0x64, 0x2f, 0x65, 0x2f, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}, - []byte(enc), + []byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x91, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x30, 0xa8, 0xc1, 0x3, 0xea, + 0xb3, 0x1d, 0xd8, 0x2e, 0x48, 0x3d, 0x43, 0x19, 0x23, 0x16, 0x1b, 0xa0, 0x5b, 0x7f, 0xd4, 0x1, 0x2}, + []byte(k), ) // Decode - dec, err := decodeSubscription(enc) + dec, err := decodeSubscription(k, v) assert.NoError(t, err) assert.Equal(t, ev, dec) } func TestEncodeBan(t *testing.T) { ev := Ban("a/b/c/d/e/") + assert.Nil(t, ev.Val()) // Encode - enc := ev.Encode() + enc := ev.Key() assert.Equal(t, typeBan, ev.unitType()) assert.Equal(t, 10, len(enc)) assert.Equal(t, @@ -65,8 +68,8 @@ func TestEncodeBan(t *testing.T) { assert.Equal(t, ev, dec) } -// Benchmark_Subscription/encode-8 4379755 270 ns/op 112 B/op 2 allocs/op -// Benchmark_Subscription/decode-8 2803533 428 ns/op 176 B/op 4 allocs/op +// Benchmark_Subscription/encode-8 5939726 199 ns/op 160 B/op 3 allocs/op +// Benchmark_Subscription/decode-8 6665554 178 ns/op 112 B/op 2 allocs/op func Benchmark_Subscription(b *testing.B) { ev := Subscription{ Ssid: message.Ssid{1, 2, 3, 4, 5}, @@ -81,17 +84,18 @@ func Benchmark_Subscription(b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - ev.Encode() + ev.Key() + ev.Val() } }) // Decode - enc := ev.Encode() + k, v := ev.Key(), ev.Val() b.Run("decode", func(b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - decodeSubscription(enc) + decodeSubscription(k, v) } }) diff --git a/internal/event/state.go b/internal/event/state.go index 643891c8..ccc4aa2f 100644 --- a/internal/event/state.go +++ b/internal/event/state.go @@ -15,7 +15,6 @@ package event import ( - bin "encoding/binary" "io" "path" @@ -25,13 +24,13 @@ import ( "github.com/weaveworks/mesh" ) -// Time represents an event time (CRDT metadata). -type Time = crdt.Time +// Value represents an event time & value. +type Value = crdt.Value // State represents globally synchronised state. type State struct { durable bool // Whether the state is durable or not. - subsets map[uint8]crdt.Set // The subsets of the state. + subsets map[uint8]crdt.Map // The subsets of the state. } // NewState creates a new replicated state. @@ -39,7 +38,7 @@ func NewState(dir string) *State { durable := dir != "" return &State{ durable: durable, - subsets: map[uint8]crdt.Set{ + subsets: map[uint8]crdt.Map{ typeSub: crdt.New(durable, ""), typeBan: crdt.New(durable, fileOf(dir, "ban.db")), }, @@ -118,27 +117,27 @@ func (st *State) Merge(other mesh.GossipData) mesh.GossipData { // Add adds the unit to the state. func (st *State) Add(ev Event) { set := st.subsets[ev.unitType()] - set.Add(ev.Encode()) + set.Add(ev.Key(), ev.Val()) } -// Remove removes the unit from the state. -func (st *State) Remove(ev Event) { +// Del removes the unit from the state. +func (st *State) Del(ev Event) { set := st.subsets[ev.unitType()] - set.Remove(ev.Encode()) + set.Del(ev.Key()) } -// Contains checks if the state contains an event. -func (st *State) Contains(ev Event) bool { +// Has checks if the state contains an event. +func (st *State) Has(ev Event) bool { set := st.subsets[ev.unitType()] - return set.Contains(ev.Encode()) + return set.Has(ev.Key()) } // Subscriptions iterates through all of the subscription units. This call is // blocking and will lock the entire set of subscriptions while iterating. -func (st *State) Subscriptions(f func(*Subscription, Time)) { +func (st *State) Subscriptions(f func(*Subscription, Value)) { set := st.subsets[typeSub] - set.Range(nil, func(v string, t crdt.Time) bool { - if ev, err := decodeSubscription(v); err == nil { + set.Range(nil, func(v string, t Value) bool { + if ev, err := decodeSubscription(v, t.Value()); err == nil { f(&ev, t) } return true @@ -147,16 +146,15 @@ func (st *State) Subscriptions(f func(*Subscription, Time)) { // SubscriptionsOf iterates through the subscription events for a specific peer. func (st *State) SubscriptionsOf(name mesh.PeerName, f func(*Subscription)) { - buffer := make([]byte, 10, 10) - offset := bin.PutUvarint(buffer, uint64(name)) - prefix := buffer[:offset] + prefix := make([]byte, 8) + binary.BigEndian.PutUint64(prefix, uint64(name)) // Copy since the Range() is locked var events []*Subscription set := st.subsets[typeSub] - set.Range(prefix, func(v string, t crdt.Time) bool { + set.Range(prefix, func(v string, t crdt.Value) bool { if t.IsAdded() { - if ev, err := decodeSubscription(v); err == nil { + if ev, err := decodeSubscription(v, t.Value()); err == nil { events = append(events, &ev) } } diff --git a/internal/event/state_test.go b/internal/event/state_test.go index 5fec3e5e..4fc69101 100644 --- a/internal/event/state_test.go +++ b/internal/event/state_test.go @@ -34,7 +34,7 @@ func setClock(t int64) { crdt.Now = func() int64 { return t } } -// Benchmark_State/contains-8 11535033 104 ns/op 4 B/op 1 allocs/op +// Benchmark_State/contains-8 11538494 100 ns/op 16 B/op 1 allocs/op func Benchmark_State(b *testing.B) { state := NewState(":memory:") for i := 1; i <= 20000; i++ { @@ -45,13 +45,13 @@ func Benchmark_State(b *testing.B) { // Encode target := Ban("10000") - state.Contains(&target) + state.Has(&target) time.Sleep(10 * time.Millisecond) b.Run("contains", func(b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - state.Contains(&target) + state.Has(&target) } }) } @@ -70,18 +70,16 @@ func TestEncodeSubscriptionState(t *testing.T) { Channel: nocopy.Bytes("A"), } state.Add(ev) - assert.True(t, state.Contains(ev)) + assert.True(t, state.Has(ev)) // Encode / decode enc := state.Encode()[0] dec, err := DecodeState(enc) assert.NoError(t, err) - dec.Subscriptions(func(k *Subscription, v Time) { + dec.Subscriptions(func(k *Subscription, v Value) { assert.Equal(t, "A", string(k.Channel)) - assert.Equal(t, Time{ - AddTime: 10, - DelTime: 0, - }, v) + assert.Equal(t, int64(10), v.AddTime()) + assert.Equal(t, int64(0), v.DelTime()) }) state.Close() @@ -99,32 +97,27 @@ func TestMergeState(t *testing.T) { state1 := NewState("") state1.Add(&ev) - // Remove from state 2 + // Del from state 2 setClock(50) state2 := NewState("") - state2.Remove(&ev) + state2.Del(&ev) // Merge delta := state1.Merge(state2) assert.Equal(t, state2, delta) - state1.Subscriptions(func(_ *Subscription, v Time) { - assert.Equal(t, Time{ - AddTime: 20, - DelTime: 50, - }, v) + state1.Subscriptions(func(_ *Subscription, v Value) { + assert.Equal(t, int64(20), v.AddTime()) + assert.Equal(t, int64(50), v.DelTime()) }) - state2.Subscriptions(func(_ *Subscription, v Time) { - assert.Equal(t, Time{ - AddTime: 0, - DelTime: 50, - }, v) + state2.Subscriptions(func(_ *Subscription, v Value) { + assert.Equal(t, int64(50), v.DelTime()) }) // Merge with zero delta state3 := NewState("") - state3.Remove(&ev) + state3.Del(&ev) delta = state3.Merge(state2) assert.Nil(t, delta) } @@ -149,13 +142,13 @@ func TestSubscriptions(t *testing.T) { // Must have 2 keys alive after removal setClock(int64(21)) state.SubscriptionsOf(1, func(ev *Subscription) { - state.Remove(ev) + state.Del(ev) }) assert.Equal(t, 2, countAdded(state)) // Count all of the subscriptions (alive or dead) count := 0 - state.Subscriptions(func(ev *Subscription, _ Time) { + state.Subscriptions(func(ev *Subscription, _ Value) { count++ }) assert.Equal(t, 3, count) @@ -163,7 +156,7 @@ func TestSubscriptions(t *testing.T) { func countAdded(state *State) (added int) { set := state.subsets[typeSub] - set.Range(nil, func(_ string, v Time) bool { + set.Range(nil, func(_ string, v Value) bool { if v.IsAdded() { added++ }