Skip to content

Commit

Permalink
BUG#37013057: mysql-connector-python Parameterized query SQL injection
Browse files Browse the repository at this point in the history
Malicious strings can be injected when utilizing
dictionary-based query parameterization via the
`cursor.execute()` API command and the C-based
implementation of the connector.

This patch fixes the injection issue.

Change-Id: I077b5846d3e7a05b3425207026096370911daf2e
  • Loading branch information
oscpache committed Sep 6, 2024
1 parent 8ca2932 commit e6b927a
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ v9.1.0
- WL#16341: OpenID Connect (Oauth2 - JWT) Authentication Support
- WL#16307: Remove Python 3.8 support
- WL#16306: Add support for Python 3.13
- BUG#37013057: mysql-connector-python Parameterized query SQL injection

v9.0.0
======
Expand Down
8 changes: 4 additions & 4 deletions mysql-connector-python/lib/mysql/connector/connection_cext.py
Original file line number Diff line number Diff line change
Expand Up @@ -855,15 +855,15 @@ def more_results(self) -> bool:

def prepare_for_mysql(
self, params: ParamsSequenceOrDictType
) -> Union[Sequence[bytes], Dict[str, bytes]]:
) -> Union[Sequence[bytes], Dict[bytes, bytes]]:
"""Prepare parameters for statements
This method is use by cursors to prepared parameters found in the
list (or tuple) params.
Returns dict.
"""
result: Union[List[Any], Dict[str, Any]] = []
result: Union[List[bytes], Dict[bytes, bytes]] = []
if isinstance(params, (list, tuple)):
if self.converter:
result = [
Expand All @@ -880,14 +880,14 @@ def prepare_for_mysql(
result = {}
if self.converter:
for key, value in params.items():
result[key] = self.converter.quote(
result[key.encode()] = self.converter.quote(
self.converter.escape(
self.converter.to_mysql(value), self._sql_mode
)
)
else:
for key, value in params.items():
result[key] = self._cmysql.convert_to_mysql(value)[0]
result[key.encode()] = self._cmysql.convert_to_mysql(value)[0]
else:
raise ProgrammingError(
f"Could not process parameters: {type(params).__name__}({params}),"
Expand Down
9 changes: 3 additions & 6 deletions mysql-connector-python/lib/mysql/connector/cursor_cext.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
RE_SQL_ON_DUPLICATE,
RE_SQL_PYTHON_CAPTURE_PARAM_NAME,
RE_SQL_PYTHON_REPLACE_PARAM,
_bytestr_format_dict,
is_eol_comment,
parse_multi_statement_query,
)
Expand Down Expand Up @@ -343,8 +344,7 @@ def execute(
if params:
prepared = self._connection.prepare_for_mysql(params)
if isinstance(prepared, dict):
for key, value in prepared.items():
stmt = stmt.replace(f"%({key})s".encode(), value)
stmt = _bytestr_format_dict(stmt, prepared)
elif isinstance(prepared, (list, tuple)):
psub = _ParamSubstitutor(prepared)
stmt = RE_PY_PARAM.sub(psub, stmt)
Expand Down Expand Up @@ -411,10 +411,7 @@ def remove_comments(match: re.Match) -> str:
tmp = fmt
prepared = self._connection.prepare_for_mysql(params)
if isinstance(prepared, dict):
for key, value in prepared.items():
tmp = tmp.replace(
f"%({key})s".encode(), value # type: ignore[arg-type]
)
tmp = _bytestr_format_dict(cast(bytes, tmp), prepared)
elif isinstance(prepared, (list, tuple)):
psub = _ParamSubstitutor(prepared)
tmp = RE_PY_PARAM.sub(psub, tmp) # type: ignore[call-overload]
Expand Down
149 changes: 149 additions & 0 deletions mysql-connector-python/tests/test_bugs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8636,3 +8636,152 @@ async def test_buffered_raw_cursor(self) -> None:
],
await cur.fetchall(),
)


class BugOra37013057(tests.MySQLConnectorTests):
"""BUG#37013057: mysql-connector-python Parameterized query SQL injection
Malicious strings can be injected when utilizing
dictionary-based query parameterization via the
`cursor.execute()` API command and the C-based
implementation of the connector.
This patch fixes the injection issue.
"""

table_name = "BugOra37013057"

cur_flavors = [
{},
{"prepared": True},
{"raw": True},
{"buffered": True},
{"dictionary": True},
]

sql_dict = f"INSERT INTO {table_name}(username, password, city) VALUES (%(username)s, %(password)s, %(city)s)"
sql_tuple = (
f"INSERT INTO {table_name}(username, password, city) VALUES (%s, %s, %s)"
)

cases = [
{
"values_dict": {
"username": "%(password)s",
"password": ", sleep(10));--",
"city": "Montevideo",
},
"values_tuple": ("%(password)s", ", sleep(10));--", "Montevideo"),
},
{
"values_dict": {
"username": "%(password)s",
"password": ", curdate());",
"city": "Rio",
},
"values_tuple": ("%(password)s", ", curdate());", "Rio"),
},
{
"values_dict": {
"username": "%(password)s",
"password": "%(city)s",
"city": ", database());",
},
"values_tuple": ("%(password)s", "%(city)s", ", database());"),
},
]

def setUp(self):
with mysql.connector.connect(**tests.get_mysql_config()) as cnx:
with cnx.cursor() as cur:
cur.execute(
f"CREATE TABLE {self.table_name}"
"(username varchar(50), password varchar(50), city varchar(50))"
)

def tearDown(self):
with mysql.connector.connect(**tests.get_mysql_config()) as cnx:
with cnx.cursor() as cur:
cur.execute(f"DROP TABLE IF EXISTS {self.table_name}")

def _prepare_cur_dict_res(self, case):
if isinstance(case["values"], dict):
return case["values_dict"]

def _run_execute(self, dict_based=True, cur_config={}):
sql = self.sql_dict if dict_based else self.sql_tuple
for case in self.cases:
if dict_based:
values = case["values_dict"]
else:
values = case["values_tuple"]

if "dictionary" in cur_config:
exp_res = case["values_dict"].copy()
elif "raw" in cur_config:
exp_res = tuple([x.encode() for x in case["values_tuple"]])
else:
exp_res = case["values_tuple"]

with self.cnx.cursor(**cur_config) as cur:
cur.execute(sql, values)
cur.execute(f"select * from {self.table_name}")
res = cur.fetchone()
cur.execute(f"TRUNCATE TABLE {self.table_name}")
self.assertEqual(res, exp_res)

@foreach_cnx()
def test_execute_dict_based_injection(self):
for cur_config in self.cur_flavors:
self._run_execute(dict_based=True, cur_config=cur_config)

@foreach_cnx()
def test_execute_tuple_based_injection(self):
for cur_config in self.cur_flavors:
self._run_execute(dict_based=False, cur_config=cur_config)


class BugOra37013057_async(tests.MySQLConnectorAioTestCase):
"""BUG#37013057: mysql-connector-python Parameterized query SQL injection
For a description see `test_bugs.BugOra37013057`.
"""

def setUp(self) -> None:
self.bug_37013057 = BugOra37013057()
self.bug_37013057.setUp()

def tearDown(self) -> None:
self.bug_37013057.tearDown()

async def _run_execute(self, dict_based=True, cur_config={}):
sql = self.bug_37013057.sql_dict if dict_based else self.bug_37013057.sql_tuple
for case in self.bug_37013057.cases:
if dict_based:
values = case["values_dict"]
else:
values = case["values_tuple"]

if "dictionary" in cur_config:
exp_res = case["values_dict"].copy()
elif "raw" in cur_config:
exp_res = tuple([x.encode() for x in case["values_tuple"]])
else:
exp_res = case["values_tuple"]

async with await self.cnx.cursor(**cur_config) as cur:
await cur.execute(sql, values)
await cur.execute(f"select * from {self.bug_37013057.table_name}")
res = await cur.fetchone()
await cur.execute(f"TRUNCATE TABLE {self.bug_37013057.table_name}")
self.assertEqual(res, exp_res)

@foreach_cnx_aio()
async def test_execute_dict_based_injection(self):
for cur_config in self.bug_37013057.cur_flavors:
await self._run_execute(dict_based=True, cur_config=cur_config)

@foreach_cnx_aio()
async def test_execute_tuple_based_injection(self):
for cur_config in self.bug_37013057.cur_flavors:
await self._run_execute(dict_based=False, cur_config=cur_config)

0 comments on commit e6b927a

Please sign in to comment.