-
Notifications
You must be signed in to change notification settings - Fork 0
Prefect Workflow
The example workflow is another implementation of the example RADPS Workflow Decomposition, composed of simple placeholder "workloads" (mostly sleeps and trivial computation) defined in python functions extended with decorators to become tasks
and flows
, the basic building blocks in the Prefect framework. Following that document, the example pipeline is divided into various stages, each of which is represented by a file in the RADPS/prefect_workflow source tree. Each stage is represented by a flow
, comprised of additional subflows
and tasks
.
The example pipeline is implemented as a single prefect.flow
which calls multiple sub-flows (some of which are themselves nested flows) and tasks to demonstrate the behavior of the framework when running an example sequence. Loops were implemented at the locations in the reference document corresponding to the tasks with Iterative Solvers. Conditionals were implemented both at the inter- and intra-flow level. There is also a file core.py
containing general functions re-used across the example pipeline.
It is possible to run most individual stages independently (and idempotently) but there is some dependence implicit in the structure of the workflow (e.g., Calibrate Target and Find Continuum requires the output of a prior Data Import and Prep stage in order to run). In addition to the "parent" flow defined in pipeline.py there is also a file example_calibration_pipeline.py that demonstrates the capability of executing a subset of the stages, especially emphasizing certain subsets of functionality implemented in the calibration flows.
The complete demo pipeline is represented in temporal dependency view by the following Gantt chart:
Each of the larger boxes (underlined by the blue state indicators) in that image represent individual stages, each of which is called from its own flow definition file. The green boxes represent flows and tasks that simulate the workloads comprising the stage logic, defined by decorated python functions inside their corresponding stage file. The three general categories of stage implemented during Sprint 1 were roughly "calibration", "external service query and findcont", and "self-calibration and cube imaging", more detailed images of which are captured here. Note that these are screenshots taken from a live view of an the interactive Prefect dashboard during tests running a local deployment:





