Scheduling Coalesce Pipelines in Snowflake

One easy way to schedule Coalesce pipelines/jobs without the need to use any third party schedulers is to take advantage of Snowflake stored procedures, Tasks, and functions to make the necessary api calls to the Coalesce endpoints at a custom-defined cadence.

In this article we go over the procedure to set up these functions and Tasks in Snowflake and any necessary authentication required to trigger the Coalesce jobs.

Before You Begin

In order to run these stored procedure setup DDL commands, one will need to be at least a SYSADMIN or higher user role in Snowflake.

You will need to setup Snowflake KeyPair Authentication or Snowflake OAuth to authorize into the Snowflake servers through Coalesce via API call. Although it is possible to use username/password authorization, this is less secure than KeyPair or OAuth.

If using KeyPair, save your KeyPair Key and decryption password in a safe place as we will need to input it into our Stored Procedures.

Enable Authentication On Your Environment

You can skip this step if you have already enabled KeyPair or OAuth previously.

  1. Go to the Deploy Tab.
  2. Choose the environment you want to deploy, click Configure Deployment.
  3. If using KeyPair go to User Credentials and select Authentication Type as KeyPair.
  4. If using OAuth, go to OAuth Settings > Toggle OAuth on. Enter the Client ID an Client Secret.
    1. Go to User Credentials and select Snowflake OAuth.

Create Network Rule & Token Secret

You will need to add network rules to Snowflake such that it can reach the Coalesce API. In addition, one will need to create a secret holding the Coalesce API Token. Make sure to replace any placeholder <> in the below examples with the proper values conforming to your Snowflake roles/Coalesce Tokens.


-- Create a network rule for Snowflake to reach Coalesce API

CREATE OR REPLACE  NETWORK RULE COALESCE_API_RULE
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('APP.COALESCESOFTWARE.IO');

-- Create a Secret containing Token from Coalesce

CREATE OR REPLACE SECRET COALESCE_API_TOKEN 
TYPE = GENERIC_STRING 
SECRET_STRING='<your Coalesce token>';
GRANT USAGE ON SECRET COALESCE_API_TOKEN TO <role>;

Stored Procedure Setup

It is recommended to create these procedures in a separate Database and Schema such that they are not lost with your Data Warehouse nodes. A DB/Schema like COALESCE.UTILITY can be a good place to store these. Depending on the authentication scheme that you decide to go with, you will have to follow a separate set of procedures, which are outlined below.

📘

Find and Replace COMPUTE_WH and SYSADMIN

The Warehouse and Role used to execute jobs is hard coded in the Procedure / Functions and can be modified depending on your requirements. Search and replace the COMPUTE_WH and SYSADMIN text in the setup script below.

KeyPair Auth Procedures

Depending on if the KeyPair key created is encrypted or not, one will have to follow either the Non-Encrypted Private Key or the Encrypted Private Key set of procedures for creating the necessary integrations, secrets and Snowflake functions.

Non-Encrypted Private Key

🚧

Newlines

Newlines must be encoded as "\n" within the string.

-- Create a Secret containing the snowflake private key.
-- Newlines must be encoded as "\n" within the string!.
  
CREATE OR REPLACE SECRET SNOWFLAKE_PRIVATE_KEY
TYPE = GENERIC_STRING
SECRET_STRING = ‘<your snowflake private key>’
GRANT USAGE ON SECRET SNOWFLAKE_PRIVATE_KEY TO <role>;


-- Create Integration to access Coalesce API 

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION COALESCE_API_INTEGRATION
ALLOWED_NETWORK_RULES = (COALESCE_API_RULE)
ALLOWED_AUTHENTICATION_SECRETS = (COALESCE_API_TOKEN, SNOWFLAKE_PRIVATE_KEY)
ENABLED=TRUE;

-- COALESCE_API_RUN_JOB
-- Pass in JobID and EnvironmentID from Coalesce
-- Calls the startRun API to release a job 
-- Returns the runCounter of that job instance
-- eg CALL COALESCE_API_RUN_JOB(5, 3)

