Skip to content

Commit

Permalink
Issue open-horizon#4166 - Agbot should avoid sending agreement update…
Browse files Browse the repository at this point in the history
… message if business policy is unchanged

Signed-off-by: Le Zhang <zhangl@us.ibm.com>
  • Loading branch information
LiilyZhang committed Dec 13, 2024
1 parent 2a70088 commit bd01dc4
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 42 deletions.
31 changes: 25 additions & 6 deletions agreementbot/consumer_protocol_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,18 +392,29 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("attempting to update agreement %v due to change in policy", ag.CurrentAgreementId)))
}

msgPrinter := i18n.GetMessagePrinter()

svcAllPol := externalpolicy.ExternalPolicy{}
svcPolicyHandler := exchange.GetHTTPServicePolicyHandler(b)
svcResolveHandler := exchange.GetHTTPServiceDefResolverHandler(b)

for _, svcId := range ag.ServiceId {
if svcPol, err := exchange.GetServicePolicyWithId(b, svcId); err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get service policy for %v from the exchange: %v", svcId, err)))
if svcDef, err := exchange.GetServiceWithId(b, svcId); err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get service %v, error: %v", svcId, err)))
return false, false, false
} else if svcPol != nil {
svcAllPol.MergeWith(&svcPol.ExternalPolicy, false)
} else if svcDef != nil {
if mergedSvcPol, _, _, _, _, err := compcheck.GetServicePolicyWithDefaultProperties(svcPolicyHandler, svcResolveHandler, svcDef.URL, exchange.GetOrg(svcId), svcDef.Version, svcDef.Arch, msgPrinter); err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get merged service policy for %v, error: %v", svcId, err)))
return false, false, false
} else if mergedSvcPol != nil {
svcAllPol.MergeWith(mergedSvcPol, false)
}
}
}

msgPrinter := i18n.GetMessagePrinter()
if glog.V(5) {
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("For agreement %v merged svc policy is %v", ag.CurrentAgreementId, svcAllPol)))
}

busPolHandler := exchange.GetHTTPBusinessPoliciesHandler(b)
_, busPol, err := compcheck.GetBusinessPolicy(busPolHandler, ag.PolicyName, true, msgPrinter)
Expand Down Expand Up @@ -510,7 +521,7 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
}
return true, true, false
}
// new cluster namespace is still compatible
// cluster namespace remains same
}
}

Expand All @@ -535,6 +546,13 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
}
}

if same, msg := consumerPol.IsSamePolicy(oldPolicy); same {
glog.V(3).Infof("business policy(producerPol) %v content remains same with old policy; no update to agreement %s", ag.PolicyName, ag.CurrentAgreementId)
return true, true, true
} else {
glog.V(3).Infof("business policy %v content is changed in agreement %v: %v", ag.PolicyName, ag.CurrentAgreementId, msg)
}

newTsCs, err := policy.Create_Terms_And_Conditions(producerPol, consumerPol, wl, ag.CurrentAgreementId, b.config.AgreementBot.DefaultWorkloadPW, b.config.AgreementBot.NoDataIntervalS, basicprotocol.PROTOCOL_CURRENT_VERSION)
if err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error creating new terms and conditions: %v", err)))
Expand All @@ -543,6 +561,7 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste

ag.LastPolicyUpdateTime = uint64(time.Now().Unix())

// this function will send out "basicagreementupdate"
b.UpdateAgreement(&ag, basicprotocol.MsgUpdateTypePolicyChange, newTsCs, cph)

