Skip to content

An extension enabling the monitoring of Apache Airflow DAGs directly from Jupyter notebooks. Tailored for developers and data scientists, it simplifies tracking specific DAGs, reduces unnecessary friction, and allows severity levels setup for failed DAGs.

License

Notifications You must be signed in to change notification settings

EmileDqy/jupyter_airflow_monitoring

Repository files navigation

Jupyter Airflow Monitoring

🚨 The project is in BETA, use with caution (see TODOs).

Demo Gif

The Jupyter Airflow Monitoring project is a solution designed to monitor Apache Airflow's DAGs directly from your Jupyter environment. Its primary objective is to monitor only the DAGs you are interested in, and do it in a non-invasive way to avoid adding unnecessary friction. This project is particularly valuable for data scientists and developers who frequently work with Jupyter notebooks and want to keep an eye on their Airflow workflows.

Table of content

  1. Why Jupyter Airflow Monitoring?
  2. Diagram
  3. Installation
    1. Method 1
    2. Method 2
  4. /message Endpoint and Reverse Proxy
    1. The /message Endpoint
    2. You have a reverse proxy ?
  5. Usage in Airflow
  6. Python API
  7. Using cURL/requests
    1. Using cURL
      1. HTTP GET
      2. HTTP POST
    2. Using the python requests module
  8. Screenshots
  9. Contributing
  10. TODOs
  11. License

Why Jupyter Airflow Monitoring?

While working with Airflow, it can be somewhat challenging to monitor the status of your DAGs, especially when you only need to track specific DAGs. This project proposes a solution for this problem by providing a way to specify tags for the DAGs you want to monitor and their corresponding severity levels if a DAG with the specified tag fails. This feature allows for customized and focused monitoring of your Airflow workflows.

Diagram

sequenceDiagram
    box rgba(173, 216, 230, 0.7) Airflow
        participant A as Airflow Monitoring DAG
    end
    A-->>A: DagsMonitoringOperator executes
    participant B as Cache File
    box rgb(255, 165, 0, 0.7) Jupyter Notebook Instance
        participant E as API Endpoint (MessageHandler)
        participant D as Jupyter Notebook
    end
    participant F as User

    Note over A: The Airflow DAG contains a special<br>operator (DagsMonitoringOperator) that<br>monitors the status of the other DAGs<br>based on tags and severity levels.

    A->>B: Writes monitoring message
    Note over A,B: The DAG periodically<br>writes messages to the cache file<br>in the form of JSON data.

    F->>D: Log in
    Note over F,D: The User logs into Jupyter Notebook<br>(through JupyterHub or not), initiating<br>a new session.

    D->>E: Sends GET request to API endpoint
    Note over D,E: The Jupyter notebook makes periodic GET<br>requests (every 10s) to the API endpoint provided<br>by the server extension. The request<br>is authenticated using the User's Jupyter session token.

    E->>B: Reads monitoring message
    Note over E,B: The server extension reads the message<br>from the cache file.

    B->>E: Returns monitoring message
    Note over B,E: The cache file returns the latest message to the<br>API Endpoint.

    E->>D: Returns monitoring message
    Note over E,D: The API Endpoint (MessageHandler) sends the<br>message back to the Jupyter Notebook.

    D->>F: Displays monitoring message
    Note over D,F: The Jupyter Notebook then displays the<br>monitoring message to the user.

Loading

Installation

⚠️ CAUTION BETA: This is a beta version (might not be secure!). Use it at your own risks.

KNOWN LIMITATIONS: Please note that the current version of this project is limited to systems where both Airflow and Jupyter are running on the same host. Additionally, it is only compatible with Airflow standalone installations. It is theoretically possible to install it on a dockerized airflow with a shared volume though but I didn't try yet (you would need to install the module on both the host and the airflow image). I plan on adding more flexibility/features in the future. See the TODOs list.

To install this project, there are two methods:

Method 1

  1. Clone the repo
git clone https://github.com/EmileDqy/jupyter_airflow_monitoring.git
cd jupyter_airflow_monitoring
  1. Install using the install.sh script
./install.sh

Method 2

  1. Clone the repo
git clone https://github.com/EmileDqy/jupyter_airflow_monitoring.git
cd jupyter_airflow_monitoring
  1. Install using pip:
pip install .
  1. Use the Jupyter CLI to enable the extension:
jupyter nbextension install --py --symlink --sys-prefix jupyter_airflow_monitoring
jupyter nbextension enable jupyter_airflow_monitoring --py --sys-prefix
jupyter serverextension enable jupyter_airflow_monitoring --py 

For your convenience, we also provide an install.sh script that will carry out the steps mentioned above.

/message Endpoint and Reverse Proxy

The /message Endpoint

