Skip to content

Commit

Permalink
支持ClickHouse查询 (#1384)
Browse files Browse the repository at this point in the history
* 支持ClickHouse查询

支持ClickHouse查询 #1343

* 版本兼容性优化

版本兼容性优化
  • Loading branch information
nick2wang authored Feb 17, 2022
1 parent ca10a8f commit 8d4b4a6
Show file tree
Hide file tree
Showing 8 changed files with 347 additions and 0 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ httptools==0.1.1
uvicorn==0.12.2
pycryptodome==3.10.1
pyodps==0.10.7.1
clickhouse-driver==0.2.3
6 changes: 6 additions & 0 deletions sql/engines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,10 @@ def get_engine(instance=None): # pragma: no cover

elif instance.db_type == 'odps':
from .odps import ODPSEngine

return ODPSEngine(instance=instance)

elif instance.db_type == 'clickhouse':
from .clickhouse import ClickHouseEngine

return ClickHouseEngine(instance=instance)
176 changes: 176 additions & 0 deletions sql/engines/clickhouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# -*- coding: UTF-8 -*-
from clickhouse_driver import connect
from . import EngineBase
from .models import ResultSet
import sqlparse
import logging
import re

logger = logging.getLogger('default')


class ClickHouseEngine(EngineBase):

def __init__(self, instance=None):
super(ClickHouseEngine, self).__init__(instance=instance)

def get_connection(self, db_name=None):
if self.conn:
return self.conn
if db_name:
self.conn = connect(host=self.host, port=self.port, user=self.user, password=self.password,
database=db_name, connect_timeout=10)
else:
self.conn = connect(host=self.host, port=self.port, user=self.user, password=self.password,
connect_timeout=10)
return self.conn

@property
def name(self):
return 'ClickHouse'

@property
def info(self):
return 'ClickHouse engine'

@property
def auto_backup(self):
"""是否支持备份"""
return False

@property
def server_version(self):
sql = "select value from system.build_options where name = 'VERSION_FULL';"
result = self.query(sql=sql)
version = result.rows[0][0].split(' ')[1]
return tuple([int(n) for n in version.split('.')[:3]])

def get_all_databases(self):
"""获取数据库列表, 返回一个ResultSet"""
sql = "show databases"
result = self.query(sql=sql)
db_list = [row[0] for row in result.rows
if row[0] not in ('system','INFORMATION_SCHEMA', 'information_schema', 'datasets')]
result.rows = db_list
return result

def get_all_tables(self, db_name, **kwargs):
"""获取table 列表, 返回一个ResultSet"""
sql = "show tables"
result = self.query(db_name=db_name, sql=sql)
tb_list = [row[0] for row in result.rows]
result.rows = tb_list
return result

def get_all_columns_by_tb(self, db_name, tb_name, **kwargs):
"""获取所有字段, 返回一个ResultSet"""
sql = f"""select
name,
type,
comment
from
system.columns
where
database = '{db_name}'
and table = '{tb_name}';"""
result = self.query(db_name=db_name, sql=sql)
column_list = [row[0] for row in result.rows]
result.rows = column_list
return result

def describe_table(self, db_name, tb_name, **kwargs):
"""return ResultSet 类似查询"""
sql = f"show create table `{tb_name}`;"
result = self.query(db_name=db_name, sql=sql)

result.rows[0] = (tb_name,) + (result.rows[0][0].replace('(', '(\n ').replace(',', ',\n '),)
return result

def query(self, db_name=None, sql='', limit_num=0, close_conn=True, **kwargs):
"""返回 ResultSet """
result_set = ResultSet(full_sql=sql)
try:
conn = self.get_connection(db_name=db_name)
cursor = conn.cursor()
cursor.execute(sql)
if int(limit_num) > 0:
rows = cursor.fetchmany(size=int(limit_num))
else:
rows = cursor.fetchall()
fields = cursor.description

result_set.column_list = [i[0] for i in fields] if fields else []
result_set.rows = rows
result_set.affected_rows = len(rows)
except Exception as e:
logger.warning(f"ClickHouse语句执行报错,语句:{sql},错误信息{e}")
result_set.error = str(e).split('Stack trace')[0]
finally:
if close_conn:
self.close()
return result_set

def query_check(self, db_name=None, sql=''):
# 查询语句的检查、注释去除、切分
result = {'msg': '', 'bad_query': False, 'filtered_sql': sql, 'has_star': False}
# 删除注释语句,进行语法判断,执行第一条有效sql
try:
sql = sqlparse.format(sql, strip_comments=True)
sql = sqlparse.split(sql)[0]
result['filtered_sql'] = sql.strip()
except IndexError:
result['bad_query'] = True
result['msg'] = '没有有效的SQL语句'
if re.match(r"^select|^show|^explain", sql, re.I) is None:
result['bad_query'] = True
result['msg'] = '不支持的查询语法类型!'
if '*' in sql:
result['has_star'] = True
result['msg'] = 'SQL语句中含有 * '
# clickhouse 20.6.3版本开始正式支持explain语法
if re.match(r"^explain", sql, re.I) and self.server_version < (20, 6, 3):
result['bad_query'] = True
result['msg'] = f"当前ClickHouse实例版本低于20.6.3,不支持explain!"
# select语句先使用Explain判断语法是否正确
if re.match(r"^select", sql, re.I) and self.server_version >= (20, 6, 3):
explain_result = self.query(db_name=db_name, sql=f"explain {sql}")
if explain_result.error:
result['bad_query'] = True
result['msg'] = explain_result.error

return result

def filter_sql(self, sql='', limit_num=0):
# 对查询sql增加limit限制,limit n 或 limit n,n 或 limit n offset n统一改写成limit n
sql = sql.rstrip(';').strip()
if re.match(r"^select", sql, re.I):
# LIMIT N
limit_n = re.compile(r'limit\s+(\d+)\s*$', re.I)
# LIMIT M OFFSET N
limit_offset = re.compile(r'limit\s+(\d+)\s+offset\s+(\d+)\s*$', re.I)
# LIMIT M,N
offset_comma_limit = re.compile(r'limit\s+(\d+)\s*,\s*(\d+)\s*$', re.I)
if limit_n.search(sql):
sql_limit = limit_n.search(sql).group(1)
limit_num = min(int(limit_num), int(sql_limit))
sql = limit_n.sub(f'limit {limit_num};', sql)
elif limit_offset.search(sql):
sql_limit = limit_offset.search(sql).group(1)
sql_offset = limit_offset.search(sql).group(2)
limit_num = min(int(limit_num), int(sql_limit))
sql = limit_offset.sub(f'limit {limit_num} offset {sql_offset};', sql)
elif offset_comma_limit.search(sql):
sql_offset = offset_comma_limit.search(sql).group(1)
sql_limit = offset_comma_limit.search(sql).group(2)
limit_num = min(int(limit_num), int(sql_limit))
sql = offset_comma_limit.sub(f'limit {sql_offset},{limit_num};', sql)
else:
sql = f'{sql} limit {limit_num};'
else:
sql = f'{sql};'
return sql

def close(self):
if self.conn:
self.conn.close()
self.conn = None
141 changes: 141 additions & 0 deletions sql/engines/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from sql.engines.pgsql import PgSQLEngine
from sql.engines.oracle import OracleEngine
from sql.engines.mongo import MongoEngine
from sql.engines.clickhouse import ClickHouseEngine
from sql.models import Instance, SqlWorkflow, SqlWorkflowContent

User = get_user_model()
Expand Down Expand Up @@ -1646,3 +1647,143 @@ def test_fill_query_columns(self):
{"_id": {"$oid": "7f10162029684728e70045ab"}, "author": "archery"}]
cols = self.engine.fill_query_columns(cursor, columns=columns)
self.assertEqual(cols, ["_id", "title", "tags", "likes", "text", "author"])


