From e26470de6f0dac832295009ae8636bb93b7c67f2 Mon Sep 17 00:00:00 2001 From: gatici Date: Mon, 5 Aug 2024 16:38:49 +0300 Subject: [PATCH] feat: Adding nrfcaching library Signed-off-by: gatici --- nrfcache/match_filters.go | 258 ++++++++++++++++++++++++++ nrfcache/nrfcache.go | 368 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 626 insertions(+) create mode 100644 nrfcache/match_filters.go create mode 100644 nrfcache/nrfcache.go diff --git a/nrfcache/match_filters.go b/nrfcache/match_filters.go new file mode 100644 index 0000000..be11900 --- /dev/null +++ b/nrfcache/match_filters.go @@ -0,0 +1,258 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: 2024 Canonical Ltd. +/* + * Match the NF profiles based on the parameters + */ + +// This file contains apis to match the nf profiles based on the parameters provided in the +// Nnrf_NFDiscovery.SearchNFInstancesParamOpts. There is a match function provided for each NF type +// which must be updated with logic to compare profiles based on the applicable params in +// Nnrf_NFDiscovery.SearchNFInstancesParamOpts + +package nrfcache + +import ( + "encoding/json" + "fmt" + "regexp" + + "openapi/Nnrf_NFDiscovery" + "openapi/models" +) + +type MatchFilter func(profile *models.NfProfile, opts *Nnrf_NFDiscovery.SearchNFInstancesParamOpts) (bool, error) + +type MatchFilters map[models.NfType]MatchFilter + +var matchFilters = MatchFilters{ + models.NfType_SMF: MatchSmfProfile, + models.NfType_AUSF: MatchAusfProfile, + models.NfType_PCF: MatchPcfProfile, + models.NfType_NSSF: MatchNssfProfile, + models.NfType_UDM: MatchUdmProfile, + models.NfType_AMF: MatchAmfProfile, +} + +func MatchSmfProfile(profile *models.NfProfile, opts *Nnrf_NFDiscovery.SearchNFInstancesParamOpts) (bool, error) { + + if opts.ServiceNames.IsSet() { + reqServiceNames := opts.ServiceNames.Value().([]models.ServiceName) + matchCount := 0 + for _, sn := range reqServiceNames { + for i := 0; i < len(*profile.NfServices); i++ { + if (*profile.NfServices)[i].ServiceName == sn { + matchCount++ + break + } + } + } + + if matchCount == 0 { + return false, nil + } + } + + if opts.Snssais.IsSet() { + reqSnssais := opts.Snssais.Value().([]string) + matchCount := 0 + + for _, reqSnssai := range reqSnssais { + var snssai models.Snssai + err := json.Unmarshal([]byte(reqSnssai), &snssai) + if err != nil { + return false, err + } + + // Snssai in the smfInfo has priority + if profile.SmfInfo != nil && profile.SmfInfo.SNssaiSmfInfoList != nil { + for _, s := range *profile.SmfInfo.SNssaiSmfInfoList { + if s.SNssai != nil && (*s.SNssai) == snssai { + matchCount++ + } + } + } else if profile.AllowedNssais != nil { + for _, s := range *profile.AllowedNssais { + if s == snssai { + matchCount++ + } + } + } + + } + + // if at least one matching snssai has been found + if matchCount == 0 { + return false, nil + } + + } + + // validate dnn + if opts.Dnn.IsSet() { + // if a dnn is provided by the upper layer, check for the exact match + // or wild card match + dnnMatched := false + + if profile.SmfInfo != nil && profile.SmfInfo.SNssaiSmfInfoList != nil { + matchDnnLoop: + for _, s := range *profile.SmfInfo.SNssaiSmfInfoList { + if s.DnnSmfInfoList != nil { + for _, d := range *s.DnnSmfInfoList { + if d.Dnn == opts.Dnn.Value() || d.Dnn == "*" { + dnnMatched = true + break matchDnnLoop + } + } + } + } + } + + if dnnMatched == false { + return false, nil + } + } + fmt.Printf("SMF match found, nfInstance Id %v", profile.NfInstanceId) + return true, nil +} + +func MatchSupiRange(supi string, supiRange []models.SupiRange) bool { + matchFound := false + for _, s := range supiRange { + if len(s.Pattern) > 0 { + r, _ := regexp.Compile(s.Pattern) + if r.MatchString(supi) { + matchFound = true + break + } + + } else if s.Start <= supi && supi <= s.End { + matchFound = true + break + } + } + + return matchFound +} + +func MatchAusfProfile(profile *models.NfProfile, opts *Nnrf_NFDiscovery.SearchNFInstancesParamOpts) (bool, error) { + matchFound := true + if opts.Supi.IsSet() { + if profile.AusfInfo != nil && len(profile.AusfInfo.SupiRanges) > 0 { + matchFound = MatchSupiRange(opts.Supi.Value(), profile.AusfInfo.SupiRanges) + } + } + fmt.Printf("Ausf match found = %v", matchFound) + return matchFound, nil +} + +func MatchNssfProfile(profile *models.NfProfile, opts *Nnrf_NFDiscovery.SearchNFInstancesParamOpts) (bool, error) { + fmt.Println("Nssf match found ") + return true, nil +} + +func MatchAmfProfile(profile *models.NfProfile, opts *Nnrf_NFDiscovery.SearchNFInstancesParamOpts) (bool, error) { + + if opts.TargetPlmnList.IsSet() { + if profile.PlmnList != nil { + plmnMatchCount := 0 + + targetPlmnList := opts.TargetPlmnList.Value().([]string) + for _, targetPlmn := range targetPlmnList { + var plmn models.PlmnId + err := json.Unmarshal([]byte(targetPlmn), &plmn) + + if err != nil { + fmt.Printf("Error Unmarshaling plmn : %+v", err) + return false, err + } + + for _, profilePlmn := range *profile.PlmnList { + if profilePlmn == plmn { + plmnMatchCount++ + break + } + } + } + if plmnMatchCount == 0 { + return false, nil + } + } + } + + if profile.AmfInfo != nil { + if opts.Guami.IsSet() { + if profile.AmfInfo.GuamiList != nil { + guamiMatchCount := 0 + + guamiList := opts.Guami.Value().([]string) + for _, guami := range guamiList { + var guamiOpt models.Guami + err := json.Unmarshal([]byte(guami), &guamiOpt) + + if err != nil { + fmt.Printf("Error Unmarshaling guami : %+v", err) + return false, err + } + + for _, guami := range *profile.AmfInfo.GuamiList { + if guamiOpt == guami { + guamiMatchCount++ + break + } + } + } + if guamiMatchCount == 0 { + return false, nil + } + } + } + + if opts.AmfRegionId.IsSet() { + if len(profile.AmfInfo.AmfRegionId) > 0 { + if profile.AmfInfo.AmfRegionId != opts.AmfRegionId.Value() { + return false, nil + } + } + } + + if opts.AmfSetId.IsSet() { + if len(profile.AmfInfo.AmfSetId) > 0 { + if profile.AmfInfo.AmfSetId != opts.AmfSetId.Value() { + return false, nil + } + } + } + + if opts.TargetNfInstanceId.IsSet() { + if profile.NfInstanceId != "" { + if profile.NfInstanceId != opts.TargetNfInstanceId.Value() { + return false, nil + } + } + } + } + fmt.Printf("Amf match found = %v", profile.NfInstanceId) + return true, nil +} + +func MatchPcfProfile(profile *models.NfProfile, opts *Nnrf_NFDiscovery.SearchNFInstancesParamOpts) (bool, error) { + matchFound := true + if opts.Supi.IsSet() { + if profile.PcfInfo != nil && len(profile.PcfInfo.SupiRanges) > 0 { + matchFound = MatchSupiRange(opts.Supi.Value(), profile.PcfInfo.SupiRanges) + } + } + fmt.Printf("PCF match found = %v", matchFound) + return matchFound, nil +} + +func MatchUdmProfile(profile *models.NfProfile, opts *Nnrf_NFDiscovery.SearchNFInstancesParamOpts) (bool, error) { + matchFound := true + if opts.Supi.IsSet() { + if profile.UdmInfo != nil && len(profile.UdmInfo.SupiRanges) > 0 { + matchFound = MatchSupiRange(opts.Supi.Value(), profile.UdmInfo.SupiRanges) + } + } + fmt.Printf("UDM match found = %v", matchFound) + return matchFound, nil +} diff --git a/nrfcache/nrfcache.go b/nrfcache/nrfcache.go new file mode 100644 index 0000000..77d5193 --- /dev/null +++ b/nrfcache/nrfcache.go @@ -0,0 +1,368 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: 2024 Canonical Ltd. +/* + * NRF Caching library + */ + +package nrfcache + +import ( + "container/heap" + "fmt" + "sync" + "time" + + "openapi/Nnrf_NFDiscovery" + "openapi/models" +) + +const defaultCacheTTl = time.Hour +const defaultNfProfileTTl = time.Minute + +type NfProfileItem struct { + nfProfile *models.NfProfile + expiryTime time.Time + ttl time.Duration + index int // index of the entry in the priority queue +} + +// isExpired - returns true if the expiry time has passed. +func (item *NfProfileItem) isExpired() bool { + return item.expiryTime.Before(time.Now()) +} + +// updateExpiryTime - sets new expiry time based on the current time +func (item *NfProfileItem) updateExpiryTime() { + item.expiryTime = time.Now().Add(time.Second * item.ttl) +} + +func newNfProfileItem(profile *models.NfProfile, ttl time.Duration) *NfProfileItem { + item := &NfProfileItem{ + nfProfile: profile, + ttl: ttl, + } + item.updateExpiryTime() + return item +} + +// NfProfilePriorityQ : Priority Queue to store the profile. Queue is ordered by expiry time +type NfProfilePriorityQ []*NfProfileItem + +// Len - Number of entries in the priority queue +func (npq NfProfilePriorityQ) Len() int { + return len(npq) +} + +// Less - Comparator for the sort interface used by the heap. +// entries will be sorted by increasing order of expiry time +func (npq NfProfilePriorityQ) Less(i, j int) bool { + return npq[i].expiryTime.Before(npq[j].expiryTime) +} + +// Swap - implemented for the sort interface used by the heap pkg. +// swaps the element at i and j. +func (npq NfProfilePriorityQ) Swap(i, j int) { + npq[i], npq[j] = npq[j], npq[i] + npq[i].index = i + npq[j].index = j +} + +// at - returns the element at index i +func (npq NfProfilePriorityQ) at(index int) *NfProfileItem { + return npq[index] +} + +// push - adds an entry to the priority queue. Invokes heap api to +// push the entry to the correct location in the queue +func (npq *NfProfilePriorityQ) push(item interface{}) { + heap.Push(npq, item) +} + +// update - update fields of existing entry. Invokes heap.Fix to re-establish the ordering. +func (npq *NfProfilePriorityQ) update(item *NfProfileItem, value *models.NfProfile, ttl time.Duration) { + item.nfProfile = value + item.ttl = ttl + item.updateExpiryTime() + heap.Fix(npq, item.index) +} + +// remove -removes an entry at given index. +func (npq *NfProfilePriorityQ) remove(item *NfProfileItem) { + heap.Remove(npq, item.index) +} + +// Push - implemented for heap interface. appends an element to the priority queue +func (npq *NfProfilePriorityQ) Push(item interface{}) { + n := len(*npq) + entry := item.(*NfProfileItem) + entry.index = n + *npq = append(*npq, entry) +} + +// Pop - implemented for heap interface. Removes the entry with expiry time +func (npq *NfProfilePriorityQ) Pop() interface{} { + old := *npq + n := len(old) + item := old[n-1] + old[n-1] = nil + item.index = -1 + *npq = old[0 : n-1] + return item +} + +// newNfProfilePriorityQ - New priority queue for storing NF Profiles. +func newNfProfilePriorityQ() *NfProfilePriorityQ { + q := &NfProfilePriorityQ{} + heap.Init(q) + return q +} + +// NrfCache : cache of nf profiles +type NrfCache struct { + cache map[string]*NfProfileItem // map[nf-instance-id] =*NfProfile + + priorityQ *NfProfilePriorityQ // sorted by expiry time + + evictionTicker *time.Ticker + + done chan struct{} + + nrfDiscoveryQueryCb NrfDiscoveryQueryCb // nrf query callback + + evictionInterval time.Duration // timer interval in which the cache is checked for eviction of expired entries + + mutex sync.RWMutex +} + +// handleLookup - Checks if the cache has nf cache entry corresponding to the parameters specified. +// If entry does not exist, perform nrf discovery query. To avoid concurrency issues, +// nrf discovery query is mutex protected. +func (c *NrfCache) handleLookup(nrfUri string, targetNfType, requestNfType models.NfType, param *Nnrf_NFDiscovery.SearchNFInstancesParamOpts) (models.SearchResult, error) { + var searchResult models.SearchResult + var err error + + c.mutex.RLock() + searchResult.NfInstances = c.get(param) + c.mutex.RUnlock() + + if len(searchResult.NfInstances) == 0 { + fmt.Printf("Cache miss for nftype %s", targetNfType) + c.mutex.Lock() + defer c.mutex.Unlock() + searchResult.NfInstances = c.get(param) + if len(searchResult.NfInstances) == 0 { + searchResult, err = c.nrfDiscoveryQueryCb(nrfUri, targetNfType, requestNfType, param) + if err != nil { + return searchResult, err + } + + for i := 0; i < len(searchResult.NfInstances); i++ { + c.set(&searchResult.NfInstances[i], time.Duration(searchResult.ValidityPeriod)) + } + } + } + return searchResult, err +} + +// set - Adds nf profile entry to the map and the priority queue +func (c *NrfCache) set(nfProfile *models.NfProfile, ttl time.Duration) { + if ttl == 0 { + ttl = defaultNfProfileTTl + } + + item, exists := c.cache[nfProfile.NfInstanceId] + if exists { + // if item.isExpired() + c.priorityQ.update(item, nfProfile, ttl) + } else { + newItem := newNfProfileItem(nfProfile, ttl) + c.cache[nfProfile.NfInstanceId] = newItem + c.priorityQ.push(newItem) + } +} + +// get - checks if nf profile corresponding to the search opts exist in the cache. +func (c *NrfCache) get(opts *Nnrf_NFDiscovery.SearchNFInstancesParamOpts) []models.NfProfile { + var nfProfiles []models.NfProfile + + for _, element := range c.cache { + if !element.isExpired() { + if opts != nil { + cb, ok := matchFilters[element.nfProfile.NfType] + if ok { + matchFound, err := cb(element.nfProfile, opts) + if err != nil { + fmt.Printf("match filter returned error %v", err) + } else if matchFound { + nfProfiles = append(nfProfiles, *(element.nfProfile)) + } + } + } else { + nfProfiles = append(nfProfiles, *(element.nfProfile)) + } + } + } + return nfProfiles +} + +// removeByNfInstanceId - removes nf profile with nfInstanceId from the cache and queue +func (c *NrfCache) removeByNfInstanceId(nfInstanceId string) bool { + c.mutex.Lock() + defer c.mutex.Unlock() + + NfProfileItem, rc := c.cache[nfInstanceId] + if rc { + c.remove(NfProfileItem) + } + return rc +} + +// remove - +func (c *NrfCache) remove(item *NfProfileItem) { + c.priorityQ.remove(item) + delete(c.cache, item.nfProfile.NfInstanceId) +} + +// cleanupExpiredItems - removes the profiles with expired TTLs +func (c *NrfCache) cleanupExpiredItems() { + fmt.Println("nrf cache: cleanup expired items") + for item := c.priorityQ.at(0); item.isExpired(); { + fmt.Printf("evicted nf instance %s", item.nfProfile.NfInstanceId) + c.remove(item) + if c.priorityQ.Len() == 0 { + break + } else { + item = c.priorityQ.at(0) + } + } +} + +// purge - release the cache and its resources. +func (c *NrfCache) purge() { + c.mutex.Lock() + defer c.mutex.Unlock() + + close(c.done) + c.priorityQ = newNfProfilePriorityQ() + c.cache = make(map[string]*NfProfileItem) + c.evictionTicker.Stop() +} + +func (c *NrfCache) startExpiryProcessing() { + for { + select { + case <-c.evictionTicker.C: + c.mutex.Lock() + if c.priorityQ.Len() == 0 { + c.mutex.Unlock() + continue + } + + c.cleanupExpiredItems() + c.mutex.Unlock() + + case <-c.done: + return + } + } +} + +func NewNrfCache(duration time.Duration, dbqueryCb NrfDiscoveryQueryCb) *NrfCache { + cache := &NrfCache{ + cache: make(map[string]*NfProfileItem), + priorityQ: newNfProfilePriorityQ(), + evictionInterval: defaultCacheTTl, + nrfDiscoveryQueryCb: dbqueryCb, + done: make(chan struct{}), + } + + cache.evictionTicker = time.NewTicker(duration) + + go cache.startExpiryProcessing() + + return cache +} + +type NrfMasterCache struct { + nrfDiscoveryQueryCb NrfDiscoveryQueryCb + nfTypeToCacheMap map[models.NfType]*NrfCache + evictionInterval time.Duration + mutex sync.Mutex +} + +func (c *NrfMasterCache) GetNrfCacheInstance(targetNfType models.NfType) *NrfCache { + c.mutex.Lock() + defer c.mutex.Unlock() + + cache, exists := c.nfTypeToCacheMap[targetNfType] + if exists == false { + fmt.Printf("Creating cache for nftype %v", targetNfType) + cache = NewNrfCache(c.evictionInterval, c.nrfDiscoveryQueryCb) + c.nfTypeToCacheMap[targetNfType] = cache + } + return cache +} + +func (c *NrfMasterCache) clearNrfMasterCache() { + c.mutex.Lock() + defer c.mutex.Unlock() + + for k, cache := range c.nfTypeToCacheMap { + cache.purge() + delete(c.nfTypeToCacheMap, k) + } +} + +func (c *NrfMasterCache) removeNfProfile(nfInstanceId string) bool { + c.mutex.Lock() + defer c.mutex.Unlock() + + var ok bool + for _, nrfCache := range c.nfTypeToCacheMap { + if ok = nrfCache.removeByNfInstanceId(nfInstanceId); ok { + break + } + } + return ok +} + +var masterCache *NrfMasterCache + +type NrfDiscoveryQueryCb func(nrfUri string, targetNfType, requestNfType models.NfType, param *Nnrf_NFDiscovery.SearchNFInstancesParamOpts) (models.SearchResult, error) + +func InitNrfCaching(interval time.Duration, cb NrfDiscoveryQueryCb) { + m := &NrfMasterCache{ + nfTypeToCacheMap: make(map[models.NfType]*NrfCache), + evictionInterval: interval, + nrfDiscoveryQueryCb: cb, + } + masterCache = m +} + +func disableNrfCaching() { + masterCache.clearNrfMasterCache() + masterCache = nil +} + +func SearchNFInstances(nrfUri string, targetNfType, requestNfType models.NfType, param *Nnrf_NFDiscovery.SearchNFInstancesParamOpts) (models.SearchResult, error) { + + var searchResult models.SearchResult + var err error + + c := masterCache.GetNrfCacheInstance(targetNfType) + if c != nil { + searchResult, err = c.handleLookup(nrfUri, targetNfType, requestNfType, param) + } else { + fmt.Println("SearchNFInstances nrf cache") + + } + for _, np := range searchResult.NfInstances { + fmt.Printf("%v", np) + } + return searchResult, err + +} + +func RemoveNfProfileFromNrfCache(nfInstanceId string) bool { + return masterCache.removeNfProfile(nfInstanceId) +}