After installation, you'll be able to interact with the /message endpoint. This endpoint allows setting and getting messages related to your monitored DAGs.

You have a Reverse Proxy?

When operating the Jupyter server behind a reverse proxy, it's crucial to configure the proxy correctly to ensure seamless connectivity between the frontend and the /message endpoint. Without the proper configuration, the frontend may encounter difficulties connecting to the backend, rendering the endpoint inaccessible. Ensure that your reverse proxy setup includes the necessary rules and configurations to allow communication with the /message endpoint. This ensures the proper functioning of the monitoring system.

Usage in Airflow

Once the extension is installed, an operator DagsMonitoringOperator is made available which can be used to create a DAG. This DAG is then scheduled to run at your preferred frequency (e.g., hourly) to monitor your DAGs.

Here is an example:

# Import the new Operator
from jupyter_airflow_monitoring.operator import DagsMonitoringOperator
from airflow import models
import pendulum

with models.DAG(
    dag_id="monitoring",
    schedule="@hourly", # Check every hour
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
) as dag:
    task = DagsMonitoringOperator(
        task_id="monitoring", 
        monitored_tags_severity={
            "example": 1, # All dags with tag 'example' will have severity=1 (red) when they fail
            "something": 2 # Severity = 2 (orange)
            "example_ui": 3 # Severity = 3 (yellow)
        }
    )
    task

In this example, the DAG with the id monitoring2 is scheduled to run hourly, starting from January 1, 2021. The DagsMonitoringOperator monitors the DAGs with the tags "example" and "example_ui", with corresponding severity levels of 1 and 3.

Python API

In addition to the operator, the project also exposes a Python API that allows setting and getting messages related to the monitored DAGs. This can be useful for more fine-grained control or custom integrations. Here is how you can use it:

from jupyter_airflow_monitoring import set_message, get_message

# Set a message
set_message("DAG Failed", "MyDAG", "#FF0000")

# Get the message
message = get_message()

Using cURL/requests

Caution: This might not be the most ideal and secure way to interact with the endpoint because it requires you to manually get the session token and store it...

In order to do that, you will need to get the session token from the jupyter server.

It usually looks something like this in the log but you can also fetch it directly from the frontend once connected:

[I 10:36:26.674 NotebookApp] Serving notebooks from local directory: /path/to/folder/
[I 10:36:26.674 NotebookApp] Jupyter Notebook 6.5.4 is running at:
[I 10:36:26.674 NotebookApp] http://localhost:8888/?token=<token>
[I 10:36:26.674 NotebookApp]  or http://127.0.0.1:8888/?token=<token>
[I 10:36:26.674 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).

Once you have the token, you can interact with the /message endpoint:

Using cURL

HTTP GET

To get the current message (used by the frontend):

curl -H "Authorization: token <token>" http://localhost:8888/message

HTTP POST

To set a new message (currently just implemented but not in use):

curl -X POST -H "Authorization: token <token>" -H "Content-Type: application/json" -d '{"message": "Hello", "title": "Test", "color": "red"}' http://localhost:8888/message

Using the python requests module

import requests
import json

token = "<your_token>"
headers = {
    "Authorization": f"token {token}"
}

# GET request
response = requests.get("http://localhost:8888/message", headers=headers)
print(response.json())

# POST request
data = {
    "message": "Hello",
    "title": "Test",
    "color": "red"
}
response = requests.post("http://localhost:8888/message", headers=headers, data=json.dumps(data))
print(response.json())

Screenshots

Demo Gif

When a DAG with a severity 1 fails:

Default view: Mouse not Hovering

On mouse hover: Mouse Hovering

On click: Modal

Contributing

Contributions to this project are welcome! Please feel free to submit a Pull Request or open an issue on GitHub if you encounter any problems or have suggestions for improvements. Please note that I'm fairly new to jupyter extension development.

TODOs

  1. Implement a more secure way of storing the /message data (the contect fetched and sent back): currently using a file in the /tmp ... not ideal. Also, the data is currently pre-rendered which is not good and secure at all. Sanitizing and processing the data is the next dev.
  2. Add support for windows based systems
  3. Add support for dockerized airflow
  4. Add option to use a connexion so that the airflow operator can communicate with the jupyter server (airflow on a different system than the jupyter server)
  5. Add monitoring by dag_id
  6. Add monitoring automatic: run when a tracked DAG finished

License

This project is licensed under the terms of the Apache-2.0 license.

I hope that this project helps improve your workflow and productivity.

About

An extension enabling the monitoring of Apache Airflow DAGs directly from Jupyter notebooks. Tailored for developers and data scientists, it simplifies tracking specific DAGs, reduces unnecessary friction, and allows severity levels setup for failed DAGs.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published