Skip to content

Commit

Permalink
Merge branch 'main' into pin_sphinx_auto_api
Browse files Browse the repository at this point in the history
  • Loading branch information
dimberman authored Jan 17, 2023
2 parents 4c4daba + 6864dec commit 42459bb
Show file tree
Hide file tree
Showing 12 changed files with 68 additions and 22 deletions.
10 changes: 9 additions & 1 deletion python-sdk/docs/development/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@

## Updating the changelog

The first step to creating a release is identifying what changed from the previous release and adding a description to the [CHANGELOG.md](../CHANGELOG.md)
The first step to creating a release is identifying what changed from the previous release and adding a description to the [CHANGELOG.md](../CHANGELOG.md).

One way to identify what changed from the previous release is to use a comparison URL between the previous release branch and the main branch.

Example -
[https://github.com/astronomer/astro-sdk/compare/1.3.3...main](https://github.com/astronomer/astro-sdk/compare/1.3.3...main)

This will show all the commits that are in the main branch but not in the release branch. Using this we can note all the changes significant enough to be mentioned in CHANGELOGS.


## Handling patch releases

Expand Down
2 changes: 1 addition & 1 deletion python-sdk/example_dags/example_load_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@
# [START load_file_example_12]
aql.load_file(
input_file=File(
"gs://astro-sdk/workspace/sample_pattern.csv",
"gs://astro-sdk/workspace/sample_pattern",
conn_id="bigquery",
filetype=FileType.CSV,
),
Expand Down
6 changes: 5 additions & 1 deletion python-sdk/src/astro/databases/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,9 +618,13 @@ def load_file_to_table_natively(
"""
native_support_kwargs = native_support_kwargs or {}
storage_integration = native_support_kwargs.get("storage_integration")
if not self.load_options:
self.load_options = SnowflakeLoadOptions()

storage_integration = native_support_kwargs.get("storage_integration", None)
if storage_integration is None:
storage_integration = self.load_options.storage_integration

stage = self.create_stage(file=source_file, storage_integration=storage_integration)

rows = self._copy_into_table_from_stage(
Expand Down
1 change: 1 addition & 0 deletions python-sdk/src/astro/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class SnowflakeLoadOptions(LoadOptions):

file_options: dict = attr.field(init=True, factory=dict)
copy_options: dict = attr.field(init=True, factory=dict)
storage_integration: str = attr.field(default=None)

def empty(self):
return not self.file_options and not self.copy_options
12 changes: 4 additions & 8 deletions python-sdk/src/astro/sql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,6 @@

from astro.sql.operators.append import AppendOperator, append
from astro.sql.operators.cleanup import CleanupOperator, cleanup
from astro.sql.operators.data_validations.ColumnCheckOperator import ( # skipcq: PY-W2000
ColumnCheckOperator,
column_check,
)
from astro.sql.operators.data_validations.SQLCheckOperator import ( # skipcq: PY-W2000
SQLCheckOperator,
sql_check,
)
from astro.sql.operators.dataframe import DataframeOperator, dataframe
from astro.sql.operators.drop import DropTableOperator, drop_table
from astro.sql.operators.export_file import ExportFileOperator, export_file
Expand Down Expand Up @@ -49,6 +41,10 @@
"TransformOperator",
"transform_file",
"transform",
"ColumnCheckOperator",
"check_column",
"SQLCheckOperator",
"check_table",
]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def __init__(
column_mapping=self.column_mapping,
partition_clause=self.partition_clause,
conn_id=dataset_conn_id,
task_id=task_id if task_id is not None else get_unique_task_id("column_check"),
task_id=task_id if task_id is not None else get_unique_task_id("check_column"),
)

def get_db_hook(self) -> DbApiHook:
Expand Down Expand Up @@ -165,7 +165,7 @@ def get_checks_string(check, col):
return f"\tCheck: {check},\n\tCheck Values:"


def column_check(
def check_column(
dataset: Union[BaseTable, pandas.DataFrame],
column_mapping: Dict[str, Dict[str, Any]],
partition_clause: Optional[str] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(
checks=checks,
partition_clause=partition_clause,
conn_id=dataset.conn_id,
task_id=task_id or get_unique_task_id("sql_check"),
task_id=task_id or get_unique_task_id("check_table"),
)

def get_db_hook(self) -> DbApiHook:
Expand All @@ -66,7 +66,7 @@ def get_db_hook(self) -> DbApiHook:
return super().get_db_hook()


def sql_check(
def check_table(
dataset: BaseTable,
checks: Dict[str, Dict[str, Any]],
partition_clause: Optional[str] = None,
Expand Down
42 changes: 42 additions & 0 deletions python-sdk/tests/databases/test_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,45 @@ def test_snowflake_load_options_empty():
assert not load_options.empty()
load_options.file_options = {}
assert load_options.empty()


def test_storage_integrations_params_in_native_support_kwargs():
"""
Test passing of storage_integrations in native_support_kwargs
"""
table = Table(conn_id="fake-conn")
file = File(path="azure://data/homes_main.ndjson")
database = SnowflakeDatabase(conn_id=table.conn_id, load_options=SnowflakeLoadOptions())

with mock.patch("astro.databases.snowflake.SnowflakeDatabase.create_stage") as create_stage, mock.patch(
"astro.databases.snowflake.SnowflakeDatabase._copy_into_table_from_stage"
) as _copy_into_table_from_stage, mock.patch(
"astro.databases.snowflake.SnowflakeDatabase.evaluate_results"
):
database.load_file_to_table_natively(
source_file=file,
target_table=table,
native_support_kwargs={"storage_integration": "some_integrations"},
)
assert create_stage.call_args.kwargs["storage_integration"] == "some_integrations"


def test_storage_integrations_params_in_load_options():
"""
Test passing of storage_integrations in SnowflakeLoadOptions
"""
table = Table(conn_id="fake-conn")
file = File(path="azure://data/homes_main.ndjson")
database = SnowflakeDatabase(
conn_id=table.conn_id,
load_options=SnowflakeLoadOptions(storage_integration="some_integrations"),
)

with mock.patch("astro.databases.snowflake.SnowflakeDatabase.create_stage") as create_stage, mock.patch(
"astro.databases.snowflake.SnowflakeDatabase._copy_into_table_from_stage"
) as _copy_into_table_from_stage, mock.patch(
"astro.databases.snowflake.SnowflakeDatabase.evaluate_results"
):
database.load_file_to_table_natively(source_file=file, target_table=table)

assert create_stage.call_args.kwargs["storage_integration"] == "some_integrations"
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def test_column_check_operator_with_null_checks(sample_dag):
Test column_check_operator for null_check case
"""
with sample_dag:
aql.column_check(
aql.check_column(
dataset=df,
column_mapping={
"name": {"null_check": {"geq_to": 0, "leq_to": 1}},
Expand All @@ -49,7 +49,7 @@ def test_failure_of_column_check_operator_with_null_checks__equal_to(sample_dag)
Test that failure column_check_operator for null_check
"""
with sample_dag, pytest.raises(AirflowException) as e:
aql.column_check(
aql.check_column(
dataset=df,
column_mapping={
"city": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ def test_autoloader_load_file_local(database_table_fixture):
assert not df.empty
assert len(df) == 3
assert df.columns.to_list() == ["id", "name"]
database.drop_table(table)


@pytest.mark.integration
Expand All @@ -54,7 +53,6 @@ def test_autoloader_load_file_s3(database_table_fixture):
output_table=table,
)
assert database.table_exists(table)
database.drop_table(table)


@pytest.mark.integration
Expand All @@ -81,7 +79,6 @@ def test_delta_load_file_gcs(database_table_fixture):
output_table=table,
)
assert database.table_exists(table)
database.drop_table(table)


@pytest.mark.integration
Expand All @@ -108,7 +105,6 @@ def test_delta_load_file_gcs_autoloader(database_table_fixture):
output_table=table,
)
assert database.table_exists(table)
database.drop_table(table)


@pytest.mark.integration
Expand All @@ -135,4 +131,3 @@ def test_delta_load_file_gcs_default_connection(database_table_fixture):
output_table=table,
)
assert database.table_exists(table)
database.drop_table(table)

0 comments on commit 42459bb

Please sign in to comment.