Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic Pipeline #2627

Open
1 task
noklam opened this issue Jun 1, 2023 · 31 comments
Open
1 task

Dynamic Pipeline #2627

noklam opened this issue Jun 1, 2023 · 31 comments

Comments

@noklam
Copy link
Contributor

noklam commented Jun 1, 2023

Introduction

We had a discussion about dynamic pipelines from #1993, also partly related to #1963, this issue is to summarise the discussion and lay out the work that we need to do.

Related Issues:

A high-level, short overview of the problem(s) you are designing a solution for.

Background

Dynamic Pipeline has been one of the most asked questions, there are various solutions but often they are case-by-case. As a result the solutions comes with all fashion and it has been asked whether Kedro can provide a feature for that.

What is "Dynamic Pipeline"

When people are referring "Dynamic Pipeline", often they are talking about the same thing. We need to make a clear distinction between them before we start to build a solution for it.

We can roughly categorise them into 2 buckets

  1. Dynamic construction of Pipeline
  2. Dynamic behavior at runtime

Dynamic construction of Pipeline (easier)

Examples of these are:

  1. Time series forecasting - Pipeline make prediction for Day 1, next pipeline requires Day 1 prediction as input.
  2. Hyperparameters tuning
  3. Combined variable length of features - feature engineering combine N features into 1 DataFrame
  4. A list of countries - each need to be saved as a catalog entry, the data are then combined in a pipeline for further processing

Dynamic behavior at runtime (harder)

Examples of these are:

  • 2nd order pipelines - pipelines generated from some node's output
    • I have a scenario that I would like to run a model training and model evaluation based on labels on dataset. Each Label
      would trigger an indiviual pipeline.

    • A pipeline that make prediction on 1 user, Fetch a list of N users, then run pipeline on each of them.

  • Running node conditionally - Run A if B does not exist, otherwise run C

Why is it challenging for Kedro Users?

It needs experience with Kedro, often you need to combine advance features, i.e. TemplatedConfig + Jinja in Catalog + doing some for loop in your pipeline.py.
image

In addition, each of the use cases need different solutions. As part of the Kedro's value proposition is the standardisation. There are no well-known pattern for these solution, they are hard to reason and debug with Jinja

What's not in scope

  • Non-DAG pipeline - i.e. Github Action, CircleCI type of pipeline.
  • Skipping nodes - i.e. if A exist, don't run B and C (a workaround with hooks is possible)
  • Dynamic node generation during a run

These two types of pipelines are fundamentally different from the "data-centric" approach of Kedro

What are some possible solutions?

Follow-up

Reference

@noklam noklam added the Type: Technical DR 💾 Decision Records (technical decisions made) label Jun 1, 2023
@stichbury
Copy link
Contributor

Thanks for this @noklam -- I was looking back at the technical discussion recently and wanted to flag it again, so I'm pleased you did. Would you be comfortable about writing this blog post? Maybe we can put it in a sprint in the next couple upcoming?

@noklam noklam changed the title Dynamic Pipeline [In Draft] - Dynamic Pipeline Jun 6, 2023
@noklam
Copy link
Contributor Author

noklam commented Jun 6, 2023

@stichbury Yes! I am happy to write something about this.

@desmond-dsouza
Copy link

The Miro board linked above seems to be not viewable by public?

@noklam
Copy link
Contributor Author

noklam commented Jun 14, 2023

@desmond-dsouza Sorry about that! You are right the link is private, as this is still in draft and there are some internal discussion in the Miro board. I will try to post some screenshot here when we discussed this.

Do you have any view on this? The board is mainly examples, I try to go through all the Slack & Discord question to summarise what are the user problem.

@astrojuanlu
Copy link
Member

astrojuanlu commented Jun 21, 2023

Another question on dynamic pipelines https://www.linen.dev/s/kedro/t/12647827/hello-thank-you-for-this-great-library-i-am-a-ds-working-in-#d170c710-8e6a-4c56-a623-058c3ec33da7

[etape 1] > [etape 2] > [if score_etape2 < X ] > [etape4]
                      > [if score_etape2 >= X ] > [etape5]

@noklam
Copy link
Contributor Author

noklam commented Jun 22, 2023

@astrojuanlu This sort of runtime generated DAGs is not supported currently. There are few possible solutions to get around this:

  1. Embed the if-else condition inside a node
  2. Use hooks or dynamic injection

We usually advise try to avoid these kind of conditional DAGs as much as possible, because it gets very complicated once you have multiple switches and it is difficult to debug. Having a conditional DAG is not much different from having a if-else node. i.e.

if score >= 0.5:
  return do_a()
else:
  return do_b()

The challenge here is the return dataset may be different or not compatible at all.

I haven't done it before, it should be more flexible to do this with the Python API. The code may look like this