class TestClickHouse(TestCase):

def setUp(self):
self.ins1 = Instance(instance_name='some_ins', type='slave', db_type='clickhouse', host='some_host',
port=9000, user='ins_user', password='some_str')
self.ins1.save()
self.sys_config = SysConfig()

def tearDown(self):
self.ins1.delete()
self.sys_config.purge()

@patch.object(ClickHouseEngine, 'query')
def test_server_version(self, mock_query):
result = ResultSet()
result.rows = [('ClickHouse 22.1.3.7',)]
mock_query.return_value = result
new_engine = ClickHouseEngine(instance=self.ins1)
server_version = new_engine.server_version
self.assertTupleEqual(server_version, (22, 1, 3))

@patch('clickhouse_driver.connect')
def test_engine_base_info(self, _conn):
new_engine = ClickHouseEngine(instance=self.ins1)
self.assertEqual(new_engine.name, 'ClickHouse')
self.assertEqual(new_engine.info, 'ClickHouse engine')

@patch.object(ClickHouseEngine, 'get_connection')
def testGetConnection(self, connect):
new_engine = ClickHouseEngine(instance=self.ins1)
new_engine.get_connection()
connect.assert_called_once()

@patch.object(ClickHouseEngine, 'query')
def testQuery(self, mock_query):
result = ResultSet()
result.rows = [('v1', 'v2'), ]
mock_query.return_value = result
new_engine = ClickHouseEngine(instance=self.ins1)
query_result = new_engine.query(sql='some_sql', limit_num=100)
self.assertListEqual(query_result.rows, [('v1', 'v2'), ])

