From 2c09e19a8df4eb8c2677bdcc4180bc76e4741e65 Mon Sep 17 00:00:00 2001 From: Le Zhang Date: Wed, 16 Oct 2024 10:03:55 -0400 Subject: [PATCH] Issue open-horizon#4166 - Agbot should avoid sending agreement update message if business policy is unchanged Signed-off-by: Le Zhang --- agreementbot/consumer_protocol_handler.go | 31 ++++++++++--- compcheck/comp_check.go | 2 +- exchange/service.go | 54 ++++++++++++++++++++++- policy/policy_file.go | 35 +++++++++++++++ policy/policy_manager.go | 36 ++------------- 5 files changed, 116 insertions(+), 42 deletions(-) diff --git a/agreementbot/consumer_protocol_handler.go b/agreementbot/consumer_protocol_handler.go index 0914e657c..7be4fbef7 100644 --- a/agreementbot/consumer_protocol_handler.go +++ b/agreementbot/consumer_protocol_handler.go @@ -392,18 +392,29 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("attempting to update agreement %v due to change in policy", ag.CurrentAgreementId))) } + msgPrinter := i18n.GetMessagePrinter() + svcAllPol := externalpolicy.ExternalPolicy{} + svcPolicyHandler := exchange.GetHTTPServicePolicyHandler(b) + svcResolveHandler := exchange.GetHTTPServiceDefResolverHandler(b) for _, svcId := range ag.ServiceId { - if svcPol, err := exchange.GetServicePolicyWithId(b, svcId); err != nil { - glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get service policy for %v from the exchange: %v", svcId, err))) + if svcDef, err := exchange.GetServiceWithId(b, svcId); err != nil { + glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get service %v, error: %v", svcId, err))) return false, false, false - } else if svcPol != nil { - svcAllPol.MergeWith(&svcPol.ExternalPolicy, false) + } else if svcDef != nil { + if mergedSvcPol, _, _, _, _, err := compcheck.GetServicePolicyWithDefaultProperties(svcPolicyHandler, svcResolveHandler, svcDef.URL, exchange.GetOrg(svcId), svcDef.Version, svcDef.Arch, msgPrinter); err != nil { + glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get merged service policy for %v, error: %v", svcId, err))) + return false, false, false + } else if mergedSvcPol != nil { + svcAllPol.MergeWith(mergedSvcPol, false) + } } } - msgPrinter := i18n.GetMessagePrinter() + if glog.V(5) { + glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("For agreement %v merged svc policy is %v", ag.CurrentAgreementId, svcAllPol))) + } busPolHandler := exchange.GetHTTPBusinessPoliciesHandler(b) _, busPol, err := compcheck.GetBusinessPolicy(busPolHandler, ag.PolicyName, true, msgPrinter) @@ -510,7 +521,7 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste } return true, true, false } - // new cluster namespace is still compatible + // cluster namespace remains same } } @@ -535,6 +546,13 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste } } + if same, msg := consumerPol.IsSamePolicy(oldPolicy); same { + glog.V(3).Infof("business policy(producerPol) %v content remains same with old policy", ag.PolicyName) + return true, true, true + } else { + glog.V(3).Infof("business policy %v content changed: %v", ag.PolicyName, msg) + } + newTsCs, err := policy.Create_Terms_And_Conditions(producerPol, consumerPol, wl, ag.CurrentAgreementId, b.config.AgreementBot.DefaultWorkloadPW, b.config.AgreementBot.NoDataIntervalS, basicprotocol.PROTOCOL_CURRENT_VERSION) if err != nil { glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error creating new terms and conditions: %v", err))) @@ -543,6 +561,7 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste ag.LastPolicyUpdateTime = uint64(time.Now().Unix()) + // this function will send out "basicagreementupdate" b.UpdateAgreement(&ag, basicprotocol.MsgUpdateTypePolicyChange, newTsCs, cph) return true, true, true diff --git a/compcheck/comp_check.go b/compcheck/comp_check.go index 6800a3f50..b8c80cf38 100644 --- a/compcheck/comp_check.go +++ b/compcheck/comp_check.go @@ -1132,7 +1132,7 @@ func GetServiceAndDeps(svcUrl, svcOrg, svcVersion, svcArch string, // not found, get it and dependents from the exchange _, depSvcs, exchTopSvc, topId, err = getServiceResolvedDef(svcUrl, svcOrg, svcVersion, svcArch) if err != nil { - return nil, "", nil, NewCompCheckError(fmt.Errorf(msgPrinter.Sprintf("Failed to find definition for dependent services of %s. Compatability of %s cannot be fully evaluated until all services are in the Exchange.", topId, externalpolicy.PROP_NODE_PRIVILEGED)), COMPCHECK_EXCHANGE_ERROR) + return nil, "", nil, NewCompCheckError(fmt.Errorf(msgPrinter.Sprintf("Failed to find definition for dependent services of %s (%s/%s/%s/%s), error: %v. Compatability of %s cannot be fully evaluated until all services are in the Exchange.", topId, svcUrl, svcOrg, svcVersion, svcArch, err, externalpolicy.PROP_NODE_PRIVILEGED)), COMPCHECK_EXCHANGE_ERROR) } topSvc = &ServiceDefinition{exchange.GetOrg(topId), *exchTopSvc} } diff --git a/exchange/service.go b/exchange/service.go index 43630628f..4902eece6 100644 --- a/exchange/service.go +++ b/exchange/service.go @@ -687,11 +687,15 @@ func ServiceDefResolver(wURL string, wOrg string, wVersion string, wArch string, // Make sure the required service has the same arch as the service. // Convert version to a version range expression (if it's not already an expression) so that the underlying GetService // will return us something in the range required by the service. + serviceVersion := sDep.Version + if serviceVersion == "" { + serviceVersion = sDep.VersionRange + } var serviceDef *ServiceDefinition if sDep.Arch != wArch { return nil, nil, nil, "", errors.New(fmt.Sprintf("service %v has a different architecture than the top level service.", sDep)) - } else if vExp, err := semanticversion.Version_Expression_Factory(sDep.Version); err != nil { - return nil, nil, nil, "", errors.New(fmt.Sprintf("unable to create version expression from %v, error %v", sDep.Version, err)) + } else if vExp, err := semanticversion.Version_Expression_Factory(serviceVersion); err != nil { + return nil, nil, nil, "", errors.New(fmt.Sprintf("unable to create version expression from version or version range %v, error %v", serviceVersion, err)) } else if apiSpecs, s_map, s_def, s_id, err := ServiceDefResolver(sDep.URL, sDep.Org, vExp.Get_expression(), sDep.Arch, serviceHandler); err != nil { return nil, nil, nil, "", err } else { @@ -722,6 +726,52 @@ func ServiceDefResolver(wURL string, wOrg string, wVersion string, wArch string, } } +// Retrieve the service object from the exchange. The service_id is prefixed with the org name. +// It returns nil if there is no such service with given service_id. Service_id is in format: / +func GetServiceWithId(ec ExchangeContext, service_id string) (*ServiceDefinition, error) { + glog.V(3).Infof(rpclogString(fmt.Sprintf("getting service policy for %v.", service_id))) + + // Get the service object. There should only be 1. + var resp interface{} + resp = new(GetServicesResponse) + var svc ServiceDefinition + + targetURL := fmt.Sprintf("%vorgs/%v/services/%v", ec.GetExchangeURL(), GetOrg(service_id), GetId(service_id)) + + retryCount := ec.GetHTTPFactory().RetryCount + retryInterval := ec.GetHTTPFactory().GetRetryInterval() + for { + if err, tpErr := InvokeExchange(ec.GetHTTPFactory().NewHTTPClient(nil), "GET", targetURL, ec.GetExchangeId(), ec.GetExchangeToken(), nil, &resp); err != nil { + glog.Errorf(rpclogString(fmt.Sprintf(err.Error()))) + return nil, err + } else if tpErr != nil { + glog.Warningf(rpclogString(fmt.Sprintf(tpErr.Error()))) + if ec.GetHTTPFactory().RetryCount == 0 { + time.Sleep(time.Duration(retryInterval) * time.Second) + continue + } else if retryCount == 0 { + return nil, fmt.Errorf("Exceeded %v retries for error: %v", ec.GetHTTPFactory().RetryCount, tpErr) + } else { + retryCount-- + time.Sleep(time.Duration(retryInterval) * time.Second) + continue + } + } else { + glog.V(3).Infof(rpclogString(fmt.Sprintf("returning service for %v.", service_id))) + services := resp.(*GetServicesResponse) + if len(services.Services) == 1 { + var cachedSvcDefs map[string]ServiceDefinition + svc = services.Services[service_id] + updateServiceDefCache(services.Services, cachedSvcDefs, GetOrg(service_id), svc.URL, svc.Arch) + } else { + glog.V(3).Infof(rpclogString(fmt.Sprintf("service %v not found.", service_id))) + return nil, nil + } + return &svc, nil + } + } +} + // This function gets the image docker auths for a service. func GetServiceDockerAuths(ec ExchangeContext, url string, org string, version string, arch string) ([]ImageDockerAuth, error) { diff --git a/policy/policy_file.go b/policy/policy_file.go index 2384ac6d4..4c2342c26 100644 --- a/policy/policy_file.go +++ b/policy/policy_file.go @@ -535,6 +535,41 @@ func (self *Policy) ShortString() string { return res } +func (self *Policy) IsSamePolicy(compare *Policy) (bool, string) { + misMatchString := "" + isSame := false + if compare == nil { + misMatchString = fmt.Sprintf("Nil policy to comapre with policy %v", self.Header) + } else if !self.Header.IsSame(compare.Header) { + misMatchString = fmt.Sprintf("Header %v mismatch with %v", self.Header, compare.Header) + } else if (len(compare.Workloads) == 0 || (len(compare.Workloads) != 0 && compare.Workloads[0].WorkloadURL == "")) && !self.APISpecs.IsSame(compare.APISpecs, true) { + misMatchString = fmt.Sprintf("API Spec %v mismatch with %v", self.APISpecs, compare.APISpecs) + } else if !self.AgreementProtocols.IsSame(compare.AgreementProtocols) { + misMatchString = fmt.Sprintf("AgreementProtocol %v mismatch with %v", self.AgreementProtocols, compare.AgreementProtocols) + } else if !self.IsSameWorkload(compare) { + misMatchString = fmt.Sprintf("Workload %v mismatch with %v", self.Workloads, compare.Workloads) + } else if !self.DataVerify.IsSame(compare.DataVerify) { + misMatchString = fmt.Sprintf("DataVerify %v mismatch with %v", self.DataVerify, compare.DataVerify) + } else if !self.Properties.IsSame(compare.Properties) { + misMatchString = fmt.Sprintf("Properties %v mismatch with %v", self.Properties, compare.Properties) + } else if !self.Constraints.IsSame(compare.Constraints) { + misMatchString = fmt.Sprintf("Constraints %v mismatch with %v", self.Constraints, compare.Constraints) + } else if self.RequiredWorkload != compare.RequiredWorkload { + misMatchString = fmt.Sprintf("RequiredWorkload %v mismatch with %v", self.RequiredWorkload, compare.RequiredWorkload) + } else if self.MaxAgreements != compare.MaxAgreements { + misMatchString = fmt.Sprintf("MaxAgreement %v mismatch with %v", self.MaxAgreements, compare.MaxAgreements) + } else if !UserInputArrayIsSame(self.UserInput, compare.UserInput) { + misMatchString = fmt.Sprintf("UserInput %v mismatch with %v", self.UserInput, compare.UserInput) + } else if !exchangecommon.SecretBindingIsSame(self.SecretBinding, compare.SecretBinding) { + misMatchString = fmt.Sprintf("SecretBinding %v mismatch with %v", self.SecretBinding, compare.SecretBinding) + } else { + isSame = true + } + + return isSame, misMatchString + +} + func (self *Policy) IsSameWorkload(compare *Policy) bool { if len(self.Workloads) != len(compare.Workloads) { return false diff --git a/policy/policy_manager.go b/policy/policy_manager.go index 2e268eb0d..c1114a584 100644 --- a/policy/policy_manager.go +++ b/policy/policy_manager.go @@ -7,7 +7,6 @@ import ( "github.com/golang/glog" "github.com/open-horizon/anax/config" "github.com/open-horizon/anax/cutil" - "github.com/open-horizon/anax/exchangecommon" "sync" ) @@ -267,42 +266,13 @@ func (self *PolicyManager) hasPolicy(org string, matchPolicy *Policy) (bool, err return false, errors.New(fmt.Sprintf("organization %v not found", org)) } + var isSame bool for _, pol := range orgArray { if errString != "" { glog.V(5).Infof("Policy Manager: Previous search loop returned: %v", errString) } - if !pol.Header.IsSame(matchPolicy.Header) { - errString = fmt.Sprintf("Header %v mismatch with %v", pol.Header, matchPolicy.Header) - continue - } else if (len(matchPolicy.Workloads) == 0 || (len(matchPolicy.Workloads) != 0 && matchPolicy.Workloads[0].WorkloadURL == "")) && !pol.APISpecs.IsSame(matchPolicy.APISpecs, true) { - errString = fmt.Sprintf("API Spec %v mismatch with %v", pol.APISpecs, matchPolicy.APISpecs) - continue - } else if !pol.AgreementProtocols.IsSame(matchPolicy.AgreementProtocols) { - errString = fmt.Sprintf("AgreementProtocol %v mismatch with %v", pol.AgreementProtocols, matchPolicy.AgreementProtocols) - continue - } else if !pol.IsSameWorkload(matchPolicy) { - errString = fmt.Sprintf("Workload %v mismatch with %v", pol.Workloads, matchPolicy.Workloads) - continue - } else if !pol.DataVerify.IsSame(matchPolicy.DataVerify) { - errString = fmt.Sprintf("DataVerify %v mismatch with %v", pol.DataVerify, matchPolicy.DataVerify) - continue - } else if !pol.Properties.IsSame(matchPolicy.Properties) { - errString = fmt.Sprintf("Properties %v mismatch with %v", pol.Properties, matchPolicy.Properties) - continue - } else if !pol.Constraints.IsSame(matchPolicy.Constraints) { - errString = fmt.Sprintf("Constraints %v mismatch with %v", pol.Constraints, matchPolicy.Constraints) - continue - } else if pol.RequiredWorkload != matchPolicy.RequiredWorkload { - errString = fmt.Sprintf("RequiredWorkload %v mismatch with %v", pol.RequiredWorkload, matchPolicy.RequiredWorkload) - continue - } else if pol.MaxAgreements != matchPolicy.MaxAgreements { - errString = fmt.Sprintf("MaxAgreement %v mismatch with %v", pol.MaxAgreements, matchPolicy.MaxAgreements) - continue - } else if !UserInputArrayIsSame(pol.UserInput, matchPolicy.UserInput) { - errString = fmt.Sprintf("UserInput %v mismatch with %v", pol.UserInput, matchPolicy.UserInput) - continue - } else if !exchangecommon.SecretBindingIsSame(pol.SecretBinding, matchPolicy.SecretBinding) { - errString = fmt.Sprintf("SecretBinding %v mismatch with %v", pol.SecretBinding, matchPolicy.SecretBinding) + + if isSame, errString = pol.IsSamePolicy(matchPolicy); !isSame { continue } else { errString = ""