# deployment_script.py
result = session.run(tag="some_pipeline")
if result["score"] >= 0.5:
  session.run(pipeline="deploy")
else:
  session.run(pipeline="retrain")

It does mean that you may lose certain feature and cannot use the Kedro CLI to run this, so use it sparingly.

@datajoely
Copy link
Contributor

For me, if we're going to do this properly we need some sort of

  • Conditional node
  • Some pre-flight checks to validate things
  • To push forward with the session store so that the reproducibility concerns can be managed by good logging/time-machine stuff
  • [not entirely sure] A way of accessing the catalog in the pipeline registry

@noklam
Copy link
Contributor Author

noklam commented Aug 24, 2023

X-posting some solution our users is using.

  1. Create a after_context_created after_catalog_created hook
  2. Replace the register_pipelines() function with a custom register_dynamic_pipelines(catalog: DataCatalog) function
  3. If you can pass the catalog to create pipelines, you can access datasets and parameters to dynamically build your pipeline!
    Use create_pipeline(catalog: DataCatalog) functions to create your pipelines !

Same Hook as above. But, instead of creating pipelines by changing the parameter and dataset names directly we create namespaced pipelines. Along with dataset factories: https://docs.kedro.org/en/stable/data/data_catalog.html#example-3-generalise-datasets-using-namespaces-into-one-dataset-factory - have a look at the catalog.yml.
This way , not only is the pipeline dynamic but also your catalog!

Detail solution can be found here.

@Lodewic
Copy link

Lodewic commented Aug 24, 2023

@noklam To pitch in here, thank you for sharing my solution to this issue tracker! You asked for an example repository, you can find that here: https://github.com/Lodewic/kedro-dynamic-pipeline-hook-example

@inigohidalgo
Copy link
Contributor

inigohidalgo commented Aug 25, 2023

  • [not entirely sure] A way of accessing the catalog in the pipeline registry

@datajoely we are currently looking for a workaround to be able to access parameters from the pipeline registry. In one of our projects (not sure exactly what kedro version was current when it was implemented, def <0.16) we had this as a workaround:

class ProjectContext(KedroContext):
    """Users can override the remaining methods from the parent class here,
    or create new ones (e.g. as required by plugins), indicate in settings.py
    """

    def _get_pipelines(self) -> Dict[str, Pipeline]:
        params = {} if self._extra_params is None else self._extra_params
        return create_pipelines(**params)

We are looking for a current "kedroic" way of handling this. I am looking into @Lodewic 's implementation to see how well it fits our usecase, it looks very promising, but a more out of the box solution might be preferable as it requires a certain amount of knowledge about kedro's inner workings which we probably shouldn't expect from general users.

@datajoely
Copy link
Contributor

This is neat @inigohidalgo !

@noklam
Copy link
Contributor Author

noklam commented Aug 29, 2023

This look like the 0.16.x or 0.17.x style of creating pipeline, I actually don't know what happened and why we moved away from this. It used to be possible to access paramaters in create_pipeline, thus the template is still create_pipeine(**kwargs).

@astrojuanlu
Copy link
Member

Relevant: https://getindata.com/blog/kedro-dynamic-pipelines/ by @marrrcin

In other news, found this on the Dagster Slack:

Seems that the cost of repetitive tedious logic around i/o is much less than trying to piece together the dagster abstraction, given the examples/docs. Feels like every time I try to use frameworks like airflow, kedro, dagster, they all let me down as soon as I start doing anything dynamic.

https://discuss.dagster.io/t/13882234/hello-world-for-dynamic-partition-etl-is-a-nightmare-the-ina#78fea568-9198-48a0-a0ba-42a141f440be

@astrojuanlu
Copy link
Member

After sharing the blog post on #1606, I was thinking that we should find a more descriptive name for the use case addressed in it. "Dynamic pipelines" seems to imply that the pipelines themselves have some data-dependent runtime behavior or data-dependent structure (the 2 buckets originally devised by @noklam), but taking a pipeline and reusing it with different configurations is hardly "dynamic". We should call this "pipeline reuse" or investigate how other projects (DVC Pipelines, MLFlow recipes, Airflow DAGs) call this concept.

@datajoely
Copy link
Contributor

datajoely commented Oct 23, 2023

In CI/CD world this sort of thing is often called Matrix job in our examples we want to run something like an "experiment array"

We should note Hydra calls this a multi-run but they also have integration with "Sweepers" which is more intuitive to what we're doing here. The next question raised is how we can make something like Optuna work for our purposes.

@inigohidalgo
Copy link
Contributor

The next question raised is how we can make something like Optuna work for our purposes.

When I've used optuna within Kedro I've defined the search space from config but the output was basically just a pickle of the finished optuna run alongside a json with the optimal hyper parameters. Which of optunas features would you see as useful for Kedro?

