Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT PROPOSAL: Add Pipelines Framework to Parsons #980

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

Jason94
Copy link
Collaborator

@Jason94 Jason94 commented Jan 26, 2024

Overview

This PR implements a draft of the pipelines API that we discussed at the January contributors meeting. The goal of the pipelines API is to add an extra level of abstraction on top of the existing Parsons' connectors and Table to make it easier to write and manage ETL pipelines.

Justification

The contributors recently had a discussion about what we can do to take Parsons to the next level to make it even easier to build ETL scripts. Another big goal that would benefit TMC and the whole space significantly is to make it easier for new data staff who don't have a handle on control flow and data structures to assemble ETL scripts.

In my opinion, Parsons connectors already make it very easy to extract and load data. The two things that it does not do much to help the user with are:

  • The transform step. We offer very little abstraction over the PETL data structure. Transformation code is still relatively low level compared to the kind of no-code solutions that are getting popular. You still need to be proficient with functions, control flow, basic Python data structures like dictionaries and arrays, etc to be able to write ETL scripts with a non-trivial transform step.
  • Parsons doesn't do anything to manage your pipelines for you. Data orchestration shouldn't be a goal of Parsons, IMO, but I think we could do a lot more than we do. Tasks like logging, error handling, and pipeline visibility are all repetitive tasks that everyone writing ETL scripts needs to manage somehow. Parsons could help with this.

During the call we discussed adding a Pipelines system to Parsons. This system would exist on top of the connectors and Parsons Table, and possibly would hide those details from a new user completely. The idea is this:

  • The community defines a discrete set of extract, transform, and load Pipes that perform higher-level operations on data than the current Table functions. This could be anything from loading a data from a VAN endpoint, to performing a complex transformation, to transforming a data model from one particular source into an intermediate representation.
  • These pipes can be combined into a Pipeline, which is a composable data structure that knows how to run pipes to perform a complete ETL job. Combined, pipes and pipelines also provide features for validation, logging strategies, error handling strategies, etc.
  • A Runner which is capable of running pipelines in parallel to make use of CPU cores or cloud scaling capabilities.
  • A Dashboard which can actually give the user some visibility into their pipelines that are running.

Update - Revision 2

Based on contributor feedback we identified two distinct user groups with different sets of issues on the original proposal.

New Engineers/Analysts

There was concern that the syntax was still too complex for analysts/engineers (new users) and would prove too big a barrier for them to use the framework successfully. To accommodate those concerns, the following changes were made in revision 2:

Removal of lambda syntax.

The original syntax used lambda functions, a difficult concept, heavily.

  • Lambda functions were replaced with the much simpler PETL expression strings:
        filter_rows("{Year} > 1975"),
  • A CompoundPipe constructor was introduced to replace the use of lambda to build up a more complex pipe from more basic pipes:
    clean_year = CompoundPipe(
        filter_rows("{Year} is not None"),
        convert("Year", int)
    )

Simplification of Pipeline syntax

The previous pipe-chain syntax in the Pipeline constructor was a repeated function call, which would have been confusing to newer users:

    load_after_1975 = Pipeline(
        load_from_csv("deniro.csv")
        (
            print_data("Raw Data")
        )(
            clean_year()
        )(
            filter_rows({
                "Year": lambda year: year > 1975
            })
        )(
            print_data("After 1975")
        )(
            write_csv("after_1975.csv")
        )
    )

This has been replaced with a much simpler syntax where the pipes are just listed in the Pipeline constructor:

    load_after_1975 = Pipeline(
        "Load after 1975",
        load_from_csv("deniro.csv"),
        clean_year(),
        filter_rows("{Year} > 1975"),
        write_csv("after_1975.csv")
    )

Power Users

The main concern for experienced engineers/analysts with a high degree of Python skill (power users) was that the framework didn't offer them enough value for the cost of construction. In particular, data orchestration was wanting here.

[My organization] started to build an orchestration platform to meet the needs of semi-technical members and resolve some of the civis pain points. Long story short, maintaining software is a lot of work and we're hoping to deprecate that eventually...
My opinion is that there are really good, well funded, and active open source projects to do those things and nothing super specific about our use case vs. private enterprise

Prefect integration

