Skip to content

Commit

Permalink
Improve setUp/tearDown in PrestoToGCSSystemTest (apache#7860)
Browse files Browse the repository at this point in the history
  • Loading branch information
mik-laj authored Mar 26, 2020
1 parent e3920f1 commit a32a127
Showing 1 changed file with 24 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.
import os
from contextlib import closing, suppress

import pytest

Expand Down Expand Up @@ -58,7 +59,7 @@
z_ipaddress_v6 IPADDRESS,
-- UUID
z_uuid UUID
);
)
"""

LOAD_QUERY = """
Expand Down Expand Up @@ -92,9 +93,9 @@
IPADDRESS '2001:db8::1', -- z_ipaddress_v6 IPADDRESS,
-- UUID
UUID '12151fd2-7586-11e9-8f9e-2a86e4085a59' -- z_uuid UUID
);
)
"""
DELETE_QUERY = "DROP TABLE memory.default.test_multiple_types;"
DELETE_QUERY = "DROP TABLE memory.default.test_multiple_types"


@pytest.mark.integration("presto")
Expand All @@ -112,21 +113,34 @@ def init_connection():
@staticmethod
def init_db():
hook = PrestoHook()
hook.run(CREATE_QUERY)
hook.run(LOAD_QUERY)
with hook.get_conn() as conn:
with closing(conn.cursor()) as cur:
cur.execute(CREATE_QUERY)
# Presto does not execute queries until the result is fetched. :-(
cur.fetchone()
cur.execute(LOAD_QUERY)
cur.fetchone()

@staticmethod
def drop_db():
hook = PrestoHook()
hook.run(DELETE_QUERY)
with hook.get_conn() as conn:
with closing(conn.cursor()) as cur:
cur.execute(DELETE_QUERY)
# Presto does not execute queries until the result is fetched. :-(
cur.fetchone()

@provide_gcp_context(GCP_GCS_KEY)
def setUp(self):
super().setUp()
self.init_connection()
self.create_gcs_bucket(GCS_BUCKET)
with suppress(Exception):
self.drop_db()
self.init_db()
self.execute_with_ctx(["bq", "rm", f"{self._project_id()}:{DATASET_NAME}"], key=GCP_BIGQUERY_KEY)
self.execute_with_ctx([
"bq", "rm", "--recursive", "--force", f"{self._project_id()}:{DATASET_NAME}"
], key=GCP_BIGQUERY_KEY)

@provide_gcp_context(GCP_BIGQUERY_KEY)
def test_run_example_dag(self):
Expand All @@ -136,5 +150,7 @@ def test_run_example_dag(self):
def tearDown(self):
self.delete_gcs_bucket(GCS_BUCKET)
self.drop_db()
self.execute_with_ctx(["bq", "rm", f"{self._project_id()}:{DATASET_NAME}"], key=GCP_BIGQUERY_KEY)
self.execute_with_ctx([
"bq", "rm", "--recursive", "--force", f"{self._project_id()}:{DATASET_NAME}"
], key=GCP_BIGQUERY_KEY)
super().tearDown()

0 comments on commit a32a127

Please sign in to comment.