Schedule Coalesce Jobs with Apache Airflow

Coalesce's code-first GUI-driven approach has makes it easier to build, test, and deploy data pipelines than ever before. This considerably improves the data pipeline development workflow when compared to creating directed acyclic graphs (or DAGs) purely with code.

Coalesce relies on third-party services and orchestrators to schedule and execute the Coalesce-built data pipelines, which are organized into Jobs for execution.

Apache Airflow is a common orchestrator used with Coalesce.

Airflow stands out as a widely embraced open-source platform dedicated to crafting, scheduling, overseeing, and archiving workflows. Rooted in Python and backed by a thriving community, it has evolved into an essential tool in the repertoire of data engineers, proficiently handling the orchestration, scheduling, and construction of pipelines.

With respect to orchestrating Coalesce Jobs, Airflow can be used to schedule and execute these Jobs directly, or within the constructs of a larger organizational data pipeline or stack via Airflow DAGs.

This article details the steps for configuring nodes in an Airflow DAG to utilize the Coalesce API to trigger Coalesce Jobs on a schedule.

Before You Begin

The steps detailed in this article assume the following is already in place:

Configure Airflow to Execute Coalesce Jobs on a Schedule

The step below will detail how to configure Airflow to execute Coalesce Jobs on a Schedule.

Configure Airflow Variables to Support Authentication to Snowflake

In Airflow, you will need to create variables to store your the Snowflake credentials for your selected Snowflake authentication method.

📘

KeyPair Authentication

In this article, we will be leveraging the KeyPair Authentication method to avoid exposing actual username/password credentials in the DAG, and the instructions and screen captures will reflect that authentication method. You will need to modify to support your selected authentication method if it is not KeyPair Authentication.


  1. Within Airflow, go to the  Admin tab, and click Variables.

  2. Create two variables:

    • AIRFLOW_VAR_COALESCE_TOKEN: To hold your Coalesce Access Token.
    • AIRFLOW_VAR_SNOWFLAKE_PRIVATE_KEY: To hold the private key you generated for Snowflake.

    🚧

    Formatting the Private Key

    The formatting of the private key you provide to Airflow must contain newline (\n) characters at the end of every line; do not just copy and paste the private key into the the AIRFLOW_VAR_SNOWFLAKE_PRIVATE_KEY box, you will have to manually add the \n characters).

    For example, if your key look like this:

    -----BEGIN PRIVATE KEY-----
    
    MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCv2xVhFpaM6hhf
    
    ce4U5GRfdArGkoqkL2EBRs0zGMn1YYfQ8+zDuN9YkMTNC1pNxQptGn921teGk0wv
    
    cMP+I83P390jqXh56TlQtwn2reRXH7OlLdELttof4VGYb4I6KpdBhDaid8bys2FE
    
    f0r948EXM81Euh9FgmMbc4KzeF1tBDyU0sqAcAJCQXOl95jUR6Wqdp04LXJVoGmI
    
    -----END PRIVATE KEY----
    

    it must be input as:

    -----BEGIN PRIVATE KEY-----\n
    
    MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCv2xVhFpaM6hhf\n
    
    ce4U5GRfdArGkoqkL2EBRs0zGMn1YYfQ8+zDuN9YkMTNC1pNxQptGn921teGk0wv\n
    
    cMP+I83P390jqXh56TlQtwn2reRXH7OlLdELttof4VGYb4I6KpdBhDaid8bys2FE\n
    
    f0r948EXM81Euh9FgmMbc4KzeF1tBDyU0sqAcAJCQXOl95jUR6Wqdp04LXJVoGmI\n
    
    -----END PRIVATE KEY-----
    

Create and Configure Your Airflow DAG to Run a Coalesce Job

In the place where your DAGs are stored within Airflow:

  1. Create a new DAG
  2. Paste in the following code:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
​
​
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 01, 01),
'retries': 1,
'retry_delay': timedelta(minutes=2),
}
​
dag = DAG(
'example_coalesce_trigger_w_status_check_parametrized_token',
default_args=default_args,
description='An example Airflow DAG',
schedule_interval=timedelta(days=1),
)