To meet that need, this revision of Pipelines has incorporated Prefect under the hood with no additional complexity to anyone using the pipeline framework, be they writing pipes or just assembling pipelines.

Each pipe is defined exactly the same as before, but the framework constructs a Prefect task for that pipe behind the scenes.

# This pipe is transformed into a Prefect pipe named "write_csv", but is written like normal Parsons code.

@define_pipe("write_csv")
def write_csv(data: Table, csv_name: str) -> Table:
    data.to_csv(csv_name)
    return data

Each Pipeline is transformed into a Prefect Flow, and is logged in the Prefect Cloud when run:

    load_after_1975 = Pipeline(
        "Load after 1975",
        load_from_csv("deniro.csv"),
        clean_year(),
        filter_rows("{Year} > 1975"),
        write_csv("after_1975.csv")
    )

In addition to providing modern data orchestration out of the box, the Prefect integration will make it possible for us to integrate the extensive Prefect Integrations library with Pipelines.

Revision 2 Demo

This code sets up and runs the same series of pipelines as before:

    clean_year = CompoundPipe(
        filter_rows("{Year} is not None"),
        convert("Year", int)
    )

    load_after_1975 = Pipeline(
        "Load after 1975",
        load_from_csv("deniro.csv"),
        clean_year(),
        filter_rows("{Year} > 1975"),
        write_csv("after_1975.csv")
    )
    split_on_1980 = Pipeline(
        "Split on 1980",
        load_from_csv("deniro.csv"),
        clean_year(),
        split_data("'gte_1980' if {Year} >= 1980 else 'lt_1980'"),
        for_streams({
            "lt_1980": write_csv("before_1980.csv"),
            "gte_1980": write_csv("after_1979.csv")
        })
    )

    save_lotr_books = Pipeline(
        "Save LOTR Books",
        load_lotr_books_from_api(),
        write_csv("lotr_books.csv")
    )

    after_1990_and_all_time = Pipeline(
        "Copy into streams test",
        load_from_csv("deniro.csv"),
        clean_year(),
        copy_data_into_streams("0", "1"),
        for_streams({
            "0": CompoundPipe(
                filter_rows("{Year} > 1990"),
                write_csv("after_1990.csv")
            )(),
            "1": write_csv("all_years.csv")
        })
    )

    dashboard = Dashboard(
        load_after_1975,
        split_on_1980,
        save_lotr_books,
        after_1990_and_all_time,
    )
    dashboard.run()

Here are the runs on my Prefect cloud
image

The declarative Pipeline syntax is easy to comprehend next to the Prefect viewer makes it easy to see the data flow
image

Prefect error handling shows you which pipe failed, what the error was, and even what time it failed.
image


Original Draft - Revision 1

Prototype

This prototype is in the /pipelines folder in the PR branch. All of the code is contained in the parsons_pipelines.py file, which is a runnable file that executes three pipelines based on a stored CSV file and the open-source Lord of the Rings API.

Here are some highlights:

Declarative pipelines syntax, made by composing pipes.

    load_after_1975 = Pipeline(
        load_from_csv("deniro.csv")
        (
            print_data("Raw Data")
        )(
            clean_year()
        )(
            filter_rows({
                "Year": lambda year: year > 1975
            })
        )(
            print_data("After 1975")
        )(
            write_csv("after_1975.csv")
        )
    )
    split_on_1980 = Pipeline(
        load_from_csv("deniro.csv")
        (
            print_data("Raw Data")
        )(
            clean_year()
        )(
            split_data(lambda row: 1 if row["Year"] >= 1980 else 0)
        )(
            all_streams(print_data("Split Data"))
        )(
            for_streams({
                0: write_csv("before_1980.csv"),
                1: write_csv("after_1979.csv")
            })
        )
    )

Pipe definitions are normal Parsons' code

@define_pipe("convert")
def convert(data: Table, *args, **kwargs) -> Table:
    return data.convert_column(*args, **kwargs)


@define_pipe("write_csv")
def write_csv(data: Table, csv_name: str) -> Table:
    data.to_csv(csv_name)
    return data

Load data in via pipes, either using Parsons or requests