@datajoely
Copy link
Contributor

@inigohidalgo it's more that Hydra counts Optuna, Ax and Nevergrad in the 'sweeper' category

@astrojuanlu
Copy link
Member

astrojuanlu commented Oct 26, 2023

Today @datajoely recommended @marrrcin's approach as an alternative to Ray Tune for parameter sweep https://linen-slack.kedro.org/t/16014653/hello-very-much-new-to-the-ml-world-i-m-trying-to-setup-a-fr#e111a9d2-188c-4cb3-8a64-37f938ad21ff

Are we confident that the DX offered by this approach can compete with this?

search_space = {
    "a": tune.grid_search([0.001, 0.01, 0.1, 1.0]),
    "b": tune.choice([1, 2, 3]),
}

tuner = tune.Tuner(objective, param_space=search_space)

@datajoely
Copy link
Contributor

No but it's does provide a budget version of it - this is what I'm saying about the lack of sweeper integration with dedicated "sweepers" in this comment

@astrojuanlu
Copy link
Member

I'm now convinced that we should cleanly separate the "dynamic pipelines" conversation as originally stated by @noklam from the parameter sweeping/experimentation/multi-run use case, which is conceptually way simpler and has very clear boundaries and expectations. I propose we continue in #1606

@datajoely
Copy link
Contributor

datajoely commented Oct 26, 2023

Yup - I think there are two categories as Nok says at the top:

  1. "Deterministic pipeline structure generation" (sweeps fall into this, but selection may be the latter)

  2. "Runtime dynamic pipeline structure" (Conditional logic introduces combinatorial complexity and possibly makes Kedro turing complete)

    There is significant user validation in terms of demand and competitor validation since we see other tools in the space offering this functionality.

@inigohidalgo
Copy link
Contributor

I've been a bit outside this discussion, although I'm super interested in the topic. To make sure I understand the two options, I have the following usecase:

I have a pipeline which predicts total demand for a product in a given day, with the day specified as an input parameter to the pipeline.

Some days due to data issues, the prediction will fail, but once the issues are solved in the past, we would like to see how the model would have performed. In order to do this, we have a backfill pipeline set up which loads the predictions dataset, checks for gaps, and launches a pipeline for each missing day. This pipeline as I've described it, is more of an example of the second--harder--view, right? Since the structure of the final pipeline depends on the state of a dataset.

But if on the other hand I simply wanted to define a pipeline which will loop through the last 10 days and run the pipeline with all those last 10 days, regardless of the status of the predictions dataset, would that be an example of 1, where I am just defining a pipeline in a for loop, potentially using code to construct that pipeline based on today's date and whatever number (10 in the example) of days backwards I would want to go which I define through config?

@datajoely

This comment was marked as off-topic.

@datajoely

This comment was marked as off-topic.

@astrojuanlu

This comment was marked as off-topic.

@stichbury
Copy link
Contributor

Docs/content update:

@inigohidalgo
Copy link
Contributor

Another usecase which I'm not sure where it would fall:

I have a time-series problem where I compute a lot of lags, rolling statistics etc. When designing my training pipeline, I have a target number of days I want my master table to include.

Due to the way lags are carried out in pandas, we need to pad our initial queries by the maximum length of lag, as otherwise we would get nulls at the start. This maximum would then be an input to some initial nodes which filter sql tables.

Technically there is no "data" dependency, since it would purely be based on prespecified parameters, but there is a point where a "max" or something needs to be calculated.

@datajoely
Copy link
Contributor

On this last point @inigohidalgo a lot of users ask can I run something like kedro run --params target_date:2023-11-01 and whilst its technically possible it's not nice to feed runtime arguments into catalog definitions to dynamically change load behaviour.

@gitgud5000
Copy link

gitgud5000 commented May 14, 2024

Hi everyone,

I wanted to share a somewhat hacky method we implemented for creating a dynamic pipeline. Our pipeline required reprocessing a dataset for previous dates based on the runtime parameter run_date. Here's a simplified representation of the process:
image
I'll describe what we ended up doing below. I would appreciate any feedback or recommendations you might have.

Modular Pipelines and Namespaces

First, we leveraged modular pipelines and namespaces to create a dynamic reprocessing capability. The goal was to reprocess datasets for previous dates without rerunning certain parts of the pipeline (specifically the feature engineering boxes, labeled as FE1, FE2, and FE3).

The Reprocess pipelines were instantiated as follows:

pipes = []
for i in range(1, 6):
    t_version = pipeline(
        pipe=check_requirements + shape_target + shape_master_table,
        namespace=f"t-{i}",
        tags=["delta_t"],
    )
    pipes.append(t_version)
t_n_pipelines = sum(pipes)