print_hello_command = 'echo "Hello, World"'
​
execute_api_call_command = """
response=$(curl --location 'https://app.coalescesoftware.io/scheduler/startRun' \
--header 'accept: application/json' \
--header 'content-type: application/json' \
--header 'Authorization: Bearer {{ params.coalesce_token }}' \
--data-raw '{
"runDetails": {
"parallelism": 16,
"environmentID": "3",
"jobID": "3" 
},
"userCredentials": {
"snowflakeAuthType": "KeyPair",
"snowflakeKeyPairKey": "{{ params.sf_private_key }}"
}
}')
​
# Extract runCounter from the JSON response using Python and remove the prefix.
runCounter=$(echo "$response" | python -c "import sys, json; print(json.load(sys.stdin)['runCounter'])" | sed 's/Run Counter: //')
echo "$runCounter"
"""
​
check_status_command = """
curl --request GET \
--url 'https://app.coalescesoftware.io/scheduler/runStatus?runCounter={{ task_instance.xcom_pull(task_ids="execute_api_call") }}' \
--header 'Authorization: Bearer {{ params.coalesce_token }}' \
--header 'accept: application/json'
"""
​
# Define tasks
task_print_hello = BashOperator(
task_id='print_hello',
bash_command=print_hello_command,
dag=dag,
)
​
task_execute_api_call = BashOperator(
task_id='execute_api_call',
params={'coalesce_token': Variable.get('AIRFLOW_VAR_COALESCE_TOKEN'),
'sf_private_key': str(Variable.get('AIRFLOW_VAR_SNOWFLAKE_PRIVATE_KEY'))},
bash_command=execute_api_call_command,
dag=dag,
)
​
task_check_status = BashOperator(
task_id='check_status',
params={'coalesce_token': Variable.get('AIRFLOW_VAR_COALESCE_TOKEN')},
bash_command=check_status_command,
dag=dag,
)
​
# Set task dependencies
task_print_hello >> task_execute_api_call >> task_check_status

📘

Check Your Environment and Job IDs

Ensure that you replace the values for environmentId and jobId in the code with the relevant ID’s of the Environment and Job in your organization in Coalesce. If you don’t want to specify a job, and instead, run the entire Coalesce pipeline, simply remove the jobId as an argument in the payload of the node in the DAG.

The environmentId can be found in the Deploy Interface of the Coalesce App. The jobID can be found in a Workspace that contains your Job.

The environment ID

The environment ID

The job ID

The job ID

  1. Edit the values for attributes like start_date, retries, retry_delay, schedule_interval, etc. to reflect your requirements.
  2. Change the name of the DAG to reflect its purpose.

Once configured and the DAG is triggered, manually or on the schedule, you will see the corresponding Job executions and their details in the Activity Feed in the Coalesce App. You are also able to retrieve run results and details via the API.

Running Multiple Coalesce Jobs Using Airflow

You can run or schedule multiple Coalesce Jobs using the Airflow Scheduler for which there are multiple ways of doing so.

  1. You can create additional nodes within the same Airflow DAG, each of which calls a different job executing sequentially. You can indicate the order by modifying the task dependencies at the bottom of the DAG code:
refresh_job_1 >> refresh_job_2 >> refresh_job_3 >> ...

where each refresh_job_X is the name of a task that executes the API call to trigger a refresh of an environment or job.

  1. You can create multiple Airflow DAG's that you can schedule independently. 

Copy the example code into a new DAG within your /dags folder in Airflow, edit the executeapi_call_command_ function with the desired environmentID and jobID parameters, modify the schedule, and give the DAG a new name.

dag = DAG(
'example_coalesce_trigger_w_status_check_parametrized_token',
default_args=default_args,
description='An example Airflow DAG',
schedule_interval=timedelta(days=1),
)

📘

Best Practices

It is best practice to save the DAG .py file with the same name as that given to the DAG within the file.

Video How to use Airflow to Schedule Coalesce Jobs

Watch a demo on using Airflow for Coalesce Jobs.