Edit on GitHub

sqlmesh.utils

  1from __future__ import annotations
  2
  3import copy
  4import importlib
  5import logging
  6import os
  7import random
  8import re
  9import string
 10import sys
 11import time
 12import traceback
 13import types
 14import typing as t
 15import uuid
 16from collections import defaultdict
 17from contextlib import contextmanager
 18from copy import deepcopy
 19from functools import lru_cache, reduce, wraps
 20from pathlib import Path
 21
 22from sqlglot import exp
 23from sqlglot.dialects.dialect import Dialects
 24
 25logger = logging.getLogger(__name__)
 26
 27T = t.TypeVar("T")
 28KEY = t.TypeVar("KEY", bound=t.Hashable)
 29VALUE = t.TypeVar("VALUE")
 30ITEM = t.TypeVar("ITEM")
 31GROUP = t.TypeVar("GROUP")
 32DECORATOR_RETURN_TYPE = t.TypeVar("DECORATOR_RETURN_TYPE")
 33
 34ALPHANUMERIC = string.ascii_lowercase + string.digits
 35
 36
 37def optional_import(name: str) -> t.Optional[types.ModuleType]:
 38    """Optionally import a module.
 39
 40    Args:
 41        name: The name of the module to import.
 42    Returns:
 43        The module if it is installed.
 44    """
 45    try:
 46        module = importlib.import_module(name)
 47    except ImportError:
 48        return None
 49    return module
 50
 51
 52def major_minor(version: str) -> t.Tuple[int, int]:
 53    """Returns a tuple of just the major.minor for a version string (major.minor.patch)."""
 54    return t.cast(t.Tuple[int, int], tuple(int(part) for part in version.split(".")[0:2]))
 55
 56
 57def unique(iterable: t.Iterable[T], by: t.Callable[[T], t.Any] = lambda i: i) -> t.List[T]:
 58    return list({by(i): None for i in iterable})
 59
 60
 61def random_id(short: bool = False) -> str:
 62    if short:
 63        return "".join(random.choices(ALPHANUMERIC, k=8))
 64
 65    return uuid.uuid4().hex
 66
 67
 68class UniqueKeyDict(dict, t.Mapping[KEY, VALUE]):
 69    """Dict that raises when a duplicate key is set."""
 70
 71    def __init__(self, name: str, *args: t.Dict[KEY, VALUE], **kwargs: VALUE) -> None:
 72        self.name = name
 73        super().__init__(*args, **kwargs)
 74
 75    def __setitem__(self, k: KEY, v: VALUE) -> None:
 76        if k in self:
 77            raise ValueError(
 78                f"Duplicate key '{k}' found in UniqueKeyDict<{self.name}>. Call dict.update(...) if this is intentional."
 79            )
 80        super().__setitem__(k, v)
 81
 82
 83class AttributeDict(dict, t.Mapping[KEY, VALUE]):
 84    __getattr__ = dict.get
 85
 86    def set(self, field: str, value: t.Any) -> str:
 87        self[field] = value
 88        # Return an empty string, so that this method can be used within Jinja
 89        return ""
 90
 91    def __deepcopy__(self, memo: t.Dict[t.Any, AttributeDict]) -> AttributeDict:
 92        copy: AttributeDict = AttributeDict()
 93        memo[id(self)] = copy
 94        for k, v in self.items():
 95            copy[k] = deepcopy(v, memo)
 96        return copy
 97
 98    def __call__(self, **kwargs: t.Dict[str, t.Any]) -> str:
 99        self.update(**kwargs)
