Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor registry namespace check to be compatible with OSDF topology #1038

Merged
merged 10 commits into from
Apr 5, 2024
4 changes: 2 additions & 2 deletions director/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,8 +552,8 @@ func registerServeAd(engineCtx context.Context, ctx *gin.Context, sType server_s
ctx.JSON(http.StatusForbidden, gin.H{"approval_error": true, "error": fmt.Sprintf("The namespace %q was not approved by an administrator", namespace.Path)})
return
} else {
log.Warningln("Failed to verify token:", err)
ctx.JSON(http.StatusForbidden, gin.H{"error": fmt.Sprintf("Authorization token verification failed %v", err)})
log.Warningln("Failed to verify token: ", err)
ctx.JSON(http.StatusForbidden, gin.H{"error": fmt.Sprintf("Authorization token verification failed: %v", err)})
return
}
}
Expand Down
7 changes: 6 additions & 1 deletion director/origin_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ func checkNamespaceStatus(prefix string, registryWebUrlStr string) (bool, error)
log.Warningf("Request %q hit 404, either it's an OSDF registry or Pelican registry <= 7.4.0. Fallback to return true for approval status check", reqUrl.String())
return true, nil
} else {
return false, errors.New(fmt.Sprintf("Server error with status code %d", res.StatusCode))
body, err := io.ReadAll(res.Body)
if err != nil {
return false, errors.New(fmt.Sprintf("Registry returns error when checkNamespaceStatus %d and can't get the response body %v", res.StatusCode, err))
} else {
return false, errors.New(fmt.Sprintf("Registry returns error when checkNamespaceStatus %d with body %s", res.StatusCode, string(body)))
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion launcher_utils/advertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func doAdvertise(ctx context.Context, servers []server_structs.XRootDServer) {

// Launch periodic advertise of xrootd servers (origin and cache) to the director, in the errogroup
func LaunchPeriodicAdvertise(ctx context.Context, egrp *errgroup.Group, servers []server_structs.XRootDServer) error {
metrics.SetComponentHealthStatus(metrics.OriginCache_Federation, metrics.StatusWarning, "First attempt to advertise to the director...")
doAdvertise(ctx, servers)

ticker := time.NewTicker(1 * time.Minute)
Expand All @@ -73,7 +74,7 @@ func LaunchPeriodicAdvertise(ctx context.Context, egrp *errgroup.Group, servers
err := Advertise(ctx, servers)
if err != nil {
log.Warningln("XRootD server advertise failed:", err)
metrics.SetComponentHealthStatus(metrics.OriginCache_Federation, metrics.StatusCritical, fmt.Sprintf("XRootD server advertise failed: %v", err))
metrics.SetComponentHealthStatus(metrics.OriginCache_Federation, metrics.StatusCritical, fmt.Sprintf("XRootD server failed to advertise to the director: %v", err))
} else {
metrics.SetComponentHealthStatus(metrics.OriginCache_Federation, metrics.StatusOK, "")
}
Expand Down
4 changes: 3 additions & 1 deletion launcher_utils/register_namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,14 @@ func registerNamespacePrep(prefix string) (key jwk.Key, registrationEndpointURL

func registerNamespaceImpl(key jwk.Key, prefix string, registrationEndpointURL string) error {
if err := registry.NamespaceRegister(key, registrationEndpointURL, "", prefix); err != nil {
metrics.SetComponentHealthStatus(metrics.OriginCache_Registry, metrics.StatusCritical, fmt.Sprintf("XRootD server failed to register its namespace %s at the registry: %v", prefix, err))
return errors.Wrapf(err, "Failed to register prefix %s", prefix)
}
metrics.SetComponentHealthStatus(metrics.OriginCache_Registry, metrics.StatusOK, "")
return nil
}

func RegisterNamespaceWithRetry(ctx context.Context, egrp *errgroup.Group, prefix string) error {
metrics.SetComponentHealthStatus(metrics.OriginCache_Federation, metrics.StatusCritical, "Origin not registered with federation")
retryInterval := param.Server_RegistrationRetryInterval.GetDuration()
if retryInterval == 0 {
log.Warning("Server.RegistrationRetryInterval is 0. Fall back to 10s")
Expand All @@ -268,6 +269,7 @@ func RegisterNamespaceWithRetry(ctx context.Context, egrp *errgroup.Group, prefi
return err
}
if isRegistered {
metrics.SetComponentHealthStatus(metrics.OriginCache_Registry, metrics.StatusOK, "")
log.Debugf("Origin already has prefix %v registered\n", prefix)
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions launchers/cache_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pelicanplatform/pelican/daemon"
"github.com/pelicanplatform/pelican/launcher_utils"
"github.com/pelicanplatform/pelican/lotman"
"github.com/pelicanplatform/pelican/metrics"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_structs"
"github.com/pelicanplatform/pelican/server_utils"
Expand Down Expand Up @@ -115,5 +116,6 @@ func CacheServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group, m

// Finish configuration of the cache server.
func CacheServeFinish(ctx context.Context, egrp *errgroup.Group) error {
metrics.SetComponentHealthStatus(metrics.OriginCache_Registry, metrics.StatusWarning, "Start to register namespaces for the cache server")
return launcher_utils.RegisterNamespaceWithRetry(ctx, egrp, "/caches/"+param.Xrootd_Sitename.GetString())
}
4 changes: 2 additions & 2 deletions launchers/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func LaunchModules(ctx context.Context, modules config.ServerType) (servers []se

// Origin needs to advertise once before the cache starts
if modules.IsEnabled(config.CacheType) && modules.IsEnabled(config.OriginType) {
log.Debug("Advertise Origin")
log.Debug("Advertise Origin and Cache to the Director")
if err = launcher_utils.Advertise(ctx, servers); err != nil {
return
}
Expand Down Expand Up @@ -330,7 +330,7 @@ func LaunchModules(ctx context.Context, modules config.ServerType) (servers []se
}

if modules.IsEnabled(config.OriginType) || modules.IsEnabled(config.CacheType) {
log.Debug("Launching periodic advertise")
log.Debug("Launching periodic advertise of origin/cache server to the director")
if err = launcher_utils.LaunchPeriodicAdvertise(ctx, egrp, servers); err != nil {
return
}
Expand Down
2 changes: 2 additions & 0 deletions launchers/origin_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/launcher_utils"
"github.com/pelicanplatform/pelican/metrics"
"github.com/pelicanplatform/pelican/oa4mp"
"github.com/pelicanplatform/pelican/origin"
"github.com/pelicanplatform/pelican/param"
Expand Down Expand Up @@ -112,6 +113,7 @@ func OriginServeFinish(ctx context.Context, egrp *errgroup.Group) error {
return err
}

metrics.SetComponentHealthStatus(metrics.OriginCache_Registry, metrics.StatusWarning, "Start to register namespaces for the origin server")
for _, export := range *originExports {
if err := launcher_utils.RegisterNamespaceWithRetry(ctx, egrp, export.FederationPrefix); err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion metrics/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ const (
OriginCache_XRootD HealthStatusComponent = "xrootd"
OriginCache_CMSD HealthStatusComponent = "cmsd"
OriginCache_Federation HealthStatusComponent = "federation" // Advertise to the director
OriginCache_Director HealthStatusComponent = "director" // File transfer with director
OriginCache_Director HealthStatusComponent = "director" // File transfer tests with director
OriginCache_Registry HealthStatusComponent = "registry" // Register namespace at the registry
DirectorRegistry_Topology HealthStatusComponent = "topology" // Fetch data from OSDF topology
Server_WebUI HealthStatusComponent = "web-ui"
)
Expand Down
2 changes: 1 addition & 1 deletion origin/origin.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func RegisterOriginAPI(router *gin.Engine, ctx context.Context, egrp *errgroup.G
return errors.New("Origin configuration passed a nil pointer")
}

metrics.SetComponentHealthStatus(metrics.OriginCache_Director, metrics.StatusWarning, "Initializing origin, unknown status for director")
metrics.SetComponentHealthStatus(metrics.OriginCache_Director, metrics.StatusWarning, "Initializing the server, unknown status from the director file transfer test")
// start the timer for the director test report timeout
server_utils.LaunchPeriodicDirectorTimeout(ctx, egrp, notificationChan)

Expand Down
26 changes: 13 additions & 13 deletions registry/client_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,18 @@ func NamespaceRegisterWithIdentity(privateKey jwk.Key, namespaceRegistryEndpoint
func NamespaceRegister(privateKey jwk.Key, namespaceRegistryEndpoint string, accessToken string, prefix string) error {
publicKey, err := privateKey.PublicKey()
if err != nil {
return errors.Wrapf(err, "Failed to generate public key for namespace registration")
return errors.Wrapf(err, "failed to generate public key for namespace registration")
}
err = jwk.AssignKeyID(publicKey)
if err != nil {
return errors.Wrap(err, "Failed to assign key ID to public key")
return errors.Wrap(err, "failed to assign key ID to public key")
}
if err = publicKey.Set("alg", "ES256"); err != nil {
return errors.Wrap(err, "Failed to assign signature algorithm to public key")
return errors.Wrap(err, "failed to assign signature algorithm to public key")
}
keySet := jwk.NewSet()
if err = keySet.AddKey(publicKey); err != nil {
return errors.Wrap(err, "Failed to add public key to new JWKS")
return errors.Wrap(err, "failed to add public key to new JWKS")
}

if log.IsLevelEnabled(log.DebugLevel) {
Expand All @@ -129,7 +129,7 @@ func NamespaceRegister(privateKey jwk.Key, namespaceRegistryEndpoint string, acc

clientNonce, err := generateNonce()
if err != nil {
return errors.Wrap(err, "Failed to generate client nonce")
return errors.Wrap(err, "failed to generate client nonce")
}

data := map[string]interface{}{
Expand All @@ -143,14 +143,14 @@ func NamespaceRegister(privateKey jwk.Key, namespaceRegistryEndpoint string, acc
// Handle case where there was an error encoded in the body
if err != nil {
if unmarshalErr := json.Unmarshal(resp, &respData); unmarshalErr == nil { // Error creating json
return errors.Wrapf(err, "Failed to make request (server message is '%v')", respData.Error)
return errors.Wrapf(err, "failed to make request (server message is '%v')", respData.Error)
}
return errors.Wrap(err, "Failed to make request")
return errors.Wrap(err, "failed to make request at "+namespaceRegistryEndpoint)
}

// No error
if err = json.Unmarshal(resp, &respData); err != nil {
return errors.Wrap(err, "Failure when parsing JSON response from client")
return errors.Wrap(err, "failure when parsing JSON response from client")
}

// Create client payload by concatenating client_nonce and server_nonce
Expand All @@ -159,11 +159,11 @@ func NamespaceRegister(privateKey jwk.Key, namespaceRegistryEndpoint string, acc
// Sign the payload
privateKeyRaw := &ecdsa.PrivateKey{}
if err = privateKey.Raw(privateKeyRaw); err != nil {
return errors.Wrap(err, "Failed to get an ECDSA private key")
return errors.Wrap(err, "failed to get an ECDSA private key")
}
signature, err := signPayload([]byte(clientPayload), privateKeyRaw)
if err != nil {
return errors.Wrap(err, "Failed to sign payload")
return errors.Wrap(err, "failed to sign payload")
}

// // Create data for the second POST request
Expand All @@ -186,14 +186,14 @@ func NamespaceRegister(privateKey jwk.Key, namespaceRegistryEndpoint string, acc
// Handle case where there was an error encoded in the body
if unmarshalErr := json.Unmarshal(resp, &respData); unmarshalErr == nil {
if err != nil {
return errors.Wrapf(err, "Failed to make request: %v", respData.Error)
return errors.Wrapf(err, "failed to register the namespace: %v", respData.Error)
}
fmt.Println(respData.Message)
} else {
if err != nil {
return errors.Wrapf(err, "Failed to make request: %s", resp)
return errors.Wrapf(err, "failed to register the namespace: %s", string(resp))
}
return errors.Wrapf(unmarshalErr, "Failed to unmarshall request response: %v", respData.Error)
return errors.Wrapf(unmarshalErr, "failed to decode the response of the request to register the namespace: %v", respData.Error)
}

return nil
Expand Down
13 changes: 8 additions & 5 deletions registry/client_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,15 @@ func TestRegistryKeyChainingOSDF(t *testing.T) {
err = NamespaceRegister(privKey, registrySvr.URL+"/api/v1.0/registry", "", "/foo/bar/test")
require.NoError(t, err)

// For now, we simply don't allow further super/sub spacing of namespaces from topo, because how
// can we validate via a key if there is none?
// If the namespace is a subspace from the topology and is registered without the identity
// we deny it
err = NamespaceRegister(privKey, registrySvr.URL+"/api/v1.0/registry", "", "/topo/foo/bar")
require.Error(t, err)
assert.Contains(t, err.Error(), "A superspace or subspace of this namespace /topo/foo/bar already exists in the OSDF topology: /topo/foo. To register a Pelican equivalence, you need to present your identity.")

err = NamespaceRegister(privKey, registrySvr.URL+"/api/v1.0/registry", "", "/topo")
err = NamespaceRegister(privKey, registrySvr.URL+"/api/v1.0/registry", "", "/topo/foo")
require.Error(t, err)
assert.Contains(t, err.Error(), "A superspace or subspace of this namespace /topo/foo already exists in the OSDF topology: /topo/foo. To register a Pelican equivalence, you need to present your identity.")

// Now we create a new key and try to use it to register a super/sub space. These shouldn't succeed
viper.Set("IssuerKey", t.TempDir()+"/keychaining")
Expand Down Expand Up @@ -209,9 +211,10 @@ func TestRegistryKeyChainingOSDF(t *testing.T) {
err = NamespaceRegister(privKey, registrySvr.URL+"/api/v1.0/registry", "", "/foo")
require.NoError(t, err)

// Finally, test with one value for topo
// However, topology check should be independent of key chaining check
err = NamespaceRegister(privKey, registrySvr.URL+"/api/v1.0/registry", "", "/topo")
require.NoError(t, err)
require.Error(t, err)
assert.Contains(t, err.Error(), "A superspace or subspace of this namespace /topo already exists in the OSDF topology: /topo/foo. To register a Pelican equivalence, you need to present your identity.")

_, err = config.SetPreferredPrefix("pelican")
assert.NoError(t, err)
Expand Down
56 changes: 47 additions & 9 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func keySignChallengeCommit(ctx *gin.Context, data *registrationData) (bool, map
if err != nil {
return false, nil, badRequestError{Message: err.Error()}
}
registryUrl := param.Federation_RegistryUrl.GetString()

var rawkey interface{} // This is the raw key, like *rsa.PrivateKey or *ecdsa.PrivateKey
if err := key.Raw(&rawkey); err != nil {
Expand Down Expand Up @@ -288,7 +289,7 @@ func keySignChallengeCommit(ctx *gin.Context, data *registrationData) (bool, map
}
data.Prefix = reqPrefix

valErr, sysErr := validateKeyChaining(reqPrefix, key)
inTopo, topoNss, valErr, sysErr := validateKeyChaining(reqPrefix, key)
if valErr != nil {
log.Errorln(err)
return false, nil, permissionDeniedError{Message: valErr.Error()}
Expand All @@ -307,7 +308,6 @@ func keySignChallengeCommit(ctx *gin.Context, data *registrationData) (bool, map
}
ns.Pubkey = string(pubkeyData)
ns.Identity = data.Identity
ns.Topology = false

if data.Identity != "" {
idMap := map[string]interface{}{}
Expand All @@ -323,17 +323,43 @@ func keySignChallengeCommit(ctx *gin.Context, data *registrationData) (bool, map
ns.AdminMetadata.UserID = val
}
}
if inTopo {
topoNssStr := GetTopoPrefixString(topoNss)
ns.AdminMetadata.Description = fmt.Sprintf("[ Attention: A superspace or subspace of this prefix exists in OSDF topology: %s ] ", topoNssStr)
}
userName, ok := idMap["name"]
if ok {
val, ok := userName.(string)
if ok {
ns.AdminMetadata.Description += "User name: " + val + " "
}
}
email, ok := idMap["email"]
if ok {
val, ok := email.(string)
if ok {
ns.AdminMetadata.Description = "User email: " + val + " This is a namespace registration from Pelican CLI with OIDC authentication. Certain fields may not be populated"
ns.AdminMetadata.Description += "User email: " + val + " This is a namespace registration from Pelican CLI with OIDC authentication. Certain fields may not be populated"
}
}
} else {
// This is either a registration from CLI without --with-identity flag or
// an automated registration from origin or cache
ns.AdminMetadata.Description = "This is a namespace registration from Pelican CLI or an automated registration. Certain fields may not be populated"

// If the namespace is in the topology, we require identity information to register a Pelican namespace
// for verification purpose
if inTopo {
return false,
nil,
permissionDeniedError{Message: fmt.Sprintf("A superspace or subspace of this namespace %s already exists in the OSDF topology: %s. "+
"To register a Pelican equivalence, you need to present your identity. "+
"If you are registering through Pelican CLI, try again with the flag '--with-identity' enabled. "+
"If this is an auto-registration from a Pelican origin or cache server, "+
"register your namespace or server through the Pelican registry website at %s instead.",
ns.Prefix,
haoming29 marked this conversation as resolved.
Show resolved Hide resolved
GetTopoPrefixString(topoNss),
registryUrl)}
}
}

// Since CLI/auto-registered namespace did not have site name as part of their registration
Expand All @@ -347,8 +373,12 @@ func keySignChallengeCommit(ctx *gin.Context, data *registrationData) (bool, map
if err != nil {
return false, nil, errors.Wrapf(err, "Failed to add the prefix %q to the database", ns.Prefix)
} else {
msg := fmt.Sprintf("Prefix %s successfully registered", ns.Prefix)
if inTopo {
msg = fmt.Sprintf("Prefix %s successfully registered. Note that there is an existing superspace or subspace of the namespace in the OSDF topology: %s. The registry admin will review your request and approve your namespace if this is expected.", ns.Prefix, GetTopoPrefixString(topoNss))
}
return true, map[string]interface{}{
"message": "Prefix successfully registered",
"message": msg,
}, nil
}
} else {
Expand Down Expand Up @@ -855,15 +885,23 @@ func checkNamespaceStatusHandler(ctx *gin.Context) {
ctx.JSON(http.StatusBadRequest, gin.H{"error": "prefix is required"})
return
}
ns, err := getNamespaceByPrefix(req.Prefix)
if ns.Topology {
res := server_structs.CheckNamespaceStatusRes{Approved: true}
ctx.JSON(http.StatusOK, res)
exists, err := namespaceExistsByPrefix(req.Prefix)
if err != nil {
log.Errorf("Error in namespaceExistsByPrefix with prefix %s. %v", req.Prefix, err)
ctx.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("Error checking if namespace %s already exists", req.Prefix)})
return
}
// Return 400 if the namespace doesn't exist to spare 404 for the legacy OSDF registry endpoint, which doesn't have this route
// and we relies on 404 to check for backward compatibility
if !exists {
ctx.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("The namespace %s does not exist in the registry", req.Prefix)})
return
}

ns, err := getNamespaceByPrefix(req.Prefix)
if err != nil || ns == nil {
log.Errorf("Error in getNamespaceByPrefix with prefix %s. %v", req.Prefix, err)
ctx.JSON(http.StatusInternalServerError, gin.H{"error": "Error getting namespace"})
ctx.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("Error getting namespace %s: %s", req.Prefix, err.Error())})
return
}
emptyMetadata := AdminMetadata{}
Expand Down
Loading
Loading