🚨 The project is in BETA, use with caution (see TODOs).
The Jupyter Airflow Monitoring project is a solution designed to monitor Apache Airflow's DAGs directly from your Jupyter environment. Its primary objective is to monitor only the DAGs you are interested in, and do it in a non-invasive way to avoid adding unnecessary friction. This project is particularly valuable for data scientists and developers who frequently work with Jupyter notebooks and want to keep an eye on their Airflow workflows.
- Why Jupyter Airflow Monitoring?
- Diagram
- Installation
- /message Endpoint and Reverse Proxy
- Usage in Airflow
- Python API
- Using cURL/requests
- Screenshots
- Contributing
- TODOs
- License
While working with Airflow, it can be somewhat challenging to monitor the status of your DAGs, especially when you only need to track specific DAGs. This project proposes a solution for this problem by providing a way to specify tags for the DAGs you want to monitor and their corresponding severity levels if a DAG with the specified tag fails. This feature allows for customized and focused monitoring of your Airflow workflows.
sequenceDiagram
box rgba(173, 216, 230, 0.7) Airflow
participant A as Airflow Monitoring DAG
end
A-->>A: DagsMonitoringOperator executes
participant B as Cache File
box rgb(255, 165, 0, 0.7) Jupyter Notebook Instance
participant E as API Endpoint (MessageHandler)
participant D as Jupyter Notebook
end
participant F as User
Note over A: The Airflow DAG contains a special<br>operator (DagsMonitoringOperator) that<br>monitors the status of the other DAGs<br>based on tags and severity levels.
A->>B: Writes monitoring message
Note over A,B: The DAG periodically<br>writes messages to the cache file<br>in the form of JSON data.
F->>D: Log in
Note over F,D: The User logs into Jupyter Notebook<br>(through JupyterHub or not), initiating<br>a new session.
D->>E: Sends GET request to API endpoint
Note over D,E: The Jupyter notebook makes periodic GET<br>requests (every 10s) to the API endpoint provided<br>by the server extension. The request<br>is authenticated using the User's Jupyter session token.
E->>B: Reads monitoring message
Note over E,B: The server extension reads the message<br>from the cache file.
B->>E: Returns monitoring message
Note over B,E: The cache file returns the latest message to the<br>API Endpoint.
E->>D: Returns monitoring message
Note over E,D: The API Endpoint (MessageHandler) sends the<br>message back to the Jupyter Notebook.
D->>F: Displays monitoring message
Note over D,F: The Jupyter Notebook then displays the<br>monitoring message to the user.
⚠️ CAUTION BETA: This is a beta version (might not be secure!). Use it at your own risks.
KNOWN LIMITATIONS: Please note that the current version of this project is limited to systems where both Airflow and Jupyter are running on the same host. Additionally, it is only compatible with Airflow standalone installations. It is theoretically possible to install it on a dockerized airflow with a shared volume though but I didn't try yet (you would need to install the module on both the host and the airflow image). I plan on adding more flexibility/features in the future. See the TODOs list.
To install this project, there are two methods:
- Clone the repo
git clone https://github.com/EmileDqy/jupyter_airflow_monitoring.git
cd jupyter_airflow_monitoring
- Install using the
install.sh
script
./install.sh
- Clone the repo
git clone https://github.com/EmileDqy/jupyter_airflow_monitoring.git
cd jupyter_airflow_monitoring
- Install using pip:
pip install .
- Use the Jupyter CLI to enable the extension:
jupyter nbextension install --py --symlink --sys-prefix jupyter_airflow_monitoring
jupyter nbextension enable jupyter_airflow_monitoring --py --sys-prefix
jupyter serverextension enable jupyter_airflow_monitoring --py
For your convenience, we also provide an install.sh
script that will carry out the steps mentioned above.
After installation, you'll be able to interact with the /message
endpoint. This endpoint allows setting and getting messages related to your monitored DAGs.
When operating the Jupyter server behind a reverse proxy, it's crucial to configure the proxy correctly to ensure seamless connectivity between the frontend and the /message
endpoint. Without the proper configuration, the frontend may encounter difficulties connecting to the backend, rendering the endpoint inaccessible. Ensure that your reverse proxy setup includes the necessary rules and configurations to allow communication with the /message
endpoint. This ensures the proper functioning of the monitoring system.
Once the extension is installed, an operator DagsMonitoringOperator
is made available which can be used to create a DAG. This DAG is then scheduled to run at your preferred frequency (e.g., hourly) to monitor your DAGs.
Here is an example:
# Import the new Operator
from jupyter_airflow_monitoring.operator import DagsMonitoringOperator
from airflow import models
import pendulum
with models.DAG(
dag_id="monitoring",
schedule="@hourly", # Check every hour
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
) as dag:
task = DagsMonitoringOperator(
task_id="monitoring",
monitored_tags_severity={
"example": 1, # All dags with tag 'example' will have severity=1 (red) when they fail
"something": 2 # Severity = 2 (orange)
"example_ui": 3 # Severity = 3 (yellow)
}
)
task
In this example, the DAG with the id monitoring2
is scheduled to run hourly, starting from January 1, 2021. The DagsMonitoringOperator
monitors the DAGs with the tags "example" and "example_ui", with corresponding severity levels of 1 and 3.
In addition to the operator, the project also exposes a Python API that allows setting and getting messages related to the monitored DAGs. This can be useful for more fine-grained control or custom integrations. Here is how you can use it:
from jupyter_airflow_monitoring import set_message, get_message
# Set a message
set_message("DAG Failed", "MyDAG", "#FF0000")
# Get the message
message = get_message()
Caution: This might not be the most ideal and secure way to interact with the endpoint because it requires you to manually get the session token and store it...
In order to do that, you will need to get the session token from the jupyter server.
It usually looks something like this in the log but you can also fetch it directly from the frontend once connected:
[I 10:36:26.674 NotebookApp] Serving notebooks from local directory: /path/to/folder/
[I 10:36:26.674 NotebookApp] Jupyter Notebook 6.5.4 is running at:
[I 10:36:26.674 NotebookApp] http://localhost:8888/?token=<token>
[I 10:36:26.674 NotebookApp] or http://127.0.0.1:8888/?token=<token>
[I 10:36:26.674 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
Once you have the token, you can interact with the /message endpoint:
To get the current message (used by the frontend):
curl -H "Authorization: token <token>" http://localhost:8888/message
To set a new message (currently just implemented but not in use):
curl -X POST -H "Authorization: token <token>" -H "Content-Type: application/json" -d '{"message": "Hello", "title": "Test", "color": "red"}' http://localhost:8888/message
import requests
import json
token = "<your_token>"
headers = {
"Authorization": f"token {token}"
}
# GET request
response = requests.get("http://localhost:8888/message", headers=headers)
print(response.json())
# POST request
data = {
"message": "Hello",
"title": "Test",
"color": "red"
}
response = requests.post("http://localhost:8888/message", headers=headers, data=json.dumps(data))
print(response.json())
Contributions to this project are welcome! Please feel free to submit a Pull Request or open an issue on GitHub if you encounter any problems or have suggestions for improvements. Please note that I'm fairly new to jupyter extension development.
- Implement a more secure way of storing the /message data (the contect fetched and sent back): currently using a file in the /tmp ... not ideal. Also, the data is currently pre-rendered which is not good and secure at all. Sanitizing and processing the data is the next dev.
- Add support for windows based systems
- Add support for dockerized airflow
- Add option to use a connexion so that the airflow operator can communicate with the jupyter server (airflow on a different system than the jupyter server)
- Add monitoring by dag_id
- Add monitoring automatic: run when a tracked DAG finished
This project is licensed under the terms of the Apache-2.0 license.
I hope that this project helps improve your workflow and productivity.