1
+ import asyncio
1
2
import json
2
3
import logging
3
4
import ssl
5
+ from contextlib import suppress
4
6
from io import BytesIO
5
7
from pathlib import Path
6
8
from typing import Any , Optional
@@ -54,6 +56,7 @@ def __init__(
54
56
auth_cert_key_path : Optional [str ] = None ,
55
57
token : Optional [str ] = None ,
56
58
token_path : Optional [str ] = None ,
59
+ token_update_interval_s : int = 300 ,
57
60
conn_timeout_s : int = 300 ,
58
61
read_timeout_s : int = 100 ,
59
62
conn_pool_size : int = 100 ,
@@ -70,13 +73,15 @@ def __init__(
70
73
self ._auth_cert_key_path = auth_cert_key_path
71
74
self ._token = token
72
75
self ._token_path = token_path
76
+ self ._token_update_interval_s = token_update_interval_s
73
77
74
78
self ._conn_timeout_s = conn_timeout_s
75
79
self ._read_timeout_s = read_timeout_s
76
80
self ._conn_pool_size = conn_pool_size
77
81
self ._trace_configs = trace_configs
78
82
79
83
self ._client : Optional [aiohttp .ClientSession ] = None
84
+ self ._token_updater_task : Optional [asyncio .Task [None ]] = None
80
85
81
86
self ._dummy_secret_key = SECRET_DUMMY_KEY
82
87
@@ -98,34 +103,36 @@ def _create_ssl_context(self) -> Optional[ssl.SSLContext]:
98
103
return ssl_context
99
104
100
105
async def init (self ) -> None :
101
- self ._client = await self .create_http_client ()
102
-
103
- async def init_if_needed (self ) -> None :
104
- if not self ._client or self ._client .closed :
105
- await self .init ()
106
-
107
- async def create_http_client (self ) -> aiohttp .ClientSession :
108
106
connector = aiohttp .TCPConnector (
109
107
limit = self ._conn_pool_size , ssl = self ._create_ssl_context ()
110
108
)
111
- if self ._auth_type == KubeClientAuthType .TOKEN :
112
- token = self ._token
113
- if not token :
114
- assert self ._token_path is not None
115
- token = Path (self ._token_path ).read_text ()
116
- headers = {"Authorization" : "Bearer " + token }
117
- else :
118
- headers = {}
109
+ if self ._token_path :
110
+ self ._token = Path (self ._token_path ).read_text ()
111
+ self ._token_updater_task = asyncio .create_task (self ._start_token_updater ())
119
112
timeout = aiohttp .ClientTimeout (
120
113
connect = self ._conn_timeout_s , total = self ._read_timeout_s
121
114
)
122
- return aiohttp .ClientSession (
115
+ self . _client = aiohttp .ClientSession (
123
116
connector = connector ,
124
117
timeout = timeout ,
125
- headers = headers ,
126
118
trace_configs = self ._trace_configs ,
127
119
)
128
120
121
+ async def _start_token_updater (self ) -> None :
122
+ if not self ._token_path :
123
+ return
124
+ while True :
125
+ try :
126
+ token = Path (self ._token_path ).read_text ()
127
+ if token != self ._token :
128
+ self ._token = token
129
+ logger .info ("Kube token was refreshed" )
130
+ except asyncio .CancelledError :
131
+ raise
132
+ except Exception as exc :
133
+ logger .exception ("Failed to update kube token: %s" , exc )
134
+ await asyncio .sleep (self ._token_update_interval_s )
135
+
129
136
@property
130
137
def namespace (self ) -> str :
131
138
return self ._namespace
@@ -134,6 +141,11 @@ async def close(self) -> None:
134
141
if self ._client :
135
142
await self ._client .close ()
136
143
self ._client = None
144
+ if self ._token_updater_task :
145
+ self ._token_updater_task .cancel ()
146
+ with suppress (asyncio .CancelledError ):
147
+ await self ._token_updater_task
148
+ self ._token_updater_task = None
137
149
138
150
async def __aenter__ (self ) -> "KubeClient" :
139
151
await self .init ()
@@ -160,23 +172,22 @@ def _generate_secret_url(
160
172
all_secrets_url = self ._generate_all_secrets_url (namespace_name )
161
173
return f"{ all_secrets_url } /{ secret_name } "
162
174
175
+ def _create_headers (
176
+ self , headers : Optional [dict [str , Any ]] = None
177
+ ) -> dict [str , Any ]:
178
+ headers = dict (headers ) if headers else {}
179
+ if self ._auth_type == KubeClientAuthType .TOKEN and self ._token :
180
+ headers ["Authorization" ] = "Bearer " + self ._token
181
+ return headers
182
+
163
183
async def _request (self , * args : Any , ** kwargs : Any ) -> dict [str , Any ]:
164
- await self .init_if_needed ( )
184
+ headers = self ._create_headers ( kwargs . pop ( "headers" , None ) )
165
185
assert self ._client , "client is not initialized"
166
- doing_retry = kwargs .pop ("doing_retry" , False )
167
-
168
- async with self ._client .request (* args , ** kwargs ) as response :
186
+ async with self ._client .request (* args , headers = headers , ** kwargs ) as response :
169
187
payload = await response .json ()
170
- try :
188
+ logging . debug ( "k8s response payload: %s" , payload )
171
189
self ._raise_for_status (payload )
172
190
return payload
173
- except KubeClientUnauthorized :
174
- if doing_retry :
175
- raise
176
- # K8s SA's token might be stale, need to refresh it and retry
177
- await self ._reload_http_client ()
178
- kwargs ["doing_retry" ] = True
179
- return await self ._request (* args , ** kwargs )
180
191
181
192
def _raise_for_status (self , payload : dict [str , Any ]) -> None :
182
193
kind = payload ["kind" ]
@@ -196,11 +207,6 @@ def _raise_for_status(self, payload: dict[str, Any]) -> None:
196
207
raise ResourceConflict (payload ["message" ])
197
208
raise KubeClientException (payload ["message" ])
198
209
199
- async def _reload_http_client (self ) -> None :
200
- await self .close ()
201
- self ._token = None
202
- await self .init ()
203
-
204
210
async def create_secret (
205
211
self ,
206
212
secret_name : str ,
0 commit comments