Edit on GitHub

SQLMesh logo

SQLMesh is a next-generation data transformation and modeling framework that is backwards compatible with dbt. It aims to be easy to use, correct, and efficient.

SQLMesh enables data practitioners to efficiently run and deploy data transformations written in SQL or Python.

Although SQLMesh will make your dbt projects more efficient, reliable, and maintainable, it is more than just a dbt alternative.

Select Features

For more information, check out the website and documentation.

Getting Started

Install SQLMesh through pypi by running:

pip install sqlmesh

Follow the tutorial to learn how to use SQLMesh.

Join our community

We'd love to join you on your data journey. Connect with us in the following ways:

Contribution

Contributions in the form of issues or pull requests are greatly appreciated. Read more about how to develop for SQLMesh.

  1"""
  2.. include:: ../README.md
  3"""
  4
  5from __future__ import annotations
  6
  7import glob
  8import logging
  9import os
 10import sys
 11import typing as t
 12from datetime import datetime
 13from enum import Enum
 14from pathlib import Path
 15
 16from sqlmesh.core.dialect import extend_sqlglot
 17
 18extend_sqlglot()
 19
 20from sqlmesh.core import constants as c
 21from sqlmesh.core.config import Config
 22from sqlmesh.core.context import Context, ExecutionContext
 23from sqlmesh.core.engine_adapter import EngineAdapter
 24from sqlmesh.core.macros import macro
 25from sqlmesh.core.model import Model, model
 26from sqlmesh.core.snapshot import Snapshot
 27from sqlmesh.utils import debug_mode_enabled, enable_debug_mode
 28
 29try:
 30    from sqlmesh._version import __version__, __version_tuple__  # type: ignore
 31except ImportError:
 32    pass
 33
 34
 35class RuntimeEnv(str, Enum):
 36    """Enum defining what environment SQLMesh is running in."""
 37
 38    TERMINAL = "terminal"
 39    DATABRICKS = "databricks"
 40    GOOGLE_COLAB = "google_colab"  # Not currently officially supported
 41    JUPYTER = "jupyter"
 42    DEBUGGER = "debugger"
 43
 44    @classmethod
 45    def get(cls) -> RuntimeEnv:
 46        """Get the console class to use based on the environment that the code is running in
 47        Reference implementation: https://github.com/noklam/rich/blob/d3a1ae61a77d934844563514370084971bc3e143/rich/console.py#L511-L528
 48
 49        Unlike the rich implementation we try to split out by notebook type instead of treating it all as Jupyter.
 50        """
 51        try:
 52            shell = get_ipython()  # type: ignore
 53            if os.getenv("DATABRICKS_RUNTIME_VERSION"):
 54                return RuntimeEnv.DATABRICKS
 55            if "google.colab" in str(shell.__class__):  # type: ignore
 56                return RuntimeEnv.GOOGLE_COLAB
 57            if shell.__class__.__name__ == "ZMQInteractiveShell":  # type: ignore
 58                return RuntimeEnv.JUPYTER
 59        except NameError:
 60            pass
 61
 62        if debug_mode_enabled():
 63            return RuntimeEnv.DEBUGGER
 64        return RuntimeEnv.TERMINAL
 65
 66    @property
 67    def is_terminal(self) -> bool:
 68        return self == RuntimeEnv.TERMINAL
 69
 70    @property
 71    def is_databricks(self) -> bool:
 72        return self == RuntimeEnv.DATABRICKS
 73
 74    @property
 75    def is_jupyter(self) -> bool:
 76        return self == RuntimeEnv.JUPYTER
 77
 78    @property
 79    def is_google_colab(self) -> bool:
 80        return self == RuntimeEnv.GOOGLE_COLAB
 81
 82    @property
 83    def is_notebook(self) -> bool:
 84        return not self.is_terminal
 85
 86
 87if RuntimeEnv.get().is_notebook:
 88    try:
 89        from sqlmesh.magics import register_magics
 90
 91        register_magics()
 92    except ImportError:
 93        pass
 94
 95
 96LOG_FORMAT = "%(asctime)s - %(threadName)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:%(lineno)d)"
 97LOG_FILENAME_PREFIX = "sqlmesh_"
 98
 99
