Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

支持Oracle会话管理 #1954

Merged
merged 2 commits into from
Nov 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion sql/db_diagnostic.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

logger = logging.getLogger("default")


# 问题诊断--进程列表
@permission_required("sql.process_view", raise_exception=True)
def process(request):
Expand All @@ -45,6 +46,8 @@ def process(request):

elif instance.db_type == "mongo":
query_result = query_engine.current_op(command_type)
elif instance.db_type == "oracle":
query_result = query_engine.session_list(command_type)
else:
result = {
"status": 1,
Expand Down Expand Up @@ -90,6 +93,8 @@ def create_kill_session(request):
elif instance.db_type == "mongo":
kill_command = query_engine.get_kill_command(json.loads(thread_ids))
result["data"] = kill_command
elif instance.db_type == "oracle":
result["data"] = query_engine.get_kill_command(json.loads(thread_ids))
else:
result = {
"status": 1,
Expand Down Expand Up @@ -127,6 +132,8 @@ def kill_session(request):
r = engine.kill(json.loads(thread_ids))
elif instance.db_type == "mongo":
r = engine.kill_op(json.loads(thread_ids))
elif instance.db_type == "oracle":
r = engine.kill_session(json.loads(thread_ids))
else:
result = {
"status": 1,
Expand Down Expand Up @@ -165,6 +172,10 @@ def tablesapce(request):
query_result = query_engine.tablesapce(offset, limit)
r = query_engine.tablesapce_num()
total = r.rows[0][0]
elif instance.db_type == "oracle":
query_result = query_engine.tablespace(offset, limit)
r = query_engine.tablespace_count()
total = r.rows[0][0]
else:
result = {
"status": 1,
Expand Down Expand Up @@ -200,7 +211,8 @@ def trxandlocks(request):
query_engine = get_engine(instance=instance)
if instance.db_type == "mysql":
query_result = query_engine.trxandlocks()

elif instance.db_type == "oracle":
query_result = query_engine.lock_info()
else:
result = {
"status": 1,
Expand Down
137 changes: 137 additions & 0 deletions sql/engines/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,143 @@ def sqltuningadvisor(self, db_name=None, sql="", close_conn=True, **kwargs):
self.close()
return result_set

def execute(self, db_name=None, sql="", close_conn=True):
"""原生执行语句"""
result = ResultSet(full_sql=sql)
conn = self.get_connection(db_name=db_name)
try:
cursor = conn.cursor()
for statement in sqlparse.split(sql):
statement = statement.rstrip(";")
cursor.execute(statement)
except Exception as e:
logger.warning(f"Oracle语句执行报错,语句:{sql},错误信息{traceback.format_exc()}")
result.error = str(e)
if close_conn:
self.close()
return result

def session_list(self, command_type):
"""获取会话信息"""
base_sql = """select
s.sid,
s.serial#,
s.status,
s.username,
q.sql_text,
q.sql_fulltext,
s.machine,
s.sql_exec_start
from v$process p, v$session s, v$sqlarea q
where p.addr = s.paddr
and s.sql_hash_value = q.hash_value"""
if not command_type:
command_type = "Active"
if command_type == "All":
sql = base_sql + ";"
elif command_type == "Active":
sql = "{} and s.status = 'ACTIVE';".format(base_sql)
elif command_type == "Others":
sql = "{} and s.status != 'ACTIVE';".format(base_sql)
else:
sql = ""

return self.query(sql=sql)

def get_kill_command(self, thread_ids):
"""由传入的sid+serial#列表生成kill命令"""
# 校验传参,thread_ids格式:[[sid, serial#], [sid, serial#]]
if [
k
for k in [[j for j in i if not isinstance(j, int)] for i in thread_ids]
if k
]:
return None
sql = """select 'alter system kill session ' || '''' || s.sid || ',' || s.serial# || '''' || ' immediate' || ';'
from v$process p, v$session s, v$sqlarea q
where p.addr = s.paddr
and s.sql_hash_value = q.hash_value
and s.sid || ',' || s.serial# in ({});""".format(
",".join(f"'{str(tid[0])},{str(tid[1])}'" for tid in thread_ids)
)
all_kill_sql = self.query(sql=sql)
kill_sql = ""
for row in all_kill_sql.rows:
kill_sql = kill_sql + row[0]

return kill_sql

def kill_session(self, thread_ids):
"""kill会话"""
# 校验传参,thread_ids格式:[[sid, serial#], [sid, serial#]]
if [
k
for k in [[j for j in i if not isinstance(j, int)] for i in thread_ids]
if k
]:
return ResultSet(full_sql="")
sql = """select 'alter system kill session ' || '''' || s.sid || ',' || s.serial# || '''' || ' immediate' || ';'
from v$process p, v$session s, v$sqlarea q
where p.addr = s.paddr
and s.sql_hash_value = q.hash_value
and s.sid || ',' || s.serial# in ({});""".format(
",".join(f"'{str(tid[0])},{str(tid[1])}'" for tid in thread_ids)
)
all_kill_sql = self.query(sql=sql)
kill_sql = ""
for row in all_kill_sql.rows:
kill_sql = kill_sql + row[0]
return self.execute(sql=kill_sql)

def tablespace(self, offset=0, row_count=14):
"""获取表空间信息"""
row_count = offset + row_count
sql = """
select f.* from (
select rownum rownumber, e.* from (
select a.tablespace_name,
d.contents tablespace_type,
d.status,
round(a.bytes/1024/1024,2) total_space,
round(b.bytes/1024/1024,2) used_space,
round((b.bytes * 100) / a.bytes,2) pct_used
from sys.sm$ts_avail a, sys.sm$ts_used b, sys.sm$ts_free c, dba_tablespaces d
where a.tablespace_name = b.tablespace_name
and a.tablespace_name = c.tablespace_name
and a.tablespace_name = d.tablespace_name
order by total_space desc ) e
where rownum <={}
) f where f.rownumber >={};""".format(
row_count, offset
)
return self.query(sql=sql)

def tablespace_count(self):
"""获取表空间数量"""
sql = """select count(*) from dba_tablespaces where contents != 'TEMPORARY'"""
return self.query(sql=sql)

def lock_info(self):
"""获取锁信息"""
sql = """
select c.username,
b.owner object_owner,
a.object_id,
b.object_name,
a.locked_mode,
c.sid related_sid,
c.serial# related_serial#,
c.machine,
d.sql_text related_sql,
d.sql_fulltext related_sql_full,
c.sql_exec_start related_sql_exec_start
from v$locked_object a,dba_objects b, v$session c, v$sqlarea d
where b.object_id = a.object_id
and a.session_id = c.sid
and c.sql_hash_value = d.hash_value;"""

return self.query(sql=sql)

def close(self):
if self.conn:
self.conn.close()
Expand Down
64 changes: 64 additions & 0 deletions sql/engines/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1753,6 +1753,70 @@ def test_execute_workflow_exception(self, _conn, _cursor, _execute):
execute_result.rows[0].__dict__.keys(), row.__dict__.keys()
)

@patch("cx_Oracle.connect.cursor.execute")
@patch("cx_Oracle.connect.cursor")
@patch("cx_Oracle.connect")
def test_execute(self, _connect, _cursor, _execute):
new_engine = OracleEngine(instance=self.ins)
sql = "update abc set count=1 where id=1;"
execute_result = new_engine.execute(sql)
self.assertIsInstance(execute_result, ResultSet)

@patch("sql.engines.oracle.OracleEngine.query")
def test_session_list(self, _query):
new_engine = OracleEngine(instance=self.ins)
_query.return_value = ResultSet()
for command_type in ["All", "Active", "Others"]:
r = new_engine.session_list(command_type)
self.assertIsInstance(r, ResultSet)

@patch("sql.engines.oracle.OracleEngine.query")
def test_get_kill_command(self, _query):
new_engine = OracleEngine(instance=self.ins)
_query.return_value.rows = (
("alter system kill session '12,123';",),
("alter system kill session '34,345';",),
)
r = new_engine.get_kill_command([[12, 123], [34, 345]])
self.assertEqual(
r, "alter system kill session '12,123';alter system kill session '34,345';"
)

@patch("sql.engines.oracle.OracleEngine.query")
@patch("cx_Oracle.connect.cursor.execute")
@patch("cx_Oracle.connect.cursor")
@patch("cx_Oracle.connect")
def test_kill_session(self, _query, _connect, _cursor, _execute):
new_engine = OracleEngine(instance=self.ins)
_query.return_value.rows = (
("alter system kill session '12,123';",),
("alter system kill session '34,345';",),
)
_execute.return_value = ResultSet()
r = new_engine.kill_session([[12, 123], [34, 345]])
self.assertIsInstance(r, ResultSet)

@patch("sql.engines.oracle.OracleEngine.query")
def test_tablespace(self, _query):
new_engine = OracleEngine(instance=self.ins)
_query.return_value = ResultSet()
r = new_engine.tablespace()
self.assertIsInstance(r, ResultSet)

@patch("sql.engines.oracle.OracleEngine.query")
def test_tablespace_count(self, _query):
new_engine = OracleEngine(instance=self.ins)
_query.return_value = ResultSet()
r = new_engine.tablespace_count()
self.assertIsInstance(r, ResultSet)

@patch("sql.engines.oracle.OracleEngine.query")
def test_lock_info(self, _query):
new_engine = OracleEngine(instance=self.ins)
_query.return_value = ResultSet()
r = new_engine.lock_info()
self.assertIsInstance(r, ResultSet)


class MongoTest(TestCase):
def setUp(self) -> None:
Expand Down
Loading