From 6e226672c60184cd43b6532f5a910acbf9d064ea Mon Sep 17 00:00:00 2001 From: tarilabs Date: Sat, 6 Jul 2024 22:14:51 +0200 Subject: [PATCH] core: if 401 on 2nd attempt, avoid anon tokens in the first flow using auth backend for token: 1. try do_request with no auths at all 2. the attempt to gain an anon token is success, but then the request fails with 401 3. at this point, in the third attempt, give chance to the flow to request a token but avoid any anon tokens. Please note: this happens effectively only on the first run of the flow. Subsequent do_request flow invocations should just succeed now on the 1st request by re-using the token --simplified behaviour introduced with this proposal Signed-off-by: tarilabs --- oras/auth/token.py | 19 +++++++++++-------- oras/provider.py | 14 ++++++++++++++ 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/oras/auth/token.py b/oras/auth/token.py index d35f734..c9c2c1f 100644 --- a/oras/auth/token.py +++ b/oras/auth/token.py @@ -44,7 +44,7 @@ def reset_basic_auth(self): self.set_header("Authorization", "Basic %s" % self._basic_auth) def authenticate_request( - self, original: requests.Response, headers: dict, refresh=False + self, original: requests.Response, headers: dict, refresh=False, skipAnonToken=False ): """ Authenticate Request @@ -73,17 +73,20 @@ def authenticate_request( h = auth_utils.parse_auth_header(authHeaderRaw) # First try to request an anonymous token - logger.debug("No Authorization, requesting anonymous token") - anon_token = self.request_anonymous_token(h) - if anon_token: - logger.debug("Successfully obtained anonymous token!") - self.token = anon_token - headers["Authorization"] = "Bearer %s" % self.token - return headers, True + if not skipAnonToken: + logger.debug("No Authorization, requesting anonymous token") + anon_token = self.request_anonymous_token(h) + if anon_token: + logger.debug("Successfully obtained anonymous token!") + self.token = anon_token + headers["Authorization"] = "Bearer %s" % self.token + return headers, True # Next try for logged in token + logger.debug("requesting auth token") token = self.request_token(h) if token: + logger.debug("Successfully obtained auth token!") self.token = token headers["Authorization"] = "Bearer %s" % self.token return headers, True diff --git a/oras/provider.py b/oras/provider.py index 308c878..4e45088 100644 --- a/oras/provider.py +++ b/oras/provider.py @@ -972,5 +972,19 @@ def do_request( stream=stream, verify=self._tls_verify, ) + # ...or attempt exchange anon token for auth token if 401 + if response.status_code == 401: + headers, changed = self.auth.authenticate_request( + response, headers, refresh=True, skipAnonToken=True + ) + response = self.session.request( + method, + url, + data=data, + json=json, + headers=headers, + stream=stream, + verify=self._tls_verify, + ) return response