Skip to content

Commit

Permalink
Add comments and make minor corrections
Browse files Browse the repository at this point in the history
  • Loading branch information
Saartank committed Dec 30, 2024
1 parent 6134693 commit cad799c
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 30 deletions.
35 changes: 23 additions & 12 deletions director/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,19 +980,26 @@ func registerServeAd(engineCtx context.Context, ctx *gin.Context, sType server_s
return
}

// Check if the allowed prefixes for caches data from the registry
// have been initialized in the director
if sType == server_structs.CacheType {
// If the allowed prefix for caches data is not initialized,
// wait for it to be initialized within 3 seconds.
if allowedPrefixesForCachesLastSetTimestamp.Load() == 0 {
log.Warning("Allowed prefixes for caches data is not set, waiting for it to be set before continuing with processing cache server ad.")
log.Warning("Allowed prefixes for caches data is not initialized. Waiting for initialization before continuing with processing cache server advertisement.")
start := time.Now()
// Wait until last set timestamp is updated
for allowedPrefixesForCachesLastSetTimestamp.Load() == 0 {
if time.Since(start) >= 3*time.Second {
log.Error("Allowed prefixes for caches data was not set within the 3-second timeout")
log.Error("Allowed prefix for caches data was initialized within the 3-second timeout")
break
}
time.Sleep(100 * time.Millisecond)
}
}

// If the allowed prefix for caches data is stale (older than 15 minutes),
// fail the server registration.
if time.Since(time.Unix(allowedPrefixesForCachesLastSetTimestamp.Load(), 0)) >= 15*time.Minute {
log.Error("Allowed prefixes for caches data is outdated, rejecting cache server ad.")
ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{
Expand Down Expand Up @@ -1022,32 +1029,36 @@ func registerServeAd(engineCtx context.Context, ctx *gin.Context, sType server_s
adV2 = server_structs.ConvertOriginAdV1ToV2(ad)
}

// FilterAdvertisedNamespaces filters the advertised namespaces based on the
// allowed prefixes for cache data retrieved from the registry.
// Filter the advertised prefixes in the cache server advertisement
// based on the allowed prefix for caches data.
if sType == server_structs.CacheType {
// Parse URL to extract hostname
parsedURL, err := url.Parse(adV2.DataURL)
if err != nil {
log.Warnf("Invalid URL: %s, error: %v", adV2.DataURL, err)
ctx.JSON(http.StatusBadRequest, server_structs.SimpleApiResp{
Status: server_structs.RespFailed,
Msg: fmt.Sprintf("Invalid %s registration: %s", sType, err.Error()),
Msg: fmt.Sprintf("Invalid data URL %s: %s", adV2.DataURL, err.Error()),
})
return
}
cacheHostname := parsedURL.Hostname()

currentPrefixesMap := allowedPrefixesForCaches.Load()
if currentPrefixesMap == nil {
log.Warn("Allowed prefixes for caches data is not initialized")
allowedPrefixesMap := allowedPrefixesForCaches.Load()
if allowedPrefixesMap == nil {
log.Warn("Allowed prefix for caches data is not initialized, failing server ad registration")
ctx.JSON(http.StatusBadRequest, server_structs.SimpleApiResp{
Status: server_structs.RespFailed,
Msg: fmt.Sprintf("Allowed prefixes for caches data is not initialized for %s", sType),
Msg: "Something is wrong with the registry or the director",
})
return
}

if prefixes, exists := (*currentPrefixesMap)[cacheHostname]; exists {
// If the cache hostname is present in the allowed prefixes map,
// filter the advertised prefixes. If the cache hostname is not present,
// do nothing. This is the default behavior where all prefixes are allowed.
//
// `prefixes` is a set of prefixes that the given cache is allowed to serve.
if prefixes, exists := (*allowedPrefixesMap)[cacheHostname]; exists {
filteredNamespaces := []server_structs.NamespaceAdV2{}

for _, namespace := range adV2.Namespaces {
Expand All @@ -1061,7 +1072,7 @@ func registerServeAd(engineCtx context.Context, ctx *gin.Context, sType server_s
if _, allowed := prefixes[namespace.Path]; allowed {
filteredNamespaces = append(filteredNamespaces, namespace)
} else {
log.Warnf("Rejected namespace: cache hostname=%s, namespace path=%s", cacheHostname, namespace.Path)
log.Infof("Filtered out prefix: %s in the server ad for cache %s", namespace.Path, cacheHostname)
}
}

Expand Down
17 changes: 6 additions & 11 deletions director/director_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ func TestDirectorRegistration(t *testing.T) {
jsonad, err := json.Marshal(ad)
assert.NoError(t, err, "Error marshalling OriginAdvertise")

// Reset the timestamp so the cache server ad is not rejected.
allowedPrefixesForCachesLastSetTimestamp.Store(time.Now().Unix())

setupRequest(c, r, jsonad, token, server_structs.CacheType)
Expand All @@ -619,8 +620,9 @@ func TestDirectorRegistration(t *testing.T) {
teardown()
})

// Verify if the prefixes in the cache server ad are correctly filtered
// based on the allowed prefix for caches data.
t.Run("cache-with-not-allowed-prefix-in-ad", func(t *testing.T) {
// Setup context and dependencies
c, r, w := setupContext()
pKey, token, _ := generateToken()
publicKey, err := jwk.PublicKeyOf(pKey)
Expand All @@ -639,7 +641,6 @@ func TestDirectorRegistration(t *testing.T) {
allowedPrefixesForCaches.Store(&allowedPrefixes)
allowedPrefixesForCachesLastSetTimestamp.Store(time.Now().Unix())

// Create advertisement with namespaces
ad := server_structs.OriginAdvertiseV2{
Name: "Human-readable name", // For web UI display
RegistryPrefix: "/caches/test", // For registry lookup
Expand All @@ -660,17 +661,11 @@ func TestDirectorRegistration(t *testing.T) {
jsonad, err := json.Marshal(ad)
assert.NoError(t, err, "Error marshalling OriginAdvertise")

// Initialize serverAds cache
serverAds = ttlcache.New(ttlcache.WithTTL[string, *server_structs.Advertisement](15 * time.Minute))

// Setup request and serve
setupRequest(c, r, jsonad, token, server_structs.CacheType)
r.ServeHTTP(w, c.Request)

// Validate response
assert.Equal(t, 200, w.Result().StatusCode, "Expected status code of 200")
assert.Equal(t, 200, w.Result().StatusCode)

// Validate serverAds cache entry
adEntry := serverAds.Get("https://data-url.org")
assert.NotNil(t, adEntry, "Cache failed to register at serverAds")

Expand All @@ -689,8 +684,8 @@ func TestDirectorRegistration(t *testing.T) {
}
}

assert.False(t, foundFooBar, "Namespace with path /foo/bar should not be registered")
assert.True(t, foundFooBaz, "Namespace with path /foo/baz should be registered")
assert.False(t, foundFooBar, "Prefix /foo/bar should not have been registered")
assert.True(t, foundFooBaz, "Prefix /foo/baz should have been registered")

teardown()
})
Expand Down
6 changes: 3 additions & 3 deletions director/registry_periodic_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ import (
)

var (
// allowedPrefixesForCaches maps cache hostnames to a set of prefixes the caches are allowed to serve.
// allowedPrefixesForCaches maps cache hostnames to a set of prefixes the caches are allowed to serve
allowedPrefixesForCaches atomic.Pointer[map[string]map[string]struct{}]
// allowedPrefixesForCachesLastSetTimestamp tracks when allowedPrefixesForCaches was last explicitly set.
// allowedPrefixesForCachesLastSetTimestamp tracks when allowedPrefixesForCaches was last set
allowedPrefixesForCachesLastSetTimestamp atomic.Int64
)

Expand Down Expand Up @@ -150,7 +150,7 @@ func LaunchRegistryPeriodicQuery(ctx context.Context, egrp *errgroup.Group) {
allowedPrefixesForCachesLastSetTimestamp.Store(time.Now().Unix())
log.Debug("Allowed prefixes for caches data updated successfully")
case <-ctx.Done():
log.Debug("Periodic fetch terminated")
log.Debug("Periodic fetch fopr allowed prefixes for caches data terminated")
return nil
}
}
Expand Down
2 changes: 2 additions & 0 deletions director/registry_periodic_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"github.com/pelicanplatform/pelican/config"
)

// TestLaunchRegistryPeriodicQuery verifies if the director correctly maintains
// in its memory the allowed prefixes for caches data from the registry.
func TestLaunchRegistryPeriodicQuery(t *testing.T) {
config.ResetConfig()
defer config.ResetConfig()
Expand Down
8 changes: 5 additions & 3 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1157,18 +1157,20 @@ func checkStatusHandler(ctx *gin.Context) {
ctx.JSON(http.StatusOK, server_structs.CheckNamespaceCompleteRes{Results: results})
}

// getAllowedPrefixesForCachesHandler is the handler function for the
// /caches/allowedPrefixes endpoint.
func getAllowedPrefixesForCachesHandler(ctx *gin.Context) {
caches, err := getAllowedPrefixesForCaches()
allowedPrefixesForCachesData, err := getAllowedPrefixesForCaches()

if err != nil {
ctx.JSON(http.StatusInternalServerError, server_structs.SimpleApiResp{
Status: server_structs.RespFailed,
Msg: fmt.Sprintf("Error fetching prohibited caches: %s", err.Error()),
Msg: fmt.Sprintf("Error fetching allowed prefixes for caches data: %s", err.Error()),
})
return
}

ctx.JSON(http.StatusOK, caches)
ctx.JSON(http.StatusOK, allowedPrefixesForCachesData)
}

func RegisterRegistryAPI(router *gin.RouterGroup) {
Expand Down
6 changes: 5 additions & 1 deletion registry/registry_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ func getNamespaceByPrefix(prefix string) (*server_structs.Namespace, error) {
return &ns, nil
}

// getAllowedPrefixesForCaches queries the database to create a map of cache
// hostnames to a list of prefixes that each cache is allowed to serve.
// If a cache hostname key is not present in the resultant map, it implies the
// default behavior where the cache is allowed to serve all prefixes.
func getAllowedPrefixesForCaches() (map[string][]string, error) {
var namespaces []server_structs.Namespace

Expand All @@ -291,7 +295,7 @@ func getAllowedPrefixesForCaches() (map[string][]string, error) {
allowedPrefixesForCachesMap := make(map[string][]string)

for _, namespace := range namespaces {
// Remove "/caches" from the beginning of the Prefix
// Remove "/caches/" from the beginning of the Prefix
cacheHostname := strings.TrimPrefix(namespace.Prefix, "/caches/")

allowedPrefixesRaw, exists := namespace.CustomFields["AllowedPrefixes"]
Expand Down
3 changes: 3 additions & 0 deletions registry/registry_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,9 @@ func TestGetNamespacesByFilter(t *testing.T) {
})
}

// TestGetAllowedPrefixesForCaches verifies if the function
// getAllowedPrefixesForCaches correctly constructs the mapping
// from cache hostnames to allowed prefixes.
func TestGetAllowedPrefixesForCaches(t *testing.T) {
setupMockRegistryDB(t)
defer teardownMockNamespaceDB(t)
Expand Down

0 comments on commit cad799c

Please sign in to comment.