Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[auth] More easily add developers to developer namespaces #13180

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 58 additions & 7 deletions auth/auth/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@

import aiohttp
import aiohttp_session
import kubernetes_asyncio.client
import kubernetes_asyncio.client.rest
import kubernetes_asyncio.config
import uvloop
from aiohttp import web
from prometheus_async.aio.web import server_stats # type: ignore

from gear import (
AuthClient,
Database,
K8sCache,
Transaction,
check_csrf_token,
create_session,
Expand Down Expand Up @@ -53,6 +57,9 @@

CLOUD = get_global_config()['cloud']
ORGANIZATION_DOMAIN = os.environ['HAIL_ORGANIZATION_DOMAIN']
DEFAULT_NAMESPACE = os.environ['HAIL_DEFAULT_NAMESPACE']

is_test_deployment = DEFAULT_NAMESPACE != 'default'

deploy_config = get_deploy_config()

Expand Down Expand Up @@ -124,7 +131,14 @@ async def check_valid_new_user(tx: Transaction, username, login_id, is_developer


async def insert_new_user(
db: Database, username: str, login_id: Optional[str], is_developer: bool, is_service_account: bool
db: Database,
username: str,
login_id: Optional[str],
is_developer: bool,
is_service_account: bool,
*,
hail_identity: Optional[str] = None,
hail_credentials_secret_name: Optional[str] = None,
) -> bool:
@transaction(db)
async def _insert(tx):
Expand All @@ -134,10 +148,18 @@ async def _insert(tx):

await tx.execute_insertone(
'''
INSERT INTO users (state, username, login_id, is_developer, is_service_account)
VALUES (%s, %s, %s, %s, %s);
INSERT INTO users (state, username, login_id, is_developer, is_service_account, hail_identity, hail_credentials_secret_name)
VALUES (%s, %s, %s, %s, %s, %s, %s);
''',
('creating', username, login_id, is_developer, is_service_account),
(
'creating',
username,
login_id,
is_developer,
is_service_account,
hail_identity,
hail_credentials_secret_name,
),
)

await _insert() # pylint: disable=no-value-for-parameter
Expand Down Expand Up @@ -367,8 +389,29 @@ async def create_user(request: web.Request, userdata): # pylint: disable=unused
is_developer = body['is_developer']
is_service_account = body['is_service_account']

hail_identity = body.get('hail_identity')
hail_credentials_secret_name = body.get('hail_credentials_secret_name')
if (hail_identity or hail_credentials_secret_name) and not is_test_deployment:
raise web.HTTPBadRequest(text='Cannot specify an existing hail identity for a new user')
if hail_credentials_secret_name:
try:
k8s_cache: K8sCache = request.app['k8s_cache']
await k8s_cache.read_secret(hail_credentials_secret_name, DEFAULT_NAMESPACE)
except kubernetes_asyncio.client.rest.ApiException as e:
raise web.HTTPBadRequest(
text=f'hail credentials secret name specified but was not found in namespace {DEFAULT_NAMESPACE}: {hail_credentials_secret_name}'
) from e

try:
await insert_new_user(db, username, login_id, is_developer, is_service_account)
await insert_new_user(
db,
username,
login_id,
is_developer,
is_service_account,
hail_identity=hail_identity,
hail_credentials_secret_name=hail_credentials_secret_name,
)
except AuthUserError as e:
raise e.http_response()

Expand Down Expand Up @@ -750,12 +793,20 @@ async def on_startup(app):
app['client_session'] = httpx.client_session()
app['flow_client'] = get_flow_client('/auth-oauth2-client-secret/client_secret.json')

kubernetes_asyncio.config.load_incluster_config()
app['k8s_client'] = kubernetes_asyncio.client.CoreV1Api()
app['k8s_cache'] = K8sCache(app['k8s_client'])


async def on_cleanup(app):
try:
await app['db'].async_close()
k8s_client: kubernetes_asyncio.client.CoreV1Api = app['k8s_client']
await k8s_client.api_client.rest_client.pool_manager.close()
finally:
await app['client_session'].close()
try:
await app['db'].async_close()
finally:
await app['client_session'].close()


class AuthAccessLogger(AccessLogger):
Expand Down
122 changes: 10 additions & 112 deletions auth/auth/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import logging
import os
import random
import secrets
from typing import Any, Awaitable, Callable, Dict, List, Optional
from typing import Any, Awaitable, Callable, Dict, List

import aiohttp
import kubernetes_asyncio.client
Expand All @@ -17,7 +16,6 @@
from gear.cloud_config import get_gcp_config, get_global_config
from hailtop import aiotools, httpx
from hailtop import batch_client as bc
from hailtop.auth.sql_config import SQLConfig, create_secret_data_from_config
from hailtop.utils import secret_alnum_string, time_msecs

log = logging.getLogger('auth.driver')
Expand All @@ -34,7 +32,7 @@ class DatabaseConflictError(Exception):


class EventHandler:
def __init__(self, handler, event=None, bump_secs=60.0, min_delay_secs=0.1):
def __init__(self, handler, event=None, bump_secs=5.0, min_delay_secs=0.1):
self.handler = handler
if event is None:
event = asyncio.Event()
Expand Down Expand Up @@ -234,86 +232,6 @@ async def delete(self):
self.app_obj_id = None


class DatabaseResource:
def __init__(self, db_instance, name=None):
self.db_instance = db_instance
self.name = name
self.password = None

async def create(self, name):
assert self.name is None

if is_test_deployment:
return

await self._delete(name)

self.password = secrets.token_urlsafe(16)
await self.db_instance.just_execute(
f'''
CREATE DATABASE `{name}`;

CREATE USER '{name}'@'%' IDENTIFIED BY '{self.password}';
GRANT ALL ON `{name}`.* TO '{name}'@'%';
'''
)
self.name = name

def secret_data(self):
with open('/database-server-config/sql-config.json', 'r', encoding='utf-8') as f:
server_config = SQLConfig.from_json(f.read())
with open('/database-server-config/server-ca.pem', 'r', encoding='utf-8') as f:
server_ca = f.read()
client_cert: Optional[str]
client_key: Optional[str]
if server_config.using_mtls():
with open('/database-server-config/client-cert.pem', 'r', encoding='utf-8') as f:
client_cert = f.read()
with open('/database-server-config/client-key.pem', 'r', encoding='utf-8') as f:
client_key = f.read()
else:
client_cert = None
client_key = None

if is_test_deployment:
return create_secret_data_from_config(server_config, server_ca, client_cert, client_key)

assert self.name is not None
assert self.password is not None

config = SQLConfig(
host=server_config.host,
port=server_config.port,
user=self.name,
password=self.password,
instance=server_config.instance,
connection_name=server_config.connection_name,
db=self.name,
ssl_ca='/sql-config/server-ca.pem',
ssl_cert='/sql-config/client-cert.pem' if client_cert is not None else None,
ssl_key='/sql-config/client-key.pem' if client_key is not None else None,
ssl_mode='VERIFY_CA',
)
return create_secret_data_from_config(config, server_ca, client_cert, client_key)

async def _delete(self, name):
if is_test_deployment:
return

# no DROP USER IF EXISTS in current db version
row = await self.db_instance.execute_and_fetchone('SELECT 1 FROM mysql.user WHERE User = %s;', (name,))
if row is not None:
await self.db_instance.just_execute(f"DROP USER '{name}';")

await self.db_instance.just_execute(f'DROP DATABASE IF EXISTS `{name}`;')

async def delete(self):
if self.name is None:
return
await self._delete(self.name)
self.name = None


class K8sNamespaceResource:
def __init__(self, k8s_client, name=None):
self.k8s_client = k8s_client
Expand Down Expand Up @@ -410,7 +328,6 @@ async def delete(self):


async def _create_user(app, user, skip_trial_bp, cleanup):
db_instance = app['db_instance']
db = app['db']
k8s_client = app['k8s_client']
identity_client = app['identity_client']
Expand Down Expand Up @@ -481,21 +398,14 @@ async def _create_user(app, user, skip_trial_bp, cleanup):
updates['hail_credentials_secret_name'] = hail_credentials_secret_name

namespace_name = user['namespace_name']
if namespace_name is None and user['is_developer'] == 1:
# auth services in test namespaces cannot/should not be creating and deleting namespaces
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would want a check here if it is a dev deployment that the expected secret given by hail_credentials_secret_name exists in that namespace using the k8s client.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this should be the create_user function in auth.py

if namespace_name is None and user['is_developer'] == 1 and not is_test_deployment:
Copy link
Contributor Author

@daniel-goldstein daniel-goldstein Jun 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prior to this line of code, adding a developer to a dev namespace through the auth API would cause their namespace to be deleted. It feels generally correct that a dev/test auth should not be creating/deleting namespaces.

namespace_name = ident
namespace = K8sNamespaceResource(k8s_client)
cleanup.append(namespace.delete)
await namespace.create(namespace_name)
updates['namespace_name'] = namespace_name

db_resource = DatabaseResource(db_instance)
cleanup.append(db_resource.delete)
await db_resource.create(ident)

db_secret = K8sSecretResource(k8s_client)
cleanup.append(db_secret.delete)
await db_secret.create('database-server-config', namespace_name, db_resource.secret_data())

if not skip_trial_bp and user['is_service_account'] != 1:
trial_bp = user['trial_bp_name']
if trial_bp is None:
Expand Down Expand Up @@ -536,7 +446,6 @@ async def create_user(app, user, skip_trial_bp=False):


async def delete_user(app, user):
db_instance = app['db_instance']
db = app['db']
k8s_client = app['k8s_client']
identity_client = app['identity_client']
Expand Down Expand Up @@ -572,9 +481,6 @@ async def delete_user(app, user):
namespace = K8sNamespaceResource(k8s_client, namespace_name)
await namespace.delete()

db_resource = DatabaseResource(db_instance, user['username'])
await db_resource.delete()

trial_bp_name = user['trial_bp_name']
if trial_bp_name is not None:
batch_client = app['batch_client']
Expand Down Expand Up @@ -619,10 +525,6 @@ async def async_main():

app['client_session'] = httpx.client_session()

db_instance = Database()
await db_instance.async_init(maxsize=50, config_file='/database-server-config/sql-config.json')
app['db_instance'] = db_instance

kubernetes_asyncio.config.load_incluster_config()
app['k8s_client'] = kubernetes_asyncio.client.CoreV1Api()

Expand All @@ -647,18 +549,14 @@ async def users_changed_handler():
await app['db'].async_close()
finally:
try:
if 'db_instance_pool' in app:
await app['db_instance_pool'].async_close()
await app['client_session'].close()
finally:
try:
await app['client_session'].close()
if user_creation_loop is not None:
user_creation_loop.shutdown()
finally:
try:
if user_creation_loop is not None:
user_creation_loop.shutdown()
await app['identity_client'].close()
finally:
try:
await app['identity_client'].close()
finally:
k8s_client: kubernetes_asyncio.client.CoreV1Api = app['k8s_client']
await k8s_client.api_client.rest_client.pool_manager.close()
k8s_client: kubernetes_asyncio.client.CoreV1Api = app['k8s_client']
await k8s_client.api_client.rest_client.pool_manager.close()
3 changes: 1 addition & 2 deletions batch/batch/driver/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import aiohttp

from gear import Database
from gear import Database, K8sCache
from hailtop import httpx
from hailtop.aiotools import BackgroundTaskManager
from hailtop.utils import Notice, retry_transient_errors, time_msecs
Expand All @@ -21,7 +21,6 @@
from ..instance_config import QuantifiedResource
from ..spec_writer import SpecWriter
from .instance import Instance
from .k8s_cache import K8sCache

if TYPE_CHECKING:
from .instance_collection import InstanceCollectionManager # pylint: disable=cyclic-import
Expand Down
2 changes: 1 addition & 1 deletion batch/batch/driver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from gear import (
AuthClient,
Database,
K8sCache,
check_csrf_token,
json_request,
json_response,
Expand Down Expand Up @@ -59,7 +60,6 @@
from .driver import CloudDriver
from .instance_collection import InstanceCollectionManager, JobPrivateInstanceManager, Pool
from .job import mark_job_complete, mark_job_started
from .k8s_cache import K8sCache

uvloop.install()

Expand Down
Loading