-
Notifications
You must be signed in to change notification settings - Fork 34
/
token.py
167 lines (134 loc) · 5.23 KB
/
token.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
__author__ = "Vanessa Sochat"
__copyright__ = "Copyright The ORAS Authors."
__license__ = "Apache-2.0"
import requests
import oras.auth.utils as auth_utils
from oras.logger import logger
from .base import AuthBackend
class TokenAuth(AuthBackend):
"""
Token (OAuth2) style auth.
"""
def __init__(self):
self.token = None
super().__init__()
def _logout(self):
self.token = None
def set_token_auth(self, token: str):
"""
Set token authentication.
:param token: the bearer token
:type token: str
"""
self.token = token
def get_auth_header(self):
return {"Authorization": "Bearer %s" % self.token}
def reset_basic_auth(self):
"""
Given we have basic auth, reset it.
"""
if "Authorization" in self.headers:
del self.headers["Authorization"]
if self._basic_auth:
self.set_header("Authorization", "Basic %s" % self._basic_auth)
def authenticate_request(
self, original: requests.Response, headers: dict, refresh=False
):
"""
Authenticate Request
Given a response, look for a Www-Authenticate header to parse.
We return True/False to indicate if the request should be retried.
:param original: original response to get the Www-Authenticate header
:type original: requests.Response
"""
if refresh:
self.token = None
authHeaderRaw = original.headers.get("Www-Authenticate")
if not authHeaderRaw:
logger.debug(
"Www-Authenticate not found in original response, cannot authenticate."
)
return headers, False
# If we have a token, set auth header (base64 encoded user/pass)
if self.token:
headers["Authorization"] = "Bearer %s" % self.token
return headers, True
h = auth_utils.parse_auth_header(authHeaderRaw)
# First try to request an anonymous token
logger.debug("No Authorization, requesting anonymous token")
anon_token = self.request_anonymous_token(h)
if anon_token:
logger.debug("Successfully obtained anonymous token!")
self.token = anon_token
headers["Authorization"] = "Bearer %s" % self.token
return headers, True
# Next try for logged in token
token = self.request_token(h)
if token:
self.token = token
headers["Authorization"] = "Bearer %s" % self.token
return headers, True
logger.error(
"This endpoint requires a token. Please use "
"basic auth with a username or password."
)
return headers, False
def request_token(self, h: auth_utils.authHeader) -> bool:
"""
Request an authenticated token and save for later.s
"""
params = {}
headers = {}
# Prepare request to retry
if h.service:
logger.debug(f"Service: {h.service}")
params["service"] = h.service
headers.update(
{
"Service": h.service,
"Accept": "application/json",
"User-Agent": "oras-py",
}
)
# Ensure the realm starts with http
if not h.realm.startswith("http"): # type: ignore
h.realm = f"{self.prefix}://{h.realm}"
# If the www-authenticate included a scope, honor it!
if h.scope:
logger.debug(f"Scope: {h.scope}")
params["scope"] = h.scope
# Set Basic Auth to receive token
headers["Authorization"] = "Basic %s" % self._basic_auth
authResponse = self.session.get(h.realm, headers=headers, params=params) # type: ignore
if authResponse.status_code != 200:
logger.debug(f"Auth response was not successful: {authResponse.text}")
return
# Request the token
info = authResponse.json()
return info.get("token") or info.get("access_token")
def request_anonymous_token(self, h: auth_utils.authHeader) -> bool:
"""
Given no basic auth, fall back to trying to request an anonymous token.
Returns: boolean if headers have been updated with token.
"""
if not h.realm:
logger.debug("Request anonymous token: no realm provided, exiting early")
return
params = {}
if h.service:
params["service"] = h.service
if h.scope:
params["scope"] = h.scope
logger.debug(f"Final params are {params}")
response = self.session.request("GET", h.realm, params=params)
if response.status_code != 200:
logger.debug(f"Response for anon token failed: {response.text}")
return
# From https://docs.docker.com/registry/spec/auth/token/ section
# We can get token OR access_token OR both (when both they are identical)
data = response.json()
token = data.get("token") or data.get("access_token")
# Update the headers but not self.token (expects Basic)
if token:
return token
logger.debug("Warning: no token or access_token present in response.")