Skip to content

Commit

Permalink
fix: check sqlalchemy_uri (apache#23901)
Browse files Browse the repository at this point in the history
(cherry picked from commit e5f512e)
  • Loading branch information
dpgaspar authored and eschutho committed May 23, 2023
1 parent dcc9df8 commit 07eb82f
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 23 deletions.
14 changes: 9 additions & 5 deletions superset/security/analytics_db_safety.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import re

from flask_babel import lazy_gettext as _
from sqlalchemy.engine.url import URL
from sqlalchemy.exc import NoSuchModuleError
Expand All @@ -24,18 +26,20 @@
# list of unsafe SQLAlchemy dialects
BLOCKLIST = {
# sqlite creates a local DB, which allows mapping server's filesystem
"sqlite",
re.compile(r"sqlite(?:\+[^\s]*)?$"),
# shillelagh allows opening local files (eg, 'SELECT * FROM "csv:///etc/passwd"')
"shillelagh",
"shillelagh+apsw",
re.compile(r"shillelagh$"),
re.compile(r"shillelagh\+apsw$"),
}


def check_sqlalchemy_uri(uri: URL) -> None:
if uri.drivername in BLOCKLIST:
for blocklist_regex in BLOCKLIST:
if not re.match(blocklist_regex, uri.drivername):
continue
try:
dialect = uri.get_dialect().__name__
except NoSuchModuleError:
except (NoSuchModuleError, ValueError):
dialect = uri.drivername

raise SupersetSecurityException(
Expand Down
84 changes: 66 additions & 18 deletions tests/integration_tests/security/analytics_db_safety_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,78 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Optional

import pytest
from sqlalchemy.engine.url import make_url

from superset.exceptions import SupersetSecurityException
from superset.security.analytics_db_safety import check_sqlalchemy_uri
from tests.integration_tests.base_tests import SupersetTestCase


class TestDBConnections(SupersetTestCase):
def test_check_sqlalchemy_uri_ok(self):
check_sqlalchemy_uri(make_url("postgres://user:password@test.com"))

def test_check_sqlalchemy_url_sqlite(self):
with pytest.raises(SupersetSecurityException) as excinfo:
check_sqlalchemy_uri(make_url("sqlite:///home/superset/bad.db"))
assert (
str(excinfo.value)
== "SQLiteDialect_pysqlite cannot be used as a data source for security reasons."
)

@pytest.mark.parametrize(
"sqlalchemy_uri, error, error_message",
[
("postgres://user:password@test.com", False, None),
(
"sqlite:///home/superset/bad.db",
True,
"SQLiteDialect_pysqlite cannot be used as a data source for security reasons.",
),
(
"sqlite+pysqlite:///home/superset/bad.db",
True,
"SQLiteDialect_pysqlite cannot be used as a data source for security reasons.",
),
(
"sqlite+aiosqlite:///home/superset/bad.db",
True,
"SQLiteDialect_pysqlite cannot be used as a data source for security reasons.",
),
(
"sqlite+pysqlcipher:///home/superset/bad.db",
True,
"SQLiteDialect_pysqlite cannot be used as a data source for security reasons.",
),
(
"sqlite+:///home/superset/bad.db",
True,
"SQLiteDialect_pysqlite cannot be used as a data source for security reasons.",
),
(
"sqlite+new+driver:///home/superset/bad.db",
True,
"SQLiteDialect_pysqlite cannot be used as a data source for security reasons.",
),
(
"sqlite+new+:///home/superset/bad.db",
True,
"SQLiteDialect_pysqlite cannot be used as a data source for security reasons.",
),
(
"shillelagh:///home/superset/bad.db",
True,
"shillelagh cannot be used as a data source for security reasons.",
),
(
"shillelagh+apsw:///home/superset/bad.db",
True,
"shillelagh cannot be used as a data source for security reasons.",
),
("shillelagh+:///home/superset/bad.db", False, None),
(
"shillelagh+something:///home/superset/bad.db",
False,
None,
),
],
)
def test_check_sqlalchemy_uri(
sqlalchemy_uri: str, error: bool, error_message: Optional[str]
):
if error:
with pytest.raises(SupersetSecurityException) as excinfo:
check_sqlalchemy_uri(make_url("shillelagh:///home/superset/bad.db"))
assert (
str(excinfo.value)
== "shillelagh cannot be used as a data source for security reasons."
)
check_sqlalchemy_uri(make_url(sqlalchemy_uri))
assert str(excinfo.value) == error_message
else:
check_sqlalchemy_uri(make_url(sqlalchemy_uri))

0 comments on commit 07eb82f

Please sign in to comment.