CREATE OR REPLACE PROCEDURE COALESCE_API_RUN_JOB(JobID NUMBER, EnvironmentID NUMBER)
RETURNS STRING
language python
runtime_version=3.8
handler = 'run_job'
external_access_integrations=(COALESCE_API_INTEGRATION)
packages = ('snowflake-snowpark-python','requests')
secrets = ('token' = COALESCE_API_TOKEN, 'creds' = SNOWFLAKE_PRIVATE_KEY)
as
$$
import _snowflake
import requests
import json

def run_job(session, JobID, EnvironmentID):
    token = _snowflake.get_generic_secret_string('token')
    sf_private_key = _snowflake.get_generic_secret_string('creds')
    
    url = "https://app.coalescesoftware.io/scheduler/startRun"
    headers = {
        "accept": "application/json",
        "authorization": "Bearer " + token
    }
    payload = {
    "runDetails": {
        "parallelism": 16,
        "environmentID": str(EnvironmentID),
        "jobID": str(JobID)
        },
    "userCredentials": {
        "snowflakeAuthType": "KeyPair",
	  "snowflakeKeyPairKey": sf_private_key,
        "snowflakeWarehouse": "COMPUTE_WH",
        "snowflakeRole": "SYSADMIN"
        }
    }
    
    response = requests.post(url, json=payload, headers=headers)
    if response.status_code == 200:
        if "runCounter" in response.json():
            return response.json()['runCounter']
        else:
            return response.json()
    else:
        return "Error, response: " + str(response.status_code)

$$;

Encrypted Private Key

🚧

Must Have Encrypted Private Key

USE ONLY IF YOUR PRIVATE KEY IS ENCRYPTED

-- USE ONLY IF YOUR PRIVATE KEY IS ENCRYPTED!
    
CREATE OR REPLACE SECRET SNOWFLAKE_KEY_PAIR_PASS
TYPE = GENERIC_STRING
SECRET_STRING = ‘<your private key decryption password>’
GRANT USAGE ON SECRET SNOWFLAKE_KEY_PAIR_PASS TO <role>;
-- Create Integration to access Coalesce API (w/ ENCRYPTED PRIVATE KEY)


CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION COALESCE_API_INTEGRATION
ALLOWED_NETWORK_RULES = (COALESCE_API_RULE)
ALLOWED_AUTHENTICATION_SECRETS = (COALESCE_API_TOKEN, SNOWFLAKE_PRIVATE_KEY, SNOWFLAKE_KEY_PAIR_PASS)
ENABLED=TRUE;


-- COALESCE_API_RUN_JOB
-- Pass in JobID and EnvironmentID from Coalesce
-- Calls the startRun API to release a job 
-- Returns the runCounter of that job instance
-- eg CALL COALESCE_API_RUN_JOB(5, 3)

CREATE OR REPLACE PROCEDURE COALESCE_API_RUN_JOB_ENC(JobID NUMBER, EnvironmentID NUMBER)
RETURNS STRING
language python
runtime_version=3.8
handler = 'run_job'
external_access_integrations=(COALESCE_API_INTEGRATION)
packages = ('snowflake-snowpark-python','requests')
secrets = ('token' = COALESCE_API_TOKEN, 'creds' = SNOWFLAKE_PRIVATE_KEY, 'keyPass' = SNOWFLAKE_KEY_PAIR_PASS)
as
$$
import _snowflake
import requests
import json

def run_job(session, JobID, EnvironmentID):
    token = _snowflake.get_generic_secret_string('token')
    sf_private_key = _snowflake.get_generic_secret_string('creds')
    keyPairPass = _snowflake.get_generic_secret_string('keyPass')
    
    url = "https://app.coalescesoftware.io/scheduler/startRun"
    headers = {
        "accept": "application/json",
        "authorization": "Bearer " + token
    }
    payload = {
    "runDetails": {
        "parallelism": 16,
        "environmentID": str(EnvironmentID),
        "jobID": str(JobID)
        },
    "userCredentials": {
        "snowflakeAuthType": "KeyPair",
	  "snowflakeKeyPairKey": sf_private_key,
  "snowflakeKeyPairKey": keyPairPass,
        "snowflakeWarehouse": "COMPUTE_WH",
        "snowflakeRole": "SYSADMIN"
        }
    }
    
    response = requests.post(url, json=payload, headers=headers)
    if response.status_code == 200:
        if "runCounter" in response.json():
            return response.json()['runCounter']
        else:
            return response.json()
    else:
        return "Error, response: " + str(response.status_code)