return true, true, true
Expand Down
2 changes: 1 addition & 1 deletion compcheck/comp_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -1132,7 +1132,7 @@ func GetServiceAndDeps(svcUrl, svcOrg, svcVersion, svcArch string,
// not found, get it and dependents from the exchange
_, depSvcs, exchTopSvc, topId, err = getServiceResolvedDef(svcUrl, svcOrg, svcVersion, svcArch)
if err != nil {
return nil, "", nil, NewCompCheckError(fmt.Errorf(msgPrinter.Sprintf("Failed to find definition for dependent services of %s. Compatability of %s cannot be fully evaluated until all services are in the Exchange.", topId, externalpolicy.PROP_NODE_PRIVILEGED)), COMPCHECK_EXCHANGE_ERROR)
return nil, "", nil, NewCompCheckError(fmt.Errorf(msgPrinter.Sprintf("Failed to find definition for dependent services of %s (%s/%s/%s/%s), error: %v. Compatability of %s cannot be fully evaluated until all services are in the Exchange.", topId, svcUrl, svcOrg, svcVersion, svcArch, err, externalpolicy.PROP_NODE_PRIVILEGED)), COMPCHECK_EXCHANGE_ERROR)
}
topSvc = &ServiceDefinition{exchange.GetOrg(topId), *exchTopSvc}
}
Expand Down
54 changes: 52 additions & 2 deletions exchange/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,11 +687,15 @@ func ServiceDefResolver(wURL string, wOrg string, wVersion string, wArch string,
// Make sure the required service has the same arch as the service.
// Convert version to a version range expression (if it's not already an expression) so that the underlying GetService
// will return us something in the range required by the service.
serviceVersion := sDep.Version
if serviceVersion == "" {
serviceVersion = sDep.VersionRange
}
var serviceDef *ServiceDefinition
if sDep.Arch != wArch {
return nil, nil, nil, "", errors.New(fmt.Sprintf("service %v has a different architecture than the top level service.", sDep))
} else if vExp, err := semanticversion.Version_Expression_Factory(sDep.Version); err != nil {
return nil, nil, nil, "", errors.New(fmt.Sprintf("unable to create version expression from %v, error %v", sDep.Version, err))
} else if vExp, err := semanticversion.Version_Expression_Factory(serviceVersion); err != nil {
return nil, nil, nil, "", errors.New(fmt.Sprintf("unable to create version expression from version or version range %v, error %v", serviceVersion, err))
} else if apiSpecs, s_map, s_def, s_id, err := ServiceDefResolver(sDep.URL, sDep.Org, vExp.Get_expression(), sDep.Arch, serviceHandler); err != nil {
return nil, nil, nil, "", err
} else {
Expand Down Expand Up @@ -722,6 +726,52 @@ func ServiceDefResolver(wURL string, wOrg string, wVersion string, wArch string,
}
}

// Retrieve the service object from the exchange. The service_id is prefixed with the org name.
// It returns nil if there is no such service with given service_id. Service_id is in format: <org>/<Id>
func GetServiceWithId(ec ExchangeContext, service_id string) (*ServiceDefinition, error) {
glog.V(3).Infof(rpclogString(fmt.Sprintf("getting service policy for %v.", service_id)))

// Get the service object. There should only be 1.
var resp interface{}
resp = new(GetServicesResponse)
var svc ServiceDefinition

targetURL := fmt.Sprintf("%vorgs/%v/services/%v", ec.GetExchangeURL(), GetOrg(service_id), GetId(service_id))

retryCount := ec.GetHTTPFactory().RetryCount
retryInterval := ec.GetHTTPFactory().GetRetryInterval()
for {
if err, tpErr := InvokeExchange(ec.GetHTTPFactory().NewHTTPClient(nil), "GET", targetURL, ec.GetExchangeId(), ec.GetExchangeToken(), nil, &resp); err != nil {
glog.Errorf(rpclogString(fmt.Sprintf(err.Error())))
return nil, err
} else if tpErr != nil {
glog.Warningf(rpclogString(fmt.Sprintf(tpErr.Error())))
if ec.GetHTTPFactory().RetryCount == 0 {
time.Sleep(time.Duration(retryInterval) * time.Second)
continue
} else if retryCount == 0 {
return nil, fmt.Errorf("Exceeded %v retries for error: %v", ec.GetHTTPFactory().RetryCount, tpErr)
} else {
retryCount--
time.Sleep(time.Duration(retryInterval) * time.Second)
continue
}
} else {
glog.V(3).Infof(rpclogString(fmt.Sprintf("returning service for %v.", service_id)))
services := resp.(*GetServicesResponse)
if len(services.Services) == 1 {
var cachedSvcDefs map[string]ServiceDefinition
svc = services.Services[service_id]
updateServiceDefCache(services.Services, cachedSvcDefs, GetOrg(service_id), svc.URL, svc.Arch)
} else {
glog.V(3).Infof(rpclogString(fmt.Sprintf("service %v not found.", service_id)))
return nil, nil
}
return &svc, nil
}
}
}

// This function gets the image docker auths for a service.
func GetServiceDockerAuths(ec ExchangeContext, url string, org string, version string, arch string) ([]ImageDockerAuth, error) {

Expand Down
35 changes: 35 additions & 0 deletions policy/policy_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,41 @@ func (self *Policy) ShortString() string {
return res
}

func (self *Policy) IsSamePolicy(compare *Policy) (bool, string) {
misMatchString := ""
isSame := false
if compare == nil {
misMatchString = fmt.Sprintf("Nil policy to comapre with policy %v", self.Header)
} else if !self.Header.IsSame(compare.Header) {
misMatchString = fmt.Sprintf("Header %v mismatch with %v", self.Header, compare.Header)
} else if (len(compare.Workloads) == 0 || (len(compare.Workloads) != 0 && compare.Workloads[0].WorkloadURL == "")) && !self.APISpecs.IsSame(compare.APISpecs, true) {
misMatchString = fmt.Sprintf("API Spec %v mismatch with %v", self.APISpecs, compare.APISpecs)
} else if !self.AgreementProtocols.IsSame(compare.AgreementProtocols) {
misMatchString = fmt.Sprintf("AgreementProtocol %v mismatch with %v", self.AgreementProtocols, compare.AgreementProtocols)
} else if !self.IsSameWorkload(compare) {
misMatchString = fmt.Sprintf("Workload %v mismatch with %v", self.Workloads, compare.Workloads)
} else if !self.DataVerify.IsSame(compare.DataVerify) {
misMatchString = fmt.Sprintf("DataVerify %v mismatch with %v", self.DataVerify, compare.DataVerify)
} else if !self.Properties.IsSame(compare.Properties) {
misMatchString = fmt.Sprintf("Properties %v mismatch with %v", self.Properties, compare.Properties)
} else if !self.Constraints.IsSame(compare.Constraints) {
misMatchString = fmt.Sprintf("Constraints %v mismatch with %v", self.Constraints, compare.Constraints)
} else if self.RequiredWorkload != compare.RequiredWorkload {
misMatchString = fmt.Sprintf("RequiredWorkload %v mismatch with %v", self.RequiredWorkload, compare.RequiredWorkload)
} else if self.MaxAgreements != compare.MaxAgreements {
misMatchString = fmt.Sprintf("MaxAgreement %v mismatch with %v", self.MaxAgreements, compare.MaxAgreements)
} else if !UserInputArrayIsSame(self.UserInput, compare.UserInput) {
misMatchString = fmt.Sprintf("UserInput %v mismatch with %v", self.UserInput, compare.UserInput)
} else if !exchangecommon.SecretBindingIsSame(self.SecretBinding, compare.SecretBinding) {
misMatchString = fmt.Sprintf("SecretBinding %v mismatch with %v", self.SecretBinding, compare.SecretBinding)
} else {
isSame = true
}

return isSame, misMatchString

}

func (self *Policy) IsSameWorkload(compare *Policy) bool {
if len(self.Workloads) != len(compare.Workloads) {
return false
Expand Down
36 changes: 3 additions & 33 deletions policy/policy_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/golang/glog"
"github.com/open-horizon/anax/config"
"github.com/open-horizon/anax/cutil"
"github.com/open-horizon/anax/exchangecommon"
"sync"
)

Expand Down Expand Up @@ -267,42 +266,13 @@ func (self *PolicyManager) hasPolicy(org string, matchPolicy *Policy) (bool, err
return false, errors.New(fmt.Sprintf("organization %v not found", org))
}

var isSame bool
for _, pol := range orgArray {
if errString != "" {
glog.V(5).Infof("Policy Manager: Previous search loop returned: %v", errString)
}
if !pol.Header.IsSame(matchPolicy.Header) {
errString = fmt.Sprintf("Header %v mismatch with %v", pol.Header, matchPolicy.Header)
continue
} else if (len(matchPolicy.Workloads) == 0 || (len(matchPolicy.Workloads) != 0 && matchPolicy.Workloads[0].WorkloadURL == "")) && !pol.APISpecs.IsSame(matchPolicy.APISpecs, true) {
errString = fmt.Sprintf("API Spec %v mismatch with %v", pol.APISpecs, matchPolicy.APISpecs)
continue
} else if !pol.AgreementProtocols.IsSame(matchPolicy.AgreementProtocols) {
errString = fmt.Sprintf("AgreementProtocol %v mismatch with %v", pol.AgreementProtocols, matchPolicy.AgreementProtocols)
continue
} else if !pol.IsSameWorkload(matchPolicy) {
errString = fmt.Sprintf("Workload %v mismatch with %v", pol.Workloads, matchPolicy.Workloads)
continue
} else if !pol.DataVerify.IsSame(matchPolicy.DataVerify) {
errString = fmt.Sprintf("DataVerify %v mismatch with %v", pol.DataVerify, matchPolicy.DataVerify)
continue
} else if !pol.Properties.IsSame(matchPolicy.Properties) {
errString = fmt.Sprintf("Properties %v mismatch with %v", pol.Properties, matchPolicy.Properties)
continue
} else if !pol.Constraints.IsSame(matchPolicy.Constraints) {
errString = fmt.Sprintf("Constraints %v mismatch with %v", pol.Constraints, matchPolicy.Constraints)
continue
} else if pol.RequiredWorkload != matchPolicy.RequiredWorkload {
errString = fmt.Sprintf("RequiredWorkload %v mismatch with %v", pol.RequiredWorkload, matchPolicy.RequiredWorkload)
continue
} else if pol.MaxAgreements != matchPolicy.MaxAgreements {
errString = fmt.Sprintf("MaxAgreement %v mismatch with %v", pol.MaxAgreements, matchPolicy.MaxAgreements)
continue
} else if !UserInputArrayIsSame(pol.UserInput, matchPolicy.UserInput) {
errString = fmt.Sprintf("UserInput %v mismatch with %v", pol.UserInput, matchPolicy.UserInput)
continue
} else if !exchangecommon.SecretBindingIsSame(pol.SecretBinding, matchPolicy.SecretBinding) {
errString = fmt.Sprintf("SecretBinding %v mismatch with %v", pol.SecretBinding, matchPolicy.SecretBinding)

if isSame, errString = pol.IsSamePolicy(matchPolicy); !isSame {
continue
} else {
errString = ""
Expand Down

0 comments on commit bd01dc4

Please sign in to comment.