Following the approach of context domain object in the existing pipeline, a Context object was introduced to the demo pipeline to track state across tasks. In the Prefect Workflow code, the context is defined in core.py
.
The context is designed as a thin wrapper (currently a class) around a python dict which stores information relevant to future stages. Each stage loads the context from disk, adds or modifies information in the context, and saves the context, which is then picked and saved to disk at the end of each stage. The high level interface to the context object consists of two methods:
-
load_context
checks for an existing context file on disk returning its contents as a dictionary (if one does already exist) or creating a default following some template (if one doesn't already exist). -
add_to_context
updates an existing context file to contain a new key:value pair, labeled by the stage where the change originated.
All interaction with the context is done via python dicts. The presence of certain keys ("data", "qa_scores", "datashape") is handled by the task and flow logic of individual stages, a design choice currently shared by the default input and return object types. This approach has the advantages of extensibility, and ready serializability (e.g., to formats like zarr, which might eventually be explored as a backing store format for the context). Potential drawbacks include potential bloat (e.g., duplicate keys) or inconsistency across context interaction by different stages in the absence of enforced standards/specifications.
While this design implements a shared context between all the pipeline stages in this demonstration Prefect workflow, aspects of it are unrealistic for a future functional pipeline. Possible future improvements to the context include:
- Storing context information in a database
- Exploring other cloud-friendly storage options
See: https://pipe-docs.readthedocs.io/en/latest/notebooks/context.html for more information.
The example Prefect workflow mocks up failures and recoveries in a few different ways:
- The calibrator import and export to archive stages were written to randomly pass/fail to represent i/o issues.
Prefect has the ability to re-try a failed
task
orflow
a specified number of times using theretries
argument.retries
is set to 3 for these stages.
If the number of retries is exceeded, the task and its parent flow will fail:
- In concurrent run of task using
.submit()
, the task runner creates aPrefectFuture
object for accessing state and result of the submitted task. The.result()
method can be used to explicitly access the result. When the submitted task fails, it raises the task's exception by default. But the default behavior can be suppressed to support a different failure handling. In uv-continuum subtraction in the cube imaging stage, it is mocked to fail for one of spws in the concurrent uvcontsub runs but it continues to run to the end of the stage to demonstrate such alternative failure handling.
Each stage generates a Prefect artifact with a (fake) qa score to demonstrate the artifact functionality of prefect.
They are displayed as "clickable" entries in the main workflow interface on the Prefect dashboard:
and also added to a running list of generated artifacts
Three types of artifact formats are available: markdown, tables, and images.
Randomly-generated QA scores are used to demonstrate conditional emission of Prefect Events and Prefect's capability to pause the flow and await user input.
The example pipeline emits stage-specific alerts if any of the (randomly generated) QA scores is less than 0.67. Similar to the QA score markdown artifacts, these are displayed both as clickable 'dots' on the main dashboard interface:
They are also added to a running list of events. The list of events can be filtered by 'event type' For example, all qa-related events, or just import qa failures, etc.
Additionally, when a low qa score is detected, the associated subflow
is paused using pause_subflow()
and can be resumed using the "Resume" button for the paused subflow
on the dashboard.

As another demonstration of use pause_subflow
and emit_event
, a process mimicking interactive clean is implemented in solve
flow.
The int_clean
mode can be triggered in stage_image_cube
, stage_perspw_cont
and a user can interact in these stages to change iteration number, stop clean, or continue with further user interaction when pause_subflow
is triggered as shown below.
Since this utilizes Prefect Dashboard UI, the flexibility in controls and how it looks are fairly limited. A future exploration is to try to connect with casagui.
First, follow the setup instructions to install prefect and set up a local prefect server using the first two steps of Prefect Tutorial Notes - Installation and Quickstart
Then, clone the RADPS repo:
git clone git@github.com:casangi/RADPS.git
The prefect workflow is in the prefect_workflow
directory, so
cd prefect_workflow
Then, to start the full example pipeline, run:
python3 pipeline.py
,which runs pipeline with cube imaging.
There are options to run different modes of imaging by providing an argument (continuum imaging = 'cont'
, continuum imaging with self-calibration = 'contselfcal'
) and the second argument for interactive clean (True
or False
).
E.g. To run continuum imaging pipeline with interactive clean:
python3 pipeline.py 'cont' True
Note: In order to run the import and export stages in parallel for each calibrator, the following must be run in the background before running python3 pipeline.py
:
python3 deploy.py&
The flow will begin to run and can be interacted with on your local prefect server at http://127.0.0.1:4200/dashboard (default location)
Any individual stage of the prefect workflow can be run individually in the same manner. For example:
python3 stage_image_cube.py
The simulated intermittent failures and ability to pausing the flow for low qa scores is currently on for exmample_calibration_pipeline
and off for the overall pipeline
. They are off by default. They can be turned on by passing failures=True
to each pipeline stage you would like to enable failures for. A more elegant solution may be added in upcoming ticket #17. In the meantime,
As the example "task failure emitting pause event" behavior shows, interaction with the pipeline is possible, but does require the user to know where to look. In other words, there is no simple way to resume components of an individual Stage from the Prefect dashboard summary of the top level pipeline flow. You have to click through to the sub-flow where the pause event is blocking and resume from there. The Prefect dashboard makes this relatively intuitive and simple to do, but it's also easy to imagine some future case where many flows and sub-flows are visible from the dashboard at one time, potentially making such an iteraction confusing. Careful design should help align the workloads requiring manual intervention with the views presented of the processing framework (context, artifacts, flow mechanics, etc.) to make interaction as simple and intuitive as possible. This is likely to improve iteratively, especially as the no-op stages start to be populated with workloads that more realistically model the characteristics of RADPS data processing use cases.
The inbuilt mechanisms for artifact definition from within the Prefect API seem like a useful tool for generating packaged metadata summarizing data reductions or other operations performed by tasks inside a stage. Tables, images, and generic markdown are supported and should be sufficient to cover many use cases for preserving information derived from flow runs for interactive inspection. There do appear to be some limitations, for instance no straightforward way to link to locally hosted files, so further exploration will be needed.
While tasks
can be run in parallel directly using .submit()
, subflows
within the same parent flow
cannot be run this way.
There is a ticket on the Prefect repo where this feature has been requested. Ticket commenters note that this functionality does exist in Airflow.
But there are several available ways to achieve this functionality, including deploying the subflow and then calling it using run_deployment
.
- Define, measure, and improve the reliability and fault tolerance of this workflow #17
- Experiment with deployments targeting some shared infrastructure, including running multiple pipelines concurrently.
- Improve design consistency
- Update all flows to use a single context throughout the whole workflow
- Merge more-or-less duplicated stages for import, etc…
- Update the calibration pipeline stages to include the iterative solver loops used
- Settle on a consistent “fake data” format (or formats) that all stages of the pipeline will support