Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds global tribe membership and plugin agreements
Browse files Browse the repository at this point in the history
  • Loading branch information
jcooklin committed Sep 3, 2015
1 parent 1d138b9 commit 5084b1e
Show file tree
Hide file tree
Showing 10 changed files with 1,738 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ build/
*.swp
profile.cov
gin-bin
tags

# we don't vendor godep _workspace
**/Godeps/_workspace/**
Expand Down
10 changes: 9 additions & 1 deletion Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions mgmt/tribe/broadcast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package tribe

import "github.com/hashicorp/memberlist"

// broadcast implements memberlist.Broadcast
type broadcast struct {
msg []byte
notify chan<- struct{}
}

func (b *broadcast) Invalidates(other memberlist.Broadcast) bool {
return false
}

func (b *broadcast) Message() []byte {
return b.msg
}

func (b *broadcast) Finished() {
if b.notify != nil {
close(b.notify)
}
}
35 changes: 35 additions & 0 deletions mgmt/tribe/clock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package tribe

import "sync/atomic"

type LClock struct {
Inc uint64
}

type LTime uint64

//Update
func (l *LClock) Update(lt LTime) {
for {
cur := LTime(atomic.LoadUint64(&l.Inc))
// If we are newer return now
if lt < cur {
return
}
// If we CAS successfully return else our counter changed
// and we need to try again
if atomic.CompareAndSwapUint64(&l.Inc, uint64(cur), uint64(lt)+1) {
return
}
}
}

// Increment
func (l *LClock) Increment() LTime {
return LTime(atomic.AddUint64(&l.Inc, 1))
}

// Time returns the current value of the clock
func (l *LClock) Time() LTime {
return LTime(atomic.LoadUint64(&l.Inc))
}
139 changes: 139 additions & 0 deletions mgmt/tribe/delegate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package tribe

type delegate struct {
tribe *tribe
}

func (t *delegate) NodeMeta(limit int) []byte {
logger.WithField("_block", "NodeMeta").Debugln("NodeMeta called")
//consider using NodeMeta to store declared, inferred and derived facts
return []byte{}
}

func (t *delegate) NotifyMsg(buf []byte) {
if len(buf) == 0 {
return
}

var rebroadcast = true

switch msgType(buf[0]) {
case addPluginMsgType:
msg := &tribeMsg{}
if err := decodeMessage(buf[1:], msg); err != nil {
panic(err)
}
rebroadcast = t.tribe.handleAddPlugin(msg)
case removePluginMsgType:
msg := &tribeMsg{}
if err := decodeMessage(buf[1:], msg); err != nil {
panic(err)
}
rebroadcast = t.tribe.handleRemovePlugin(msg)
case addAgreementMsgType:
msg := &agreementMsg{}
if err := decodeMessage(buf[1:], msg); err != nil {
panic(err)
}
rebroadcast = t.tribe.handleAddAgreement(msg)
case removeAgreementMsgType:
msg := &agreementMsg{}
if err := decodeMessage(buf[1:], msg); err != nil {
panic(err)
}
rebroadcast = t.tribe.handleRemoveAgreement(msg)
case joinAgreementMsgType:
msg := &agreementMsg{}
if err := decodeMessage(buf[1:], msg); err != nil {
panic(err)
}
rebroadcast = t.tribe.handleJoinAgreement(msg)
case leaveAgreementMsgType:
msg := &agreementMsg{}
if err := decodeMessage(buf[1:], msg); err != nil {
panic(err)
}
rebroadcast = t.tribe.handleLeaveAgreement(msg)
default:
logger.WithField("_block", "NotifyMsg").Errorln("NodeMeta called")
return
}

if rebroadcast {
newBuf := make([]byte, len(buf))
copy(newBuf, buf)
t.tribe.broadcasts.QueueBroadcast(&broadcast{
msg: newBuf,
notify: nil,
})
}
}

func (t *delegate) GetBroadcasts(overhead, limit int) [][]byte {
return t.tribe.broadcasts.GetBroadcasts(overhead, limit)
}

func (t *delegate) LocalState(join bool) []byte {
t.tribe.mutex.Lock()
defer t.tribe.mutex.Unlock()

fs := fullStateMsg{
LTime: t.tribe.clock.Time(),
PluginMsgs: t.tribe.msgBuffer,
Agreements: map[string]*agreements{},
}

for name, agreements := range t.tribe.agreements {
agreements.PluginAgreement.mutex.Lock()
fs.Agreements[name] = agreements
agreements.PluginAgreement.mutex.Unlock()
}

buf, err := encodeMessage(fullStateMsgType, fs)
if err != nil {
panic(err)
}

return buf
}

func (t *delegate) MergeRemoteState(buf []byte, join bool) {
logger = logger.WithField("_block", "MergeRemoteState")
logger.Debugln("calling merge")

if msgType(buf[0]) != fullStateMsgType {
logger.Errorln("NodeMeta called")
return
}

fs := &fullStateMsg{}
if err := decodeMessage(buf[1:], fs); err != nil {
panic(err)
}

if t.tribe.clock.Time() > fs.LTime {
//we are ahead return now
return
}

logger.Debugln("Updating full state")
if join {
t.tribe.mutex.Lock()
t.tribe.agreements = fs.Agreements
t.tribe.msgBuffer = fs.PluginMsgs
t.tribe.clock.Update(fs.LTime - 1)
t.tribe.mutex.Unlock()
//todo what about the intents???
} else {
//Process Plugin adds
for _, m := range fs.PluginMsgs {
// if m == nil {
// continue
// }
if m.GetType() == addPluginMsgType {
t.tribe.handleAddPlugin(m.(*tribeMsg))
}
}
}

}
52 changes: 52 additions & 0 deletions mgmt/tribe/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package tribe

import (
"math/rand"
"os"

"github.com/codegangsta/cli"
)

const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

var (
flTribeNodeName = cli.StringFlag{
Name: "tribe-node-name",
Usage: "Name of this node in tribe cluster (default: hostname)",
EnvVar: "PULSE_TRIBE_NODE_NAME",
Value: getHostname(),
}

flTribeSeed = cli.StringFlag{
Name: "tribe",
Usage: `IP or resolvable hostname of a seed node to join.
The default empty value assumes this is the first node in a cluster.`,
EnvVar: "PULSE_TRIBE_SEED",
Value: "",
}

flTribeAdvertisePort = cli.IntFlag{
Name: "tribe-port",
Usage: "Port tribe gossips over to maintain membership",
EnvVar: "PULSE_TRIBE_PORT",
Value: 6000,
}

Flags = []cli.Flag{flTribeNodeName, flTribeSeed, flTribeAdvertisePort}
)

func getHostname() string {
hostname, err := os.Hostname()
if err != nil {
return randHostname(8)
}
return hostname
}

func randHostname(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}
return string(b)
}
21 changes: 21 additions & 0 deletions mgmt/tribe/member_delegate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package tribe

import (
"github.com/hashicorp/memberlist"
)

type memberDelegate struct {
tribe *tribe
}

func (m *memberDelegate) NotifyJoin(n *memberlist.Node) {
m.tribe.handleMemberJoin(n)
}

func (m *memberDelegate) NotifyLeave(n *memberlist.Node) {
m.tribe.handleMemberLeave(n)
}

func (m *memberDelegate) NotifyUpdate(n *memberlist.Node) {
m.tribe.handleMemberUpdate(n)
}
Loading

0 comments on commit 5084b1e

Please sign in to comment.