Skip to content

Commit

Permalink
Refactor session params, promote to beta (ClickHouse#12)
Browse files Browse the repository at this point in the history
* Refactor settings validation and passthrough, promote to Beta status

* Tiny cleanup

* Quiet linter

* Fix non-local test code
  • Loading branch information
genzgd authored Jun 8, 2022
1 parent 8637d1b commit 2096cc1
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 25 deletions.
16 changes: 7 additions & 9 deletions clickhouse_connect/driver/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,11 @@ def _apply_settings(self, settings: Dict[str, Any] = None):
def _validate_settings(self, settings: Dict[str, Any]):
validated = {}
for key, value in settings.items():
setting_def = self.server_settings.get(key)
if setting_def is None:
logger.warning('Setting %s is not valid, ignoring', key)
continue
if setting_def.readonly:
logger.warning('Setting %s is read only, ignoring', key)
continue
if 'session' not in key:
setting_def = self.server_settings.get(key)
if setting_def is None or setting_def.readonly:
logger.warning('Setting %s is not valid or read only, ignoring', key)
continue
validated[key] = value
return validated

Expand Down Expand Up @@ -105,7 +103,7 @@ def exec_query(self, query: str, settings: Optional[Dict] = None, use_none: bool
:return: QueryResult of data and metadata returned by ClickHouse
"""

def command(self, cmd: str, parameters=None, use_database:bool = True, settings: Dict[str, str] = None) \
def command(self, cmd: str, parameters=None, use_database: bool = True, settings: Dict[str, str] = None) \
-> Union[str, int, Sequence[str]]:
"""
Client method that returns a single value instead of a result set
Expand All @@ -123,7 +121,7 @@ def command(self, cmd: str, parameters=None, use_database:bool = True, settings:
return self.exec_command(cmd, use_database, settings)

@abstractmethod
def exec_command(self, cmd, use_database: bool = True, settings:Dict[str, str] = None) -> Union[
def exec_command(self, cmd, use_database: bool = True, settings: Dict[str, str] = None) -> Union[
str, int, Sequence[str]]:
"""
Subclass implementation of the client query function
Expand Down
16 changes: 6 additions & 10 deletions clickhouse_connect/driver/httpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,15 @@ def __init__(self, interface: str, host: str, port: int, username: str, password
self.session = session
self.connect_timeout = connect_timeout
self.read_timeout = send_receive_timeout
self.common_settings = {}
super().__init__(database=database, query_limit=query_limit, uri=self.url, settings=kwargs)

def _apply_settings(self, settings: Dict[str, Any] = None):
valid_settings = self._validate_settings(settings)
for key, value in valid_settings.items():
if isinstance(value, bool):
self.common_settings[key] = '1' if value else '0'
self.session.params[key] = '1' if value else '0'
else:
self.common_settings[key] = str(value)
self.session.params[key] = str(value)

def _format_query(self, query: str) -> str:
query = query.strip()
Expand All @@ -127,8 +126,7 @@ def exec_query(self, query: str, settings: Optional[Dict] = None, use_none: bool
See BaseClient doc_string for this method
"""
headers = {'Content-Type': 'text/plain'}
params = self.common_settings.copy()
params['database'] = self.database
params = {'database': self.database }
if settings:
params.update(settings)
if columns_only_re.search(query):
Expand Down Expand Up @@ -160,9 +158,8 @@ def data_insert(self, table: str, column_names: Sequence[str], data: Sequence[Se
See BaseClient doc_string for this method
"""
headers = {'Content-Type': 'application/octet-stream'}
params = self.common_settings.copy()
params['query'] = f"INSERT INTO {table} ({', '.join(column_names)}) FORMAT {self.write_format}"
params['database'] = self.database
params = {'query': f"INSERT INTO {table} ({', '.join(column_names)}) FORMAT {self.write_format}",
'database': self.database}
if settings:
params.update(settings)
insert_block = self.build_insert(data, column_types=column_types, column_names=column_names, column_oriented=column_oriented)
Expand All @@ -174,8 +171,7 @@ def exec_command(self, cmd, use_database: bool = True, settings: Optional[Dict]
See BaseClient doc_string for this method
"""
headers = {'Content-Type': 'text/plain'}
params = self.common_settings.copy()
params['query'] = cmd
params = {'query': cmd}
if use_database:
params['database'] = self.database
if settings:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def run_setup(try_c: bool = True):
'superset.db_engine_specs': ['clickhousedb=clickhouse_connect.cc_superset.engine:ClickHouseEngineSpec']
},
classifiers=[
'Development Status :: 3 - Alpha',
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python :: 3.7',
Expand Down
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ def pytest_addoption(parser):
parser.addoption('--test-db', help='Test database, will not be cleaned up')
parser.addoption('--tls', default=False, action='store_true')
parser.addoption('--no-tls', dest='tls', action='store_false')
parser.addoption('--local', default=False, action='store_true')
12 changes: 7 additions & 5 deletions tests/integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class TestConfig(NamedTuple):
use_docker: bool
test_database: str
cleanup: bool
local: bool

@property
def cloud(self):
Expand All @@ -43,12 +44,13 @@ def test_config_fixture(request) -> Iterator[TestConfig]:
username = request.config.getoption('username')
password = request.config.getoption('password')
cleanup = request.config.getoption('cleanup')
local = request.config.getoption('local')
test_database = request.config.getoption('test_db', None)
if test_database:
cleanup = False
else:
test_database = 'cc_test'
yield TestConfig(interface, host, port, username, password, use_docker, test_database, cleanup)
yield TestConfig(interface, host, port, username, password, use_docker, test_database, cleanup, local)


@fixture(scope='session', name='test_db')
Expand All @@ -75,7 +77,7 @@ def test_client_fixture(test_config: TestConfig, test_db: str) -> Iterator[Clien
while True:
tries += 1
try:
driver = create_client(interface=test_config.interface,
client = create_client(interface=test_config.interface,
host=test_config.host,
port=test_config.port,
username=test_config.username,
Expand All @@ -86,9 +88,9 @@ def test_client_fixture(test_config: TestConfig, test_db: str) -> Iterator[Clien
raise Exception('Failed to connect to ClickHouse server after 30 seconds') from ex
sleep(1)
if test_db != 'default':
driver.command(f'CREATE DATABASE IF NOT EXISTS {test_db}', use_database=False)
driver.database = test_db
yield driver
client.command(f'CREATE DATABASE IF NOT EXISTS {test_db}', use_database=False)
client.database = test_db
yield client
if test_config.use_docker:
down_result = run_cmd(['docker-compose', '-f', compose_file, 'down', '-v'])
if down_result[0]:
Expand Down
25 changes: 25 additions & 0 deletions tests/integration_tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from decimal import Decimal
from time import sleep

from clickhouse_connect import create_client
from clickhouse_connect.driver.client import Client
from clickhouse_connect.driver.options import HAS_NUMPY, HAS_PANDAS
from clickhouse_connect.driver.query import QueryResult
from tests.integration_tests.conftest import TestConfig


def test_query(test_client: Client):
Expand Down Expand Up @@ -32,6 +35,28 @@ def test_decimal_conv(test_client: Client, test_table_engine: str):
assert result == [(5, -182, 55.2), (57238478234, 77, -29.5773)]


def test_session_params(test_config: TestConfig):
client = create_client(interface=test_config.interface,
host=test_config.host,
port=test_config.port,
username=test_config.username,
password=test_config.password,
session_id='TEST_SESSION_ID')
result = client.exec_query('SELECT number FROM system.numbers LIMIT 5',
settings={'query_id': 'test_session_params'}).result_set
assert len(result) == 5
if test_config.local:
sleep(10) # Allow the log entries to flush to tables
result = client.exec_query(
"SELECT session_id, user FROM system.session_log WHERE session_id = 'TEST_SESSION_ID' AND " +
'event_time > now() - 30').result_set
assert result[0] == ('TEST_SESSION_ID', test_config.username)
result = client.exec_query(
"SELECT query_id, user FROM system.query_log WHERE query_id = 'test_session_params' AND " +
'event_time > now() - 30').result_set
assert result[0] == ('test_session_params', test_config.username)


def test_numpy(test_client: Client):
if HAS_NUMPY:
np_array = test_client.query_np('SELECT * FROM system.tables')
Expand Down

0 comments on commit 2096cc1

Please sign in to comment.