Skip to content

Commit

Permalink
支持Oracle会话管理 (hhyo#1954)
Browse files Browse the repository at this point in the history
* 支持Oracle会话管理

* 删除非必要sql拼接
  • Loading branch information
nick2wang authored Nov 6, 2022
1 parent 027904f commit 287e74a
Show file tree
Hide file tree
Showing 4 changed files with 492 additions and 89 deletions.
14 changes: 13 additions & 1 deletion sql/db_diagnostic.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

logger = logging.getLogger("default")


# 问题诊断--进程列表
@permission_required("sql.process_view", raise_exception=True)
def process(request):
Expand All @@ -45,6 +46,8 @@ def process(request):

elif instance.db_type == "mongo":
query_result = query_engine.current_op(command_type)
elif instance.db_type == "oracle":
query_result = query_engine.session_list(command_type)
else:
result = {
"status": 1,
Expand Down Expand Up @@ -90,6 +93,8 @@ def create_kill_session(request):
elif instance.db_type == "mongo":
kill_command = query_engine.get_kill_command(json.loads(thread_ids))
result["data"] = kill_command
elif instance.db_type == "oracle":
result["data"] = query_engine.get_kill_command(json.loads(thread_ids))
else:
result = {
"status": 1,
Expand Down Expand Up @@ -127,6 +132,8 @@ def kill_session(request):
r = engine.kill(json.loads(thread_ids))
elif instance.db_type == "mongo":
r = engine.kill_op(json.loads(thread_ids))
elif instance.db_type == "oracle":
r = engine.kill_session(json.loads(thread_ids))
else:
result = {
"status": 1,
Expand Down Expand Up @@ -165,6 +172,10 @@ def tablesapce(request):
query_result = query_engine.tablesapce(offset, limit)
r = query_engine.tablesapce_num()
total = r.rows[0][0]
elif instance.db_type == "oracle":
query_result = query_engine.tablespace(offset, limit)
r = query_engine.tablespace_count()
total = r.rows[0][0]
else:
result = {
"status": 1,
Expand Down Expand Up @@ -200,7 +211,8 @@ def trxandlocks(request):
query_engine = get_engine(instance=instance)
if instance.db_type == "mysql":
query_result = query_engine.trxandlocks()

elif instance.db_type == "oracle":
query_result = query_engine.lock_info()
else:
result = {
"status": 1,
Expand Down
137 changes: 137 additions & 0 deletions sql/engines/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,143 @@ def sqltuningadvisor(self, db_name=None, sql="", close_conn=True, **kwargs):
self.close()
return result_set

def execute(self, db_name=None, sql="", close_conn=True):
"""原生执行语句"""
result = ResultSet(full_sql=sql)
conn = self.get_connection(db_name=db_name)
try:
cursor = conn.cursor()
for statement in sqlparse.split(sql):
statement = statement.rstrip(";")
cursor.execute(statement)
except Exception as e:
logger.warning(f"Oracle语句执行报错,语句:{sql},错误信息{traceback.format_exc()}")
result.error = str(e)
if close_conn:
self.close()
return result

def session_list(self, command_type):
"""获取会话信息"""
base_sql = """select
s.sid,
s.serial#,
s.status,
s.username,
q.sql_text,
q.sql_fulltext,
s.machine,
s.sql_exec_start
from v$process p, v$session s, v$sqlarea q
where p.addr = s.paddr
and s.sql_hash_value = q.hash_value"""
if not command_type:
command_type = "Active"
if command_type == "All":
sql = base_sql + ";"
elif command_type == "Active":
sql = "{} and s.status = 'ACTIVE';".format(base_sql)
elif command_type == "Others":
sql = "{} and s.status != 'ACTIVE';".format(base_sql)
else:
sql = ""

return self.query(sql=sql)

def get_kill_command(self, thread_ids):
"""由传入的sid+serial#列表生成kill命令"""
# 校验传参,thread_ids格式:[[sid, serial#], [sid, serial#]]
if [
k
for k in [[j for j in i if not isinstance(j, int)] for i in thread_ids]
if k
]:
return None
sql = """select 'alter system kill session ' || '''' || s.sid || ',' || s.serial# || '''' || ' immediate' || ';'
from v$process p, v$session s, v$sqlarea q
where p.addr = s.paddr
and s.sql_hash_value = q.hash_value
and s.sid || ',' || s.serial# in ({});""".format(
",".join(f"'{str(tid[0])},{str(tid[1])}'" for tid in thread_ids)
)
all_kill_sql = self.query(sql=sql)
kill_sql = ""
for row in all_kill_sql.rows:
kill_sql = kill_sql + row[0]

return kill_sql

def kill_session(self, thread_ids):
"""kill会话"""
# 校验传参,thread_ids格式:[[sid, serial#], [sid, serial#]]
if [
k
for k in [[j for j in i if not isinstance(j, int)] for i in thread_ids]
if k
]:
return ResultSet(full_sql="")
sql = """select 'alter system kill session ' || '''' || s.sid || ',' || s.serial# || '''' || ' immediate' || ';'
from v$process p, v$session s, v$sqlarea q
where p.addr = s.paddr
and s.sql_hash_value = q.hash_value
and s.sid || ',' || s.serial# in ({});""".format(
",".join(f"'{str(tid[0])},{str(tid[1])}'" for tid in thread_ids)
)
all_kill_sql = self.query(sql=sql)
kill_sql = ""
for row in all_kill_sql.rows:
kill_sql = kill_sql + row[0]
return self.execute(sql=kill_sql)

def tablespace(self, offset=0, row_count=14):
"""获取表空间信息"""
row_count = offset + row_count
sql = """
select f.* from (
select rownum rownumber, e.* from (
select a.tablespace_name,
d.contents tablespace_type,
d.status,
round(a.bytes/1024/1024,2) total_space,
round(b.bytes/1024/1024,2) used_space,
round((b.bytes * 100) / a.bytes,2) pct_used
from sys.sm$ts_avail a, sys.sm$ts_used b, sys.sm$ts_free c, dba_tablespaces d
where a.tablespace_name = b.tablespace_name
and a.tablespace_name = c.tablespace_name
and a.tablespace_name = d.tablespace_name
order by total_space desc ) e
where rownum <={}
) f where f.rownumber >={};""".format(
row_count, offset
)
return self.query(sql=sql)

def tablespace_count(self):
"""获取表空间数量"""
sql = """select count(*) from dba_tablespaces where contents != 'TEMPORARY'"""
return self.query(sql=sql)

def lock_info(self):
"""获取锁信息"""
sql = """
select c.username,
b.owner object_owner,
a.object_id,
b.object_name,
a.locked_mode,
c.sid related_sid,
c.serial# related_serial#,
c.machine,
d.sql_text related_sql,
d.sql_fulltext related_sql_full,
c.sql_exec_start related_sql_exec_start
from v$locked_object a,dba_objects b, v$session c, v$sqlarea d
where b.object_id = a.object_id
and a.session_id = c.sid
and c.sql_hash_value = d.hash_value;"""

return self.query(sql=sql)

def close(self):
if self.conn:
self.conn.close()
Expand Down
64 changes: 64 additions & 0 deletions sql/engines/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1753,6 +1753,70 @@ def test_execute_workflow_exception(self, _conn, _cursor, _execute):
execute_result.rows[0].__dict__.keys(), row.__dict__.keys()
)

@patch("cx_Oracle.connect.cursor.execute")
@patch("cx_Oracle.connect.cursor")
@patch("cx_Oracle.connect")
def test_execute(self, _connect, _cursor, _execute):
new_engine = OracleEngine(instance=self.ins)
sql = "update abc set count=1 where id=1;"
execute_result = new_engine.execute(sql)
self.assertIsInstance(execute_result, ResultSet)

@patch("sql.engines.oracle.OracleEngine.query")
def test_session_list(self, _query):
new_engine = OracleEngine(instance=self.ins)
_query.return_value = ResultSet()
for command_type in ["All", "Active", "Others"]:
r = new_engine.session_list(command_type)
self.assertIsInstance(r, ResultSet)

@patch("sql.engines.oracle.OracleEngine.query")
def test_get_kill_command(self, _query):
new_engine = OracleEngine(instance=self.ins)
_query.return_value.rows = (
("alter system kill session '12,123';",),
("alter system kill session '34,345';",),
)
r = new_engine.get_kill_command([[12, 123], [34, 345]])
self.assertEqual(
r, "alter system kill session '12,123';alter system kill session '34,345';"
)

@patch("sql.engines.oracle.OracleEngine.query")
@patch("cx_Oracle.connect.cursor.execute")
@patch("cx_Oracle.connect.cursor")
@patch("cx_Oracle.connect")
def test_kill_session(self, _query, _connect, _cursor, _execute):
new_engine = OracleEngine(instance=self.ins)
_query.return_value.rows = (
("alter system kill session '12,123';",),
("alter system kill session '34,345';",),
)
_execute.return_value = ResultSet()
r = new_engine.kill_session([[12, 123], [34, 345]])
self.assertIsInstance(r, ResultSet)

@patch("sql.engines.oracle.OracleEngine.query")
def test_tablespace(self, _query):
new_engine = OracleEngine(instance=self.ins)
_query.return_value = ResultSet()
r = new_engine.tablespace()
self.assertIsInstance(r, ResultSet)

@patch("sql.engines.oracle.OracleEngine.query")
def test_tablespace_count(self, _query):
new_engine = OracleEngine(instance=self.ins)
_query.return_value = ResultSet()
r = new_engine.tablespace_count()
self.assertIsInstance(r, ResultSet)

@patch("sql.engines.oracle.OracleEngine.query")
def test_lock_info(self, _query):
new_engine = OracleEngine(instance=self.ins)
_query.return_value = ResultSet()
r = new_engine.lock_info()
self.assertIsInstance(r, ResultSet)


class MongoTest(TestCase):
def setUp(self) -> None:
Expand Down
Loading

0 comments on commit 287e74a

Please sign in to comment.