Edit on GitHub

SQLMesh logo

SQLMesh is a data transformation framework that brings the benefits of DevOps to data teams. It enables data scientists, analysts, and engineers to efficiently run and deploy data transformations written in SQL or Python.

For more information, check out the website and documentation.

Geting Started

Install SQLMesh through pypi by running:

pip install sqlmesh

Follow the tutorial to learn how to use SQLMesh.

Join our community

We'd love to join you on your data journey. Connect with us in the following ways:

Contribution

Contributions in the form of issues or pull requests are greatly appreciated. Read more about how to develop for SQLMesh.

  1"""
  2.. include:: ../README.md
  3"""
  4from __future__ import annotations
  5
  6import logging
  7import os
  8import sys
  9import typing as t
 10from enum import Enum
 11
 12from sqlmesh.core.dialect import extend_sqlglot
 13
 14extend_sqlglot()
 15
 16from sqlmesh.core.config import Config
 17from sqlmesh.core.context import Context, ExecutionContext
 18from sqlmesh.core.engine_adapter import EngineAdapter
 19from sqlmesh.core.macros import macro
 20from sqlmesh.core.model import Model, model
 21from sqlmesh.core.snapshot import Snapshot
 22from sqlmesh.utils import debug_mode_enabled
 23
 24try:
 25    from sqlmesh._version import __version__, __version_tuple__  # type: ignore
 26except ImportError:
 27    pass
 28
 29
 30class RuntimeEnv(str, Enum):
 31    """Enum defining what environment SQLMesh is running in."""
 32
 33    TERMINAL = "terminal"
 34    DATABRICKS = "databricks"
 35    GOOGLE_COLAB = "google_colab"  # Not currently officially supported
 36    JUPYTER = "jupyter"
 37
 38    @classmethod
 39    def get(cls) -> RuntimeEnv:
 40        """Get the console class to use based on the environment that the code is running in
 41        Reference implementation: https://github.com/noklam/rich/blob/d3a1ae61a77d934844563514370084971bc3e143/rich/console.py#L511-L528
 42
 43        Unlike the rich implementation we try to split out by notebook type instead of treating it all as Jupyter.
 44        """
 45        try:
 46            shell = get_ipython()  # type: ignore
 47            if os.getenv("DATABRICKS_RUNTIME_VERSION"):
 48                return RuntimeEnv.DATABRICKS
 49            if "google.colab" in str(shell.__class__):  # type: ignore
 50                return RuntimeEnv.GOOGLE_COLAB
 51            if shell.__class__.__name__ == "ZMQInteractiveShell":  # type: ignore
 52                return RuntimeEnv.JUPYTER
 53        except NameError:
 54            pass
 55
 56        return RuntimeEnv.TERMINAL
 57
 58    @property
 59    def is_terminal(self) -> bool:
 60        return self == RuntimeEnv.TERMINAL
 61
 62    @property
 63    def is_databricks(self) -> bool:
 64        return self == RuntimeEnv.DATABRICKS
 65
 66    @property
 67    def is_jupyter(self) -> bool:
 68        return self == RuntimeEnv.JUPYTER
 69
 70    @property
 71    def is_google_colab(self) -> bool:
 72        return self == RuntimeEnv.GOOGLE_COLAB
 73
 74    @property
 75    def is_notebook(self) -> bool:
 76        return not self.is_terminal
 77
 78
 79runtime_env = RuntimeEnv.get()
 80
 81
 82if runtime_env.is_notebook:
 83    try:
 84        from sqlmesh.magics import register_magics
 85
 86        register_magics()
 87    except ImportError:
 88        pass
 89
 90
 91# SO: https://stackoverflow.com/questions/384076/how-can-i-color-python-logging-output
 92class CustomFormatter(logging.Formatter):
 93    """Custom logging formatter."""
 94
 95    grey = "\x1b[38;20m"
 96    yellow = "\x1b[33;20m"
 97    red = "\x1b[31;20m"
 98    bold_red = "\x1b[31;1m"
 99    reset = "\x1b[0m"
