In this tutorial, we show the complete "Hello, World!" example. You will learn how to run the simplest BigFlow workflow on a local machine and how to deploy it on Cloud Composer.
This tutorial is based on the Docs project located in the BigFlow repository.
Start from cloning this repository:
git clone https://github.com/allegro/bigflow.git
cd bigflow/docs
Then, install the BigFlow PIP package
in a fresh virtual environment in the docs
project directory.
Since you have installed the BigFlow PIP package, you can use BigFlow CLI. Test it:
bigflow -h
In BigFlow, your project consists of workflows. You can run them directly from your local machine or deploy them to Cloud Composer as Airflow DAGs.
Our "Hello, World!" workflow is the simple Python code. It consists of two jobs. The first one says Hello, and the second one says Goodbye:
import bigflow
class HelloWorldJob(bigflow.Job):
id = 'hello_world'
def execute(self, context: bigflow.JobContext):
print(f'Hello world on {context.runtime}!')
class SayGoodbyeJob(bigflow.Job):
id = 'say_goodbye'
def execute(self, context: bigflow.JobContext):
print(f'Goodbye!')
hello_world_workflow = bigflow.Workflow(
workflow_id='hello_world_workflow',
definition=[
HelloWorldJob(),
SayGoodbyeJob(),
],
)
The bigflow run
command lets you run this workflow directly
from sources on your local machine (without building and deploying it to Composer).
Run the whole workflow:
bigflow run --workflow hello_world_workflow
Output:
Hello world on 2020-09-10 12:17:52!
Goodbye!
Run the single job:
bigflow run --job hello_world_workflow.say_goodbye
Output:
Goodbye!
Run the workflow with concrete runtime:
bigflow run --workflow hello_world_workflow --runtime '2020-08-01 10:00:00'
Output:
Hello world on 2020-08-01 10:00:00!
Goodbye!
Executing workflows locally is great for development, but finally you want them to be executed periodically by Google Cloud Composer.
One of the key features of BigFlow is the full automation of the build and deployment process.
Before you start, you have to set up a GCP environment, which consist of two services:
- a Cloud Composer instance
- a Docker Registry
Then, add the configuration of your environment to the deployment_config.py file.
For the purpose of this example, it's enough to set these two properties:
gcp_project_id
and dags_bucket
:
from bigflow import Config
deployment_config = Config(
name='dev',
properties={
'gcp_project_id': 'my_gcp_project_id',
'docker_repository': 'eu.gcr.io/{gcp_project_id}/docs-project',
'dags_bucket': 'my_composer_dags_bucket',
},
)
Read more about deployment_config.py
.
There are two deployment artifacts, which are being built from your BigFlow project:
Build both artifacts with the single command
(we recommend to focus one the single workflow here, you can build all workflows in your project
by skipping the workflow
parameter):
bigflow build --workflow hello_world_workflow
List the freshly generated deployment artifacts:
ls .dags .image
Output:
dags:
hello_world_workflow__v0_1_0SNAPSHOT341dbf7c__2020_09_21_10_00_00_dag.py
image:
deployment_config.py image-0.1.0.tar
Read more about the bigflow build
command.
Your final task is to deploy the artifacts on Cloud Composer.
When deploying from a local machine, we recommend using the local authentication method.
It relies on your personal GCP account, through gcloud
tool.
Check if you are authenticated:
gcloud info
If not, set the default project:
gcloud config set project <your-project>
And then, log in:
gcloud auth application-default login
Deploy your workflow to Cloud Composer:
bigflow deploy
Wait a while till Airflow reads the new DAG file and check your Airflow UI. If you see this picture it means that you nailed it!
Read more about the bigflow deploy
command.
In BigFlow, project environments are configured by
bigflow.Config
objects.
Here we show how to create the workflow which prints different messaged for each environment.
import bigflow
config = bigflow.Config(
name='dev',
properties={
'message_to_print': 'Message to print on DEV'
},
).add_configuration(
name='prod',
properties={
'message_to_print': 'Message to print on PROD'
},
)
class HelloConfigJob(bigflow.Job):
id = 'hello_config_job'
def __init__(self, message_to_print):
self.message_to_print = message_to_print
def execute(self, context):
print(self.message_to_print)
hello_world_workflow = bigflow.Workflow(
workflow_id='hello_config_workflow',
definition=[
HelloConfigJob(config.resolve_property('message_to_print')),
],
)
Run the workflow with dev
config:
bigflow run --workflow hello_config_workflow --config dev
Output:
bf_env is : dev
Message to print on DEV
Run the workflow with prod
config:
bigflow run --workflow hello_config_workflow --config prod
Output:
bf_env is : prod
Message to print on PROD
Run the workflow with the default config, which happened to be dev
config:
bigflow run --workflow hello_config_workflow
Output:
bf_env is : dev
Message to print on DEV
Read more about the bigflow run
command.