Skip to content

Commit

Permalink
fix(netlink): netlink fixes added
Browse files Browse the repository at this point in the history
Signed-off-by: atulpatel261194 <Atul.Patel@intel.com>
  • Loading branch information
atulpatel261194 committed May 24, 2024
1 parent 1ed4abd commit dbd0624
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 22 deletions.
18 changes: 13 additions & 5 deletions pkg/netlink/eventbus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func (e *EventBus) Subscribe(eventType string) *Subscriber {

// Publish api notifies the subscribers with certain eventType
func (e *EventBus) Publish(eventType string, data interface{}) {
e.mutex.RLock()
defer e.mutex.RUnlock()
e.mutex.Lock()
defer e.mutex.Unlock()

subscribers, ok := e.subscribers[eventType]
if !ok {
Expand All @@ -57,7 +57,15 @@ func (e *EventBus) Publish(eventType string, data interface{}) {
}

Check warning on line 57 in pkg/netlink/eventbus/eventbus.go

View check run for this annotation

Codecov / codecov/patch

pkg/netlink/eventbus/eventbus.go#L55-L57

Added lines #L55 - L57 were not covered by tests
}

// Unsubscribe the subscriber, which delete the subscriber(all resources will be washed out)
func (s *Subscriber) Unsubscribe() {
close(s.Ch)
// Unsubscribe closes all subscriber channels and empties the subscriber map.
func (e *EventBus) Unsubscribe() {
e.mutex.Lock()
defer e.mutex.Unlock()

for eventName, subs := range e.subscribers {
for _, sub := range subs {
close(sub.Ch) // Close each channel
}
delete(e.subscribers, eventName) // Remove the entry from the map

Check warning on line 69 in pkg/netlink/eventbus/eventbus.go

View check run for this annotation

Codecov / codecov/patch

pkg/netlink/eventbus/eventbus.go#L61-L69

Added lines #L61 - L69 were not covered by tests
}
}
33 changes: 30 additions & 3 deletions pkg/netlink/netlink_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,33 @@ const (
routeTypeNeighbor = "neighbor"
)

const (
// RouteAdded event const
RouteAdded = "route_added"
// RouteUpdated event const
RouteUpdated = "route_updated"
// RouteDeleted event const
RouteDeleted = "route_deleted"
// NexthopAdded event const
NexthopAdded = "nexthop_added"
// NexthopUpdated event const
NexthopUpdated = "nexthop_updated"
// NexthopDeleted event const
NexthopDeleted = "nexthop_deleted"
// FdbEntryAdded event const
FdbEntryAdded = "fdb_entry_added"
// FdbEntryUpdated event const
FdbEntryUpdated = "fdb_entry_updated"
// FdbEntryDeleted event const
FdbEntryDeleted = "fdb_entry_deleted"
// L2NexthopAdded event const
L2NexthopAdded = "l2_nexthop_added"
// L2NexthopUpdated event const
L2NexthopUpdated = "l2_nexthop_updated"
// L2NexthopDeleted event const
L2NexthopDeleted = "l2_nexthop_deleted"
)

// getFlag gets the flag
func getFlag(s string) int {
f := 0
Expand Down Expand Up @@ -1936,13 +1963,13 @@ func monitorNetlink() {
// Inform subscribers to delete configuration for any still remaining Netlink DB objects.
log.Printf("netlink: Delete any residual objects in DB")
for _, r := range routes {
notifyAddDel(r, "route_deleted")
notifyAddDel(r, "RouteDeleted")
}
for _, nexthop := range Nexthops {
notifyAddDel(nexthop, "nexthop_deleted")
notifyAddDel(nexthop, "NexthopDeleted")
}
for _, m := range fDB {
notifyAddDel(m, "FDB_entry_deleted")
notifyAddDel(m, "fdb_entry_deleted")
}
log.Printf("netlink: DB cleanup completed.")

Check warning on line 1974 in pkg/netlink/netlink_watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/netlink/netlink_watcher.go#L1952-L1974

Added lines #L1952 - L1974 were not covered by tests
}
Expand Down
29 changes: 15 additions & 14 deletions pkg/vendor_plugins/intel-e2000/p4runtime/p4translation/p4trans.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,20 +988,18 @@ func tearDownSvi(svi *infradb.Svi) bool {
// Initialize function handles init functionality
func Initialize() {
// Netlink Listener
startSubscriber(nm.EventBus, "route_added")

startSubscriber(nm.EventBus, "route_updated")
startSubscriber(nm.EventBus, "route_deleted")
startSubscriber(nm.EventBus, "nexthop_added")
startSubscriber(nm.EventBus, "nexthop_updated")
startSubscriber(nm.EventBus, "nexthop_deleted")
startSubscriber(nm.EventBus, "fdb_entry_added")
startSubscriber(nm.EventBus, "fdb_entry_updated")
startSubscriber(nm.EventBus, "fdb_entry_deleted")
startSubscriber(nm.EventBus, "l2_nexthop_added")
startSubscriber(nm.EventBus, "l2_nexthop_updated")
startSubscriber(nm.EventBus, "l2_nexthop_deleted")

startSubscriber(nm.EventBus, nm.RouteAdded)
startSubscriber(nm.EventBus, nm.RouteUpdated)
startSubscriber(nm.EventBus, nm.RouteDeleted)
startSubscriber(nm.EventBus, nm.NexthopAdded)
startSubscriber(nm.EventBus, nm.NexthopUpdated)
startSubscriber(nm.EventBus, nm.NexthopDeleted)
startSubscriber(nm.EventBus, nm.FdbEntryAdded)
startSubscriber(nm.EventBus, nm.FdbEntryUpdated)
startSubscriber(nm.EventBus, nm.FdbEntryDeleted)
startSubscriber(nm.EventBus, nm.L2NexthopAdded)
startSubscriber(nm.EventBus, nm.L2NexthopUpdated)
startSubscriber(nm.EventBus, nm.L2NexthopDeleted)
// InfraDB Listener

eb := eventbus.EBus
Expand Down Expand Up @@ -1064,6 +1062,9 @@ func Initialize() {

// DeInitialize function handles stops functionality
func DeInitialize() {
// unsubscriber all the events
nm.EventBus.Unsubscribe()

L3entries := L3.StaticDeletions()
for _, entry := range L3entries {
if e, ok := entry.(p4client.TableEntry); ok {
Expand Down

0 comments on commit dbd0624

Please sign in to comment.