0.10.0 The Edge of Glory
0.10.0 The Edge of Glory
Major Changes
- A native scheduler with support for exactly-once, fault tolerant, timezone-aware scheduling. A new Dagster daemon process has been added to manage your schedules and sensors with a reconciliation loop, ensuring that all runs are executed exactly once, even if the Dagster daemon experiences occasional failure. See the Migration Guide for instructions on moving from
SystemCronScheduler
orK8sScheduler
to the new scheduler. - First-class sensors, built on the new Dagster daemon, allow you to instigate runs based on changes in external state - for example, files on S3 or assets materialized by other Dagster pipelines. See the Sensors Overview for more information.
- Dagster now supports pipeline run queueing. You can apply instance-level run concurrency limits and prioritization rules by adding the QueuedRunCoordinator to your Dagster instance. See the Run Concurrency Overview for more information.
- The
IOManager
abstraction provides a new, streamlined primitive for granular control over where and how solid outputs are stored and loaded. This is intended to replace the (deprecated) intermediate/system storage abstractions, See the IO Manager Overview for more information. - A new Partitions page in Dagit lets you view your your pipeline runs organized by partition. You can also launch backfills from Dagit and monitor them from this page.
- A new Instance Status page in Dagit lets you monitor the health of your Dagster instance, with repository location information, daemon statuses, instance-level schedule and sensor information, and linkable instance configuration.
- Resources can now declare their dependencies on other resources via the
required_resource_keys
parameter on@resource
. - Our support for deploying on Kubernetes is now mature and battle-tested Our Helm chart is now easier to configure and deploy, and we’ve made big investments in observability and reliability. You can view Kubernetes interactions in the structured event log and use Dagit to help you understand what’s happening in your deployment. The defaults in the Helm chart will give you graceful degradation and failure recovery right out of the box.
- Experimental support for dynamic orchestration with the new
DynamicOutputDefinition
API. Dagster can now map the downstream dependencies over a dynamic output at runtime.
Breaking Changes
Dropping Python 2 support
- We’ve dropped support for Python 2.7, based on community usage and enthusiasm for Python 3-native public APIs.
Removal of deprecated APIs
These APIs were marked for deprecation with warnings in the 0.9.0 release, and have been removed in the 0.10.0 release.
- The decorator
input_hydration_config
has been removed. Use thedagster_type_loader
decorator instead. - The decorator
output_materialization_config
has been removed. Usedagster_type_materializer
instead. - The system storage subsystem has been removed. This includes
SystemStorageDefinition
,@system_storage
, anddefault_system_storage_defs
. Use the newIOManagers
API instead. See the IO Manager Overview for more information. - The
config_field
argument on decorators and definitions classes has been removed and replaced withconfig_schema
. This is a drop-in rename. - The argument
step_keys_to_execute
to the functionsreexecute_pipeline
andreexecute_pipeline_iterator
has been removed. Use thestep_selection
argument to select subsets for execution instead. - Repositories can no longer be loaded using the legacy
repository
key in yourworkspace.yaml
; useload_from
instead. See the
Workspaces Overview for documentation about how to define a workspace.
Breaking API Changes
SolidExecutionResult.compute_output_event_dict
has been renamed toSolidExecutionResult.compute_output_events_dict
. A solid execution result is returned from methods such asresult_for_solid
. Any call sites will need to be updated.- The
.compute
suffix is no longer applied to step keys. Step keys that were previously namedmy_solid.compute
will now be namedmy_solid
. If you are using any API method that takes a step_selection argument, you will need to update the step keys accordingly. - The
pipeline_def
property has been removed from theInitResourceContext
passed to functions decorated with@resource
.
Helm Chart
- The schema for the
scheduler
values in the helm chart has changed. Instead of a simple toggle on/off, we now require an explicitscheduler.type
to specify usage of theDagsterDaemonScheduler
,K8sScheduler
, or otherwise. If your specifiedscheduler.type
has required config, these fields must be specified underscheduler.config
. snake_case
fields have been changed tocamelCase
. Please update yourvalues.yaml
as follows:pipeline_run
→pipelineRun
dagster_home
→dagsterHome
env_secrets
→envSecrets
env_config_maps
→envConfigMaps
- The Helm values
celery
andk8sRunLauncher
have now been consolidated under the Helm valuerunLauncher
for simplicity. Use the fieldrunLauncher.type
to specify usage of theK8sRunLauncher
,CeleryK8sRunLauncher
, or otherwise. By default, theK8sRunLauncher
is enabled. - All Celery message brokers (i.e. RabbitMQ and Redis) are disabled by default. If you are using the
CeleryK8sRunLauncher
, you should explicitly enable your message broker of choice. userDeployments
are now enabled by default.
Core
-
Event log messages streamed to
stdout
andstderr
have been streamlined to be a single line per event. -
Experimental support for memoization and versioning lets you execute pipelines incrementally, selecting which solids need to be rerun based on runtime criteria and versioning their outputs with configurable identifiers that capture their upstream dependencies.
To set up memoized step selection, users can provide a
MemoizableIOManager
, whosehas_output
function decides whether a given solid output needs to be computed or already exists. To execute a pipeline with memoized step selection, users can supply thedagster/is_memoized_run
run tag toexecute_pipeline
.To set the version on a solid or resource, users can supply the
version
field on the definition. To access the derived version for a step output, users can access theversion
field on theOutputContext
passed to thehandle_output
andload_input
methods ofIOManager
and thehas_output
method ofMemoizableIOManager
. -
Schedules that are executed using the new
DagsterDaemonScheduler
can now execute in any timezone by adding anexecution_timezone
parameter to the schedule. Daylight Savings Time transitions are also supported. See the Schedules Overview for more information and examples.
Dagit
- Countdown and refresh buttons have been added for pages with regular polling queries (e.g. Runs, Schedules).
- Confirmation and progress dialogs are now presented when performing run terminations and deletions. Additionally, hanging/orphaned runs can now be forced to terminate, by selecting "Force termination immediately" in the run termination dialog.
- The Runs page now shows counts for "Queued" and "In progress" tabs, and individual run pages show timing, tags, and configuration metadata.
- The backfill experience has been improved with means to view progress and terminate the entire backfill via the partition set page. Additionally, errors related to backfills are now surfaced more clearly.
- Shortcut hints are no longer displayed when attempting to use the screen capture command.
- The asset page has been revamped to include a table of events and enable organizing events by partition. Asset key escaping issues in other views have been fixed as well.
- Miscellaneous bug fixes, frontend performance tweaks, and other improvements are also included.
Kubernetes/Helm
- The Dagster Kubernetes documentation has been refreshed.
Helm
-
We've added schema validation to our Helm chart. You can now check that your values YAML file is
correct by running:helm lint helm/dagster -f helm/dagster/values.yaml
-
Added support for resource annotations throughout our Helm chart.
-
Added Helm deployment of the dagster daemon & daemon scheduler.
-
Added Helm support for configuring a compute log manager in your dagster instance.
-
User code deployments now include a user
ConfigMap
by default. -
Changed the default liveness probe for Dagit to use
httpGet "/dagit_info"
instead oftcpSocket:80
Dagster-K8s [Kubernetes]
- Added support for user code deployments on Kubernetes.
- Added support for tagging pipeline executions.
- Fixes to support version 12.0.0 of the Python Kubernetes client.
- Improved implementation of Kubernetes+Dagster retries.
- Many logging improvements to surface debugging information and failures in the structured event log.
Dagster-Celery-K8s
- Improved interrupt/termination handling in Celery workers.
Integrations & Libraries
- Added a new
dagster-docker
library with aDockerRunLauncher
that launches each run in its own Docker container. (See Deploying with Docker docs for an example.) - Added support for AWS Athena. (Thanks @jmsanders!)
- Added mocks for AWS S3, Athena, and Cloudwatch in tests. (Thanks @jmsanders!)
- Allow setting of S3 endpoint through env variables. (Thanks @marksteve!)
- Various bug fixes and new features for the Azure, Databricks, and Dask integrations.
- Added a
create_databricks_job_solid
for creating solids that launch Databricks jobs.
Migrating to 0.10.0
Action Required: Run and event storage schema changes
# Run after migrating to 0.10.0
$ dagster instance migrate
This release includes several schema changes to the Dagster storages that improve performance and enable new features like sensors and run queueing. After upgrading to 0.10.0, run the dagster instance migrate
command to migrate your instance storage to the latest schema. This will turn off any running schedules, so you will need to restart any previously running schedules after migrating the schema. Before turning them back on, you should follow the steps below to migrate to DagsterDaemonScheduler
.
New scheduler: DagsterDaemonScheduler
This release includes a new DagsterDaemonScheduler
with improved fault tolerance and full support for timezones. We highly recommend upgrading to the new scheduler during this release. The existing schedulers, SystemCronScheduler
and K8sScheduler
, are deprecated and will be removed in a future release.
Steps to migrate
Instead of relying on system cron or k8s cron jobs, the DaemonScheduler
uses the new dagster-daemon
service to run schedules. This requires running the dagster-daemon
service as a part of your deployment.
Refer to our deployment documentation for a guides on how to set up and run the daemon process for local development, Docker, or Kubernetes deployments.
If you are currently using the SystemCronScheduler or K8sScheduler:
-
Stop any currently running schedules, to prevent any dangling cron jobs from being left behind. You can do this through the Dagit UI, or using the following command:
dagster schedule stop --location {repository_location_name} {schedule_name}
If you do not stop running schedules before changing schedulers, Dagster will throw an exception on startup due to the misconfigured running schedules.
-
In your
dagster.yaml
file, remove thescheduler:
entry. If there is noscheduler:
entry, theDagsterDaemonScheduler
is automatically used as the default scheduler. -
Start the
dagster-daemon
process. Guides can be found in our deployment documentations.
See our schedules troubleshooting guide for help if you experience any problems with the new scheduler.
If you are not using a legacy scheduler:
No migration steps are needed, but make sure you run dagster instance migrate
as a part of upgrading to 0.10.0.
Deprecation: Intermediate Storage
We have deprecated the intermediate storage machinery in favor of the new IO manager abstraction, which offers finer-grained control over how inputs and outputs are serialized and persisted. Check out the IO Managers Overview for more information.
Steps to Migrate
-
We have deprecated the top level
"storage"
and"intermediate_storage"
fields onrun_config
. If you are currently executing pipelines as follows:@pipeline def my_pipeline(): ... execute_pipeline( my_pipeline, run_config={ "intermediate_storage": { "filesystem": {"base_dir": ...} } }, ) execute_pipeline( my_pipeline, run_config={ "storage": { "filesystem": {"base_dir": ...} } }, )
You should instead use the built-in IO manager
fs_io_manager
, which can be attached to your pipeline as a resource:@pipeline( mode_defs=[ ModeDefinition( resource_defs={"io_manager": fs_io_manager} ) ], ) def my_pipeline(): ... execute_pipeline( my_pipeline, run_config={ "resources": { "io_manager": {"config": {"base_dir": ...}} } }, )
There are corresponding IO managers for other intermediate storages, such as the S3- and ADLS2-based storages
-
We have deprecated
IntermediateStorageDefinition
and@intermediate_storage
.If you have written custom intermediate storage, you should migrate to custom IO managers defined using the
@io_manager
API. We have provided a helper method,io_manager_from_intermediate_storage
, to help migrate your existing custom intermediate storages to IO managers.my_io_manager_def = io_manager_from_intermediate_storage( my_intermediate_storage_def ) @pipeline( mode_defs=[ ModeDefinition( resource_defs={ "io_manager": my_io_manager_def } ), ], ) def my_pipeline(): ...
-
We have deprecated the
intermediate_storage_defs
argument toModeDefinition
, in favor of the new IO managers, which should be attached using theresource_defs
argument.
Removal: input_hydration_config
and output_materialization_config
Use dagster_type_loader
instead of input_hydration_config
and dagster_type_materializer
instead of output_materialization_config
.
On DagsterType
and type constructors in dagster_pandas
use the loader
argument instead of input_hydration_config
and the materializer
argument instead of dagster_type_materializer
argument.
Removal: repository
key in workspace YAML
We have removed the ability to specify a repository in your workspace using the repository:
key. Use load_from:
instead when specifying how to load the repositories in your workspace.
Deprecated: python_environment
key in workspace YAML
The python_environment:
key is now deprecated and will be removed in a future release.
Previously, when you wanted to load a repository location in your workspace using a different Python environment from Dagit’s Python environment, you needed to use a python_environment:
key under load_from:
instead of the python_file:
or python_package:
keys. Now, you can simply customize the executable_path
in your workspace entries without needing to change to the
python_environment:
key.
For example, the following workspace entry:
- python_environment:
executable_path: "/path/to/venvs/dagster-dev-3.7.6/bin/python"
target:
python_package:
package_name: dagster_examples
location_name: dagster_examples
should now be expressed as:
- python_package:
executable_path: "/path/to/venvs/dagster-dev-3.7.6/bin/python"
package_name: dagster_examples
location_name: dagster_examples
See our Workspaces Overview for more information and examples.
Removal: config_field
property on definition classes
We have removed the property config_field
on definition classes. Use config_schema
instead.
Removal: System Storage
We have removed the system storage abstractions, i.e. SystemStorageDefinition
and @system_storage
(deprecated in 0.9.0).
Please note that the intermediate storage abstraction is also deprecated and will be removed in 0.11.0. Use IO managers instead.
- We have removed the
system_storage_defs
argument (deprecated in 0.9.0) toModeDefinition
, in favor ofintermediate_storage_defs.
- We have removed the built-in system storages, e.g.
default_system_storage_defs
(deprecated in 0.9.0).
Removal: step_keys_to_execute
We have removed the step_keys_to_execute
argument to reexecute_pipeline
and reexecute_pipeline_iterator
, in favor of step_selection
. This argument accepts the Dagster selection syntax, so, for example, *solid_a+
represents solid_a
, all of its upstream steps, and its immediate downstream steps.
Breaking Change: date_partition_range
Starting in 0.10.0, Dagster uses the pendulum library to ensure that schedules and partitions behave correctly with respect to timezones. As part of this change, the delta
parameter to date_partition_range
(which determined the time different between partitions and was a datetime.timedelta
) has been replaced by a delta_range
parameter (which must be a string that's a valid argument to the pendulum.period
function, such as "days"
, "hours"
, or "months"
).
For example, the following partition range for a monthly partition set:
date_partition_range(
start=datetime.datetime(2018, 1, 1),
end=datetime.datetime(2019, 1, 1),
delta=datetime.timedelta(months=1)
)
should now be expressed as:
date_partition_range(
start=datetime.datetime(2018, 1, 1),
end=datetime.datetime(2019, 1, 1),
delta_range="months"
)
Breaking Change: PartitionSetDefinition.create_schedule_definition
When you create a schedule from a partition set using PartitionSetDefinition.create_schedule_definition
, you now must supply a partition_selector
argument that tells the scheduler which partition to use for a given schedule time.
We have added two helper functions, create_offset_partition_selector
and identity_partition_selector
, that capture two common partition selectors (schedules that execute at a fixed offset from the partition times, e.g. a schedule that creates the previous day's partition each morning, and schedules that execute at the same time as the partition times).
The previous default partition selector was last_partition
, which didn't always work as expected when using the default scheduler and has been removed in favor of the two helper partition selectors above.
For example, a schedule created from a daily partition set that fills in each partition the next day at 10AM would be created as follows:
partition_set = PartitionSetDefinition(
name='hello_world_partition_set',
pipeline_name='hello_world_pipeline',
partition_fn= date_partition_range(
start=datetime.datetime(2021, 1, 1),
delta_range="days",
timezone="US/Central",
)
run_config_fn_for_partition=my_run_config_fn,
)
schedule_definition = partition_set.create_schedule_definition(
"daily_10am_schedule",
"0 10 * * *",
partition_selector=create_offset_partition_selector(lambda d: d.subtract(hours=10, days=1))
execution_timezone="US/Central",
)
Renamed: Helm values
Following convention in the Helm docs, we now camel case all of our Helm values. To migrate to 0.10.0, you'll need to update your values.yaml
with the following renames:
pipeline_run
→pipelineRun
dagster_home
→dagsterHome
env_secrets
→envSecrets
env_config_maps
→envConfigMaps
Restructured: scheduler
in Helm values
When specifying the Dagster instance scheduler, rather than using a boolean field to switch between the current options of K8sScheduler
and DagsterDaemonScheduler
, we now require the scheduler type to be explicitly defined under scheduler.type
. If the user specified scheduler.type
has required config, additional fields will need to be specified under scheduler.config
.
scheduler.type
and corresponding scheduler.config
values are enforced via JSON Schema.
For example, if your Helm values previously were set like this to enable the DagsterDaemonScheduler
:
scheduler:
k8sEnabled: false
You should instead have:
scheduler:
type: DagsterDaemonScheduler
Restructured: celery
and k8sRunLauncher
in Helm values
celery
and k8sRunLauncher
now live under runLauncher.config.celeryK8sRunLauncher
and runLauncher.config.k8sRunLauncher
respectively. Now, to enable celery, runLauncher.type
must equal CeleryK8sRunLauncher
. To enable the vanilla K8s run launcher, runLauncher.type
must equal K8sRunLauncher
.
runLauncher.type
and corresponding runLauncher.config
values are enforced via JSON Schema.
For example, if your Helm values previously were set like this to enable the K8sRunLauncher
:
celery:
enabled: false
k8sRunLauncher:
enabled: true
jobNamespace: ~
loadInclusterConfig: true
kubeconfigFile: ~
envConfigMaps: []
envSecrets: []
You should instead have:
runLauncher:
type: K8sRunLauncher
config:
k8sRunLauncher:
jobNamespace: ~
loadInclusterConfig: true
kubeconfigFile: ~
envConfigMaps: []
envSecrets: []
New Helm defaults
By default, userDeployments
is enabled and the runLauncher
is set to the K8sRunLauncher
. Along with the latter change, all message brokers (e.g. rabbitmq
and redis
) are now disabled by default.
If you were using the CeleryK8sRunLauncher
, one of rabbitmq
or redis
must now be explicitly enabled in your Helm values.