Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Merge pull request #312 from jcooklin/fb/tribe
Browse files Browse the repository at this point in the history
Adds global tribe membership and agreements
  • Loading branch information
jcooklin committed Sep 4, 2015
2 parents 8df2b1f + 55c64b9 commit 315901a
Show file tree
Hide file tree
Showing 10 changed files with 2,329 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ build/
*.swp
profile.cov
gin-bin
tags

# we don't vendor godep _workspace
**/Godeps/_workspace/**
Expand Down
10 changes: 9 additions & 1 deletion Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions mgmt/tribe/broadcast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package tribe

import "github.com/hashicorp/memberlist"

// broadcast implements memberlist.Broadcast
type broadcast struct {
msg []byte
notify chan<- struct{}
}

func (b *broadcast) Invalidates(other memberlist.Broadcast) bool {
return false
}

func (b *broadcast) Message() []byte {
return b.msg
}

func (b *broadcast) Finished() {
if b.notify != nil {
close(b.notify)
}
}
35 changes: 35 additions & 0 deletions mgmt/tribe/clock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package tribe

import "sync/atomic"

type LClock struct {
Inc uint64
}

type LTime uint64

//Update
func (l *LClock) Update(lt LTime) {
for {
cur := LTime(atomic.LoadUint64(&l.Inc))
// If we are newer return now
if lt < cur {
return
}
// If we CAS successfully return else our counter changed
// and we need to try again
if atomic.CompareAndSwapUint64(&l.Inc, uint64(cur), uint64(lt)+1) {
return
}
}
}

// Increment
func (l *LClock) Increment() LTime {
return LTime(atomic.AddUint64(&l.Inc, 1))
}

// Time returns the current value of the clock
func (l *LClock) Time() LTime {
return LTime(atomic.LoadUint64(&l.Inc))
}
281 changes: 281 additions & 0 deletions mgmt/tribe/delegate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
package tribe

type delegate struct {
tribe *tribe
}

func (t *delegate) NodeMeta(limit int) []byte {
logger.WithField("_block", "NodeMeta").Debugln("NodeMeta called")
//consider using NodeMeta to store declared, inferred and derived facts
return []byte{}
}

func (t *delegate) NotifyMsg(buf []byte) {
if len(buf) == 0 {
return
}

var rebroadcast = true

switch msgType(buf[0]) {
case addPluginMsgType:
msg := &pluginMsg{}
if err := decodeMessage(buf[1:], msg); err != nil {
panic(err)
}
rebroadcast = t.tribe.handleAddPlugin(msg)
case removePluginMsgType:
msg := &pluginMsg{}
if err := decodeMessage(buf[1:], msg); err != nil {
panic(err)
}
rebroadcast = t.tribe.handleRemovePlugin(msg)
case addAgreementMsgType:
msg := &agreementMsg{}
if err := decodeMessage(buf[1:], msg); err != nil {
panic(err)
}
rebroadcast = t.tribe.handleAddAgreement(msg)
case removeAgreementMsgType:
msg := &agreementMsg{}
if err := decodeMessage(buf[1:], msg); err != nil {
panic(err)
}
rebroadcast = t.tribe.handleRemoveAgreement(msg)
case joinAgreementMsgType:
msg := &agreementMsg{}
if err := decodeMessage(buf[1:], msg); err != nil {
panic(err)
}
rebroadcast = t.tribe.handleJoinAgreement(msg)
case leaveAgreementMsgType:
msg := &agreementMsg{}
if err := decodeMessage(buf[1:], msg); err != nil {
panic(err)
}
rebroadcast = t.tribe.handleLeaveAgreement(msg)
case addTaskMsgType:
msg := &taskMsg{}
if err := decodeMessage(buf[1:], msg); err != nil {
panic(err)
}
rebroadcast = t.tribe.handleAddTask(msg)
case removeTaskMsgType:
msg := &taskMsg{}
if err := decodeMessage(buf[1:], msg); err != nil {
panic(err)
}
rebroadcast = t.tribe.handleRemoveTask(msg)
default:
logger.WithField("_block", "NotifyMsg").Errorln("NodeMeta called")
return
}

if rebroadcast {
newBuf := make([]byte, len(buf))
copy(newBuf, buf)
t.tribe.broadcasts.QueueBroadcast(&broadcast{
msg: newBuf,
notify: nil,
})
}
}

func (t *delegate) GetBroadcasts(overhead, limit int) [][]byte {
return t.tribe.broadcasts.GetBroadcasts(overhead, limit)
}

func (t *delegate) LocalState(join bool) []byte {
t.tribe.mutex.Lock()
defer t.tribe.mutex.Unlock()

// TODO the sizes here need to be set with a flag that is also ref in tribe.go
pluginMsgs := make([]*pluginMsg, 512)
agreementMsgs := make([]*agreementMsg, 512)
taskMsgs := make([]*taskMsg, 512)
pluginIntentMsgs := make([]*pluginMsg, 512)
agreementIntentMsgs := make([]*agreementMsg, 512)
taskIntentMsgs := make([]*taskMsg, 512)

for idx, msg := range t.tribe.msgBuffer {
if msg == nil {
continue
}
switch msg.GetType() {
case addPluginMsgType:
pluginMsgs[idx] = msg.(*pluginMsg)
case removePluginMsgType:
pluginMsgs[idx] = msg.(*pluginMsg)
case addAgreementMsgType:
agreementMsgs[idx] = msg.(*agreementMsg)
case removeAgreementMsgType:
agreementMsgs[idx] = msg.(*agreementMsg)
case joinAgreementMsgType:
agreementMsgs[idx] = msg.(*agreementMsg)
case leaveAgreementMsgType:
agreementMsgs[idx] = msg.(*agreementMsg)
case addTaskMsgType:
taskMsgs[idx] = msg.(*taskMsg)
case removeTaskMsgType:
taskMsgs[idx] = msg.(*taskMsg)
}
}

for idx, msg := range t.tribe.intentBuffer {
if msg == nil {
continue
}
switch msg.GetType() {
case addPluginMsgType:
pluginIntentMsgs[idx] = msg.(*pluginMsg)
case removePluginMsgType:
pluginIntentMsgs[idx] = msg.(*pluginMsg)
case addAgreementMsgType:
agreementIntentMsgs[idx] = msg.(*agreementMsg)
case removeAgreementMsgType:
agreementIntentMsgs[idx] = msg.(*agreementMsg)
case joinAgreementMsgType:
agreementIntentMsgs[idx] = msg.(*agreementMsg)
case leaveAgreementMsgType:
agreementIntentMsgs[idx] = msg.(*agreementMsg)
case addTaskMsgType:
taskIntentMsgs[idx] = msg.(*taskMsg)
case removeTaskMsgType:
taskIntentMsgs[idx] = msg.(*taskMsg)
}
}

fs := fullStateMsg{
LTime: t.tribe.clock.Time(),
PluginMsgs: pluginMsgs,
AgreementMsgs: agreementMsgs,
TaskMsgs: taskMsgs,
PluginIntentMsgs: pluginIntentMsgs,
AgreementIntentMsgs: agreementIntentMsgs,
TaskIntentMsgs: taskIntentMsgs,
Agreements: t.tribe.agreements,
Members: t.tribe.members,
}

// for name, agreements := range t.tribe.agreements {
// agreements.PluginAgreement.mutex.Lock()
// fs.Agreements[name] = agreements
// agreements.PluginAgreement.mutex.Unlock()
// }

buf, err := encodeMessage(fullStateMsgType, fs)
if err != nil {
panic(err)
}

return buf
}

func (t *delegate) MergeRemoteState(buf []byte, join bool) {
logger.WithField("_block", "MergeRemoteState").Debugln("calling merge")

if msgType(buf[0]) != fullStateMsgType {
logger.Errorln("Unknown message type")
return
}

fs := &fullStateMsg{}
if err := decodeMessage(buf[1:], fs); err != nil {
panic(err)
}

if t.tribe.clock.Time() > fs.LTime {
//we are ahead return now
return
}

logger.Debugln("Updating full state")
t.tribe.mutex.Lock()
defer t.tribe.mutex.Unlock()
if join {
t.tribe.clock.Update(fs.LTime - 1)
t.tribe.agreements = fs.Agreements
// TODO investigate this more ..jc
for k, v := range fs.Members {
t.tribe.members[k] = v
}
// t.tribe.members = fs.Members
for idx, pluginMsg := range fs.PluginMsgs {
if pluginMsg == nil {
continue
}
t.tribe.msgBuffer[idx] = pluginMsg
}
for idx, agreementMsg := range fs.AgreementMsgs {
if agreementMsg == nil {
continue
}
t.tribe.msgBuffer[idx] = agreementMsg
}
for idx, taskMsg := range fs.TaskMsgs {
if taskMsg == nil {
continue
}
t.tribe.msgBuffer[idx] = taskMsg
}
for idx, pluginMsg := range fs.PluginIntentMsgs {
if pluginMsg == nil {
continue
}
t.tribe.intentBuffer[idx] = pluginMsg
}
for idx, agreementMsg := range fs.AgreementIntentMsgs {
if agreementMsg == nil {
continue
}
t.tribe.intentBuffer[idx] = agreementMsg
}
for idx, taskMsg := range fs.TaskIntentMsgs {
if taskMsg == nil {
continue
}
t.tribe.intentBuffer[idx] = taskMsg
}
} else {
for _, m := range fs.PluginMsgs {
if m == nil {
continue
}
if m.GetType() == addPluginMsgType {
t.tribe.handleAddPlugin(m)
}
if m.GetType() == removePluginMsgType {
t.tribe.handleRemovePlugin(m)
}
}
for _, m := range fs.AgreementMsgs {
if m == nil {
continue
}
if m.GetType() == addAgreementMsgType {
t.tribe.handleAddAgreement(m)
}
if m.GetType() == removeAgreementMsgType {
t.tribe.handleRemoveAgreement(m)
}
if m.GetType() == joinAgreementMsgType {
t.tribe.handleJoinAgreement(m)
}
if m.GetType() == leaveAgreementMsgType {
t.tribe.handleLeaveAgreement(m)
}
}
for _, m := range fs.TaskMsgs {
if m == nil {
continue
}
if m.GetType() == addTaskMsgType {
t.tribe.handleAddTask(m)
}
if m.GetType() == removeTaskMsgType {
t.tribe.handleRemoveTask(m)
}
}
}

}
Loading

0 comments on commit 315901a

Please sign in to comment.