100# SO: https://stackoverflow.com/questions/384076/how-can-i-color-python-logging-output
101class CustomFormatter(logging.Formatter):
102    """Custom logging formatter."""
103
104    grey = "\x1b[38;20m"
105    yellow = "\x1b[33;20m"
106    red = "\x1b[31;20m"
107    bold_red = "\x1b[31;1m"
108    reset = "\x1b[0m"
109
110    FORMATS = {
111        logging.DEBUG: grey + LOG_FORMAT + reset,
112        logging.INFO: grey + LOG_FORMAT + reset,
113        logging.WARNING: yellow + LOG_FORMAT + reset,
114        logging.ERROR: red + LOG_FORMAT + reset,
115        logging.CRITICAL: bold_red + LOG_FORMAT + reset,
116    }
117
118    def format(self, record: logging.LogRecord) -> str:
119        log_fmt = self.FORMATS.get(record.levelno)
120        formatter = logging.Formatter(log_fmt)
121        return formatter.format(record)
122
123
124def configure_logging(
125    force_debug: bool = False,
126    ignore_warnings: bool = False,
127    write_to_stdout: bool = False,
128    write_to_file: bool = True,
129    log_limit: int = c.DEFAULT_LOG_LIMIT,
130    log_file_dir: t.Optional[t.Union[str, Path]] = None,
131) -> None:
132    logger = logging.getLogger()
133    debug = force_debug or debug_mode_enabled()
134
135    # base logger needs to be the lowest level that we plan to log
136    level = logging.DEBUG if debug else logging.INFO
137    logger.setLevel(level)
138
139    stdout_handler = logging.StreamHandler(sys.stdout)
140    stdout_handler.setFormatter(CustomFormatter())
141    stdout_handler.setLevel(
142        level if write_to_stdout else (logging.ERROR if ignore_warnings else logging.WARNING)
143    )
144    logger.addHandler(stdout_handler)
145
146    log_file_dir = log_file_dir or c.DEFAULT_LOG_FILE_DIR
147    log_path_prefix = Path(log_file_dir) / LOG_FILENAME_PREFIX
148    if write_to_file:
149        os.makedirs(str(log_file_dir), exist_ok=True)
150        filename = f"{log_path_prefix}{datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}.log"
151        file_handler = logging.FileHandler(filename, mode="w", encoding="utf-8")
152        # the log files should always log at least info so that users will always have
153        # minimal info for debugging even if they specify "ignore_warnings"
154        file_handler.setLevel(level)
155        file_handler.setFormatter(logging.Formatter(LOG_FORMAT))
156        logger.addHandler(file_handler)
157
158    if log_limit > 0:
159        for path in list(sorted(glob.glob(f"{log_path_prefix}*.log"), reverse=True))[log_limit:]:
160            os.remove(path)
161
162    if debug:
163        import faulthandler
164        import signal
165
166        enable_debug_mode()
167
168        # Enable threadumps.
169        faulthandler.enable()
170        # Windows doesn't support register so we check for it here
171        if hasattr(faulthandler, "register"):
172            faulthandler.register(signal.SIGUSR1.value)
class RuntimeEnv(builtins.str, enum.Enum):
36class RuntimeEnv(str, Enum):
37    """Enum defining what environment SQLMesh is running in."""
38
39    TERMINAL = "terminal"
40    DATABRICKS = "databricks"
41    GOOGLE_COLAB = "google_colab"  # Not currently officially supported
42    JUPYTER = "jupyter"
43    DEBUGGER = "debugger"
44
45    @classmethod
46    def get(cls) -> RuntimeEnv:
47        """Get the console class to use based on the environment that the code is running in
48        Reference implementation: https://github.com/noklam/rich/blob/d3a1ae61a77d934844563514370084971bc3e143/rich/console.py#L511-L528
49
50        Unlike the rich implementation we try to split out by notebook type instead of treating it all as Jupyter.
51        """
52        try:
53            shell = get_ipython()  # type: ignore
54            if os.getenv("DATABRICKS_RUNTIME_VERSION"):
55                return RuntimeEnv.DATABRICKS
56            if "google.colab" in str(shell.__class__):  # type: ignore
57                return RuntimeEnv.GOOGLE_COLAB
58            if shell.__class__.__name__ == "ZMQInteractiveShell":  # type: ignore
59                return RuntimeEnv.JUPYTER
60        except NameError:
61            pass
62
63        if debug_mode_enabled():
64            return RuntimeEnv.DEBUGGER
65        return RuntimeEnv.TERMINAL
66
67    @property
68    def is_terminal(self) -> bool:
69        return self == RuntimeEnv.TERMINAL
70
71    @property
72    def is_databricks(self) -> bool:
73        return self == RuntimeEnv.DATABRICKS
74
75    @property
76    def is_jupyter(self) -> bool:
77        return self == RuntimeEnv.JUPYTER
78
79    @property
80    def is_google_colab(self) -> bool:
81        return self == RuntimeEnv.GOOGLE_COLAB
82
83    @property
84    def is_notebook(self) -> bool:
85        return not self.is_terminal

Enum defining what environment SQLMesh is running in.

