diff --git a/agreementbot/agreementworker.go b/agreementbot/agreementworker.go index 10bb64c6e..90312544f 100644 --- a/agreementbot/agreementworker.go +++ b/agreementbot/agreementworker.go @@ -1084,60 +1084,7 @@ func (b *BaseAgreementWorker) HandleAgreementReply(cph ConsumerProtocolHandler, // Only non-pattern based agreements can use MMS object policy. if agreement.GetDeviceType() == persistence.DEVICE_TYPE_DEVICE { if b.GetCSSURL() != "" && agreement.Pattern == "" { - - // Retrieve the node policy. - nodePolicyHandler := exchange.GetHTTPNodePolicyHandler(b) - msgPrinter := i18n.GetMessagePrinter() - _, nodePolicy, err := compcheck.GetNodePolicy(nodePolicyHandler, agreement.DeviceId, msgPrinter) - if err != nil { - glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("%v", err))) - } else if nodePolicy == nil { - glog.Warning(BAWlogstring(workerId, fmt.Sprintf("cannot find node policy for this node %v.", agreement.DeviceId))) - } else { - if glog.V(5) { - glog.Infof(BAWlogstring(workerId, fmt.Sprintf("retrieved node policy: %v", nodePolicy))) - } - } - - // Query the MMS cache to find objects with policies that refer to the agreed-to service(s). Service IDs are - // a concatenation of org '/' service name, hardware architecture and version, separated by underscores. We need - // all 3 pieces. - for _, serviceId := range agreement.ServiceId { - - // Array to contain the decomposed pieces from the agreementServiceID - serviceNamePieces := make([]string, 3) - - // Break the service id into the individual tuple pieces, service name (which includes org), arch and version. - // The id will be in the format of name_version_arch (ie. hello-world_1.0.0_am64) - // We can't just split the agreementServiceID with "_" since the name can contain "_" characters too. - // Instead, look at the last index and grab the arch first, then the version, and then the name - tmpServiceId := serviceId - for i := 2; i > 0; i-- { - idx := strings.LastIndex(tmpServiceId, "_") - serviceNamePieces[i] = tmpServiceId[idx+1:] - // Strip off the last _ found - tmpServiceId = tmpServiceId[0:idx] - } - // What remains is the service name - serviceNamePieces[0] = tmpServiceId - - objPolicies := b.mmsObjMgr.GetObjectPolicies(agreement.Org, serviceNamePieces[0], serviceNamePieces[2], serviceNamePieces[1]) - - destsToAddMap := make(map[string]*exchange.ObjectDestinationsToAdd, 0) - destsToDeleteMap := make(map[string]*exchange.ObjectDestinationsToDelete, 0) - - if err := AssignObjectToNodes(b, objPolicies, agreement.DeviceId, nodePolicy, destsToAddMap, destsToDeleteMap, nil, false); err != nil { - glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("unable to assign object(s) to node %v, error %v", agreement.DeviceId, err))) - } - - if len(destsToAddMap) > 0 { - AddDestinationsForObjects(b, destsToAddMap) - } - if len(destsToDeleteMap) > 0 { - DeleteDestinationsForObjects(b, destsToDeleteMap) - } - - } + AgreementHandleMMSObjectPolicy(b, b.mmsObjMgr, *agreement, workerId, BAWlogstring) } else if b.GetCSSURL() == "" { glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("unable to evaluate object placement because there is no CSS URL configured in this agbot"))) } @@ -1524,3 +1471,59 @@ func GetHAPartners(nodeID string, haGroupName string, httpClient *http.Client, u return haPartners, nil } + +func AgreementHandleMMSObjectPolicy(ec exchange.ExchangeContext, mmsObjMgr *MMSObjectPolicyManager, agreement persistence.Agreement, workerId string, logFunction func(string, interface{}) string) { + // Retrieve the node policy. + nodePolicyHandler := exchange.GetHTTPNodePolicyHandler(ec) + msgPrinter := i18n.GetMessagePrinter() + _, nodePolicy, err := compcheck.GetNodePolicy(nodePolicyHandler, agreement.DeviceId, msgPrinter) + if err != nil { + glog.Errorf(logFunction(workerId, fmt.Sprintf("%v", err))) + } else if nodePolicy == nil { + glog.Warning(logFunction(workerId, fmt.Sprintf("cannot find node policy for this node %v.", agreement.DeviceId))) + } else { + if glog.V(5) { + glog.Infof(logFunction(workerId, fmt.Sprintf("retrieved node policy: %v", nodePolicy))) + } + } + + // Query the MMS cache to find objects with policies that refer to the agreed-to service(s). Service IDs are + // a concatenation of org '/' service name, hardware architecture and version, separated by underscores. We need + // all 3 pieces. + for _, serviceId := range agreement.ServiceId { + + // Array to contain the decomposed pieces from the agreementServiceID + serviceNamePieces := make([]string, 3) + + // Break the service id into the individual tuple pieces, service name (which includes org), arch and version. + // The id will be in the format of name_version_arch (ie. hello-world_1.0.0_am64) + // We can't just split the agreementServiceID with "_" since the name can contain "_" characters too. + // Instead, look at the last index and grab the arch first, then the version, and then the name + tmpServiceId := serviceId + for i := 2; i > 0; i-- { + idx := strings.LastIndex(tmpServiceId, "_") + serviceNamePieces[i] = tmpServiceId[idx+1:] + // Strip off the last _ found + tmpServiceId = tmpServiceId[0:idx] + } + // What remains is the service name + serviceNamePieces[0] = tmpServiceId + + objPolicies := mmsObjMgr.GetObjectPolicies(agreement.Org, serviceNamePieces[0], serviceNamePieces[2], serviceNamePieces[1]) + + destsToAddMap := make(map[string]*exchange.ObjectDestinationsToAdd, 0) + destsToDeleteMap := make(map[string]*exchange.ObjectDestinationsToDelete, 0) + + if err := AssignObjectToNodes(ec, objPolicies, agreement.DeviceId, nodePolicy, destsToAddMap, destsToDeleteMap, nil, false); err != nil { + glog.Errorf(logFunction(workerId, fmt.Sprintf("unable to assign object(s) to node %v, error %v", agreement.DeviceId, err))) + } + + if len(destsToAddMap) > 0 { + AddDestinationsForObjects(ec, destsToAddMap) + } + if len(destsToDeleteMap) > 0 { + DeleteDestinationsForObjects(ec, destsToDeleteMap) + } + + } +} diff --git a/agreementbot/consumer_protocol_handler.go b/agreementbot/consumer_protocol_handler.go index b092ab5a6..2d762cdd6 100644 --- a/agreementbot/consumer_protocol_handler.go +++ b/agreementbot/consumer_protocol_handler.go @@ -620,6 +620,7 @@ func (b *BaseConsumerProtocolHandler) HandleServicePolicyChanged(cmd *ServicePol } func (b *BaseConsumerProtocolHandler) HandleNodePolicyChanged(cmd *NodePolicyChangedCommand, cph ConsumerProtocolHandler) { + // TODO: Lily check here for handle node policy change. Need to get model policy re-evaluated if glog.V(5) { glog.Infof(BCPHlogstring(b.Name(), "recieved node policy change command.")) } @@ -636,6 +637,9 @@ func (b *BaseConsumerProtocolHandler) HandleNodePolicyChanged(cmd *NodePolicyCha if !agStillValid { glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a node policy %v that has changed.", ag.CurrentAgreementId, ag.ServiceId))) b.CancelAgreement(ag, TERM_REASON_POLICY_CHANGED, cph, policyMatches) + } else { + // If the agreement is still valid, then handlePolicyChangeFor MMS object + b.HandlePolicyChangeForMMSObject(ag, cph) } } } @@ -685,6 +689,23 @@ func (b *BaseConsumerProtocolHandler) HandleMMSObjectPolicy(cmd *MMSObjectPolicy } } +// HandlePolicyChangeForMMSObject need to: +// 1. for each service in agreement, grab object policy using service info +// 2. AssignObjectToNode func (nodePlicy, objectPolicy ...), then add/delete destination +func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForMMSObject(agreement persistence.Agreement, cph ConsumerProtocolHandler) { + if glog.V(5) { + glog.Infof(BCPHlogstring(b.Name(), "handle node policy change for MMS object.")) + } + + if agreement.GetDeviceType() == persistence.DEVICE_TYPE_DEVICE { + if b.GetCSSURL() != "" && agreement.Pattern == "" { + AgreementHandleMMSObjectPolicy(b, b.mmsObjMgr, agreement, b.Name(), BCPHlogstring) + } else if b.GetCSSURL() == "" { + glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("unable to re-evaluate object placement because there is no CSS URL configured in this agbot"))) + } + } +} + // Note: Multiple agbot could call this function at the same time (for different agreement). // // Table workloadusage is partitioned. So one agbot could only see the workloadusage in