Trigger a DBT Cloud job run from Snowflake
- Acquire a DBT Cloud Service token with privileges enough to run a DBT job within a project (DBT Cloud Service tokens, API def)
- Create a Snowflake secret to keep DBT Cloud API token:
Show code
USE ROLE ACCOUNTADMIN;
USE DATABASE CONFIG;
USE SCHEMA CONFIG;
CREATE OR REPLACE SECRET secret_dbt_cloud_api_project_job
TYPE = GENERIC_STRING
SECRET_STRING = '---dbt-cloud-api-token---'
COMMENT = 'DBT Cloud API Token to trigger a project jobs';
GRANT READ ON SECRET CONFIG.CONFIG.secret_dbt_cloud_api_project_job TO ROLE MY_ROLE;
GRANT USAGE ON DATABASE CONFIG TO ROLE MY_ROLE;
GRANT USAGE ON SCHEMA CONFIG.CONFIG TO ROLE MY_ROLE;
- Create a network rule to access DBT Cloud API:
Show code
USE ROLE ACCOUNTADMIN;
USE DATABASE CONFIG;
USE SCHEMA CONFIG;
CREATE OR REPLACE NETWORK RULE NETWORK_RULE_HOST_PORT_DBT_CLOUD
TYPE = HOST_PORT
VALUE_LIST = ( 'your.instance.dbt.com:443' )
MODE = EGRESS
COMMENT = 'DBT Cloud';
- Create an External Access Integration:
Show code
USE ROLE ACCOUNTADMIN;
USE DATABASE CONFIG;
USE SCHEMA CONFIG;
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION EXTERNAL_ACCESS_DBT_CLOUD_API
ALLOWED_NETWORK_RULES = ( NETWORK_RULE_HOST_PORT_DBT_CLOUD )
ALLOWED_AUTHENTICATION_SECRETS = ( secret_dbt_cloud_api_project_job )
ENABLED = TRUE
COMMENT = 'Access to DBT Cloud API';
GRANT USAGE ON INTEGRATION EXTERNAL_ACCESS_DBT_CLOUD_API TO ROLE MY_ROLE;
- Create a Python UDF to access API
Show code
USE DATABASE MY_DATABASE;
USE SCHEMA MY_SCHEMA;
CREATE OR REPLACE FUNCTION run_dbt_project_job(job_id STRING, cause STRING)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
HANDLER = 'run_dbt_project_job'
EXTERNAL_ACCESS_INTEGRATIONS = ( EXTERNAL_ACCESS_DBT_CLOUD_API )
SECRETS = ( 'dbt_api_token' = config.config.secret_dbt_cloud_api_project_job )
PACKAGES = ('requests')
AS
$$
import _snowflake
import requests
def run_dbt_project_job(job_id, cause):
api_key = _snowflake.get_generic_secret_string('dbt_api_token')
account_id = '---your-dbt-cloud-account-id---'
url = f"https://your.instance.dbt.com/api/v2/accounts/{account_id}/jobs/{job_id}/run/"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"cause": cause
}
try:
# Send the POST request to dbt Cloud API
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status() # Raise an error for HTTP codes 4xx/5xx
response_json = response.json()
return response_json.get("status", {}).get("user_message", None)
# return response.json() # Return the response from dbt Cloud API
except requests.exceptions.RequestException as e:
return f"Error triggering dbt Cloud job: {e}"
$$;
GRANT USAGE ON FUNCTION MY_DATABASE.MY_SCHEMA.run_dbt_project_job(STRING,STRING) TO ROLE MY_ROLE;
- Use function to trigger a job run at DBT Cloud:
Show code
USE DATABASE MY_DATABASE;
USE SCHEMA MY_SCHEMA;
SELECT run_dbt_project_job('---your-project-job-id---', 'comment ...');
returns Success! for successful completion, an error message otherwise.