From b77c6b642d94974d79751ce8c8fa0217fd7def5e Mon Sep 17 00:00:00 2001 From: Nick Schrock Date: Wed, 27 Sep 2023 05:39:38 -0400 Subject: [PATCH] Describe External Assets; rename create_unexecutable_observable_assets_def to external_assets_from_specs (#16754) ## Summary & Motivation This PR renames `create_unexecutable_observable_assets_def` to `external_assets_from_specs`. I also want this PR to serve as the final discussion on naming this feature and documents this capability. The verbiage in the docblock: ``` Create an external assets definition from a sequence of asset specs. An external asset is an asset that is not materialized by Dagster, but is tracked in the asset graph and asset catalog. A common use case for external assets is modeling data produced by an process not under Dagster's control. For example a daily drop of a file from a third party in s3. In most systems these are described as sources. This includes Dagster, which includes :py:class:`SourceAsset`, which will be supplanted by external assets in the near-term future, as external assets are a superset of the functionality of Source Assets. External assets can act as sources, but that is not their only use. In particular, external assets have themselves have lineage-specified through the ``deps`` argument of :py:class:`AssetSpec`- and can depend on other external assets. External assets are not allowed to depend on non-external assets. The user can emit `AssetMaterialization`, `AssetObservation`, and `AssetCheckEvaluations` events attached external assets. And Dagster now has the ability to have "runless" events to enable many use cases that were previously not possible. Runless events are events generated outside the context of a particular run (for example, in a sensor or by an script), allowing for greater flexibility in event generation. This can be done in a few ways: Note to reviewers that this in an in-progress doc block and the below will have links and examples. 1) DagsterInstance exposes `report_runless_event` that can be used to generate events for external assets directly on an instance. See docs. 2) Sensors can build these events and return them using :py:class:`SensorResult`. A use case for this is using a sensor to continously monitor the metadata exhaust from an external system and inserting events that reflect that exhaust. See docs. 3) Dagster Cloud exposes a REST API for ingesting runless events. Users can copy and paste a curl command in the their external computations (such as Airflow operator) to register metadata associated with those computations See docs. 4) Dagster ops can generate these events directly and yield them or by calling ``log_event`` on :py:class:`OpExecutionContext`. Use cases for this include querying metadata in an external system that is too expensive to do so in a sensor. Or for adapting pure op-based Dagster code to take advantage of asset-oriented lineage, observability, and data quality features, without having to port them wholesale to `@asset`- and `@multi_asset`-based code. This feature set allows users to use Dagster as an observability, lineage, and data quality tool for assets that are not materialized by Dagster. In addition to traditional use cases like sources, this feature can model entire lineage graphs of assets that are scheduled and materialized by other tools and workflow engines. This allows users to use Dagster as a cross-cutting observability tool without migrating their entire data platform to a single orchestration engine. External assets do not have all the features of normal assets: they cannot be materialized ad hoc by Dagster (this is diabled in the UI); cannot be backfilled; cannot be scheduled using auto-materialize policies; and opt out of other features around direct materialization, both now and in the future. External assets also provide fewer guarantees around the correctness of information of their information in the asset catalog. In other words, in exchange for the flexibility Dagster provides less guardrails for external assets than assets that are materialized by Dagster, and there is an increased chance that they will insert non-sensical information into the asset catalog, potentially eroding trust. ``` Suggesting alternative lanuage in this docblock is the best way to talk about an alternative name IMO. ## How I Tested These Changes --- .../__snapshots__/test_all_snapshot_ids.ambr | 116 ++++++------- .../graphql/__snapshots__/test_solids.ambr | 32 ++-- .../dagster_graphql_tests/graphql/repo.py | 4 +- .../_core/definitions/external_asset.py | 158 ++++++++++++++++++ .../_core/definitions/observable_asset.py | 86 ---------- .../test_external_data.py | 8 +- ...able_assets.py => test_external_assets.py} | 65 +++---- 7 files changed, 271 insertions(+), 198 deletions(-) create mode 100644 python_modules/dagster/dagster/_core/definitions/external_asset.py delete mode 100644 python_modules/dagster/dagster/_core/definitions/observable_asset.py rename python_modules/dagster/dagster_tests/definitions_tests/{test_observable_assets.py => test_external_assets.py} (77%) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/__snapshots__/test_all_snapshot_ids.ambr b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/__snapshots__/test_all_snapshot_ids.ambr index 12b1a570a3f6d..dbfc01cb978f1 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/__snapshots__/test_all_snapshot_ids.ambr +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/__snapshots__/test_all_snapshot_ids.ambr @@ -48266,56 +48266,6 @@ "scalar_kind": null, "type_param_keys": null }, - "Shape.006812248eeed5f01a7a8b7c8aa6d541d96b375a": { - "__class__": "ConfigTypeSnap", - "description": null, - "enum_values": null, - "fields": [ - { - "__class__": "ConfigFieldSnap", - "default_provided": true, - "default_value_as_json_str": "{\"config\": {\"retries\": {\"enabled\": {}}}}", - "description": "Configure how steps are executed within a run.", - "is_required": false, - "name": "execution", - "type_key": "Shape.09d73f0755bf4752d3f121837669c8660dcf451e" - }, - { - "__class__": "ConfigFieldSnap", - "default_provided": true, - "default_value_as_json_str": "{}", - "description": "Configure how loggers emit messages within a run.", - "is_required": false, - "name": "loggers", - "type_key": "Shape.e895d95ee6d0eff1b884c76f44a2ab7089f0c49b" - }, - { - "__class__": "ConfigFieldSnap", - "default_provided": true, - "default_value_as_json_str": "{\"an_asset\": {\"config\": {}}, \"executable_asset\": {}}", - "description": "Configure runtime parameters for ops or assets.", - "is_required": false, - "name": "ops", - "type_key": "Shape.b8d5bc3385ac93557bbf436b3886579c7e541df6" - }, - { - "__class__": "ConfigFieldSnap", - "default_provided": true, - "default_value_as_json_str": "{\"io_manager\": {}}", - "description": "Configure how shared resources are implemented within a run.", - "is_required": false, - "name": "resources", - "type_key": "Shape.1578133c1c71e8e3c9cf3ad46c216eb51b48c778" - } - ], - "given_name": null, - "key": "Shape.006812248eeed5f01a7a8b7c8aa6d541d96b375a", - "kind": { - "__enum__": "ConfigTypeKind.STRICT_SHAPE" - }, - "scalar_kind": null, - "type_param_keys": null - }, "Shape.081354663b9d4b8fbfd1cb8e358763912953913f": { "__class__": "ConfigTypeSnap", "description": null, @@ -48518,7 +48468,7 @@ "scalar_kind": null, "type_param_keys": null }, - "Shape.b8d5bc3385ac93557bbf436b3886579c7e541df6": { + "Shape.b31f9017806ff4b8385cd662d325a47a819a2815": { "__class__": "ConfigTypeSnap", "description": null, "enum_values": null, @@ -48529,7 +48479,7 @@ "default_value_as_json_str": "{\"config\": {}}", "description": null, "is_required": false, - "name": "an_asset", + "name": "_external_assets_def", "type_key": "Shape.62edccaf30696e25335ae92685bdc41e204e30e6" }, { @@ -48543,7 +48493,57 @@ } ], "given_name": null, - "key": "Shape.b8d5bc3385ac93557bbf436b3886579c7e541df6", + "key": "Shape.b31f9017806ff4b8385cd662d325a47a819a2815", + "kind": { + "__enum__": "ConfigTypeKind.STRICT_SHAPE" + }, + "scalar_kind": null, + "type_param_keys": null + }, + "Shape.b7f072a80a1fdee0bfc5e256ea343cd7e3a8818b": { + "__class__": "ConfigTypeSnap", + "description": null, + "enum_values": null, + "fields": [ + { + "__class__": "ConfigFieldSnap", + "default_provided": true, + "default_value_as_json_str": "{\"config\": {\"retries\": {\"enabled\": {}}}}", + "description": "Configure how steps are executed within a run.", + "is_required": false, + "name": "execution", + "type_key": "Shape.09d73f0755bf4752d3f121837669c8660dcf451e" + }, + { + "__class__": "ConfigFieldSnap", + "default_provided": true, + "default_value_as_json_str": "{}", + "description": "Configure how loggers emit messages within a run.", + "is_required": false, + "name": "loggers", + "type_key": "Shape.e895d95ee6d0eff1b884c76f44a2ab7089f0c49b" + }, + { + "__class__": "ConfigFieldSnap", + "default_provided": true, + "default_value_as_json_str": "{\"_external_assets_def\": {\"config\": {}}, \"executable_asset\": {}}", + "description": "Configure runtime parameters for ops or assets.", + "is_required": false, + "name": "ops", + "type_key": "Shape.b31f9017806ff4b8385cd662d325a47a819a2815" + }, + { + "__class__": "ConfigFieldSnap", + "default_provided": true, + "default_value_as_json_str": "{\"io_manager\": {}}", + "description": "Configure how shared resources are implemented within a run.", + "is_required": false, + "name": "resources", + "type_key": "Shape.1578133c1c71e8e3c9cf3ad46c216eb51b48c778" + } + ], + "given_name": null, + "key": "Shape.b7f072a80a1fdee0bfc5e256ea343cd7e3a8818b", "kind": { "__enum__": "ConfigTypeKind.STRICT_SHAPE" }, @@ -48699,8 +48699,8 @@ "__class__": "SolidInvocationSnap", "input_dep_snaps": [], "is_dynamic_mapped": false, - "solid_def_name": "an_asset", - "solid_name": "an_asset", + "solid_def_name": "_external_assets_def", + "solid_name": "_external_assets_def", "tags": {} }, { @@ -48753,7 +48753,7 @@ "name": "io_manager" } ], - "root_config_key": "Shape.006812248eeed5f01a7a8b7c8aa6d541d96b375a" + "root_config_key": "Shape.b7f072a80a1fdee0bfc5e256ea343cd7e3a8818b" } ], "name": "executable_test_job", @@ -48774,7 +48774,7 @@ }, "description": null, "input_def_snaps": [], - "name": "an_asset", + "name": "_external_assets_def", "output_def_snaps": [ { "__class__": "OutputDefSnap", @@ -48822,7 +48822,7 @@ ''' # --- # name: test_all_snapshot_ids[47] - '43ee457f7b442944a111b7d729f827ab9618a4ae' + '7f7ef891f97ea4a6a6c2222533b79da418717189' # --- # name: test_all_snapshot_ids[48] ''' diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/__snapshots__/test_solids.ambr b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/__snapshots__/test_solids.ambr index aff3a79f34df3..8d0c6b0b3a99b 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/__snapshots__/test_solids.ambr +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/__snapshots__/test_solids.ambr @@ -2,6 +2,22 @@ dict({ 'repositoryOrError': dict({ 'usedSolids': list([ + dict({ + '__typename': 'UsedSolid', + 'definition': dict({ + 'name': '_external_assets_def', + }), + 'invocations': list([ + dict({ + 'pipeline': dict({ + 'name': 'executable_test_job', + }), + 'solidHandle': dict({ + 'handleID': '_external_assets_def', + }), + }), + ]), + }), dict({ '__typename': 'UsedSolid', 'definition': dict({ @@ -130,22 +146,6 @@ }), ]), }), - dict({ - '__typename': 'UsedSolid', - 'definition': dict({ - 'name': 'an_asset', - }), - 'invocations': list([ - dict({ - 'pipeline': dict({ - 'name': 'executable_test_job', - }), - 'solidHandle': dict({ - 'handleID': 'an_asset', - }), - }), - ]), - }), dict({ '__typename': 'UsedSolid', 'definition': dict({ diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py index 60c96fa563777..8e2ee291b4521 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py @@ -85,10 +85,10 @@ from dagster._core.definitions.definitions_class import Definitions from dagster._core.definitions.events import Failure from dagster._core.definitions.executor_definition import in_process_executor +from dagster._core.definitions.external_asset import external_assets_from_specs from dagster._core.definitions.freshness_policy import FreshnessPolicy from dagster._core.definitions.metadata import MetadataValue from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition -from dagster._core.definitions.observable_asset import create_unexecutable_observable_assets_def from dagster._core.definitions.partition import PartitionedConfig from dagster._core.definitions.reconstruct import ReconstructableRepository from dagster._core.definitions.sensor_definition import RunRequest, SkipReason @@ -1383,7 +1383,7 @@ def executable_asset() -> None: pass -unexecutable_asset = create_unexecutable_observable_assets_def([AssetSpec("unexecutable_asset")]) +unexecutable_asset = next(iter(external_assets_from_specs([AssetSpec("unexecutable_asset")]))) executable_test_job = build_assets_job( name="executable_test_job", assets=[executable_asset, unexecutable_asset] diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset.py b/python_modules/dagster/dagster/_core/definitions/external_asset.py new file mode 100644 index 0000000000000..03891ea5f69a7 --- /dev/null +++ b/python_modules/dagster/dagster/_core/definitions/external_asset.py @@ -0,0 +1,158 @@ +from typing import List, Sequence + +from dagster import _check as check +from dagster._core.definitions.asset_spec import ( + SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, + AssetExecutionType, + AssetSpec, +) +from dagster._core.definitions.assets import AssetsDefinition +from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset +from dagster._core.definitions.source_asset import SourceAsset +from dagster._core.errors import DagsterInvariantViolationError +from dagster._core.execution.context.compute import AssetExecutionContext + + +def external_assets_from_specs(specs: Sequence[AssetSpec]) -> List[AssetsDefinition]: + """Create an external assets definition from a sequence of asset specs. + + An external asset is an asset that is not materialized by Dagster, but is tracked in the + asset graph and asset catalog. + + A common use case for external assets is modeling data produced by an process not + under Dagster's control. For example a daily drop of a file from a third party in s3. + + In most systems these are described as sources. This includes Dagster, which includes + :py:class:`SourceAsset`, which will be supplanted by external assets in the near-term + future, as external assets are a superset of the functionality + of Source Assets. + + External assets can act as sources, but that is not their only use. + + In particular, external assets have themselves have lineage-specified through the + ``deps`` argument of :py:class:`AssetSpec`- and can depend on other external assets. + External assets are not allowed to depend on non-external assets. + + The user can emit `AssetMaterialization`, `AssetObservation`, and `AssetCheckEvaluations` + events attached external assets. And Dagster now has the ability to have "runless" + events to enable many use cases that were previously not possible. Runless events + are events generated outside the context of a particular run (for example, in a + sensor or by an script), allowing for greater flexibility in event generation. + This can be done in a few ways: + + Note to reviewers that this in an in-progress doc block and the below will have links and examples. + + 1) DagsterInstance exposes `report_runless_event` that can be used to generate events for + external assets directly on an instance. See docs. + 2) Sensors can build these events and return them using :py:class:`SensorResult`. A use + case for this is using a sensor to continously monitor the metadata exhaust from + an external system and inserting events that + reflect that exhaust. See docs. + 3) Dagster Cloud exposes a REST API for ingesting runless events. Users can copy and + paste a curl command in the their external computations (such as Airflow operator) + to register metadata associated with those computations See docs. + 4) Dagster ops can generate these events directly and yield them or by calling + ``log_event`` on :py:class:`OpExecutionContext`. Use cases for this include + querying metadata in an external system that is too expensive to do so in a sensor. Or + for adapting pure op-based Dagster code to take advantage of asset-oriented lineage, + observability, and data quality features, without having to port them wholesale + to `@asset`- and `@multi_asset`-based code. + + This feature set allows users to use Dagster as an observability, lineage, and + data quality tool for assets that are not materialized by Dagster. In addition to + traditional use cases like sources, this feature can model entire lineage graphs of + assets that are scheduled and materialized by other tools and workflow engines. This + allows users to use Dagster as a cross-cutting observability tool without migrating + their entire data platform to a single orchestration engine. + + External assets do not have all the features of normal assets: they cannot be + materialized ad hoc by Dagster (this is diabled in the UI); cannot be backfilled; cannot + be scheduled using auto-materialize policies; and opt out of other features around + direct materialization, both now and in the future. External assets also provide fewer + guarantees around the correctness of information of their information in the asset + catalog. In other words, in exchange for the flexibility Dagster provides less guardrails + for external assets than assets that are materialized by Dagster, and there is an increased + chance that they will insert non-sensical information into the asset catalog, potentially + eroding trust. + + Args: + specs (Sequence[AssetSpec]): The specs for the assets. + """ + assets_defs = [] + for spec in specs: + check.invariant( + spec.auto_materialize_policy is None, + "auto_materialize_policy must be None since it is ignored", + ) + check.invariant(spec.code_version is None, "code_version must be None since it is ignored") + check.invariant( + spec.freshness_policy is None, "freshness_policy must be None since it is ignored" + ) + check.invariant( + spec.skippable is False, + "skippable must be False since it is ignored and False is the default", + ) + + @multi_asset( + specs=[ + AssetSpec( + key=spec.key, + description=spec.description, + group_name=spec.group_name, + metadata={ + **(spec.metadata or {}), + **{ + SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: ( + AssetExecutionType.UNEXECUTABLE.value + ) + }, + }, + deps=spec.deps, + ) + ] + ) + def _external_assets_def(context: AssetExecutionContext) -> None: + raise DagsterInvariantViolationError( + "You have attempted to execute an unexecutable asset" + f" {context.asset_key.to_user_string}." + ) + + assets_defs.append(_external_assets_def) + + return assets_defs + + +def create_external_asset_from_source_asset(source_asset: SourceAsset) -> AssetsDefinition: + check.invariant( + source_asset.observe_fn is None, + "Observable source assets not supported yet: observe_fn should be None", + ) + check.invariant( + source_asset.auto_observe_interval_minutes is None, + "Observable source assets not supported yet: auto_observe_interval_minutes should be None", + ) + + kwargs = { + "key": source_asset.key, + "metadata": { + **source_asset.metadata, + **{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}, + }, + "group_name": source_asset.group_name, + "description": source_asset.description, + "partitions_def": source_asset.partitions_def, + } + + if source_asset.io_manager_def: + kwargs["io_manager_def"] = source_asset.io_manager_def + elif source_asset.io_manager_key: + kwargs["io_manager_key"] = source_asset.io_manager_key + + @asset(**kwargs) + def _external_assets_def() -> None: + raise NotImplementedError(f"Asset {source_asset.key} is not executable") + + check.invariant(isinstance(_external_assets_def, AssetsDefinition)) + assert isinstance(_external_assets_def, AssetsDefinition) # appese pyright + + return _external_assets_def diff --git a/python_modules/dagster/dagster/_core/definitions/observable_asset.py b/python_modules/dagster/dagster/_core/definitions/observable_asset.py deleted file mode 100644 index 664c3d6beb5ef..0000000000000 --- a/python_modules/dagster/dagster/_core/definitions/observable_asset.py +++ /dev/null @@ -1,86 +0,0 @@ -from typing import Sequence - -from dagster import _check as check -from dagster._core.definitions.asset_spec import ( - SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, - AssetExecutionType, - AssetSpec, -) -from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset -from dagster._core.definitions.source_asset import SourceAsset -from dagster._core.errors import DagsterInvariantViolationError - - -def create_unexecutable_observable_assets_def(specs: Sequence[AssetSpec]): - new_specs = [] - for spec in specs: - check.invariant( - spec.auto_materialize_policy is None, - "auto_materialize_policy must be None since it is ignored", - ) - check.invariant(spec.code_version is None, "code_version must be None since it is ignored") - check.invariant( - spec.freshness_policy is None, "freshness_policy must be None since it is ignored" - ) - check.invariant( - spec.skippable is False, - "skippable must be False since it is ignored and False is the default", - ) - - new_specs.append( - AssetSpec( - key=spec.key, - description=spec.description, - group_name=spec.group_name, - metadata={ - **(spec.metadata or {}), - **{ - SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: ( - AssetExecutionType.UNEXECUTABLE.value - ) - }, - }, - deps=spec.deps, - ) - ) - - @multi_asset(specs=new_specs) - def an_asset() -> None: - raise DagsterInvariantViolationError( - f"You have attempted to execute an unexecutable asset {[spec.key for spec in specs]}" - ) - - return an_asset - - -def create_unexecutable_observable_assets_def_from_source_asset(source_asset: SourceAsset): - check.invariant( - source_asset.observe_fn is None, - "Observable source assets not supported yet: observe_fn should be None", - ) - check.invariant( - source_asset.auto_observe_interval_minutes is None, - "Observable source assets not supported yet: auto_observe_interval_minutes should be None", - ) - - kwargs = { - "key": source_asset.key, - "metadata": { - **source_asset.metadata, - **{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}, - }, - "group_name": source_asset.group_name, - "description": source_asset.description, - "partitions_def": source_asset.partitions_def, - } - - if source_asset.io_manager_def: - kwargs["io_manager_def"] = source_asset.io_manager_def - elif source_asset.io_manager_key: - kwargs["io_manager_key"] = source_asset.io_manager_key - - @asset(**kwargs) - def shim_asset() -> None: - raise NotImplementedError(f"Asset {source_asset.key} is not executable") - - return shim_asset diff --git a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py index 3d4a97e836ad6..96377a011ec6f 100644 --- a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py +++ b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py @@ -22,9 +22,9 @@ from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.backfill_policy import BackfillPolicy +from dagster._core.definitions.external_asset import external_assets_from_specs from dagster._core.definitions.metadata import MetadataValue, TextMetadataValue, normalize_metadata from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition -from dagster._core.definitions.observable_asset import create_unexecutable_observable_assets_def from dagster._core.definitions.partition import ScheduleType from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition from dagster._core.definitions.utils import DEFAULT_GROUP_NAME @@ -1187,8 +1187,8 @@ def test_external_time_window_valid_partition_key(): ) -def test_unexecutable_external_asset_node() -> None: - asset_one = create_unexecutable_observable_assets_def([AssetSpec("asset_one")]) +def test_external_assets_def_to_external_asset_graph() -> None: + asset_one = next(iter(external_assets_from_specs([AssetSpec("asset_one")]))) assets_job = build_assets_job("assets_job", [asset_one]) external_asset_nodes = external_asset_graph_from_defs([assets_job], source_assets_by_key={}) @@ -1197,7 +1197,7 @@ def test_unexecutable_external_asset_node() -> None: assert next(iter(external_asset_nodes)).is_executable is False -def test_historical_external_asset_node() -> None: +def test_historical_external_asset_node_that_models_underlying_external_assets_def() -> None: assert not ExternalAssetNode( asset_key=AssetKey("asset_one"), dependencies=[], diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py similarity index 77% rename from python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py rename to python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py index 65d5c99ebc4f3..389969e4c382e 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py @@ -15,38 +15,41 @@ asset, ) from dagster._core.definitions.asset_spec import AssetSpec -from dagster._core.definitions.freshness_policy import FreshnessPolicy -from dagster._core.definitions.observable_asset import ( - create_unexecutable_observable_assets_def, - create_unexecutable_observable_assets_def_from_source_asset, +from dagster._core.definitions.external_asset import ( + create_external_asset_from_source_asset, + external_assets_from_specs, ) +from dagster._core.definitions.freshness_policy import FreshnessPolicy from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition -def test_observable_asset_basic_creation() -> None: - assets_def = create_unexecutable_observable_assets_def( - specs=[ - AssetSpec( - key="observable_asset_one", - # multi-asset does not support description lol - # description="desc", - metadata={"user_metadata": "value"}, - group_name="a_group", - ) - ] +def external_asset_from_spec(spec: AssetSpec) -> AssetsDefinition: + return next(iter(external_assets_from_specs(specs=[spec]))) + + +def test_external_asset_basic_creation() -> None: + assets_def = external_asset_from_spec( + AssetSpec( + key="external_asset_one", + # will work once https://github.com/dagster-io/dagster/pull/16755 merges + # description="desc", + metadata={"user_metadata": "value"}, + group_name="a_group", + ) ) assert isinstance(assets_def, AssetsDefinition) - expected_key = AssetKey(["observable_asset_one"]) + expected_key = AssetKey(["external_asset_one"]) assert assets_def.key == expected_key + # will work once https://github.com/dagster-io/dagster/pull/16755 merges # assert assets_def.descriptions_by_key[expected_key] == "desc" assert assets_def.metadata_by_key[expected_key]["user_metadata"] == "value" assert assets_def.group_names_by_key[expected_key] == "a_group" assert assets_def.is_asset_executable(expected_key) is False -def test_invalid_observable_asset_creation() -> None: +def test_invalid_external_asset_creation() -> None: invalid_specs = [ AssetSpec("invalid_asset1", auto_materialize_policy=AutoMaterializePolicy.eager()), AssetSpec("invalid_asset2", code_version="ksjdfljs"), @@ -56,7 +59,7 @@ def test_invalid_observable_asset_creation() -> None: for invalid_spec in invalid_specs: with pytest.raises(check.CheckError): - create_unexecutable_observable_assets_def(specs=[invalid_spec]) + external_assets_from_specs(specs=[invalid_spec]) def test_normal_asset_materializeable() -> None: @@ -66,23 +69,21 @@ def an_asset() -> None: ... assert an_asset.is_asset_executable(AssetKey(["an_asset"])) is True -def test_observable_asset_creation_with_deps() -> None: - asset_two = AssetSpec("observable_asset_two") - assets_def = create_unexecutable_observable_assets_def( - specs=[ - AssetSpec( - "observable_asset_one", - deps=[asset_two.key], # todo remove key when asset deps accepts it - ) - ] +def test_external_asset_creation_with_deps() -> None: + asset_two = AssetSpec("external_asset_two") + assets_def = external_asset_from_spec( + AssetSpec( + "external_asset_one", + deps=[asset_two.key], # todo remove key when asset deps accepts it + ) ) assert isinstance(assets_def, AssetsDefinition) - expected_key = AssetKey(["observable_asset_one"]) + expected_key = AssetKey(["external_asset_one"]) assert assets_def.key == expected_key assert assets_def.asset_deps[expected_key] == { - AssetKey(["observable_asset_two"]), + AssetKey(["external_asset_two"]), } @@ -112,7 +113,7 @@ def an_asset(source_asset: str) -> str: assert result_one.output_for_node("an_asset") == "hardcoded-computed" defs_with_shim = Definitions( - assets=[create_unexecutable_observable_assets_def_from_source_asset(source_asset), an_asset] + assets=[create_external_asset_from_source_asset(source_asset), an_asset] ) assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition) @@ -175,9 +176,9 @@ def an_asset(context: AssetExecutionContext, source_asset: str) -> str: assert result_one.success assert result_one.output_for_node("an_asset") == "hardcoded-computed-2021-01-02" - shimmed_source_asset = create_unexecutable_observable_assets_def_from_source_asset(source_asset) + shimmed_source_asset = create_external_asset_from_source_asset(source_asset) defs_with_shim = Definitions( - assets=[create_unexecutable_observable_assets_def_from_source_asset(source_asset), an_asset] + assets=[create_external_asset_from_source_asset(source_asset), an_asset] ) assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition)