Trigger a DBT Cloud job run from Snowflake

1 min read
  • Acquire a DBT Cloud Service token with privileges enough to run a DBT job within a project (DBT Cloud Service tokens, API def)
  • Create a Snowflake secret to keep DBT Cloud API token:
Show code
USE ROLE ACCOUNTADMIN;
USE DATABASE CONFIG;
USE SCHEMA CONFIG;
            
CREATE OR REPLACE SECRET secret_dbt_cloud_api_project_job
    TYPE = GENERIC_STRING
    SECRET_STRING = '---dbt-cloud-api-token---'
    COMMENT = 'DBT Cloud API Token to trigger a project jobs';
         
GRANT READ ON SECRET CONFIG.CONFIG.secret_dbt_cloud_api_project_job TO ROLE MY_ROLE;
GRANT USAGE ON DATABASE CONFIG TO ROLE MY_ROLE;
GRANT USAGE ON SCHEMA CONFIG.CONFIG TO ROLE MY_ROLE;     
  • Create a network rule to access DBT Cloud API:
Show code
USE ROLE ACCOUNTADMIN;
USE DATABASE CONFIG;
USE SCHEMA CONFIG;
 
CREATE OR REPLACE NETWORK RULE NETWORK_RULE_HOST_PORT_DBT_CLOUD
    TYPE = HOST_PORT
    VALUE_LIST = ( 'your.instance.dbt.com:443' )
    MODE = EGRESS
    COMMENT = 'DBT Cloud';  
  • Create an External Access Integration:
Show code
USE ROLE ACCOUNTADMIN;
USE DATABASE CONFIG;
USE SCHEMA CONFIG;
 
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION EXTERNAL_ACCESS_DBT_CLOUD_API
    ALLOWED_NETWORK_RULES = ( NETWORK_RULE_HOST_PORT_DBT_CLOUD )
    ALLOWED_AUTHENTICATION_SECRETS = ( secret_dbt_cloud_api_project_job )
    ENABLED = TRUE
    COMMENT = 'Access to DBT Cloud API';
 
GRANT USAGE ON INTEGRATION EXTERNAL_ACCESS_DBT_CLOUD_API TO ROLE MY_ROLE;  
  • Create a Python UDF to access API
Show code
USE DATABASE MY_DATABASE;
USE SCHEMA MY_SCHEMA;
 
CREATE OR REPLACE FUNCTION run_dbt_project_job(job_id STRING, cause STRING)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
HANDLER = 'run_dbt_project_job'
EXTERNAL_ACCESS_INTEGRATIONS = ( EXTERNAL_ACCESS_DBT_CLOUD_API )
SECRETS = ( 'dbt_api_token' = config.config.secret_dbt_cloud_api_project_job )
PACKAGES = ('requests')
AS
$$
 
import _snowflake
import requests
 
def run_dbt_project_job(job_id, cause):
    api_key = _snowflake.get_generic_secret_string('dbt_api_token')
    account_id = '---your-dbt-cloud-account-id---'
    url = f"https://your.instance.dbt.com/api/v2/accounts/{account_id}/jobs/{job_id}/run/"
     
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }
     
    payload = {
        "cause": cause
    }
     
    try:
        # Send the POST request to dbt Cloud API
        response = requests.post(url, headers=headers, json=payload)
        response.raise_for_status()  # Raise an error for HTTP codes 4xx/5xx
        response_json = response.json()
        return response_json.get("status", {}).get("user_message", None)
        # return response.json()  # Return the response from dbt Cloud API
    except requests.exceptions.RequestException as e:
        return f"Error triggering dbt Cloud job: {e}"
$$;
 
GRANT USAGE ON FUNCTION MY_DATABASE.MY_SCHEMA.run_dbt_project_job(STRING,STRING) TO ROLE MY_ROLE;  
  • Use function to trigger a job run at DBT Cloud:
Show code
USE DATABASE MY_DATABASE;
USE SCHEMA MY_SCHEMA;
 
SELECT run_dbt_project_job('---your-project-job-id---', 'comment ...');
  

returns Success! for successful completion, an error message otherwise.

Ref. creating-using-external-network-access