100        # Return an empty string, so that this method can be used within Jinja
101        return ""
102
103
104class registry_decorator:
105    """A decorator that registers itself."""
106
107    registry_name = ""
108    _registry: t.Optional[UniqueKeyDict] = None
109
110    @classmethod
111    def registry(cls) -> UniqueKeyDict:
112        if cls._registry is None:
113            cls._registry = UniqueKeyDict(cls.registry_name)
114        return cls._registry
115
116    def __init__(self, name: str = "") -> None:
117        self.name = name
118
119    def __call__(
120        self, func: t.Callable[..., DECORATOR_RETURN_TYPE]
121    ) -> t.Callable[..., DECORATOR_RETURN_TYPE]:
122        self.func = func
123
124        registry = self.registry()
125        func_name = (self.name or func.__name__).lower()
126
127        try:
128            registry[func_name] = self
129        except ValueError:
130            # No need to raise due to duplicate key if the functions are identical
131            if func.__code__.co_code != registry[func_name].func.__code__.co_code:
132                raise
133
134        @wraps(func)
135        def wrapper(*args: t.Any, **kwargs: t.Any) -> DECORATOR_RETURN_TYPE:
136            return func(*args, **kwargs)
137
138        return wrapper
139
140    @classmethod
141    def get_registry(cls) -> UniqueKeyDict:
142        """Get a copy of the registry"""
143        return UniqueKeyDict(cls.registry_name, **(cls._registry or {}))
144
145    @classmethod
146    def set_registry(cls, registry: UniqueKeyDict) -> None:
147        """Set the registry."""
148        cls._registry = registry
149
150
151@contextmanager
152def sys_path(*paths: Path) -> t.Iterator[None]:
153    """A context manager to temporarily add a path to 'sys.path'."""
154    inserted = set()
155
156    for path in paths:
157        path_str = str(path.absolute())
158
159        if path_str not in sys.path:
160            sys.path.insert(0, path_str)
161            inserted.add(path_str)
162
163    try:
164        yield
165    finally:
166        for path_str in inserted:
167            sys.path.remove(path_str)
168
169
170def format_exception(exception: BaseException) -> t.List[str]:
171    if sys.version_info < (3, 10):
172        return traceback.format_exception(
173            type(exception), exception, exception.__traceback__
174        )  # type: ignore
175    else:
176        return traceback.format_exception(exception)  # type: ignore
177
178
179def word_characters_only(s: str, replacement_char: str = "_") -> str:
180    """
181    Replace all non-word characters in string with the replacement character.
182    Reference SO: https://stackoverflow.com/questions/1276764/stripping-everything-but-alphanumeric-chars-from-a-string-in-python/70310018#70310018
183
184    >>> word_characters_only("Hello, world!")
185    'Hello__world_'
186    >>> word_characters_only("Hello, world! 123", '')
187    'Helloworld123'
188    """
189    return re.sub(r"\W", replacement_char, s)
190
191
192def str_to_bool(s: t.Optional[str]) -> bool:
193    """
194    Convert a string to a boolean. disutils is being deprecated and it is recommended to implement your own version:
195    https://peps.python.org/pep-0632/
196
197    Unlike disutils, this actually returns a bool and never raises. If a value cannot be determined to be true
198    then false is returned.
199    """
200    if not s:
201        return False
202    return s.lower() in ("true", "1", "t", "y", "yes", "on")
203
204
205_debug_mode_enabled: bool = False
206
207
208def enable_debug_mode() -> None:
209    global _debug_mode_enabled
210    _debug_mode_enabled = True
211
212
213def debug_mode_enabled() -> bool:
214    return _debug_mode_enabled or str_to_bool(os.environ.get("SQLMESH_DEBUG"))
215
216
217def ttl_cache(ttl: int = 60, maxsize: int = 128000) -> t.Callable:
218    """Caches a function that clears whenever the current epoch / ttl seconds changes.
219
220    TTL is not exact, it is used as a salt. So by default, at every minute mark, the cache will be cleared.
221    This is done for simplicity.
222
223    Args:
224        ttl: The number of seconds to hold the cache for.
225        maxsize: The maximum size of the cache.
226    """
227
228    def decorator(func: t.Callable) -> t.Any:
229        @lru_cache(maxsize=maxsize)
230        def cache(tick: int, *args: t.Any, **kwargs: t.Any) -> t.Any:
231            return func(*args, **kwargs)
232
233        @wraps(func)
234        def wrap(*args: t.Any, **kwargs: t.Any) -> t.Any:
235            return cache(int(time.time() / ttl), *args, **kwargs)
236
237        return wrap
238
239    return decorator
240
241
242class classproperty(property):
243    """
244    Similar to a normal property but works for class methods
245    """
246
247    def __get__(self, obj: t.Any, owner: t.Any = None) -> t.Any:
248        return classmethod(self.fget).__get__(None, owner)()  # type: ignore
249
250
251@contextmanager
252def env_vars(environ: dict[str, str]) -> t.Iterator[None]:
253    """A context manager to temporarily modify environment variables."""
254    old_environ = os.environ.copy()
255    os.environ.update(environ)
256
257    try:
258        yield
259    finally:
260        os.environ.clear()
261        os.environ.update(old_environ)
262
263
264def merge_dicts(*args: t.Dict) -> t.Dict:
265    """
266    Merges dicts. Just does key collision replacement
267    """
268
269    def merge(a: t.Dict, b: t.Dict) -> t.Dict:
270        for b_key, b_value in b.items():
271            a_value = a.get(b_key)
272            if isinstance(a_value, dict) and isinstance(b_value, dict):
273                merge(a_value, b_value)
274            elif isinstance(b_value, dict):
275                a[b_key] = copy.deepcopy(b_value)
276            else:
277                a[b_key] = b_value
278        return a
279
280    return reduce(merge, args, {})
281
282
283def sqlglot_dialects() -> str:
284    return "'" + "', '".join(Dialects.__members__.values()) + "'"
285
286
287NON_ALNUM = re.compile(r"[^a-zA-Z0-9_]")
288
289
290def sanitize_name(name: str) -> str:
291    return NON_ALNUM.sub("_", name)
292
293
294def groupby(
295    items: t.Iterable[ITEM],
296    func: t.Callable[[ITEM], GROUP],
297) -> t.DefaultDict[GROUP, t.List[ITEM]]:
298    grouped = defaultdict(list)
299    for item in items:
300        grouped[func(item)].append(item)
301    return grouped
302
303
304def columns_to_types_to_struct(
305    columns_to_types: t.Union[t.Dict[str, exp.DataType], t.Dict[str, str]]
306) -> exp.DataType:
307    """
308    Converts a dict of column names to types to a struct.
309    """
310    return exp.DataType(
311        this=exp.DataType.Type.STRUCT,
312        expressions=[
313            exp.ColumnDef(this=exp.to_identifier(k), kind=v) for k, v in columns_to_types.items()
314        ],
315        nested=True,
316    )
317
318
319def type_is_known(d_type: t.Union[exp.DataType, exp.ColumnDef]) -> bool:
320    """Checks that a given column type is known and not NULL."""
321    if isinstance(d_type, exp.ColumnDef):
322        if not d_type.kind:
323            return False
324        d_type = d_type.kind
325    if isinstance(d_type, exp.DataTypeParam):
326        return True
327    if d_type.is_type(exp.DataType.Type.UNKNOWN, exp.DataType.Type.NULL):
328        return False
329    if d_type.expressions:
330        return all(type_is_known(expression) for expression in d_type.expressions)
331    return True
332
333
334def columns_to_types_all_known(columns_to_types: t.Dict[str, exp.DataType]) -> bool:
335    """Checks that all column types are known and not NULL."""
336    return all(type_is_known(expression) for expression in columns_to_types.values())
def optional_import(name: str) -> Union[module, NoneType]:
38def optional_import(name: str) -> t.Optional[types.ModuleType]:
39    """Optionally import a module.
40
41    Args:
42        name: The name of the module to import.
43    Returns:
44        The module if it is installed.
45    """
46    try:
47        module = importlib.import_module(name)
48    except ImportError:
49        return None
50    return module

Optionally import a module.

Arguments:
  • name: The name of the module to import.
Returns:

The module if it is installed.

def major_minor(version: str) -> Tuple[int, int]:
53def major_minor(version: str) -> t.Tuple[int, int]:
54    """Returns a tuple of just the major.minor for a version string (major.minor.patch)."""
55    return t.cast(t.Tuple[int, int], tuple(int(part) for part in version.split(".")[0:2]))

Returns a tuple of just the major.minor for a version string (major.minor.patch).

def unique( iterable: Iterable[~T], by: Callable[[~T], Any] = <function <lambda>>) -> List[~T]:
58def unique(iterable: t.Iterable[T], by: t.Callable[[T], t.Any] = lambda i: i) -> t.List[T]:
59    return list({by(i): None for i in iterable})
def random_id(short: bool = False) -> str:
62def random_id(short: bool = False) -> str:
63    if short:
64        return "".join(random.choices(ALPHANUMERIC, k=8))
65
66    return uuid.uuid4().hex
class UniqueKeyDict(builtins.dict, typing.Mapping[~KEY, ~VALUE]):
69class UniqueKeyDict(dict, t.Mapping[KEY, VALUE]):
70    """Dict that raises when a duplicate key is set."""
71
72    def __init__(self, name: str, *args: t.Dict[KEY, VALUE], **kwargs: VALUE) -> None:
73        self.name = name
74        super().__init__(*args, **kwargs)
75
76    def __setitem__(self, k: KEY, v: VALUE) -> None:
77        if k in self:
78            raise ValueError(
79                f"Duplicate key '{k}' found in UniqueKeyDict<{self.name}>. Call dict.update(...) if this is intentional."
80            )
81        super().__setitem__(k, v)

Dict that raises when a duplicate key is set.

Inherited Members
builtins.dict
get
setdefault
pop
popitem
keys
items
values
update
fromkeys
clear
copy
class AttributeDict(builtins.dict, typing.Mapping[~KEY, ~VALUE]):
 84class AttributeDict(dict, t.Mapping[KEY, VALUE]):
 85    __getattr__ = dict.get
 86
 87    def set(self, field: str, value: t.Any) -> str:
 88        self[field] = value
 89        # Return an empty string, so that this method can be used within Jinja
 90        return ""
 91
 92    def __deepcopy__(self, memo: t.Dict[t.Any, AttributeDict]) -> AttributeDict:
 93        copy: AttributeDict = AttributeDict()
 94        memo[id(self)] = copy
 95        for k, v in self.items():
 96            copy[k] = deepcopy(v, memo)
 97        return copy
 98
 99    def __call__(self, **kwargs: t.Dict[str, t.Any]) -> str:
100        self.update(**kwargs)
101        # Return an empty string, so that this method can be used within Jinja
102        return ""
def set(self, field: str, value: Any) -> str:
87    def set(self, field: str, value: t.Any) -> str:
88        self[field] = value
89        # Return an empty string, so that this method can be used within Jinja
90        return ""
Inherited Members
builtins.dict
get
setdefault
pop
popitem
keys
items
values
update
fromkeys
clear
copy
class registry_decorator:
105class registry_decorator:
106    """A decorator that registers itself."""
107
108    registry_name = ""
109    _registry: t.Optional[UniqueKeyDict] = None
110
111    @classmethod
112    def registry(cls) -> UniqueKeyDict:
113        if cls._registry is None:
114            cls._registry = UniqueKeyDict(cls.registry_name)
115        return cls._registry
116
117    def __init__(self, name: str = "") -> None:
118        self.name = name
119
120    def __call__(
121        self, func: t.Callable[..., DECORATOR_RETURN_TYPE]
122    ) -> t.Callable[..., DECORATOR_RETURN_TYPE]:
123        self.func = func
124
125        registry = self.registry()
126        func_name = (self.name or func.__name__).lower()
127
128        try:
129            registry[func_name] = self
130        except ValueError:
131            # No need to raise due to duplicate key if the functions are identical
132            if func.__code__.co_code != registry[func_name].func.__code__.co_code:
133                raise
134
135        @wraps(func)
136        def wrapper(*args: t.Any, **kwargs: t.Any) -> DECORATOR_RETURN_TYPE:
137            return func(*args, **kwargs)
138
139        return wrapper
140
141    @classmethod
142    def get_registry(cls) -> UniqueKeyDict:
143        """Get a copy of the registry"""
144        return UniqueKeyDict(cls.registry_name, **(cls._registry or {}))
145
146    @classmethod
147    def set_registry(cls, registry: UniqueKeyDict) -> None:
148        """Set the registry."""
149        cls._registry = registry

A decorator that registers itself.

registry_decorator(name: str = '')
117    def __init__(self, name: str = "") -> None:
118        self.name = name
@classmethod
def registry(cls) -> sqlmesh.utils.UniqueKeyDict:
111    @classmethod
112    def registry(cls) -> UniqueKeyDict:
113        if cls._registry is None:
114            cls._registry = UniqueKeyDict(cls.registry_name)
115        return cls._registry
@classmethod
def get_registry(cls) -> sqlmesh.utils.UniqueKeyDict:
141    @classmethod
142    def get_registry(cls) -> UniqueKeyDict:
143        """Get a copy of the registry"""
144        return UniqueKeyDict(cls.registry_name, **(cls._registry or {}))

Get a copy of the registry

@classmethod
def set_registry(cls, registry: sqlmesh.utils.UniqueKeyDict) -> None:
146    @classmethod
147    def set_registry(cls, registry: UniqueKeyDict) -> None:
148        """Set the registry."""
149        cls._registry = registry

Set the registry.

@contextmanager
def sys_path(*paths: pathlib.Path) -> Iterator[NoneType]:
152@contextmanager
153def sys_path(*paths: Path) -> t.Iterator[None]:
154    """A context manager to temporarily add a path to 'sys.path'."""
155    inserted = set()
156
157    for path in paths:
158        path_str = str(path.absolute())
159
160        if path_str not in sys.path:
161            sys.path.insert(0, path_str)
162            inserted.add(path_str)
163
164    try:
165        yield
166    finally:
167        for path_str in inserted:
168            sys.path.remove(path_str)

A context manager to temporarily add a path to 'sys.path'.

def format_exception(exception: BaseException) -> List[str]:
171def format_exception(exception: BaseException) -> t.List[str]:
172    if sys.version_info < (3, 10):
173        return traceback.format_exception(
174            type(exception), exception, exception.__traceback__
175        )  # type: ignore
176    else:
177        return traceback.format_exception(exception)  # type: ignore
def word_characters_only(s: str, replacement_char: str = '_') -> str:
180def word_characters_only(s: str, replacement_char: str = "_") -> str:
181    """
182    Replace all non-word characters in string with the replacement character.
183    Reference SO: https://stackoverflow.com/questions/1276764/stripping-everything-but-alphanumeric-chars-from-a-string-in-python/70310018#70310018
184
185    >>> word_characters_only("Hello, world!")
186    'Hello__world_'
187    >>> word_characters_only("Hello, world! 123", '')
188    'Helloworld123'
189    """
190    return re.sub(r"\W", replacement_char, s)

Replace all non-word characters in string with the replacement character. Reference SO: https://stackoverflow.com/questions/1276764/stripping-everything-but-alphanumeric-chars-from-a-string-in-python/70310018#70310018

>>> word_characters_only("Hello, world!")
'Hello__world_'
>>> word_characters_only("Hello, world! 123", '')
'Helloworld123'
def str_to_bool(s: Union[str, NoneType]) -> bool:
193def str_to_bool(s: t.Optional[str]) -> bool:
194    """
195    Convert a string to a boolean. disutils is being deprecated and it is recommended to implement your own version:
196    https://peps.python.org/pep-0632/
197
198    Unlike disutils, this actually returns a bool and never raises. If a value cannot be determined to be true
199    then false is returned.
200    """
201    if not s:
202        return False
203    return s.lower() in ("true", "1", "t", "y", "yes", "on")

Convert a string to a boolean. disutils is being deprecated and it is recommended to implement your own version: https://peps.python.org/pep-0632/

Unlike disutils, this actually returns a bool and never raises. If a value cannot be determined to be true then false is returned.

def enable_debug_mode() -> None:
209def enable_debug_mode() -> None:
210    global _debug_mode_enabled
211    _debug_mode_enabled = True
def debug_mode_enabled() -> bool:
214def debug_mode_enabled() -> bool:
215    return _debug_mode_enabled or str_to_bool(os.environ.get("SQLMESH_DEBUG"))
def ttl_cache(ttl: int = 60, maxsize: int = 128000) -> Callable:
218def ttl_cache(ttl: int = 60, maxsize: int = 128000) -> t.Callable:
219    """Caches a function that clears whenever the current epoch / ttl seconds changes.
220
221    TTL is not exact, it is used as a salt. So by default, at every minute mark, the cache will be cleared.
222    This is done for simplicity.
223
224    Args:
225        ttl: The number of seconds to hold the cache for.
226        maxsize: The maximum size of the cache.
227    """
228
229    def decorator(func: t.Callable) -> t.Any:
230        @lru_cache(maxsize=maxsize)
231        def cache(tick: int, *args: t.Any, **kwargs: t.Any) -> t.Any:
232            return func(*args, **kwargs)
233
234        @wraps(func)
235        def wrap(*args: t.Any, **kwargs: t.Any) -> t.Any:
236            return cache(int(time.time() / ttl), *args, **kwargs)
237
238        return wrap
239
240    return decorator

Caches a function that clears whenever the current epoch / ttl seconds changes.

TTL is not exact, it is used as a salt. So by default, at every minute mark, the cache will be cleared. This is done for simplicity.

Arguments:
  • ttl: The number of seconds to hold the cache for.
  • maxsize: The maximum size of the cache.
class classproperty(builtins.property):
243class classproperty(property):
244    """
245    Similar to a normal property but works for class methods
246    """
247
248    def __get__(self, obj: t.Any, owner: t.Any = None) -> t.Any:
249        return classmethod(self.fget).__get__(None, owner)()  # type: ignore

Similar to a normal property but works for class methods

Inherited Members
builtins.property
property
getter
setter
deleter
@contextmanager
def env_vars(environ: 'dict[str, str]') -> Iterator[NoneType]:
252@contextmanager
253def env_vars(environ: dict[str, str]) -> t.Iterator[None]:
254    """A context manager to temporarily modify environment variables."""
255    old_environ = os.environ.copy()
256    os.environ.update(environ)
257
258    try:
259        yield
260    finally:
261        os.environ.clear()
262        os.environ.update(old_environ)

A context manager to temporarily modify environment variables.

def merge_dicts(*args: Dict) -> Dict:
265def merge_dicts(*args: t.Dict) -> t.Dict:
266    """
267    Merges dicts. Just does key collision replacement
268    """
269
270    def merge(a: t.Dict, b: t.Dict) -> t.Dict:
271        for b_key, b_value in b.items():
272            a_value = a.get(b_key)
273            if isinstance(a_value, dict) and isinstance(b_value, dict):
274                merge(a_value, b_value)
275            elif isinstance(b_value, dict):
276                a[b_key] = copy.deepcopy(b_value)
277            else:
278                a[b_key] = b_value
279        return a
280
281    return reduce(merge, args, {})

Merges dicts. Just does key collision replacement

def sqlglot_dialects() -> str:
284def sqlglot_dialects() -> str:
285    return "'" + "', '".join(Dialects.__members__.values()) + "'"
def sanitize_name(name: str) -> str:
291def sanitize_name(name: str) -> str:
292    return NON_ALNUM.sub("_", name)
def groupby( items: Iterable[~ITEM], func: Callable[[~ITEM], ~GROUP]) -> DefaultDict[~GROUP, List[~ITEM]]:
295def groupby(
296    items: t.Iterable[ITEM],
297    func: t.Callable[[ITEM], GROUP],
298) -> t.DefaultDict[GROUP, t.List[ITEM]]:
299    grouped = defaultdict(list)
300    for item in items:
301        grouped[func(item)].append(item)
302    return grouped
def columns_to_types_to_struct( columns_to_types: Union[Dict[str, sqlglot.expressions.DataType], Dict[str, str]]) -> sqlglot.expressions.DataType:
305def columns_to_types_to_struct(
306    columns_to_types: t.Union[t.Dict[str, exp.DataType], t.Dict[str, str]]
307) -> exp.DataType:
308    """
309    Converts a dict of column names to types to a struct.
310    """
311    return exp.DataType(
312        this=exp.DataType.Type.STRUCT,
313        expressions=[
314            exp.ColumnDef(this=exp.to_identifier(k), kind=v) for k, v in columns_to_types.items()
315        ],
316        nested=True,
317    )

Converts a dict of column names to types to a struct.

def type_is_known( d_type: Union[sqlglot.expressions.DataType, sqlglot.expressions.ColumnDef]) -> bool:
320def type_is_known(d_type: t.Union[exp.DataType, exp.ColumnDef]) -> bool:
321    """Checks that a given column type is known and not NULL."""
322    if isinstance(d_type, exp.ColumnDef):
323        if not d_type.kind:
324            return False
325        d_type = d_type.kind
326    if isinstance(d_type, exp.DataTypeParam):
327        return True
328    if d_type.is_type(exp.DataType.Type.UNKNOWN, exp.DataType.Type.NULL):
329        return False
330    if d_type.expressions:
331        return all(type_is_known(expression) for expression in d_type.expressions)
332    return True

Checks that a given column type is known and not NULL.

def columns_to_types_all_known(columns_to_types: Dict[str, sqlglot.expressions.DataType]) -> bool:
335def columns_to_types_all_known(columns_to_types: t.Dict[str, exp.DataType]) -> bool:
336    """Checks that all column types are known and not NULL."""
337    return all(type_is_known(expression) for expression in columns_to_types.values())

Checks that all column types are known and not NULL.