From dbd062451e779d9083571ba0a5d4f01b733d9c0b Mon Sep 17 00:00:00 2001 From: atulpatel261194 Date: Fri, 24 May 2024 12:48:10 -0700 Subject: [PATCH] fix(netlink): netlink fixes added Signed-off-by: atulpatel261194 --- pkg/netlink/eventbus/eventbus.go | 18 +++++++--- pkg/netlink/netlink_watcher.go | 33 +++++++++++++++++-- .../p4runtime/p4translation/p4trans.go | 29 ++++++++-------- 3 files changed, 58 insertions(+), 22 deletions(-) diff --git a/pkg/netlink/eventbus/eventbus.go b/pkg/netlink/eventbus/eventbus.go index 37e4461f..f05c6e7f 100644 --- a/pkg/netlink/eventbus/eventbus.go +++ b/pkg/netlink/eventbus/eventbus.go @@ -44,8 +44,8 @@ func (e *EventBus) Subscribe(eventType string) *Subscriber { // Publish api notifies the subscribers with certain eventType func (e *EventBus) Publish(eventType string, data interface{}) { - e.mutex.RLock() - defer e.mutex.RUnlock() + e.mutex.Lock() + defer e.mutex.Unlock() subscribers, ok := e.subscribers[eventType] if !ok { @@ -57,7 +57,15 @@ func (e *EventBus) Publish(eventType string, data interface{}) { } } -// Unsubscribe the subscriber, which delete the subscriber(all resources will be washed out) -func (s *Subscriber) Unsubscribe() { - close(s.Ch) +// Unsubscribe closes all subscriber channels and empties the subscriber map. +func (e *EventBus) Unsubscribe() { + e.mutex.Lock() + defer e.mutex.Unlock() + + for eventName, subs := range e.subscribers { + for _, sub := range subs { + close(sub.Ch) // Close each channel + } + delete(e.subscribers, eventName) // Remove the entry from the map + } } diff --git a/pkg/netlink/netlink_watcher.go b/pkg/netlink/netlink_watcher.go index 787287e7..74c8e832 100644 --- a/pkg/netlink/netlink_watcher.go +++ b/pkg/netlink/netlink_watcher.go @@ -424,6 +424,33 @@ const ( routeTypeNeighbor = "neighbor" ) +const ( + // RouteAdded event const + RouteAdded = "route_added" + // RouteUpdated event const + RouteUpdated = "route_updated" + // RouteDeleted event const + RouteDeleted = "route_deleted" + // NexthopAdded event const + NexthopAdded = "nexthop_added" + // NexthopUpdated event const + NexthopUpdated = "nexthop_updated" + // NexthopDeleted event const + NexthopDeleted = "nexthop_deleted" + // FdbEntryAdded event const + FdbEntryAdded = "fdb_entry_added" + // FdbEntryUpdated event const + FdbEntryUpdated = "fdb_entry_updated" + // FdbEntryDeleted event const + FdbEntryDeleted = "fdb_entry_deleted" + // L2NexthopAdded event const + L2NexthopAdded = "l2_nexthop_added" + // L2NexthopUpdated event const + L2NexthopUpdated = "l2_nexthop_updated" + // L2NexthopDeleted event const + L2NexthopDeleted = "l2_nexthop_deleted" +) + // getFlag gets the flag func getFlag(s string) int { f := 0 @@ -1936,13 +1963,13 @@ func monitorNetlink() { // Inform subscribers to delete configuration for any still remaining Netlink DB objects. log.Printf("netlink: Delete any residual objects in DB") for _, r := range routes { - notifyAddDel(r, "route_deleted") + notifyAddDel(r, "RouteDeleted") } for _, nexthop := range Nexthops { - notifyAddDel(nexthop, "nexthop_deleted") + notifyAddDel(nexthop, "NexthopDeleted") } for _, m := range fDB { - notifyAddDel(m, "FDB_entry_deleted") + notifyAddDel(m, "fdb_entry_deleted") } log.Printf("netlink: DB cleanup completed.") } diff --git a/pkg/vendor_plugins/intel-e2000/p4runtime/p4translation/p4trans.go b/pkg/vendor_plugins/intel-e2000/p4runtime/p4translation/p4trans.go index 7ba63f3d..054bcaa2 100644 --- a/pkg/vendor_plugins/intel-e2000/p4runtime/p4translation/p4trans.go +++ b/pkg/vendor_plugins/intel-e2000/p4runtime/p4translation/p4trans.go @@ -988,20 +988,18 @@ func tearDownSvi(svi *infradb.Svi) bool { // Initialize function handles init functionality func Initialize() { // Netlink Listener - startSubscriber(nm.EventBus, "route_added") - - startSubscriber(nm.EventBus, "route_updated") - startSubscriber(nm.EventBus, "route_deleted") - startSubscriber(nm.EventBus, "nexthop_added") - startSubscriber(nm.EventBus, "nexthop_updated") - startSubscriber(nm.EventBus, "nexthop_deleted") - startSubscriber(nm.EventBus, "fdb_entry_added") - startSubscriber(nm.EventBus, "fdb_entry_updated") - startSubscriber(nm.EventBus, "fdb_entry_deleted") - startSubscriber(nm.EventBus, "l2_nexthop_added") - startSubscriber(nm.EventBus, "l2_nexthop_updated") - startSubscriber(nm.EventBus, "l2_nexthop_deleted") - + startSubscriber(nm.EventBus, nm.RouteAdded) + startSubscriber(nm.EventBus, nm.RouteUpdated) + startSubscriber(nm.EventBus, nm.RouteDeleted) + startSubscriber(nm.EventBus, nm.NexthopAdded) + startSubscriber(nm.EventBus, nm.NexthopUpdated) + startSubscriber(nm.EventBus, nm.NexthopDeleted) + startSubscriber(nm.EventBus, nm.FdbEntryAdded) + startSubscriber(nm.EventBus, nm.FdbEntryUpdated) + startSubscriber(nm.EventBus, nm.FdbEntryDeleted) + startSubscriber(nm.EventBus, nm.L2NexthopAdded) + startSubscriber(nm.EventBus, nm.L2NexthopUpdated) + startSubscriber(nm.EventBus, nm.L2NexthopDeleted) // InfraDB Listener eb := eventbus.EBus @@ -1064,6 +1062,9 @@ func Initialize() { // DeInitialize function handles stops functionality func DeInitialize() { + // unsubscriber all the events + nm.EventBus.Unsubscribe() + L3entries := L3.StaticDeletions() for _, entry := range L3entries { if e, ok := entry.(p4client.TableEntry); ok {