Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds agreements for tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
jcooklin committed Sep 4, 2015
1 parent 5084b1e commit 55c64b9
Show file tree
Hide file tree
Showing 4 changed files with 797 additions and 206 deletions.
188 changes: 165 additions & 23 deletions mgmt/tribe/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ func (t *delegate) NotifyMsg(buf []byte) {

switch msgType(buf[0]) {
case addPluginMsgType:
msg := &tribeMsg{}
msg := &pluginMsg{}
if err := decodeMessage(buf[1:], msg); err != nil {
panic(err)
}
rebroadcast = t.tribe.handleAddPlugin(msg)
case removePluginMsgType:
msg := &tribeMsg{}
msg := &pluginMsg{}
if err := decodeMessage(buf[1:], msg); err != nil {
panic(err)
}
Expand Down Expand Up @@ -54,6 +54,18 @@ func (t *delegate) NotifyMsg(buf []byte) {
panic(err)
}
rebroadcast = t.tribe.handleLeaveAgreement(msg)
case addTaskMsgType:
msg := &taskMsg{}
if err := decodeMessage(buf[1:], msg); err != nil {
panic(err)
}
rebroadcast = t.tribe.handleAddTask(msg)
case removeTaskMsgType:
msg := &taskMsg{}
if err := decodeMessage(buf[1:], msg); err != nil {
panic(err)
}
rebroadcast = t.tribe.handleRemoveTask(msg)
default:
logger.WithField("_block", "NotifyMsg").Errorln("NodeMeta called")
return
Expand All @@ -77,18 +89,80 @@ func (t *delegate) LocalState(join bool) []byte {
t.tribe.mutex.Lock()
defer t.tribe.mutex.Unlock()

fs := fullStateMsg{
LTime: t.tribe.clock.Time(),
PluginMsgs: t.tribe.msgBuffer,
Agreements: map[string]*agreements{},
// TODO the sizes here need to be set with a flag that is also ref in tribe.go
pluginMsgs := make([]*pluginMsg, 512)
agreementMsgs := make([]*agreementMsg, 512)
taskMsgs := make([]*taskMsg, 512)
pluginIntentMsgs := make([]*pluginMsg, 512)
agreementIntentMsgs := make([]*agreementMsg, 512)
taskIntentMsgs := make([]*taskMsg, 512)

for idx, msg := range t.tribe.msgBuffer {
if msg == nil {
continue
}
switch msg.GetType() {
case addPluginMsgType:
pluginMsgs[idx] = msg.(*pluginMsg)
case removePluginMsgType:
pluginMsgs[idx] = msg.(*pluginMsg)
case addAgreementMsgType:
agreementMsgs[idx] = msg.(*agreementMsg)
case removeAgreementMsgType:
agreementMsgs[idx] = msg.(*agreementMsg)
case joinAgreementMsgType:
agreementMsgs[idx] = msg.(*agreementMsg)
case leaveAgreementMsgType:
agreementMsgs[idx] = msg.(*agreementMsg)
case addTaskMsgType:
taskMsgs[idx] = msg.(*taskMsg)
case removeTaskMsgType:
taskMsgs[idx] = msg.(*taskMsg)
}
}

for name, agreements := range t.tribe.agreements {
agreements.PluginAgreement.mutex.Lock()
fs.Agreements[name] = agreements
agreements.PluginAgreement.mutex.Unlock()
for idx, msg := range t.tribe.intentBuffer {
if msg == nil {
continue
}
switch msg.GetType() {
case addPluginMsgType:
pluginIntentMsgs[idx] = msg.(*pluginMsg)
case removePluginMsgType:
pluginIntentMsgs[idx] = msg.(*pluginMsg)
case addAgreementMsgType:
agreementIntentMsgs[idx] = msg.(*agreementMsg)
case removeAgreementMsgType:
agreementIntentMsgs[idx] = msg.(*agreementMsg)
case joinAgreementMsgType:
agreementIntentMsgs[idx] = msg.(*agreementMsg)
case leaveAgreementMsgType:
agreementIntentMsgs[idx] = msg.(*agreementMsg)
case addTaskMsgType:
taskIntentMsgs[idx] = msg.(*taskMsg)
case removeTaskMsgType:
taskIntentMsgs[idx] = msg.(*taskMsg)
}
}

fs := fullStateMsg{
LTime: t.tribe.clock.Time(),
PluginMsgs: pluginMsgs,
AgreementMsgs: agreementMsgs,
TaskMsgs: taskMsgs,
PluginIntentMsgs: pluginIntentMsgs,
AgreementIntentMsgs: agreementIntentMsgs,
TaskIntentMsgs: taskIntentMsgs,
Agreements: t.tribe.agreements,
Members: t.tribe.members,
}

// for name, agreements := range t.tribe.agreements {
// agreements.PluginAgreement.mutex.Lock()
// fs.Agreements[name] = agreements
// agreements.PluginAgreement.mutex.Unlock()
// }

buf, err := encodeMessage(fullStateMsgType, fs)
if err != nil {
panic(err)
Expand All @@ -98,11 +172,10 @@ func (t *delegate) LocalState(join bool) []byte {
}

func (t *delegate) MergeRemoteState(buf []byte, join bool) {
logger = logger.WithField("_block", "MergeRemoteState")
logger.Debugln("calling merge")
logger.WithField("_block", "MergeRemoteState").Debugln("calling merge")

if msgType(buf[0]) != fullStateMsgType {
logger.Errorln("NodeMeta called")
logger.Errorln("Unknown message type")
return
}

Expand All @@ -117,21 +190,90 @@ func (t *delegate) MergeRemoteState(buf []byte, join bool) {
}

logger.Debugln("Updating full state")
t.tribe.mutex.Lock()
defer t.tribe.mutex.Unlock()
if join {
t.tribe.mutex.Lock()
t.tribe.agreements = fs.Agreements
t.tribe.msgBuffer = fs.PluginMsgs
t.tribe.clock.Update(fs.LTime - 1)
t.tribe.mutex.Unlock()
//todo what about the intents???
t.tribe.agreements = fs.Agreements
// TODO investigate this more ..jc
for k, v := range fs.Members {
t.tribe.members[k] = v
}
// t.tribe.members = fs.Members
for idx, pluginMsg := range fs.PluginMsgs {
if pluginMsg == nil {
continue
}
t.tribe.msgBuffer[idx] = pluginMsg
}
for idx, agreementMsg := range fs.AgreementMsgs {
if agreementMsg == nil {
continue
}
t.tribe.msgBuffer[idx] = agreementMsg
}
for idx, taskMsg := range fs.TaskMsgs {
if taskMsg == nil {
continue
}
t.tribe.msgBuffer[idx] = taskMsg
}
for idx, pluginMsg := range fs.PluginIntentMsgs {
if pluginMsg == nil {
continue
}
t.tribe.intentBuffer[idx] = pluginMsg
}
for idx, agreementMsg := range fs.AgreementIntentMsgs {
if agreementMsg == nil {
continue
}
t.tribe.intentBuffer[idx] = agreementMsg
}
for idx, taskMsg := range fs.TaskIntentMsgs {
if taskMsg == nil {
continue
}
t.tribe.intentBuffer[idx] = taskMsg
}
} else {
//Process Plugin adds
for _, m := range fs.PluginMsgs {
// if m == nil {
// continue
// }
if m == nil {
continue
}
if m.GetType() == addPluginMsgType {
t.tribe.handleAddPlugin(m.(*tribeMsg))
t.tribe.handleAddPlugin(m)
}
if m.GetType() == removePluginMsgType {
t.tribe.handleRemovePlugin(m)
}
}
for _, m := range fs.AgreementMsgs {
if m == nil {
continue
}
if m.GetType() == addAgreementMsgType {
t.tribe.handleAddAgreement(m)
}
if m.GetType() == removeAgreementMsgType {
t.tribe.handleRemoveAgreement(m)
}
if m.GetType() == joinAgreementMsgType {
t.tribe.handleJoinAgreement(m)
}
if m.GetType() == leaveAgreementMsgType {
t.tribe.handleLeaveAgreement(m)
}
}
for _, m := range fs.TaskMsgs {
if m == nil {
continue
}
if m.GetType() == addTaskMsgType {
t.tribe.handleAddTask(m)
}
if m.GetType() == removeTaskMsgType {
t.tribe.handleRemoveTask(m)
}
}
}
Expand Down
66 changes: 59 additions & 7 deletions mgmt/tribe/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tribe

import (
"bytes"
"fmt"

"github.com/hashicorp/go-msgpack/codec"
)
Expand All @@ -16,6 +17,8 @@ const (
fullStateMsgType
joinAgreementMsgType
leaveAgreementMsgType
addTaskMsgType
removeTaskMsgType
)

var msgTypes = []string{
Expand All @@ -26,6 +29,8 @@ var msgTypes = []string{
"Full state",
"Join agreement",
"Leave agreement",
"Add task",
"Remove task",
}

func (m msgType) String() string {
Expand All @@ -37,32 +42,38 @@ type msg interface {
Time() LTime
GetType() msgType //TODO rename to Type
Agreement() string
String() string
}

type tribeMsg struct {
type pluginMsg struct {
LTime LTime
Plugin plugin
UUID string
AgreementName string
Type msgType
}

func (t *tribeMsg) ID() string {
func (t *pluginMsg) ID() string {
return t.UUID
}

func (t *tribeMsg) Time() LTime {
func (t *pluginMsg) Time() LTime {
return t.LTime
}

func (t *tribeMsg) GetType() msgType {
func (t *pluginMsg) GetType() msgType {
return t.Type
}

func (t *tribeMsg) Agreement() string {
func (t *pluginMsg) Agreement() string {
return t.AgreementName
}

func (t *pluginMsg) String() string {
return fmt.Sprintf("msg type='%v' agreementName='%v' uuid='%v' plugin='%v'",
t.GetType(), t.Agreement(), t.ID(), t.Plugin)
}

type agreementMsg struct {
LTime LTime
UUID string
Expand All @@ -87,10 +98,51 @@ func (a *agreementMsg) Agreement() string {
return a.AgreementName
}

func (a *agreementMsg) String() string {
return fmt.Sprintf("msg type='%v' agreementName='%v' uuid='%v' member='%v'",
a.GetType(), a.Agreement(), a.ID(), a.MemberName)
}

type taskMsg struct {
LTime LTime
UUID string
TaskID string
AgreementName string
Type msgType
}

func (t *taskMsg) ID() string {
return t.UUID
}

func (t *taskMsg) Time() LTime {
return t.LTime
}

func (t *taskMsg) GetType() msgType {
return t.Type
}

func (t *taskMsg) Agreement() string {
return t.AgreementName
}

func (t *taskMsg) String() string {
return fmt.Sprintf("msg type='%v' agreementName='%v' uuid='%v' task='%v'",
t.GetType(), t.Agreement(), t.ID(), t.TaskID)
}

type fullStateMsg struct {
LTime LTime
PluginMsgs []msg //TODO rename Msgs
LTime LTime
PluginMsgs []*pluginMsg
AgreementMsgs []*agreementMsg
TaskMsgs []*taskMsg
PluginIntentMsgs []*pluginMsg
AgreementIntentMsgs []*agreementMsg
TaskIntentMsgs []*taskMsg

Agreements map[string]*agreements
Members map[string]*member
}

func decodeMessage(buf []byte, out interface{}) error {
Expand Down
Loading

0 comments on commit 55c64b9

Please sign in to comment.