TERMINAL = <RuntimeEnv.TERMINAL: 'terminal'>
DATABRICKS = <RuntimeEnv.DATABRICKS: 'databricks'>
GOOGLE_COLAB = <RuntimeEnv.GOOGLE_COLAB: 'google_colab'>
JUPYTER = <RuntimeEnv.JUPYTER: 'jupyter'>
DEBUGGER = <RuntimeEnv.DEBUGGER: 'debugger'>
@classmethod
def get(cls) -> sqlmesh.RuntimeEnv:
45    @classmethod
46    def get(cls) -> RuntimeEnv:
47        """Get the console class to use based on the environment that the code is running in
48        Reference implementation: https://github.com/noklam/rich/blob/d3a1ae61a77d934844563514370084971bc3e143/rich/console.py#L511-L528
49
50        Unlike the rich implementation we try to split out by notebook type instead of treating it all as Jupyter.
51        """
52        try:
53            shell = get_ipython()  # type: ignore
54            if os.getenv("DATABRICKS_RUNTIME_VERSION"):
55                return RuntimeEnv.DATABRICKS
56            if "google.colab" in str(shell.__class__):  # type: ignore
57                return RuntimeEnv.GOOGLE_COLAB
58            if shell.__class__.__name__ == "ZMQInteractiveShell":  # type: ignore
59                return RuntimeEnv.JUPYTER
60        except NameError:
61            pass
62
63        if debug_mode_enabled():
64            return RuntimeEnv.DEBUGGER
65        return RuntimeEnv.TERMINAL

Get the console class to use based on the environment that the code is running in Reference implementation: https://github.com/noklam/rich/blob/d3a1ae61a77d934844563514370084971bc3e143/rich/console.py#L511-L528

Unlike the rich implementation we try to split out by notebook type instead of treating it all as Jupyter.

Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class CustomFormatter(logging.Formatter):
102class CustomFormatter(logging.Formatter):
103    """Custom logging formatter."""
104
105    grey = "\x1b[38;20m"
106    yellow = "\x1b[33;20m"
107    red = "\x1b[31;20m"
108    bold_red = "\x1b[31;1m"
109    reset = "\x1b[0m"
110
111    FORMATS = {
112        logging.DEBUG: grey + LOG_FORMAT + reset,
113        logging.INFO: grey + LOG_FORMAT + reset,
114        logging.WARNING: yellow + LOG_FORMAT + reset,
115        logging.ERROR: red + LOG_FORMAT + reset,
116        logging.CRITICAL: bold_red + LOG_FORMAT + reset,
117    }
118
119    def format(self, record: logging.LogRecord) -> str:
120        log_fmt = self.FORMATS.get(record.levelno)
121        formatter = logging.Formatter(log_fmt)
122        return formatter.format(record)

Custom logging formatter.

def format(self, record: logging.LogRecord) -> str:
119    def format(self, record: logging.LogRecord) -> str:
120        log_fmt = self.FORMATS.get(record.levelno)
121        formatter = logging.Formatter(log_fmt)
122        return formatter.format(record)

Format the specified record as text.

The record's attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

Inherited Members
logging.Formatter
Formatter
converter
formatTime
formatException
usesTime
formatMessage
formatStack
def configure_logging( force_debug: bool = False, ignore_warnings: bool = False, write_to_stdout: bool = False, write_to_file: bool = True, log_limit: int = 20, log_file_dir: Union[str, pathlib.Path, NoneType] = None) -> None:
125def configure_logging(
126    force_debug: bool = False,
127    ignore_warnings: bool = False,
128    write_to_stdout: bool = False,
129    write_to_file: bool = True,
130    log_limit: int = c.DEFAULT_LOG_LIMIT,
131    log_file_dir: t.Optional[t.Union[str, Path]] = None,
132) -> None:
133    logger = logging.getLogger()
134    debug = force_debug or debug_mode_enabled()
135
136    # base logger needs to be the lowest level that we plan to log
137    level = logging.DEBUG if debug else logging.INFO
138    logger.setLevel(level)
139
140    stdout_handler = logging.StreamHandler(sys.stdout)
141    stdout_handler.setFormatter(CustomFormatter())
142    stdout_handler.setLevel(
143        level if write_to_stdout else (logging.ERROR if ignore_warnings else logging.WARNING)
144    )
145    logger.addHandler(stdout_handler)
146
147    log_file_dir = log_file_dir or c.DEFAULT_LOG_FILE_DIR
148    log_path_prefix = Path(log_file_dir) / LOG_FILENAME_PREFIX
149    if write_to_file:
150        os.makedirs(str(log_file_dir), exist_ok=True)
151        filename = f"{log_path_prefix}{datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}.log"
152        file_handler = logging.FileHandler(filename, mode="w", encoding="utf-8")
153        # the log files should always log at least info so that users will always have
154        # minimal info for debugging even if they specify "ignore_warnings"
155        file_handler.setLevel(level)
156        file_handler.setFormatter(logging.Formatter(LOG_FORMAT))
157        logger.addHandler(file_handler)
158
159    if log_limit > 0:
160        for path in list(sorted(glob.glob(f"{log_path_prefix}*.log"), reverse=True))[log_limit:]:
161            os.remove(path)
162
163    if debug:
164        import faulthandler
165        import signal
166
167        enable_debug_mode()
168
169        # Enable threadumps.
170        faulthandler.enable()
171        # Windows doesn't support register so we check for it here
172        if hasattr(faulthandler, "register"):
173            faulthandler.register(signal.SIGUSR1.value)