In this setup, each reprocessing pipeline (t-1 to t-5) is created with a unique namespace. This allows us to isolate the processing for different time periods. Notably, the feature engineering steps (FE1, FE2, FE3) do not run in the reprocess parts of the pipeline, as they are only relevant for the initial processing (t=0).

Next, we created these entries in the catalog.yml for the Δ_t versions of the dataset:

# A type of SQLQueryDataset used in "Some ETL" box
## t=0 / Original Version
EX_DATASET:
  type: "${_datasets.sqlscript}"
  credentials: oracle_credentials
  filepath: queries/EX_DATASET.sql
  query_args:
      run_date: ${runtime_params:run_date}
      use_case: ${runtime_params:use_case}
## t=Δ Version
"{namespace}.EX_DATASET":
  type: "${_datasets.sqlscript}"
  credentials: oracle_credentials
  filepath: queries/EX_DATASET.sql
  query_args:
      run_date: "{namespace}"
      use_case: ${runtime_params:use_case}

# The same for other types like D_n or resulting GenMT
## t=0 / Original Version
D1:
  type: "${_datasets.parquet}"
  filepath: "${_azure_base_path}/04_feature/${runtime_params:use_case}/${runtime_params:run_date}/D1.parquet"
  credentials: azure_credentials
## t=Δ Version
"{namespace}.D1":
  type: "${_datasets.parquet}"
  filepath: "${_azure_base_path}/04_feature/${runtime_params:use_case}/{namespace}/D1.parquet"
  credentials: azure_credentials

We initially thought this approach would suffice if we could somehow perform a nested interpolation of the namespace to its value. However, the resolution of the config happens when the config is loaded before a session is run. The dataset factory placeholders are resolved later when the pipeline is being executed (see Kedro issue 3086).

So Hooks🪝...

Since hooks are stateful objects (see Kedro issue 2690), we created a DatesDeltaToContextHook to handle the dynamic aspects. Here's what it does:

  1. after_context_created: Creates and stores the namespaced run_date parameters.
     def after_context_created(self, context) -> None:
         """
         Create t-0 -> t-5 of rundate and add to context to generate catalog.
         """
         run_date = context.params.get("run_date", None)
         self.delta_pattern = r"t-\d+"
         if run_date:
             # Calculate t-0 to t-5 of run_date to use in catalog generation
             self.formatted_dates = self._gen_time_delta(run_date)
             context.config_loader["parameters"] = {
                 **context.config_loader["parameters"],
                 **self.formatted_dates,
             }
  2. after_catalog_created: Modifies the dataset instances that match the namespace pattern.
    def after_catalog_created(self, catalog: DataCatalog) -> None:
        # return None
        """
        Modify dataset filepaths/sql in the catalog based on the delta run_dates from the parameters.
        """
        _pipelines: Dict[str, Pipeline] = dict(pipelines)
    
        LOGGER.info("Enforcing data set pattern discovery...")
        # Capture all data set names from all pipelines
        data_set_names = {
            data_set_name
            for pipeline in _pipelines.values()
            for data_set_name in pipeline.datasets()
        }
        # filter based on match the pattern t-number (e.g. t-1, t-2, t-3 ...) excluding the ones with `params:`
        data_set_to_alter = {
            data_set_name
            for data_set_name in data_set_names
            if re.search(self.delta_pattern, data_set_name)
            and "params:" not in data_set_name
        }
    
        for data_set_name in data_set_to_alter:
            try:
                t_delta, _ = data_set_name.split(".")
                # Enforce data set pattern discovery
                dataset = catalog._get_dataset(data_set_name)  # pylint: disable=protected-access
                run_date = self.formatted_dates.get(f"{t_delta}.run_date")
                match dataset:
                    case SQLScriptDataset():
                        self._update_dataset_sql_query(dataset, run_date)
                    case MemoryDataset():
                        pass
                    case _:
                        self._update_dataset_filepath(dataset, run_date)
            except DatasetNotFoundError:
                continue

Feel free to provide any feedback or suggestions. Thank you!

@datajoely
Copy link
Contributor

Thank you for such a clear write up @gitgud5000 - I'm so keen to make this use-case ergonomic and this is so helpful

@inigohidalgo
Copy link
Contributor

This is a really cool use of filtering and namespaces thanks for sharing @gitgud5000

This is just a thought, not at all a practical change in your case as it only addresses a subset of the behavior you are building but:

one of the major "needs" you're solving with the after_catalog_created hook "forcing dataset pattern discovery" is is providing the dynamically-generated run_date to the parametrized sql queries, right?
A more ergonomic implementation of this could go through the newly-released Ibis TableDataset, but this only helps with whatever parametrized filtering you're doing on the sql query side, not with the directory name for the parquet files.

@astrojuanlu astrojuanlu removed the Type: Technical DR 💾 Decision Records (technical decisions made) label Jul 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants