-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathexample_DbtDag_databricks.py
More file actions
67 lines (58 loc) · 2.23 KB
/
example_DbtDag_databricks.py
File metadata and controls
67 lines (58 loc) · 2.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""
This dag runs the jaffle_shop dbt project on databricks using
the `DbtDag` class from Cosmos.
"""
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.profiles.databricks import DatabricksTokenProfileMapping
import os
from pendulum import datetime
# You need to set this Airflow connection, for an example see the .env_example file in the root of this repository
DATABRICKS_CONN_ID = os.getenv("DATABRICKS_CONN_ID", "databricks_default")
# Use your own values for the catalog, schema, compute type and compute names
CATALOG_NAME = os.getenv("DATABRICKS_CATALOG", "dev_catalog")
SCHEMA_NAME = os.getenv("DATABRICKS_SCHEMA", "dev_schema")
COMPUTE_NAME = os.getenv("DATABRICKS_COMPUTE", "shared_compute")
COMPUTE_TYPE = os.getenv("DATABRICKS_COMPUTE_TYPE", "serverless")
# Adjust this to your own project name, the path to the dbt project and
# the path to the dbt executable if you are using one
DBT_PROJECT_PATH = f"{os.environ['AIRFLOW_HOME']}/include/dbt/jaffle_shop"
DBT_EXECUTABLE_PATH = f"{os.getenv('AIRFLOW_HOME')}/dbt_venv_databricks/bin/dbt"
_project_config = ProjectConfig(
dbt_project_path=DBT_PROJECT_PATH,
)
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=DatabricksTokenProfileMapping(
conn_id=DATABRICKS_CONN_ID,
profile_args={
"catalog": CATALOG_NAME,
"schema": SCHEMA_NAME,
"compute": {
"type": COMPUTE_TYPE,
"compute_name": COMPUTE_NAME,
},
},
),
)
# Only needed if you can't install dbt-databricks in the requirements.txt file
_execution_config = ExecutionConfig(
dbt_executable_path=DBT_EXECUTABLE_PATH,
)
_default_args = {
"retries": 2,
}
example_DbtDag_databricks = DbtDag(
# Mandatory DAG parameters
dag_id="example_DbtDag_databricks",
# Mandatory Cosmos parameters
project_config=_project_config,
profile_config=_profile_config,
# Add optional Cosmos parameters as needed, for example
execution_config=_execution_config,
# Add optional DAG parameters, for example:
start_date=datetime(2025, 10, 1),
schedule="@daily",
default_args=_default_args,
tags=["basic", "databricks", "jaffle_shop"],
)