@patch.object(ClickHouseEngine, 'query')
def testAllDb(self, mock_query):
db_result = ResultSet()
db_result.rows = [('db_1',), ('db_2',)]
mock_query.return_value = db_result
new_engine = ClickHouseEngine(instance=self.ins1)
dbs = new_engine.get_all_databases()
self.assertEqual(dbs.rows, ['db_1', 'db_2'])

@patch.object(ClickHouseEngine, 'query')
def testAllTables(self, mock_query):
table_result = ResultSet()
table_result.rows = [('tb_1', 'some_des'), ('tb_2', 'some_des')]
mock_query.return_value = table_result
new_engine = ClickHouseEngine(instance=self.ins1)
tables = new_engine.get_all_tables('some_db')
mock_query.assert_called_once_with(db_name='some_db', sql=ANY)
self.assertEqual(tables.rows, ['tb_1', 'tb_2'])

@patch.object(ClickHouseEngine, 'query')
def testAllColumns(self, mock_query):
db_result = ResultSet()
db_result.rows = [('col_1', 'type'), ('col_2', 'type2')]
mock_query.return_value = db_result
new_engine = ClickHouseEngine(instance=self.ins1)
dbs = new_engine.get_all_columns_by_tb('some_db', 'some_tb')
self.assertEqual(dbs.rows, ['col_1', 'col_2'])

@patch.object(ClickHouseEngine, 'query')
def testDescribe(self, mock_query):
new_engine = ClickHouseEngine(instance=self.ins1)
new_engine.describe_table('some_db', 'some_db')
mock_query.assert_called_once()

def test_query_check_wrong_sql(self):
new_engine = ClickHouseEngine(instance=self.ins1)
wrong_sql = '-- 测试'
check_result = new_engine.query_check(db_name='some_db', sql=wrong_sql)
self.assertDictEqual(check_result,
{'msg': '不支持的查询语法类型!', 'bad_query': True, 'filtered_sql': '-- 测试', 'has_star': False})

def test_query_check_update_sql(self):
new_engine = ClickHouseEngine(instance=self.ins1)
update_sql = 'update user set id=0'
check_result = new_engine.query_check(db_name='some_db', sql=update_sql)
self.assertDictEqual(check_result,
{'msg': '不支持的查询语法类型!', 'bad_query': True, 'filtered_sql': 'update user set id=0',
'has_star': False})

def test_filter_sql_with_delimiter(self):
new_engine = ClickHouseEngine(instance=self.ins1)
sql_without_limit = 'select user from usertable;'
check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=100)
self.assertEqual(check_result, 'select user from usertable limit 100;')

