From 8d4b4a66f269e5e77d9c199a6b6a6c1c5b63076e Mon Sep 17 00:00:00 2001 From: Nick Wang <33473924+nick2wang@users.noreply.github.com> Date: Thu, 17 Feb 2022 09:00:57 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81ClickHouse=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=20(#1384)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 支持ClickHouse查询 支持ClickHouse查询 #1343 * 版本兼容性优化 版本兼容性优化 --- requirements.txt | 1 + sql/engines/__init__.py | 6 + sql/engines/clickhouse.py | 176 ++++++++++++++++++++++++++++++ sql/engines/tests.py | 141 ++++++++++++++++++++++++ sql/models.py | 1 + sql/templates/instance.html | 1 + sql/templates/queryapplylist.html | 4 + sql/templates/sqlquery.html | 17 +++ 8 files changed, 347 insertions(+) create mode 100644 sql/engines/clickhouse.py diff --git a/requirements.txt b/requirements.txt index 67531c2415..8d1359f51a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,3 +27,4 @@ httptools==0.1.1 uvicorn==0.12.2 pycryptodome==3.10.1 pyodps==0.10.7.1 +clickhouse-driver==0.2.3 diff --git a/sql/engines/__init__.py b/sql/engines/__init__.py index 728a3429ca..00d91f3d91 100644 --- a/sql/engines/__init__.py +++ b/sql/engines/__init__.py @@ -181,4 +181,10 @@ def get_engine(instance=None): # pragma: no cover elif instance.db_type == 'odps': from .odps import ODPSEngine + return ODPSEngine(instance=instance) + + elif instance.db_type == 'clickhouse': + from .clickhouse import ClickHouseEngine + + return ClickHouseEngine(instance=instance) diff --git a/sql/engines/clickhouse.py b/sql/engines/clickhouse.py new file mode 100644 index 0000000000..e2d1caf25c --- /dev/null +++ b/sql/engines/clickhouse.py @@ -0,0 +1,176 @@ +# -*- coding: UTF-8 -*- +from clickhouse_driver import connect +from . import EngineBase +from .models import ResultSet +import sqlparse +import logging +import re + +logger = logging.getLogger('default') + + +class ClickHouseEngine(EngineBase): + + def __init__(self, instance=None): + super(ClickHouseEngine, self).__init__(instance=instance) + + def get_connection(self, db_name=None): + if self.conn: + return self.conn + if db_name: + self.conn = connect(host=self.host, port=self.port, user=self.user, password=self.password, + database=db_name, connect_timeout=10) + else: + self.conn = connect(host=self.host, port=self.port, user=self.user, password=self.password, + connect_timeout=10) + return self.conn + + @property + def name(self): + return 'ClickHouse' + + @property + def info(self): + return 'ClickHouse engine' + + @property + def auto_backup(self): + """是否支持备份""" + return False + + @property + def server_version(self): + sql = "select value from system.build_options where name = 'VERSION_FULL';" + result = self.query(sql=sql) + version = result.rows[0][0].split(' ')[1] + return tuple([int(n) for n in version.split('.')[:3]]) + + def get_all_databases(self): + """获取数据库列表, 返回一个ResultSet""" + sql = "show databases" + result = self.query(sql=sql) + db_list = [row[0] for row in result.rows + if row[0] not in ('system','INFORMATION_SCHEMA', 'information_schema', 'datasets')] + result.rows = db_list + return result + + def get_all_tables(self, db_name, **kwargs): + """获取table 列表, 返回一个ResultSet""" + sql = "show tables" + result = self.query(db_name=db_name, sql=sql) + tb_list = [row[0] for row in result.rows] + result.rows = tb_list + return result + + def get_all_columns_by_tb(self, db_name, tb_name, **kwargs): + """获取所有字段, 返回一个ResultSet""" + sql = f"""select + name, + type, + comment + from + system.columns + where + database = '{db_name}' + and table = '{tb_name}';""" + result = self.query(db_name=db_name, sql=sql) + column_list = [row[0] for row in result.rows] + result.rows = column_list + return result + + def describe_table(self, db_name, tb_name, **kwargs): + """return ResultSet 类似查询""" + sql = f"show create table `{tb_name}`;" + result = self.query(db_name=db_name, sql=sql) + + result.rows[0] = (tb_name,) + (result.rows[0][0].replace('(', '(\n ').replace(',', ',\n '),) + return result + + def query(self, db_name=None, sql='', limit_num=0, close_conn=True, **kwargs): + """返回 ResultSet """ + result_set = ResultSet(full_sql=sql) + try: + conn = self.get_connection(db_name=db_name) + cursor = conn.cursor() + cursor.execute(sql) + if int(limit_num) > 0: + rows = cursor.fetchmany(size=int(limit_num)) + else: + rows = cursor.fetchall() + fields = cursor.description + + result_set.column_list = [i[0] for i in fields] if fields else [] + result_set.rows = rows + result_set.affected_rows = len(rows) + except Exception as e: + logger.warning(f"ClickHouse语句执行报错,语句:{sql},错误信息{e}") + result_set.error = str(e).split('Stack trace')[0] + finally: + if close_conn: + self.close() + return result_set + + def query_check(self, db_name=None, sql=''): + # 查询语句的检查、注释去除、切分 + result = {'msg': '', 'bad_query': False, 'filtered_sql': sql, 'has_star': False} + # 删除注释语句,进行语法判断,执行第一条有效sql + try: + sql = sqlparse.format(sql, strip_comments=True) + sql = sqlparse.split(sql)[0] + result['filtered_sql'] = sql.strip() + except IndexError: + result['bad_query'] = True + result['msg'] = '没有有效的SQL语句' + if re.match(r"^select|^show|^explain", sql, re.I) is None: + result['bad_query'] = True + result['msg'] = '不支持的查询语法类型!' + if '*' in sql: + result['has_star'] = True + result['msg'] = 'SQL语句中含有 * ' + # clickhouse 20.6.3版本开始正式支持explain语法 + if re.match(r"^explain", sql, re.I) and self.server_version < (20, 6, 3): + result['bad_query'] = True + result['msg'] = f"当前ClickHouse实例版本低于20.6.3,不支持explain!" + # select语句先使用Explain判断语法是否正确 + if re.match(r"^select", sql, re.I) and self.server_version >= (20, 6, 3): + explain_result = self.query(db_name=db_name, sql=f"explain {sql}") + if explain_result.error: + result['bad_query'] = True + result['msg'] = explain_result.error + + return result + + def filter_sql(self, sql='', limit_num=0): + # 对查询sql增加limit限制,limit n 或 limit n,n 或 limit n offset n统一改写成limit n + sql = sql.rstrip(';').strip() + if re.match(r"^select", sql, re.I): + # LIMIT N + limit_n = re.compile(r'limit\s+(\d+)\s*$', re.I) + # LIMIT M OFFSET N + limit_offset = re.compile(r'limit\s+(\d+)\s+offset\s+(\d+)\s*$', re.I) + # LIMIT M,N + offset_comma_limit = re.compile(r'limit\s+(\d+)\s*,\s*(\d+)\s*$', re.I) + if limit_n.search(sql): + sql_limit = limit_n.search(sql).group(1) + limit_num = min(int(limit_num), int(sql_limit)) + sql = limit_n.sub(f'limit {limit_num};', sql) + elif limit_offset.search(sql): + sql_limit = limit_offset.search(sql).group(1) + sql_offset = limit_offset.search(sql).group(2) + limit_num = min(int(limit_num), int(sql_limit)) + sql = limit_offset.sub(f'limit {limit_num} offset {sql_offset};', sql) + elif offset_comma_limit.search(sql): + sql_offset = offset_comma_limit.search(sql).group(1) + sql_limit = offset_comma_limit.search(sql).group(2) + limit_num = min(int(limit_num), int(sql_limit)) + sql = offset_comma_limit.sub(f'limit {sql_offset},{limit_num};', sql) + else: + sql = f'{sql} limit {limit_num};' + else: + sql = f'{sql};' + return sql + + def close(self): + if self.conn: + self.conn.close() + self.conn = None diff --git a/sql/engines/tests.py b/sql/engines/tests.py index 6124c856a7..c5413fd208 100644 --- a/sql/engines/tests.py +++ b/sql/engines/tests.py @@ -17,6 +17,7 @@ from sql.engines.pgsql import PgSQLEngine from sql.engines.oracle import OracleEngine from sql.engines.mongo import MongoEngine +from sql.engines.clickhouse import ClickHouseEngine from sql.models import Instance, SqlWorkflow, SqlWorkflowContent User = get_user_model() @@ -1646,3 +1647,143 @@ def test_fill_query_columns(self): {"_id": {"$oid": "7f10162029684728e70045ab"}, "author": "archery"}] cols = self.engine.fill_query_columns(cursor, columns=columns) self.assertEqual(cols, ["_id", "title", "tags", "likes", "text", "author"]) + + +class TestClickHouse(TestCase): + + def setUp(self): + self.ins1 = Instance(instance_name='some_ins', type='slave', db_type='clickhouse', host='some_host', + port=9000, user='ins_user', password='some_str') + self.ins1.save() + self.sys_config = SysConfig() + + def tearDown(self): + self.ins1.delete() + self.sys_config.purge() + + @patch.object(ClickHouseEngine, 'query') + def test_server_version(self, mock_query): + result = ResultSet() + result.rows = [('ClickHouse 22.1.3.7',)] + mock_query.return_value = result + new_engine = ClickHouseEngine(instance=self.ins1) + server_version = new_engine.server_version + self.assertTupleEqual(server_version, (22, 1, 3)) + + @patch('clickhouse_driver.connect') + def test_engine_base_info(self, _conn): + new_engine = ClickHouseEngine(instance=self.ins1) + self.assertEqual(new_engine.name, 'ClickHouse') + self.assertEqual(new_engine.info, 'ClickHouse engine') + + @patch.object(ClickHouseEngine, 'get_connection') + def testGetConnection(self, connect): + new_engine = ClickHouseEngine(instance=self.ins1) + new_engine.get_connection() + connect.assert_called_once() + + @patch.object(ClickHouseEngine, 'query') + def testQuery(self, mock_query): + result = ResultSet() + result.rows = [('v1', 'v2'), ] + mock_query.return_value = result + new_engine = ClickHouseEngine(instance=self.ins1) + query_result = new_engine.query(sql='some_sql', limit_num=100) + self.assertListEqual(query_result.rows, [('v1', 'v2'), ]) + + @patch.object(ClickHouseEngine, 'query') + def testAllDb(self, mock_query): + db_result = ResultSet() + db_result.rows = [('db_1',), ('db_2',)] + mock_query.return_value = db_result + new_engine = ClickHouseEngine(instance=self.ins1) + dbs = new_engine.get_all_databases() + self.assertEqual(dbs.rows, ['db_1', 'db_2']) + + @patch.object(ClickHouseEngine, 'query') + def testAllTables(self, mock_query): + table_result = ResultSet() + table_result.rows = [('tb_1', 'some_des'), ('tb_2', 'some_des')] + mock_query.return_value = table_result + new_engine = ClickHouseEngine(instance=self.ins1) + tables = new_engine.get_all_tables('some_db') + mock_query.assert_called_once_with(db_name='some_db', sql=ANY) + self.assertEqual(tables.rows, ['tb_1', 'tb_2']) + + @patch.object(ClickHouseEngine, 'query') + def testAllColumns(self, mock_query): + db_result = ResultSet() + db_result.rows = [('col_1', 'type'), ('col_2', 'type2')] + mock_query.return_value = db_result + new_engine = ClickHouseEngine(instance=self.ins1) + dbs = new_engine.get_all_columns_by_tb('some_db', 'some_tb') + self.assertEqual(dbs.rows, ['col_1', 'col_2']) + + @patch.object(ClickHouseEngine, 'query') + def testDescribe(self, mock_query): + new_engine = ClickHouseEngine(instance=self.ins1) + new_engine.describe_table('some_db', 'some_db') + mock_query.assert_called_once() + + def test_query_check_wrong_sql(self): + new_engine = ClickHouseEngine(instance=self.ins1) + wrong_sql = '-- 测试' + check_result = new_engine.query_check(db_name='some_db', sql=wrong_sql) + self.assertDictEqual(check_result, + {'msg': '不支持的查询语法类型!', 'bad_query': True, 'filtered_sql': '-- 测试', 'has_star': False}) + + def test_query_check_update_sql(self): + new_engine = ClickHouseEngine(instance=self.ins1) + update_sql = 'update user set id=0' + check_result = new_engine.query_check(db_name='some_db', sql=update_sql) + self.assertDictEqual(check_result, + {'msg': '不支持的查询语法类型!', 'bad_query': True, 'filtered_sql': 'update user set id=0', + 'has_star': False}) + + def test_filter_sql_with_delimiter(self): + new_engine = ClickHouseEngine(instance=self.ins1) + sql_without_limit = 'select user from usertable;' + check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=100) + self.assertEqual(check_result, 'select user from usertable limit 100;') + + def test_filter_sql_without_delimiter(self): + new_engine = ClickHouseEngine(instance=self.ins1) + sql_without_limit = 'select user from usertable' + check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=100) + self.assertEqual(check_result, 'select user from usertable limit 100;') + + def test_filter_sql_with_limit(self): + new_engine = ClickHouseEngine(instance=self.ins1) + sql_without_limit = 'select user from usertable limit 10' + check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=1) + self.assertEqual(check_result, 'select user from usertable limit 1;') + + def test_filter_sql_with_limit_min(self): + new_engine = ClickHouseEngine(instance=self.ins1) + sql_without_limit = 'select user from usertable limit 10' + check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=100) + self.assertEqual(check_result, 'select user from usertable limit 10;') + + def test_filter_sql_with_limit_offset(self): + new_engine = ClickHouseEngine(instance=self.ins1) + sql_without_limit = 'select user from usertable limit 10 offset 100' + check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=1) + self.assertEqual(check_result, 'select user from usertable limit 1 offset 100;') + + def test_filter_sql_with_limit_nn(self): + new_engine = ClickHouseEngine(instance=self.ins1) + sql_without_limit = 'select user from usertable limit 10, 100' + check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=1) + self.assertEqual(check_result, 'select user from usertable limit 10,1;') + + def test_filter_sql_upper(self): + new_engine = ClickHouseEngine(instance=self.ins1) + sql_without_limit = 'SELECT USER FROM usertable LIMIT 10, 100' + check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=1) + self.assertEqual(check_result, 'SELECT USER FROM usertable limit 10,1;') + + def test_filter_sql_not_select(self): + new_engine = ClickHouseEngine(instance=self.ins1) + sql_without_limit = 'show create table usertable;' + check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=1) + self.assertEqual(check_result, 'show create table usertable;') diff --git a/sql/models.py b/sql/models.py index 4d6a2c0f16..883557d9c1 100755 --- a/sql/models.py +++ b/sql/models.py @@ -87,6 +87,7 @@ class Meta: ('mongo', 'Mongo'), ('phoenix', 'Phoenix'), ('odps', 'ODPS'), + ('clickhouse', 'ClickHouse'), ('goinception', 'goInception')) diff --git a/sql/templates/instance.html b/sql/templates/instance.html index 805e426c05..1a1940b6f0 100644 --- a/sql/templates/instance.html +++ b/sql/templates/instance.html @@ -21,6 +21,7 @@ +