Skip to content

Commit

Permalink
xdscache: do not notify if cluster load assignments did not change.
Browse files Browse the repository at this point in the history
Signed-off-by: Tero Saarni <tero.saarni@est.tech>
  • Loading branch information
tsaarni committed Jun 9, 2021
1 parent de4c7be commit 0be2438
Showing 1 changed file with 15 additions and 7 deletions.
22 changes: 15 additions & 7 deletions internal/xdscache/v3/endpointstranslator.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (c *EndpointsCache) SetClusters(clusters []*dag.ServiceCluster) error {
// UpdateEndpoint adds ep to the cache, or replaces it if it is
// already cached. Any ServiceClusters that are backed by a Service
// that ep belongs become stale.
func (c *EndpointsCache) UpdateEndpoint(ep *v1.Endpoints) {
func (c *EndpointsCache) UpdateEndpoint(ep *v1.Endpoints) bool {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -200,12 +200,15 @@ func (c *EndpointsCache) UpdateEndpoint(ep *v1.Endpoints) {
// all as stale.
if affected := c.services[name]; len(affected) > 0 {
c.stale = append(c.stale, affected...)
return true
}

return false
}

// DeleteEndpoint deletes ep from the cache. Any ServiceClusters
// that are backed by a Service that ep belongs become stale.
func (c *EndpointsCache) DeleteEndpoint(ep *v1.Endpoints) {
func (c *EndpointsCache) DeleteEndpoint(ep *v1.Endpoints) bool {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -216,7 +219,9 @@ func (c *EndpointsCache) DeleteEndpoint(ep *v1.Endpoints) {
// all as stale.
if affected := c.services[name]; len(affected) > 0 {
c.stale = append(c.stale, affected...)
return true
}
return false
}

// NewEndpointsTranslator allocates a new endpoints translator.
Expand Down Expand Up @@ -264,7 +269,6 @@ func (e *EndpointsTranslator) Merge(entries map[string]*envoy_endpoint_v3.Cluste
func (e *EndpointsTranslator) OnChange(d *dag.DAG) {
clusters := []*dag.ServiceCluster{}
names := map[string]bool{}

var visitor func(dag.Vertex)
visitor = func(vertex dag.Vertex) {
if svc, ok := vertex.(*dag.ServiceCluster); ok {
Expand Down Expand Up @@ -338,9 +342,11 @@ func equal(a, b map[string]*envoy_endpoint_v3.ClusterLoadAssignment) bool {
func (e *EndpointsTranslator) OnAdd(obj interface{}) {
switch obj := obj.(type) {
case *v1.Endpoints:
e.cache.UpdateEndpoint(obj)
changed := e.cache.UpdateEndpoint(obj)
e.Merge(e.cache.Recalculate())
e.Notify()
if changed {
e.Notify()
}
if e.Observer != nil {
e.Observer.Refresh()
}
Expand Down Expand Up @@ -385,9 +391,11 @@ func (e *EndpointsTranslator) OnUpdate(oldObj, newObj interface{}) {
func (e *EndpointsTranslator) OnDelete(obj interface{}) {
switch obj := obj.(type) {
case *v1.Endpoints:
e.cache.DeleteEndpoint(obj)
changed := e.cache.DeleteEndpoint(obj)
e.Merge(e.cache.Recalculate())
e.Notify()
if changed {
e.Notify()
}
if e.Observer != nil {
e.Observer.Refresh()
}
Expand Down

0 comments on commit 0be2438

Please sign in to comment.