From this tutorial, you will learn how to run a simple BigFlow workflow on your local machine and how to deploy it on Cloud Composer.
You might want to familiarize yourself with BigFlow core concepts before getting through this tutorial.
This tutorial is based on examples located in the BigFlow repository.
Start from cloning the BigFlow repository:
git clone https://github.com/allegro/bigflow.git
cd bigflow/examples_project
Then, install the BigFlow PIP package
in a new virtual environment in the examples_project
directory.
Next, compile project dependencies and install them:
bigflow build-requirements
pip install -r resources/requirements.txt
Since you have installed the BigFlow PIP package, you can use BigFlow CLI. Test it:
bigflow -h
In BigFlow, your project consists of workflows. You can run them directly on your local machine or deploy them to Cloud Composer, automatically compiled to Airflow DAGs.
The "Hello World" workflow below consists of two jobs. The first one says "Hello", and the second one says Goodbye:
import bigflow
class HelloWorldJob(bigflow.Job):
id = 'hello_world'
def execute(self, context: bigflow.JobContext):
print(f'Hello world on {context.runtime}!')
class SayGoodbyeJob(bigflow.Job):
id = 'say_goodbye'
def execute(self, context: bigflow.JobContext):
print(f'Goodbye!')
hello_world_workflow = bigflow.Workflow(
workflow_id='hello_world_workflow',
definition=[
HelloWorldJob(),
SayGoodbyeJob(),
],
)
The bigflow run
command lets you run this workflow directly
from sources on your local machine (without building and deploying it to Composer).
Run a whole workflow:
bigflow run --workflow hello_world_workflow
Output:
Hello world on 2020-09-10 12:17:52!
Goodbye!
Run a single job:
bigflow run --job hello_world_workflow.say_goodbye
Output:
Goodbye!
Run a workflow with a concrete runtime:
bigflow run --workflow hello_world_workflow --runtime '2020-08-01 10:00:00'
Output:
Hello world on 2020-08-01 10:00:00!
Goodbye!
Executing workflows locally is great for development, but finally you want them to be executed periodically by Cloud Composer.
One of the key features of BigFlow is the full automation of the build and deployment process. First, let us talk about the build process and artifacts.
There are two deployment artifacts, which are being built from your BigFlow project:
Build both artifacts with the single command
(we recommend to focus one the single workflow here, you can build all workflows in your project
by skipping the workflow
parameter):
bigflow build --workflow hello_world_workflow
List newly generated deployment artifacts:
ls .dags .image
Output (versions may vary):
dags:
hello_world_workflow__v0_1_0SNAPSHOT341dbf7c__2020_09_21_10_00_00_dag.py
image:
deployment_config.py image-0.1.0.tar
Read more about the bigflow build
command.
Now it's time to make use of your artifacts.
Before you start, you have to set up a GCP environment, which consist of two services:
- a Cloud Composer instance
- a Docker Registry
Then, add the deployment configuration of your environment to the deployment_config.py file.
For the purpose of this example, it's enough to set these two properties:
gcp_project_id
and dags_bucket
:
from bigflow import Config
deployment_config = Config(
name='dev',
properties={
'gcp_project_id': 'my_gcp_project_id',
'docker_repository': 'europe-west1-docker.pkg.dev/{gcp_project_id}/docs-project',
'dags_bucket': 'my_composer_dags_bucket',
},
)
You can read more here about the deployment_config.py
.
Now, it's time to deploy artifacts on Cloud Composer.
When deploying from a local machine, we recommend using the local authentication method.
It relies on your personal GCP account, through gcloud
tool.
Check if you are authenticated:
gcloud info
If not, set the default project:
gcloud config set project <your-project>
And then, log in:
gcloud auth application-default login
Deploy your workflow to Cloud Composer:
bigflow deploy
Wait a while till Airflow reads the new DAG file and check your Airflow UI. If you see this picture it means that you nailed it!
Read more about the bigflow deploy
command.
In BigFlow, project environments are configured by
bigflow.Config
objects.
Here we show how to create the workflow which prints different messaged for each environment.
import bigflow
config = bigflow.Config(
name='dev',
properties={
'message_to_print': 'Message to print on DEV'
},
).add_configuration(
name='prod',
properties={
'message_to_print': 'Message to print on PROD'
},
)
class HelloConfigJob(bigflow.Job):
id = 'hello_config_job'
def __init__(self, message_to_print):
self.message_to_print = message_to_print
def execute(self, context):
print(self.message_to_print)
hello_world_workflow = bigflow.Workflow(
workflow_id='hello_config_workflow',
definition=[
HelloConfigJob(config.resolve_property('message_to_print')),
],
)
Run the workflow with dev
config:
bigflow run --workflow hello_config_workflow --config dev
Output:
bf_env is : dev
Message to print on DEV
Run the workflow with prod
config:
bigflow run --workflow hello_config_workflow --config prod
Output:
bf_env is : prod
Message to print on PROD
Run the workflow with the default config, which happened to be dev
config:
bigflow run --workflow hello_config_workflow
Output:
bf_env is : dev
Message to print on DEV
Read more about the bigflow run
command.