@define_pipe("load_from_csv", input_type=PipeResult.Unit)
def load_from_csv(filename: str, **kwargs) -> Table:
    return Table.from_csv(filename, **kwargs)

@define_pipe("load_lotr_books", input_type=PipeResult.Unit)
def load_lotr_books_from_api() -> Table:
    # Set up the endpoint and headers
    url = "https://the-one-api.dev/v2/book"
    headers = {}

    # Make the request to the API
    response = requests.get(url, headers=headers)
    response.raise_for_status()  # Raises an HTTPError if the response was an error

    # Convert the JSON response into a Parsons Table
    books_json = response.json().get("docs", [])
    books_table = Table(books_json)

    return books_table

Trivially compose & name commonly-combined pipes

    clean_year = lambda: (
        filter_rows({
            "Year": lambda year: year is not None
        })
    )(
        convert(
            "Year",
            lambda year_str: int(year_str)
        )
    )

Group pipelines together in a Dashboard to facilitate logging, reporting, etc

    dashboard = Dashboard(
        load_after_1975,
        split_on_1980,
        save_lotr_books
    )
    dashboard.run()

Call Dashboard with a logger to get free logging of the output of every step in every pipeline

    dashboard = Dashboard(
        load_after_1975,
        split_on_1980, save_lotr_books,
        logger=CsvLogger()
    )
    dashboard.run()

logging

Generate an HTML report with your pipelines' results

    dashboard.run()
    dashboard.generate_report("report.html")

pipeline_report

Individual pipelines can be run to retrieve their data and captured as a Parsons table

    save_lotr_books_data: Table = Pipeline(
        load_lotr_books_from_api()
        (
            write_csv("lotr_books.csv")
        )
    ).run()
    save_lotr_books_data.to_bigquery(...)

Next Steps

If the Parsons contributors decide to move forward with the proposal, I believe these features are necessary to implement an MVP of the pipelines framework:

  • Error handling features. Since the pipelines framework will manage the control flow of the pipelines, it will need to handle errors gracefully.
  • Connector pipes. It's not practical to write a pipeline for every method on every possible connector. We'll need to create a few pipes that compose well with the existing connector library to make pipelines useful.
  • Transformation pipes. Unlike connectors, I do believe that we should complete hide the details of Table from the surface of the pipelines API. This serves the purpose of targeting less technically experienced developers/analysts. If someone needs to drop into the level of Table manipulations, they can write their own pipe. With that said, we should expose all of the necessary table transformations as pipes and write a set of higher-level pipes that can perform more complex transformations than the relatively simple Table API.

These features would be nice to implement, but I don't believe they need to be completed to justify releasing the framework:

  • Data validation, to fail the pipeline in some controlled fashion if some conditions are not met.
  • Better logging. For larger datasets it would be nice to log a sample of the data, not the full data. It would also be nice to target cloud platforms like AWS or GCS for logging records.
  • Alert mechanisms. Enable a dashboard to send an email, slack, etc alert when a pipeline fails.
  • Better reporting . In-interface viewing of the logs, color coded error statuses for pipes, real-time view as production pipelines are running.

@austinweisgrau
Copy link
Collaborator

My 2 cents on transformations - in general I think folks should be discouraged from doing transformations on data with python. Doing transformations in a database is going to be preferred for maintainability, clarity, and performance most of the time.

I could be wrong here but also my expectation would be that most Parsons users will be more familiar with SQL than with python, especially if the transformations will need to be formulated as some kind of lambda function map as in these examples, I imagine SQL is going to be significantly more accessible.

@shaunagm
Copy link
Collaborator

Finally taking a look (sorry for the delay). We can chat about these questions at the contributor meeting:

  • I'm not sure I understand the distinction between a pipe, a pipeline, and a compound pipeline, and how they can be used
  • can we/should we tie ourselves to Prefect like this? Is there a way to generalize across the different workflow tools orgs use? (I've never used Prefect, so I may need a better understanding of how it's used)
  • re: Austin's point about doing transformations in the database - that seems to fit with an approach that uses dbt, which Austin added a bit of functionality to Parsons to better integrate with? I don't want to go too far down a rabbit hole but it's worth talking through the larger debate to help frame what we're encouraging people to do with Parsons

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants