Skip to content

Commit

Permalink
#103 Enabled SaaS connections (#104)
Browse files Browse the repository at this point in the history
* #103 Enabled SaaS connections

* #103 Made mypy happy

* #103 Fixed the compatibility issue with the requests

* #103 Added future annotations

* Apply suggestions from code review

Applying suggestions from the review

Co-authored-by: Christoph Kuhnke <github@kuhnke.net>

---------

Co-authored-by: Christoph Kuhnke <github@kuhnke.net>
  • Loading branch information
ahsimb and ckunki authored May 23, 2024
1 parent 7e9c937 commit c37251d
Show file tree
Hide file tree
Showing 7 changed files with 1,189 additions and 823 deletions.
2 changes: 2 additions & 0 deletions doc/changes/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Changes

* [unreleased](unreleased.md)
* [0.2.9](changes_0.2.9.md)
* [0.2.8](changes_0.2.8.md)
* [0.2.7](changes_0.2.7.md)
Expand All @@ -18,6 +19,7 @@
---
hidden:
---
unreleased
changes_0.2.9
changes_0.2.8
changes_0.2.7
Expand Down
5 changes: 5 additions & 0 deletions doc/changes/unreleased.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Unreleased

## Changes

* #103 Enabled SaaS connections for both the database and the BucketFS.
10 changes: 10 additions & 0 deletions exasol/nb_connector/ai_lab_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,13 @@ class AILabConfig(Enum):
sme_aws_bucket = auto()
sme_aws_role = auto()
sme_aws_connection = auto()
saas_url = auto()
saas_token = auto()
saas_account_id = auto()
saas_database_id = auto()
saas_database_name = auto()


class StorageBackend(Enum):
onprem = auto()
saas = auto()
232 changes: 193 additions & 39 deletions exasol/nb_connector/connections.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import annotations
import ssl
from pathlib import Path
from typing import (
Expand All @@ -10,9 +11,10 @@
from sqlalchemy.engine.url import URL # type: ignore

import exasol.bucketfs as bfs # type: ignore
import exasol.saas.client.api_access as saas_api # type: ignore
from exasol.nb_connector.secret_store import Secrets
from exasol.nb_connector.utils import optional_str_to_bool
from exasol.nb_connector.ai_lab_config import AILabConfig as CKey
from exasol.nb_connector.ai_lab_config import AILabConfig as CKey, StorageBackend


def _optional_encryption(conf: Secrets, key: CKey = CKey.db_encryption) -> Optional[bool]:
Expand Down Expand Up @@ -60,6 +62,88 @@ def _extract_ssl_options(conf: Secrets) -> dict:
return sslopt


def infer_backend(conf: Secrets,
backend_requirements: dict[StorageBackend, list[list[CKey]]]) -> StorageBackend:
"""
Infers the backend looking at the provided configuration data. Raises a ValueError exception
if the configuration does not satisfy any backend.
Parameters:
conf:
The secret store.
backend_requirements:
List of backends and required configuration items. For a specific
backend the required configuration items are specified by a list
of groups.
To allow access to a specific backend the configuration must
contain all items of the first group and at least 1 item of each
of the remaining groups.
"""

def group_match(group_no: int, key_group: list[CKey]) -> bool:
bool_func = all if group_no == 0 else any
return bool_func(conf.get(key) for key in key_group)

def all_groups_match(key_groups: list[list[CKey]]) -> bool:
return all(group_match(group_no, key_group)
for group_no, key_group in enumerate(key_groups))

def key_names(group_no: int, key_group: list[CKey]) -> str:
sep = ', ' if group_no == 0 else ' or '
return sep.join(key.name for key in key_group)

def expected_keys(key_groups: list[list[CKey]]) -> str:
return ', '.join(key_names(group_no, key_group)
for group_no, key_group in enumerate(key_groups))

for backend, backend_key_groups in backend_requirements.items():
if all_groups_match(backend_key_groups):
return backend

message = ('Incomplete parameter list. Please provide the parameters ' +
' or '.join(f'[{expected_keys(backend_key_groups)}] for {backend.name} database'
for backend, backend_key_groups in backend_requirements.items()))
raise ValueError(message)


def infer_db_backend(conf: Secrets) -> StorageBackend:
"""
Infers the backend for a database connection.
"""
return infer_backend(
conf,
{
StorageBackend.onprem: [
[CKey.db_host_name, CKey.db_port, CKey.db_user, CKey.db_password]
],
StorageBackend.saas: [
[CKey.saas_url, CKey.saas_token, CKey.saas_account_id],
[CKey.saas_database_id, CKey.saas_database_name]
]
}
)


def infer_bfs_backend(conf: Secrets) -> StorageBackend:
"""
Infers the backend for a BucketFS connection.
"""
return infer_backend(
conf,
{
StorageBackend.onprem: [
[CKey.bfs_port, CKey.bfs_user, CKey.bfs_password, CKey.bfs_bucket],
[CKey.bfs_host_name, CKey.db_host_name]
],
StorageBackend.saas: [
[CKey.saas_url, CKey.saas_token, CKey.saas_account_id],
[CKey.saas_database_id, CKey.saas_database_name]
]
}
)


def get_external_host(conf: Secrets) -> str:
"""Constructs the host part of a DB URL using provided configuration parameters."""
return f"{conf.get(CKey.db_host_name)}:{conf.get(CKey.db_port)}"
Expand All @@ -76,26 +160,43 @@ def get_udf_bucket_path(conf: Secrets) -> str:
def open_pyexasol_connection(conf: Secrets, **kwargs) -> pyexasol.ExaConnection:
"""
Opens a pyexasol connection using provided configuration parameters.
Supports both On-Prem and Saas backends.
Does NOT set the default schema, even if it is defined in the configuration.
Any additional parameters can be passed to pyexasol via the kwargs.
Parameters in kwargs override the correspondent values in the configuration.
The configuration should provide the following parameters:
- Server address and port (db_host_name, db_port),
- Client security credentials (db_user, db_password).
On-Prem:
- Server address and port (db_host_name, db_port),
- Client security credentials (db_user, db_password).
Saas:
- SaaS service url (saas_url),
- SaaS account id (saas_account_id),
- Database id or name (saas_database_id or saas_database_name),
- Client security credentials (saas_token).
Optional parameters include:
- Secured comm flag (db_encryption),
- Some of the SSL options (cert_vld, trusted_ca, client_cert).
If the schema is not provided then it should be set explicitly in every SQL statement.
For other optional parameters the default settings are as per the pyexasol interface.
"""

conn_params: dict[str, Any] = {
"dsn": get_external_host(conf),
"user": conf.get(CKey.db_user),
"password": conf.get(CKey.db_password),
}
if infer_db_backend(conf) == StorageBackend.onprem:
conn_params: dict[str, Any] = {
"dsn": get_external_host(conf),
"user": conf.get(CKey.db_user),
"password": conf.get(CKey.db_password),
}
else:
conn_params = saas_api.get_connection_params(
host=conf.get(CKey.saas_url),
account_id=conf.get(CKey.saas_account_id),
pat=conf.get(CKey.saas_token),
database_id=conf.get(CKey.saas_database_id),
database_name=conf.get(CKey.saas_database_name)
)

encryption = _optional_encryption(conf)
if encryption is not None:
Expand All @@ -112,14 +213,22 @@ def open_pyexasol_connection(conf: Secrets, **kwargs) -> pyexasol.ExaConnection:
def open_sqlalchemy_connection(conf: Secrets):
"""
Creates an Exasol SQLAlchemy websocket engine using provided configuration parameters.
Supports both On-Prem and Saas backends.
Sets the default schema if it is defined in the configuration.
The configuration should provide the following parameters:
- Server address and port (db_host_name, db_port),
- Client security credentials (db_user, db_password).
On-Prem:
- Server address and port (db_host_name, db_port),
- Client security credentials (db_user, db_password).
Saas:
- SaaS service url (saas_url),
- SaaS account id (saas_account_id),
- Database id or name (saas_database_id or saas_database_name),
- Client security credentials (saas_token).
Optional parameters include:
- Secured comm flag (db_encryption).
- Validation of the server's TLS/SSL certificate by the client (cert_vld).
If the schema is not provided then it should be set explicitly in every SQL statement.
For other optional parameters the default settings are as per the Exasol SQLAlchemy interface.
Currently, it's not possible to use a bundle of trusted CAs other than the default. Neither
Expand All @@ -134,48 +243,93 @@ def open_sqlalchemy_connection(conf: Secrets):
if (certificate_validation is not None) and (not certificate_validation):
query_params['SSLCertificate'] = 'SSL_VERIFY_NONE'

if infer_db_backend(conf) == StorageBackend.onprem:
conn_params: dict[str, Any] = {
"host": conf.get(CKey.db_host_name),
"port": int(conf.get(CKey.db_port)), # type: ignore
"username": conf.get(CKey.db_user),
"password": conf.get(CKey.db_password),
}
else:
conn_params = saas_api.get_connection_params(
host=conf.get(CKey.saas_url),
account_id=conf.get(CKey.saas_account_id),
pat=conf.get(CKey.saas_token),
database_id=conf.get(CKey.saas_database_id),
database_name=conf.get(CKey.saas_database_name)
)
host, port = str(conn_params['dsn']).split(':')
conn_params = {
"host": host,
"port": int(port),
"username": conn_params['user'],
"password": conn_params['password']
}

websocket_url = URL.create('exa+websocket',
username=conf.get(CKey.db_user),
password=conf.get(CKey.db_password),
host=conf.get(CKey.db_host_name),
port=int(getattr(conf, CKey.db_port.name)),
**conn_params,
database=conf.get(CKey.db_schema),
query=query_params
)

return sqlalchemy.create_engine(websocket_url)


def open_bucketfs_connection(conf: Secrets) -> bfs.Bucket:
def open_bucketfs_connection(conf: Secrets) -> bfs.BucketLike:
"""
Connects to a BucketFS service using provided configuration parameters.
Returns the Bucket object for the bucket selected in the configuration.
Returns the BucketLike object for the bucket selected in the configuration.
Supports both On-Prem and Saas backends.
The configuration should provide the following parameters;
- Host name and port of the BucketFS service (bfs_host_name or db_host_name, bfs_port),
- Client security credentials (bfs_user, bfs_password).
- Bucket name (bfs_bucket)
Optional parameters include:
- Secured comm flag (bfs_encryption), defaults to False.
- Some of the SSL options (cert_vld, trusted_ca).
On-Prem:
- Host name and port of the BucketFS service (bfs_host_name or db_host_name, bfs_port),
- Client security credentials (bfs_user, bfs_password).
- Bucket name (bfs_bucket)
Optional parameters include:
- Secured comm flag (bfs_encryption), defaults to False.
- Some of the SSL options (cert_vld, trusted_ca).
Saas:
- SaaS service url (saas_url),
- SaaS account id (saas_account_id),
- Database id or name (saas_database_id or saas_database_name),
- Client security credentials (saas_token).
"""

# Set up the connection parameters.
buckfs_url_prefix = "https" if _optional_encryption(conf, CKey.bfs_encryption) else "http"
buckfs_host = conf.get(CKey.bfs_host_name, conf.get(CKey.db_host_name))
buckfs_url = f"{buckfs_url_prefix}://{buckfs_host}:{conf.get(CKey.bfs_port)}"

sslopt = _extract_ssl_options(conf)
verify = sslopt.get("cert_reqs") == ssl.CERT_REQUIRED
verify = sslopt.get("ca_certs") or sslopt.get("ca_cert_path") or verify

buckfs_credentials = {
conf.get(CKey.bfs_bucket): {
"username": conf.get(CKey.bfs_user),
"password": conf.get(CKey.bfs_password),
if infer_db_backend(conf) == StorageBackend.onprem:
# Set up the connection parameters.
buckfs_url_prefix = "https" if _optional_encryption(conf, CKey.bfs_encryption) else "http"
buckfs_host = conf.get(CKey.bfs_host_name, conf.get(CKey.db_host_name))
buckfs_url = f"{buckfs_url_prefix}://{buckfs_host}:{conf.get(CKey.bfs_port)}"

sslopt = _extract_ssl_options(conf)
verify = sslopt.get("cert_reqs") == ssl.CERT_REQUIRED
verify = sslopt.get("ca_certs") or sslopt.get("ca_cert_path") or verify

buckfs_credentials = {
conf.get(CKey.bfs_bucket): {
"username": conf.get(CKey.bfs_user),
"password": conf.get(CKey.bfs_password),
}
}
}

# Connect to the BucketFS service and navigate to the bucket of choice.
bucketfs = bfs.Service(buckfs_url, buckfs_credentials, verify)
return bucketfs[conf.get(CKey.bfs_bucket)]
# Connect to the BucketFS service and navigate to the bucket of choice.
bucketfs = bfs.Service(buckfs_url, buckfs_credentials, verify)
return bucketfs[conf.get(CKey.bfs_bucket)]

else:
saas_url, saas_token, saas_account_id, saas_database_id = [
conf.get(key) for key in [
CKey.saas_url, CKey.saas_token, CKey.saas_account_id, CKey.saas_database_id]
]
saas_database_id = (saas_database_id or
saas_api.get_database_id(
host=saas_url,
account_id=saas_account_id,
pat=saas_token,
database_name=conf.get(CKey.saas_database_name)
))
return bfs.SaaSBucket(url=saas_url,
account_id=saas_account_id,
database_id=saas_database_id,
pat=saas_token)
Loading

0 comments on commit c37251d

Please sign in to comment.