Skip to content

Commit

Permalink
release(v0.0.4): adds task builder through YAML configuration
Browse files Browse the repository at this point in the history
* adds task builder using YAML configuration
* adds simple S3 to Mongo task
* adds documentation using `mdbook` and Github Pages
  • Loading branch information
stav121 committed Feb 4, 2024
1 parent 6040305 commit 492a255
Show file tree
Hide file tree
Showing 47 changed files with 1,028 additions and 62 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/build_wheel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ jobs:
with:
python-version: 3.11

- name: Format code using yapf
run: |
pip install --upgrade yapf
yapf airgoodies/ -r -i
- name: Build wheel and install
run: |
pip install --upgrade setuptools
Expand Down
33 changes: 33 additions & 0 deletions .github/workflows/mdbook.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
on:
push:
branches: [ "feat/task_builder" ]

jobs:
deploy:
runs-on: ubuntu-latest
permissions:
contents: write
pages: write
id-token: write
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Install latest mdbook
run: |
tag=$(curl 'https://api.github.com/repos/rust-lang/mdbook/releases/latest' | jq -r '.tag_name')
url="https://github.com/rust-lang/mdbook/releases/download/${tag}/mdbook-${tag}-x86_64-unknown-linux-gnu.tar.gz"
mkdir mdbook
curl -sSL $url | tar -xz --directory=./mdbook
echo `pwd`/mdbook >> $GITHUB_PATH
- name: Build Book
run: cd docs && mdbook build
- name: Setup Pages
uses: actions/configure-pages@v4
- name: Upload artifact
uses: actions/upload-pages-artifact@v2
with:
path: 'docs/book'
- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v3
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,4 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
*.iml
other/
25 changes: 16 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
### Airgoodies