100    log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:%(lineno)d)"
101
102    FORMATS = {
103        logging.DEBUG: grey + log_format + reset,
104        logging.INFO: grey + log_format + reset,
105        logging.WARNING: yellow + log_format + reset,
106        logging.ERROR: red + log_format + reset,
107        logging.CRITICAL: bold_red + log_format + reset,
108    }
109
110    def format(self, record: logging.LogRecord) -> str:
111        log_fmt = self.FORMATS.get(record.levelno)
112        formatter = logging.Formatter(log_fmt)
113        return formatter.format(record)
114
115
116def enable_logging(level: t.Optional[int] = None) -> None:
117    """Enable logging to send to stdout and color different levels"""
118    level = level or (logging.DEBUG if debug_mode_enabled() else logging.INFO)
119    logger = logging.getLogger()
120    logger.setLevel(level if not debug_mode_enabled() else logging.DEBUG)
121    if not logger.hasHandlers():
122        handler = logging.StreamHandler(sys.stdout)
123        handler.setLevel(level)
124        handler.setFormatter(CustomFormatter())
125        logger.addHandler(handler)
class RuntimeEnv(builtins.str, enum.Enum):
31class RuntimeEnv(str, Enum):
32    """Enum defining what environment SQLMesh is running in."""
33
34    TERMINAL = "terminal"
35    DATABRICKS = "databricks"
36    GOOGLE_COLAB = "google_colab"  # Not currently officially supported
37    JUPYTER = "jupyter"
38
39    @classmethod
40    def get(cls) -> RuntimeEnv:
41        """Get the console class to use based on the environment that the code is running in
42        Reference implementation: https://github.com/noklam/rich/blob/d3a1ae61a77d934844563514370084971bc3e143/rich/console.py#L511-L528
43
44        Unlike the rich implementation we try to split out by notebook type instead of treating it all as Jupyter.
45        """
46        try:
47            shell = get_ipython()  # type: ignore
48            if os.getenv("DATABRICKS_RUNTIME_VERSION"):
49                return RuntimeEnv.DATABRICKS
50            if "google.colab" in str(shell.__class__):  # type: ignore
51                return RuntimeEnv.GOOGLE_COLAB
52            if shell.__class__.__name__ == "ZMQInteractiveShell":  # type: ignore
53                return RuntimeEnv.JUPYTER
54        except NameError:
55            pass
56
57        return RuntimeEnv.TERMINAL
58
59    @property
60    def is_terminal(self) -> bool:
61        return self == RuntimeEnv.TERMINAL
62
63    @property
64    def is_databricks(self) -> bool:
65        return self == RuntimeEnv.DATABRICKS
66
67    @property
68    def is_jupyter(self) -> bool:
69        return self == RuntimeEnv.JUPYTER
70
71    @property
72    def is_google_colab(self) -> bool:
73        return self == RuntimeEnv.GOOGLE_COLAB
74
75    @property
76    def is_notebook(self) -> bool:
77        return not self.is_terminal

Enum defining what environment SQLMesh is running in.

TERMINAL = <RuntimeEnv.TERMINAL: 'terminal'>
DATABRICKS = <RuntimeEnv.DATABRICKS: 'databricks'>
GOOGLE_COLAB = <RuntimeEnv.GOOGLE_COLAB: 'google_colab'>
JUPYTER = <RuntimeEnv.JUPYTER: 'jupyter'>
@classmethod
def get(cls) -> sqlmesh.RuntimeEnv:
39    @classmethod
40    def get(cls) -> RuntimeEnv:
41        """Get the console class to use based on the environment that the code is running in
42        Reference implementation: https://github.com/noklam/rich/blob/d3a1ae61a77d934844563514370084971bc3e143/rich/console.py#L511-L528
43
44        Unlike the rich implementation we try to split out by notebook type instead of treating it all as Jupyter.
45        """
46        try:
47            shell = get_ipython()  # type: ignore
48            if os.getenv("DATABRICKS_RUNTIME_VERSION"):
49                return RuntimeEnv.DATABRICKS
50            if "google.colab" in str(shell.__class__):  # type: ignore
51                return RuntimeEnv.GOOGLE_COLAB
52            if shell.__class__.__name__ == "ZMQInteractiveShell":  # type: ignore
53                return RuntimeEnv.JUPYTER
54        except NameError:
55            pass
56
57        return RuntimeEnv.TERMINAL

Get the console class to use based on the environment that the code is running in Reference implementation: https://github.com/noklam/rich/blob/d3a1ae61a77d934844563514370084971bc3e143/rich/console.py#L511-L528

Unlike the rich implementation we try to split out by notebook type instead of treating it all as Jupyter.

Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
runtime_env = <RuntimeEnv.TERMINAL: 'terminal'>
class CustomFormatter(logging.Formatter):
 93class CustomFormatter(logging.Formatter):
 94    """Custom logging formatter."""
 95
 96    grey = "\x1b[38;20m"
 97    yellow = "\x1b[33;20m"
 98    red = "\x1b[31;20m"
 99    bold_red = "\x1b[31;1m"
100    reset = "\x1b[0m"
101    log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:%(lineno)d)"
102
103    FORMATS = {
104        logging.DEBUG: grey + log_format + reset,
105        logging.INFO: grey + log_format + reset,
106        logging.WARNING: yellow + log_format + reset,
107        logging.ERROR: red + log_format + reset,
108        logging.CRITICAL: bold_red + log_format + reset,
109    }
110
111    def format(self, record: logging.LogRecord) -> str:
112        log_fmt = self.FORMATS.get(record.levelno)
113        formatter = logging.Formatter(log_fmt)
114        return formatter.format(record)

Custom logging formatter.

def format(self, record: logging.LogRecord) -> str:
111    def format(self, record: logging.LogRecord) -> str:
112        log_fmt = self.FORMATS.get(record.levelno)
113        formatter = logging.Formatter(log_fmt)
114        return formatter.format(record)

Format the specified record as text.

The record's attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

Inherited Members
logging.Formatter
Formatter
converter
formatTime
formatException
usesTime
formatMessage
formatStack
def enable_logging(level: Union[int, NoneType] = None) -> None:
117def enable_logging(level: t.Optional[int] = None) -> None:
118    """Enable logging to send to stdout and color different levels"""
119    level = level or (logging.DEBUG if debug_mode_enabled() else logging.INFO)
120    logger = logging.getLogger()
121    logger.setLevel(level if not debug_mode_enabled() else logging.DEBUG)
122    if not logger.hasHandlers():
123        handler = logging.StreamHandler(sys.stdout)
124        handler.setLevel(level)
125        handler.setFormatter(CustomFormatter())
126        logger.addHandler(handler)

Enable logging to send to stdout and color different levels