$$;

OAuth Auth Procedures

If authentication to Snowflake was set up using OAuth for the refresh environment, follow the steps below.

-- COALESCE_API_RUN_JOB
-- Pass in JobID and EnvironmentID from Coalesce
-- Calls the startRun API to release a job 
-- Returns the runCounter of that job instance
-- eg CALL COALESCE_API_RUN_JOB(5, 3)

CREATE OR REPLACE PROCEDURE COALESCE_API_RUN_JOB(JobID NUMBER, EnvironmentID NUMBER)
RETURNS STRING
language python
runtime_version=3.8
handler = 'run_job'
external_access_integrations=(COALESCE_API_INTEGRATION)
packages = ('snowflake-snowpark-python','requests')
secrets = ('token' = COALESCE_API_TOKEN)
as
$$
import _snowflake
import requests
import json

def run_job(session, JobID, EnvironmentID):
    token = _snowflake.get_generic_secret_string('token')
    
    url = "https://app.coalescesoftware.io/scheduler/startRun"
    headers = {
        "accept": "application/json",
        "authorization": "Bearer " + token
    }
    payload = {
    "runDetails": {
        "parallelism": 16,
        "environmentID": str(EnvironmentID),
        "jobID": str(JobID)
        },
    "userCredentials": {
        "snowflakeAuthType": "OAuth",
        "snowflakeWarehouse": "COMPUTE_WH",
        "snowflakeRole": "SYSADMIN"
        }
    }
    
    response = requests.post(url, json=payload, headers=headers)
    if response.status_code == 200:
        if "runCounter" in response.json():
            return response.json()['runCounter']
        else:
            return response.json()
    else:
        return "Error, response: " + str(response.status_code)

$$;

Email Notifications

If you want to setup email alerts on refresh statuses for pipelines, one can create a notification integration and follow the instructions in Snowflake's Sending Email Notifications.


ALTER USER <user> SET EMAIL = '<email address>';

CREATE NOTIFICATION INTEGRATION COALESCE_EMAIL
    TYPE=email
    ENABLED=true
    ALLOWED_RECIPIENTS=('<email address1>', '<email address2>');

Scheduling

Once your stored procedures are created and saved, they can be called on a schedule using Snowflake Tasks.

Example: Run a Job on a Schedule

-- Run a job on a schedule
CREATE TASK TASK_RUN_JOB_PROCESS_DIMS 
SCHEDULE = 'USING CRON 0 8-17 * * 1-5 America/Los_Angeles'
WAREHOUSE = COMPUTE_WH 
AS CALL RUN_JOB_SEND_EMAIL(5, 3, '[email protected]');

Example: Run a Job When Data Changes Based on a Stream

-- Run a job when data change based on a STREAM
CREATE TASK TASK_RUN_JOB_PROCESS_DIMS 
SCHEDULE = '1 minute'	
WHEN SYSTEM$STREAM_HAS_DATA('DEV_STG.STR_ORDERS') 
WAREHOUSE = COMPUTE_WH 
AS CALL COALESCE_API_RUN_JOB_COMPLETE(5 , 3);

Example: Run a Secondary Job on Completion of a Prior Job

-- Run a secondary job on completion of a prior job
CREATE TASK TASK_RUN_JOB_PROCESS_FACTS 
AFTER TASK_RUN_JOB_PROCESS_DIMS
AS CALL COALESCE_API_RUN_JOB_COMPLETE(5 , 3);

ALTER TASK TASK_RUN_JOB_PROCESS_DIMS RESUME;
ALTER TASK TASK_RUN_JOB_PROCESS_FACTS RESUME;

Video: Schedule Your Coalesce Jobs Using Snowflake