Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed CRDT Set to a Map #320

Merged
merged 1 commit into from
May 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions internal/broker/cluster/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (s *Swarm) onPeerOffline(name mesh.PeerName) {
dead := &deadPeer{name: name}
s.state.SubscriptionsOf(name, func(ev *event.Subscription) {
s.OnUnsubscribe(dead, ev) // Notify locally that the subscription is gone
s.state.Remove(ev) // Remove the state from ourselves
s.state.Del(ev) // Remove the state from ourselves
})
}
}
Expand Down Expand Up @@ -248,22 +248,22 @@ func (s *Swarm) merge(buf []byte) (mesh.GossipData, error) {

// Merge and get the delta
delta := s.state.Merge(other)
other.Subscriptions(func(ev *event.Subscription, t event.Time) {
other.Subscriptions(func(ev *event.Subscription, v event.Value) {
if ev.Peer == uint64(s.router.Ourself.Name) {
return // Skip ourselves
}

// Find the active peer for this subscription event
encoded := ev.Encode()
key := ev.Key()
peer := s.findPeer(mesh.PeerName(ev.Peer))

// If the subscription is added, notify (TODO: use channels)
if t.IsAdded() && peer.onSubscribe(encoded, ev.Ssid) && peer.IsActive() {
if v.IsAdded() && peer.onSubscribe(key, ev.Ssid) && peer.IsActive() {
s.OnSubscribe(peer, ev)
}

// If the subscription is removed, notify (TODO: use channels)
if t.IsRemoved() && peer.onUnsubscribe(encoded, ev.Ssid) && peer.IsActive() {
if v.IsRemoved() && peer.onUnsubscribe(key, ev.Ssid) && peer.IsActive() {
s.OnUnsubscribe(peer, ev)
}
})
Expand Down Expand Up @@ -348,17 +348,17 @@ func (s *Swarm) NotifyBeginOf(ev event.Event) {

// NotifyEndOf notifies the swarm when an event is stopped being triggered.
func (s *Swarm) NotifyEndOf(ev event.Event) {
s.state.Remove(ev)
s.state.Del(ev)

// Create a delta for broadcasting just this operation
op := event.NewState("")
op.Remove(ev)
op.Del(ev)
s.gossip.GossipBroadcast(op)
}

// Contains checks whether an event is currently triggered within the cluster.
func (s *Swarm) Contains(ev event.Event) bool {
return s.state.Contains(ev)
return s.state.Has(ev)
}

// Close terminates the connection.
Expand Down
84 changes: 46 additions & 38 deletions internal/event/crdt/durable.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (
"github.com/tidwall/buntdb"
)

// getTime retrieves a time from the store.
func getTime(tx *buntdb.Tx, item string) Time {
// getValue retrieves a time from the store.
func getValue(tx *buntdb.Tx, item string) Value {
if t, err := tx.Get(item); err == nil {
return decodeTime(t)
return decodeValue(t)
}
return Time{}
return newValue()
}

// Durable represents a last-write-wins CRDT set which can be persisted to disk.
Expand All @@ -46,7 +46,7 @@ func NewDurable(dir string) *Durable {
}

// newDurableWith creates a new last-write-wins set with bias for 'add'.
func newDurableWith(path string, items map[string]Time) *Durable {
func newDurableWith(path string, items map[string]Value) *Durable {
if path == "" {
path = ":memory:"
}
Expand All @@ -72,7 +72,7 @@ func newDurableWith(path string, items map[string]Time) *Durable {
}

// Store stores the item into the transaction.
func (s *Durable) store(tx *buntdb.Tx, key string, t Time) {
func (s *Durable) store(tx *buntdb.Tx, key string, t Value) {
var opts *buntdb.SetOptions
if t.IsRemoved() {
opts = &buntdb.SetOptions{
Expand All @@ -81,83 +81,91 @@ func (s *Durable) store(tx *buntdb.Tx, key string, t Time) {
}
}

tx.Set(key, t.Encode(), opts)
tx.Set(key, t.encode(), opts)
}

// Fetch fetches the item either from transaction or cache.
func (s *Durable) fetch(item string) Time {
func (s *Durable) fetch(item string) Value {
cacheKey := binary.ToBytes(item)
if v, err := s.cache.Get(cacheKey); err == nil {
return decodeTime(binary.ToString(&v))
return decodeValue(binary.ToString(&v))
}

tx, _ := s.db.Begin(false)
defer tx.Rollback()
if t, err := tx.Get(item); err == nil {
s.cache.Set(cacheKey, binary.ToBytes(t), 60)
return decodeTime(t)
return decodeValue(t)
}
return Time{}
return newValue()
}

// Add adds a value to the set.
func (s *Durable) Add(item string) {
func (s *Durable) Add(item string, value []byte) {
s.db.Update(func(tx *buntdb.Tx) error {
t := getTime(tx, item)
s.store(tx, item, Time{AddTime: Now(), DelTime: t.DelTime})
t, now := getValue(tx, item), Now()
if t.AddTime() < now {
t.setAddTime(Now())
t.setValue(value)
s.store(tx, item, t)
}
return nil
})
}

// Remove removes the value from the set.
func (s *Durable) Remove(item string) {
// Del removes the value from the set.
func (s *Durable) Del(item string) {
s.db.Update(func(tx *buntdb.Tx) error {
v := getTime(tx, item)
s.store(tx, item, Time{AddTime: v.AddTime, DelTime: Now()})
t, now := getValue(tx, item), Now()
if t.DelTime() < now {
t.setDelTime(Now())
s.store(tx, item, t)
}
return nil
})
}

// Contains checks if a value is present in the set.
func (s *Durable) Contains(item string) bool {
// Has checks if a value is present in the set.
func (s *Durable) Has(item string) bool {
return s.fetch(item).IsAdded()
}

// Get retrieves the time for an item.
func (s *Durable) Get(item string) Time {
func (s *Durable) Get(item string) Value {
return s.fetch(item)
}

// Merge merges two LWW sets. This also modifies the set being merged in
// to leave only the delta.
func (s *Durable) Merge(other Set) {
func (s *Durable) Merge(other Map) {
r := other.(*Volatile)
r.lock.Lock()
defer r.lock.Unlock()

s.db.Update(func(stx *buntdb.Tx) error {
for key, rt := range r.data {
st := getTime(stx, key)
st := getValue(stx, key)

// Update add time
if st.AddTime < rt.AddTime {
st.AddTime = rt.AddTime
// Update add time & value
if st.AddTime() < rt.AddTime() {
st.setAddTime(rt.AddTime())
} else {
rt.AddTime = 0 // Remove from delta
rt.setAddTime(0) // Remove from delta
}

// Update delete time
if st.DelTime < rt.DelTime {
st.DelTime = rt.DelTime
if st.DelTime() < rt.DelTime() {
st.setDelTime(rt.DelTime())
} else {
rt.DelTime = 0 // Remove from delta
rt.setDelTime(0) // Remove from delta
}

if rt.IsZero() {
delete(r.data, key) // Remove from delta
} else {
s.store(stx, key, st) // Merge the new value
r.data[key] = rt // Update the delta
st.setValue(rt.Value()) // Set the new value
s.store(stx, key, st) // Merge the new value
r.data[key] = rt // Update the delta
}
}

Expand All @@ -166,31 +174,31 @@ func (s *Durable) Merge(other Set) {
}

// Range iterates through the events for a specific prefix.
func (s *Durable) Range(prefix []byte, f func(string, Time) bool) {
func (s *Durable) Range(prefix []byte, f func(string, Value) bool) {
s.db.View(func(tx *buntdb.Tx) error {
return tx.Ascend("", func(k, v string) bool {
if !bytes.HasPrefix(binary.ToBytes(k), prefix) {
return true
}

return f(k, decodeTime(v))
return f(k, decodeValue(v))
})
})
}

// Count returns the number of items in the set.
func (s *Durable) Count() (count int) {
s.Range(nil, func(k string, v Time) bool {
s.Range(nil, func(k string, v Value) bool {
count++
return true
})
return
}

// ToMap converts the set to a map (useful for testing).
func (s *Durable) toMap() map[string]Time {
m := make(map[string]Time)
s.Range(nil, func(k string, v Time) bool {
func (s *Durable) toMap() map[string]Value {
m := make(map[string]Value)
s.Range(nil, func(k string, v Value) bool {
m[k] = v
return true
})
Expand Down
Loading