[![.github/workflows/build_wheel.yaml](https://github.com/stav121/apache-airflow-goodies/actions/workflows/build_wheel.yaml/badge.svg?branch=main)](https://github.com/stav121/apache-airflow-goodies/actions/workflows/build_wheel.yaml)
![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/stav121/apache-airflow-goodies/build_wheel.yaml?branch=feat%2Ftask_builder&style=flat&label=build)
![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/stav121/apache-airflow-goodies/mdbook.yaml?branch=feat%2Ftask_builder&label=docs)
![PyPI - Version](https://img.shields.io/pypi/v/airgoodies)
![GitHub License](https://img.shields.io/github/license/stav121/apache-airflow-goodies)
![PyPI - Downloads](https://img.shields.io/pypi/dm/goodies)
Expand All @@ -13,27 +14,33 @@ Current version matrix:

| Airgoodies Version | Apache Airflow Version | Python Version | Project tag |
|--------------------------------------------------------------------------------------------|------------------------|----------------|---------------------------------------------------------------------------------------------|
| [0.0.4](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.4) | 2.7.2 | 3.11 | [v0.0.4](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.4) |
| [0.0.3](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.3) | 2.7.2 | 3.11 | [v0.0.3](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.3) |
| [0.0.2](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.2) | 2.7.2 | 3.11 | [v0.0.2](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.2) |
| [0.0.1-alpha](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.1-alpha) | 2.7.2 | 3.11 | [v0.0.1-alpha](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.1-alpha) |

Provided goodies for version [0.0.3](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.3):
Provided goodies for version [0.0.4](https://github.com/stav121/apache-airflow-goodies/releases/tag/v0.0.4):

| Module | Description | Dependency Versions |
|------------------|-----------------------------------------|----------------------------------------------------------|
| airgoodies.aws | API for reasy interaction with AWS | pandas==2.1.1<br>apache-airflow-providers-amazon===8.7.1 |
| airgoodies.mongo | API for easy interaction with MongoDB | pymongo==4.5.0 |
| airgoodies.xcom | API for managing variables through XCom | *None* |
| Module | Description | Dependency Versions |
|--------------------|-------------------------------------------------|----------------------------------------------------------|
| airgoodies.command | API for dynamic task configuration through YAML | pyyaml==6.0.1 |
| airgoodies.aws | API for easy interaction with AWS | pandas==2.1.1<br>apache-airflow-providers-amazon===8.7.1 |
| airgoodies.mongo | API for easy interaction with MongoDB | pymongo==4.5.0 |
| airgoodies.xcom | API for managing variables through XCom | *None* |

### Usage
### Installation

Add the following rquirement in your `requirements.txt`

```
# requirements.txt
airgoodies=0.0.3
airgoodies=0.0.4
```

### Example usage

For the official documentation, see [here](https://stav121.github.io/apache-airflow-goodies)

For an example of how to use this project, see [here](https://github.com/stav121/apache-airflow-goodies-examples)

### Building the project
Expand Down
4 changes: 2 additions & 2 deletions airgoodies/aws/s3/airgoodies.aws.s3.variables.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"airgoodies-aws-s3-connection-name": "",
"airgoodies-aws-s3-default-bucket": ""
"${dag_id}.airgoodies-aws-s3-connection-name": "",
"${dag_id}.airgoodies-aws-s3-default-bucket": ""
}
64 changes: 48 additions & 16 deletions airgoodies/aws/s3/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,23 @@ class S3Wrapper:
Contains utilities such as, load CSV to pandas, load Excel etc.
"""
from logging import Logger, getLogger
from airflow.models import TaskInstance
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from pandas import DataFrame
from typing import Callable
from airgoodies.mongo.connection import MongoConnection
from airgoodies.util.annotation import provide_dag_id

_logger: Logger = getLogger('airflow.task')
_conn_name: str
_s3_hook: S3Hook
_default_bucket: str = None

def __init__(self, connection_name: str | None = None) -> None:
@provide_dag_id
def __init__(self,
dag_id: str = None,
task_instance: TaskInstance = None,
connection_name: str | None = None) -> None:
"""
Initialize the connection to S3 with either the provided connection_name or
the pre-configured from the variable:
Expand All @@ -38,19 +44,26 @@ def __init__(self, connection_name: str | None = None) -> None:
from airflow.models import Variable
from airgoodies.common.exception import ConfigNotFoundException
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airgoodies.common.variables import AWSVariables
from airgoodies.common.variables import AWSVariables, Common

if connection_name is None:
# Load from variable
self._conn_name = Variable.get(key=AWSVariables.S3_CONNECTION_NAME)
self._conn_name = Variable.get(
key=AWSVariables.S3_CONNECTION_NAME.replace(
Common.DAG_ID_VARIABLE, dag_id))
if self._conn_name is None:
raise ConfigNotFoundException(AWSVariables.S3_CONNECTION_NAME)
raise ConfigNotFoundException(
AWSVariables.S3_CONNECTION_NAME.replace(
Common.DAG_ID_VARIABLE, dag_id))
else:
# Load from the provided name
self._conn_name = connection_name

if Variable.get(key=AWSVariables.S3_DEFAULT_BUCKET) is not None:
self._default_bucket = Variable.get(key=AWSVariables.S3_DEFAULT_BUCKET)
if Variable.get(key=AWSVariables.S3_DEFAULT_BUCKET.replace(
Common.DAG_ID_VARIABLE, dag_id)) is not None:
self._default_bucket = Variable.get(
key=AWSVariables.S3_DEFAULT_BUCKET.replace(
Common.DAG_ID_VARIABLE, dag_id))

self._s3_hook: S3Hook = S3Hook(aws_conn_id=self._conn_name)

Expand All @@ -60,21 +73,32 @@ def get_s3_hook(self) -> S3Hook:
"""
return self._s3_hook

def load_file(self, key: str, bucket_name: str | None = None) -> str | None:
def load_file(self,
key: str,
bucket_name: str | None = None) -> str | None:
"""
Load the provided key from the provided or default bucket.
:param key: the fully qualified key of the file
:param bucket_name: alternative bucket name otherwise it will use the default
"""

if bucket_name is None:
bucket_name = self._default_bucket

file: str = self._s3_hook.read_key(key=key, bucket_name=bucket_name)
if key.endswith(('.xls', '.xlsx')):
file: str = self._s3_hook.get_key(
key=key, bucket_name=bucket_name).get()["Body"].read()
else:
file: str = self._s3_hook.read_key(key=key,
bucket_name=bucket_name)

return file

def load_as_dataframe(self, key: str, bucket_name: str | None = None, sep: str = ',') -> DataFrame:
def load_as_dataframe(self,
key: str,
bucket_name: str | None = None,
sep: str = ',') -> DataFrame:
"""
Load the provided file from S3 into a pandas DataFrame.
Expand All @@ -87,15 +111,15 @@ def load_as_dataframe(self, key: str, bucket_name: str | None = None, sep: str =
from io import StringIO
from airgoodies.common.exception import FileNotFoundException, UnsupportedFileFormatException

file: StringIO = StringIO(self.load_file(key=key, bucket_name=bucket_name))
file: str = self.load_file(key=key, bucket_name=bucket_name)

if file is None:
raise FileNotFoundException(filename=key)

if key.lower().endswith('.csv'):
return read_csv(filepath_or_buffer=file, sep=sep)
return read_csv(filepath_or_buffer=StringIO(file), sep=sep)
elif key.lower().endswith(('.xls', '.xlsx')):
return read_excel(io=file)
return read_excel(io=file, header=None)
else:
raise UnsupportedFileFormatException()

Expand All @@ -116,7 +140,9 @@ def load_and_transform(self,
"""
from pandas import DataFrame

result: DataFrame = self.load_as_dataframe(key=key, bucket_name=bucket_name, sep=sep)
result: DataFrame = self.load_as_dataframe(key=key,
bucket_name=bucket_name,
sep=sep)

if transform_method is None:
return result
Expand Down Expand Up @@ -149,10 +175,16 @@ def load_to_mongo(self,
data: DataFrame

if transform_method is None:
data = self.load_as_dataframe(key=key, bucket_name=bucket_name, sep=sep)
data = self.load_as_dataframe(key=key,
bucket_name=bucket_name,
sep=sep)
else:
data = self.load_and_transform(key=key, bucket_name=bucket_name, transform_method=transform_method, sep=sep)
data = self.load_and_transform(key=key,
bucket_name=bucket_name,
transform_method=transform_method,
sep=sep)

connection.get_db().get_collection(name=load_table_name).insert_many(loads(data.to_json(orient='records')))
connection.get_db().get_collection(name=load_table_name).insert_many(
loads(data.to_json(orient='records')))

return load_table_name
Empty file added airgoodies/command/__init__.py
Empty file.
3 changes: 3 additions & 0 deletions airgoodies/command/airgoodies.command.parser.variable.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"${dag_id}.airgoodies-dag-config-file-key": ""
}
40 changes: 40 additions & 0 deletions airgoodies/command/command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""
@author: Stavros Grigoriou
@since: 0.0.4
"""


class AirgoodiesCommand:
"""
Airgoodies Command class, contains the metadata and callable for an Airgoodies compatible command.
"""
from airflow.operators.python import PythonOperator
from airflow import DAG

_task_id: str
_python_callable: callable
_provide_context: bool = True

def __init__(self,
task_id: str,
python_callable: callable,
provide_context: bool = True):
"""
Command initializer.
"""
self._task_id = task_id
self._python_callable = python_callable
self._provide_context = provide_context

def to_operator(self, dag=DAG) -> PythonOperator:
"""
Convert the command in to a PythonOperator usable by Apache Airflow.
:return: A setup python operator.
"""
from airflow.operators.python import PythonOperator

return PythonOperator(task_id=self._task_id,
python_callable=self._python_callable,
provide_context=self._provide_context,
dag=dag)
Loading

0 comments on commit 492a255

Please sign in to comment.