sqlmesh.cli.example_project
1import typing as t 2from enum import Enum 3from pathlib import Path 4 5import click 6from sqlglot import Dialect 7 8 9class ProjectTemplate(Enum): 10 AIRFLOW = "airflow" 11 DBT = "dbt" 12 DEFAULT = "default" 13 EMPTY = "empty" 14 15 16def _gen_config(dialect: t.Optional[str], template: ProjectTemplate) -> str: 17 default_configs = { 18 ProjectTemplate.DEFAULT: f"""gateways: 19 local: 20 connection: 21 type: duckdb 22 database: db.db 23 24default_gateway: local 25 26model_defaults: 27 dialect: {dialect} 28""", 29 ProjectTemplate.AIRFLOW: f"""gateways: 30 local: 31 connection: 32 type: duckdb 33 database: db.db 34 35default_gateway: local 36 37default_scheduler: 38 type: airflow 39 airflow_url: http://localhost:8080/ 40 username: airflow 41 password: airflow 42 43model_defaults: 44 dialect: {dialect} 45""", 46 ProjectTemplate.DBT: """from pathlib import Path 47 48from sqlmesh.dbt.loader import sqlmesh_config 49 50config = sqlmesh_config(Path(__file__).parent) 51""", 52 } 53 54 default_configs[ProjectTemplate.EMPTY] = default_configs[ProjectTemplate.DEFAULT] 55 return default_configs[template] 56 57 58EXAMPLE_SCHEMA_NAME = "sqlmesh_example" 59EXAMPLE_FULL_MODEL_NAME = f"{EXAMPLE_SCHEMA_NAME}.full_model" 60EXAMPLE_INCREMENTAL_MODEL_NAME = f"{EXAMPLE_SCHEMA_NAME}.incremental_model" 61EXAMPLE_SEED_MODEL_NAME = f"{EXAMPLE_SCHEMA_NAME}.seed_model" 62 63EXAMPLE_FULL_MODEL_DEF = f"""MODEL ( 64 name {EXAMPLE_FULL_MODEL_NAME}, 65 kind FULL, 66 cron '@daily', 67 grain item_id, 68 audits (assert_positive_order_ids), 69); 70 71SELECT 72 item_id, 73 COUNT(DISTINCT id) AS num_orders, 74FROM 75 {EXAMPLE_INCREMENTAL_MODEL_NAME} 76GROUP BY item_id 77""" 78 79EXAMPLE_INCREMENTAL_MODEL_DEF = f"""MODEL ( 80 name {EXAMPLE_INCREMENTAL_MODEL_NAME}, 81 kind INCREMENTAL_BY_TIME_RANGE ( 82 time_column event_date 83 ), 84 start '2020-01-01', 85 cron '@daily', 86 grain (id, event_date) 87); 88 89SELECT 90 id, 91 item_id, 92 event_date, 93FROM 94 {EXAMPLE_SEED_MODEL_NAME} 95WHERE 96 event_date BETWEEN @start_date AND @end_date 97""" 98 99EXAMPLE_SEED_MODEL_DEF = f"""MODEL ( 100 name {EXAMPLE_SEED_MODEL_NAME}, 101 kind SEED ( 102 path '../seeds/seed_data.csv' 103 ), 104 columns ( 105 id INTEGER, 106 item_id INTEGER, 107 event_date DATE 108 ), 109 grain (id, event_date) 110); 111""" 112 113EXAMPLE_AUDIT = """AUDIT ( 114 name assert_positive_order_ids, 115); 116 117SELECT * 118FROM @this_model 119WHERE 120 item_id < 0 121""" 122 123EXAMPLE_SEED_DATA = """id,item_id,event_date 1241,2,2020-01-01 1252,1,2020-01-01 1263,3,2020-01-03 1274,1,2020-01-04 1285,1,2020-01-05 1296,1,2020-01-06 1307,1,2020-01-07 131""" 132 133EXAMPLE_TEST = f"""test_example_full_model: 134 model: {EXAMPLE_FULL_MODEL_NAME} 135 inputs: 136 {EXAMPLE_INCREMENTAL_MODEL_NAME}: 137 rows: 138 - id: 1 139 item_id: 1 140 event_date: '2020-01-01' 141 - id: 2 142 item_id: 1 143 event_date: '2020-01-02' 144 - id: 3 145 item_id: 2 146 event_date: '2020-01-03' 147 outputs: 148 query: 149 rows: 150 - item_id: 1 151 num_orders: 2 152 - item_id: 2 153 num_orders: 1 154""" 155 156 157def init_example_project( 158 path: t.Union[str, Path], 159 dialect: t.Optional[str], 160 template: ProjectTemplate = ProjectTemplate.DEFAULT, 161) -> None: 162 root_path = Path(path) 163 config_extension = "py" if template == ProjectTemplate.DBT else "yaml" 164 config_path = root_path / f"config.{config_extension}" 165 audits_path = root_path / "audits" 166 macros_path = root_path / "macros" 167 models_path = root_path / "models" 168 seeds_path = root_path / "seeds" 169 tests_path = root_path / "tests" 170 171 if config_path.exists(): 172 raise click.ClickException(f"Found an existing config in '{config_path}'") 173 174 if not dialect and template != ProjectTemplate.DBT: 175 raise click.ClickException( 176 "Default SQL dialect is a required argument for SQLMesh projects" 177 ) 178 179 _create_config(config_path, dialect, template) 180 if template == ProjectTemplate.DBT: 181 return 182 183 _create_folders([audits_path, macros_path, models_path, seeds_path, tests_path]) 184 185 if template != ProjectTemplate.EMPTY: 186 _create_macros(macros_path) 187 _create_audits(audits_path) 188 _create_models(models_path) 189 _create_seeds(seeds_path) 190 _create_tests(tests_path) 191 192 193def _create_folders(target_folders: t.Sequence[Path]) -> None: 194 for folder_path in target_folders: 195 folder_path.mkdir(exist_ok=True) 196 (folder_path / ".gitkeep").touch() 197 198 199def _create_config(config_path: Path, dialect: t.Optional[str], template: ProjectTemplate) -> None: 200 if dialect: 201 Dialect.get_or_raise(dialect) 202 203 project_config = _gen_config(dialect, template) 204 205 _write_file( 206 config_path, 207 project_config, 208 ) 209 210 211def _create_macros(macros_path: Path) -> None: 212 (macros_path / "__init__.py").touch() 213 214 215def _create_audits(audits_path: Path) -> None: 216 _write_file(audits_path / "assert_positive_order_ids.sql", EXAMPLE_AUDIT) 217 218 219def _create_models(models_path: Path) -> None: 220 for model_name, model_def in [ 221 (EXAMPLE_FULL_MODEL_NAME, EXAMPLE_FULL_MODEL_DEF), 222 (EXAMPLE_INCREMENTAL_MODEL_NAME, EXAMPLE_INCREMENTAL_MODEL_DEF), 223 (EXAMPLE_SEED_MODEL_NAME, EXAMPLE_SEED_MODEL_DEF), 224 ]: 225 _write_file(models_path / f"{model_name.split('.')[-1]}.sql", model_def) 226 227 228def _create_seeds(seeds_path: Path) -> None: 229 _write_file(seeds_path / "seed_data.csv", EXAMPLE_SEED_DATA) 230 231 232def _create_tests(tests_path: Path) -> None: 233 _write_file(tests_path / "test_full_model.yaml", EXAMPLE_TEST) 234 235 236def _write_file(path: Path, payload: str) -> None: 237 with open(path, "w", encoding="utf-8") as fd: 238 fd.write(payload)
class
ProjectTemplate(enum.Enum):
10class ProjectTemplate(Enum): 11 AIRFLOW = "airflow" 12 DBT = "dbt" 13 DEFAULT = "default" 14 EMPTY = "empty"
An enumeration.
AIRFLOW =
<ProjectTemplate.AIRFLOW: 'airflow'>
DBT =
<ProjectTemplate.DBT: 'dbt'>
DEFAULT =
<ProjectTemplate.DEFAULT: 'default'>
EMPTY =
<ProjectTemplate.EMPTY: 'empty'>
Inherited Members
- enum.Enum
- name
- value
def
init_example_project( path: Union[str, pathlib.Path], dialect: Union[str, NoneType], template: sqlmesh.cli.example_project.ProjectTemplate = <ProjectTemplate.DEFAULT: 'default'>) -> None:
158def init_example_project( 159 path: t.Union[str, Path], 160 dialect: t.Optional[str], 161 template: ProjectTemplate = ProjectTemplate.DEFAULT, 162) -> None: 163 root_path = Path(path) 164 config_extension = "py" if template == ProjectTemplate.DBT else "yaml" 165 config_path = root_path / f"config.{config_extension}" 166 audits_path = root_path / "audits" 167 macros_path = root_path / "macros" 168 models_path = root_path / "models" 169 seeds_path = root_path / "seeds" 170 tests_path = root_path / "tests" 171 172 if config_path.exists(): 173 raise click.ClickException(f"Found an existing config in '{config_path}'") 174 175 if not dialect and template != ProjectTemplate.DBT: 176 raise click.ClickException( 177 "Default SQL dialect is a required argument for SQLMesh projects" 178 ) 179 180 _create_config(config_path, dialect, template) 181 if template == ProjectTemplate.DBT: 182 return 183 184 _create_folders([audits_path, macros_path, models_path, seeds_path, tests_path]) 185 186 if template != ProjectTemplate.EMPTY: 187 _create_macros(macros_path) 188 _create_audits(audits_path) 189 _create_models(models_path) 190 _create_seeds(seeds_path) 191 _create_tests(tests_path)