def test_filter_sql_without_delimiter(self):
new_engine = ClickHouseEngine(instance=self.ins1)
sql_without_limit = 'select user from usertable'
check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=100)
self.assertEqual(check_result, 'select user from usertable limit 100;')

def test_filter_sql_with_limit(self):
new_engine = ClickHouseEngine(instance=self.ins1)
sql_without_limit = 'select user from usertable limit 10'
check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=1)
self.assertEqual(check_result, 'select user from usertable limit 1;')

def test_filter_sql_with_limit_min(self):
new_engine = ClickHouseEngine(instance=self.ins1)
sql_without_limit = 'select user from usertable limit 10'
check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=100)
self.assertEqual(check_result, 'select user from usertable limit 10;')

def test_filter_sql_with_limit_offset(self):
new_engine = ClickHouseEngine(instance=self.ins1)
sql_without_limit = 'select user from usertable limit 10 offset 100'
check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=1)
self.assertEqual(check_result, 'select user from usertable limit 1 offset 100;')

def test_filter_sql_with_limit_nn(self):
new_engine = ClickHouseEngine(instance=self.ins1)
sql_without_limit = 'select user from usertable limit 10, 100'
check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=1)
self.assertEqual(check_result, 'select user from usertable limit 10,1;')

def test_filter_sql_upper(self):
new_engine = ClickHouseEngine(instance=self.ins1)
sql_without_limit = 'SELECT USER FROM usertable LIMIT 10, 100'
check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=1)
self.assertEqual(check_result, 'SELECT USER FROM usertable limit 10,1;')

def test_filter_sql_not_select(self):
new_engine = ClickHouseEngine(instance=self.ins1)
sql_without_limit = 'show create table usertable;'
check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=1)
self.assertEqual(check_result, 'show create table usertable;')
1 change: 1 addition & 0 deletions sql/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class Meta:
('mongo', 'Mongo'),
('phoenix', 'Phoenix'),
('odps', 'ODPS'),
('clickhouse', 'ClickHouse'),
('goinception', 'goInception'))


Expand Down
1 change: 1 addition & 0 deletions sql/templates/instance.html
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<option value="mongo">Mongo</option>
<option value="phoenix">Phoenix</option>
<option value="odps">ODPS</option>
<option value="clickhouse">ClickHouse</option>
</select>
</div>
<div class="form-group">
Expand Down
4 changes: 4 additions & 0 deletions sql/templates/queryapplylist.html
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ <h4 class="modal-title" id="myModalLabel">申请数据库查询权限</h4>
<optgroup id="optgroup-mongo" label="Mongo"></optgroup>
<optgroup id="optgroup-phoenix" label="Phoenix"></optgroup>
<optgroup id="optgroup-odps" label="ODPS"></optgroup>
<optgroup id="optgroup-clickhouse" label="ClickHouse"></optgroup>
</select>
</div>
<div class="form-group">
Expand Down Expand Up @@ -165,6 +166,7 @@ <h4 class="modal-title text-danger">工单日志</h4>
$("#optgroup-mongo").empty();
$("#optgroup-phoenix").empty();
$("#optgroup-odps").empty();
$("#optgroup-clickhouse").empty();
for (var i = 0; i < result.length; i++) {
var instance = "<option value=\"" + result[i]['instance_name'] + "\">" + result[i]['instance_name'] + "</option>";
if (result[i]['db_type'] === 'mysql') {
Expand All @@ -183,6 +185,8 @@ <h4 class="modal-title text-danger">工单日志</h4>
$("#optgroup-phoenix").append(instance);
} else if (result[i]['db_type'] === 'odps') {
$("#optgroup-odps").append(instance);
} else if (result[i]['db_type'] === 'clickhouse') {
$("#optgroup-clickhouse").append(instance);
}
}
$('#instance_name').selectpicker('render');
Expand Down
Loading

0 comments on commit 8d4b4a6

Please sign in to comment.