-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(hive): Update CSV to Hive upload prefix #14255
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -791,9 +791,16 @@ class CeleryConfig: # pylint: disable=too-few-public-methods | |
CSV_TO_HIVE_UPLOAD_DIRECTORY = "EXTERNAL_HIVE_TABLES/" | ||
# Function that creates upload directory dynamically based on the | ||
# database used, user and schema provided. | ||
CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC: Callable[ | ||
["Database", "models.User", str], Optional[str] | ||
] = lambda database, user, schema: CSV_TO_HIVE_UPLOAD_DIRECTORY | ||
def CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC( | ||
database: "Database", | ||
user: "models.User", # pylint: disable=unused-argument | ||
schema: Optional[str], | ||
) -> str: | ||
# Note the final empty path enforces a trailing slash. | ||
return os.path.join( | ||
CSV_TO_HIVE_UPLOAD_DIRECTORY, str(database.id), schema or "", "" | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ktmud that was my original thinking as well, however these can have spaces etc. in them which could be problematic for S3 keys. I thought a database ID was the safest and also remains unchanged should someone rename a database (the name is purely for presentation purposes). |
||
|
||
|
||
# The namespace within hive where the tables created from | ||
# uploading CSVs will be stored. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -173,7 +173,7 @@ def test_create_table_from_csv_append() -> None: | |
|
||
@mock.patch( | ||
"superset.db_engine_specs.hive.config", | ||
{**app.config, "CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC": lambda *args: True}, | ||
{**app.config, "CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC": lambda *args: ""}, | ||
) | ||
@mock.patch("superset.db_engine_specs.hive.g", spec={}) | ||
@mock.patch("tableschema.Table") | ||
|
@@ -190,7 +190,7 @@ def test_create_table_from_csv_if_exists_fail(mock_table, mock_g): | |
|
||
@mock.patch( | ||
"superset.db_engine_specs.hive.config", | ||
{**app.config, "CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC": lambda *args: True}, | ||
{**app.config, "CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC": lambda *args: ""}, | ||
) | ||
@mock.patch("superset.db_engine_specs.hive.g", spec={}) | ||
@mock.patch("tableschema.Table") | ||
|
@@ -211,7 +211,7 @@ def test_create_table_from_csv_if_exists_fail_with_schema(mock_table, mock_g): | |
|
||
@mock.patch( | ||
"superset.db_engine_specs.hive.config", | ||
{**app.config, "CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC": lambda *args: True}, | ||
{**app.config, "CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC": lambda *args: ""}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
) | ||
@mock.patch("superset.db_engine_specs.hive.g", spec={}) | ||
@mock.patch("tableschema.Table") | ||
|
@@ -239,7 +239,7 @@ def test_create_table_from_csv_if_exists_replace(mock_upload_to_s3, mock_table, | |
|
||
@mock.patch( | ||
"superset.db_engine_specs.hive.config", | ||
{**app.config, "CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC": lambda *args: True}, | ||
{**app.config, "CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC": lambda *args: ""}, | ||
) | ||
@mock.patch("superset.db_engine_specs.hive.g", spec={}) | ||
@mock.patch("tableschema.Table") | ||
|
@@ -347,6 +347,22 @@ def is_readonly(sql: str) -> bool: | |
assert is_readonly("WITH (SELECT 1) bla SELECT * from bla") | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"schema,upload_prefix", | ||
[("foo", "EXTERNAL_HIVE_TABLES/1/foo/"), (None, "EXTERNAL_HIVE_TABLES/1/")], | ||
) | ||
def test_s3_upload_prefix(schema: str, upload_prefix: str) -> None: | ||
mock_database = mock.MagicMock() | ||
mock_database.id = 1 | ||
|
||
assert ( | ||
app.config["CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC"]( | ||
database=mock_database, user=mock.MagicMock(), schema=schema | ||
) | ||
== upload_prefix | ||
) | ||
|
||
|
||
def test_upload_to_s3_no_bucket_path(): | ||
with pytest.raises( | ||
Exception, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note the return type has changed from
Optional[str]
tostr
since a path must be specified.