Edit on GitHub

sqlmesh.core.config.base

  1from __future__ import annotations
  2
  3import typing as t
  4from enum import Enum, auto
  5
  6from sqlmesh.utils.errors import ConfigError
  7from sqlmesh.utils.pydantic import PydanticModel
  8
  9T = t.TypeVar("T", bound="BaseConfig")
 10
 11
 12class UpdateStrategy(Enum):
 13    """Supported strategies for adding new config to existing config"""
 14
 15    REPLACE = auto()  # Replace with new value
 16    EXTEND = auto()  # Extend list to existing list
 17    KEY_UPDATE = auto()  # Update dict key value with new dict key value
 18    KEY_EXTEND = auto()  # Extend dict key value to existing dict key value
 19    IMMUTABLE = auto()  # Raise if a key tries to change this value
 20    NESTED_UPDATE = auto()  # Recursively updates the nested config
 21
 22
 23def update_field(
 24    old: t.Optional[t.Any],
 25    new: t.Any,
 26    update_strategy: t.Optional[UpdateStrategy] = None,
 27) -> t.Any:
 28    """
 29    Update config field with new config value
 30
 31    Args:
 32        old: The existing config value
 33        new: The new config value
 34        update_strategy: The strategy to use when updating the field
 35
 36
 37        The updated field
 38    """
 39
 40    def _update_pydantic_config(old: BaseConfig, new: BaseConfig) -> PydanticModel:
 41        if type(new) != type(old):
 42            raise ConfigError(
 43                "NESTED_UPDATE behavior requires both values to have the same type. "
 44                f"{type(old)} and {type(new)} were given instead."
 45            )
 46        return old.update_with(new)
 47
 48    if not old:
 49        return new
 50
 51    update_strategy = update_strategy or UpdateStrategy.REPLACE
 52
 53    if update_strategy == UpdateStrategy.IMMUTABLE:
 54        raise ConfigError(f"Cannot modify property: {old}.")
 55
 56    if update_strategy == UpdateStrategy.REPLACE:
 57        return new
 58    if update_strategy == UpdateStrategy.EXTEND:
 59        if not isinstance(old, list) or not isinstance(new, list):
 60            raise ConfigError("EXTEND behavior requires list field.")
 61
 62        return old + new
 63    if update_strategy == UpdateStrategy.KEY_UPDATE:
 64        if not isinstance(old, dict) or not isinstance(new, dict):
 65            raise ConfigError("KEY_UPDATE behavior requires dictionary field.")
 66
 67        combined = old.copy()
 68        combined.update(new)
 69        return combined
 70    if update_strategy == UpdateStrategy.KEY_EXTEND:
 71        if not isinstance(old, dict) or not isinstance(new, dict):
 72            raise ConfigError("KEY_EXTEND behavior requires dictionary field.")
 73
 74        combined = old.copy()
 75        for key, value in new.items():
 76            if not isinstance(value, list):
 77                raise ConfigError("KEY_EXTEND behavior requires list values in dictionary.")
 78
 79            old_value = combined.get(key)
 80            if old_value:
 81                if not isinstance(old_value, list):
 82                    raise ConfigError("KEY_EXTEND behavior requires list values in dictionary.")
 83
 84                combined[key] = old_value + value
 85            else:
 86                combined[key] = value
 87
 88        return combined
 89    if update_strategy == UpdateStrategy.NESTED_UPDATE:
 90        if not isinstance(old, BaseConfig) and not isinstance(old, dict):
 91            raise ConfigError(
 92                f"NESTED_UPDATE behavior requires a config object and a dict of config objects as values. {type(old)} was given instead."
 93            )
 94
 95        if isinstance(old, dict):
 96            for k, pydantic_model in new.items():
 97                if k in old:
 98                    old[k] = _update_pydantic_config(old[k], pydantic_model)
 99                else:
100                    old[k] = pydantic_model
101
102            return old
103        return _update_pydantic_config(old, new)
104
105    raise ConfigError(f"Unknown update strategy {update_strategy}.")
106
107
108class BaseConfig(PydanticModel):
109    """Base configuration functionality for configuration classes."""
110
111    _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = {}
112
113    def update_with(self: T, other: t.Union[t.Dict[str, t.Any], T]) -> T:
114        """Updates this instance's fields with the passed in config fields and returns a new instance.
115
116        Args:
117            other: Other configuration.
118
119        Returns:
120            New instance updated with the passed in config fields
121        """
122        if isinstance(other, dict):
123            other = self.__class__(**other)
124
125        updated_fields = {}
126
127        for field in other.fields_set:
128            if field in self.all_field_infos():
129                updated_fields[field] = update_field(
130                    getattr(self, field),
131                    getattr(other, field),
132                    self._FIELD_UPDATE_STRATEGY.get(field),
133                )
134            else:
135                updated_fields[field] = getattr(other, field)
136
137        # Assign each field to trigger assignment validators
138        updated = self.copy()
139        for field, value in updated_fields.items():
140            setattr(updated, field, value)
141
142        return updated
class UpdateStrategy(enum.Enum):
13class UpdateStrategy(Enum):
14    """Supported strategies for adding new config to existing config"""
15
16    REPLACE = auto()  # Replace with new value
17    EXTEND = auto()  # Extend list to existing list
18    KEY_UPDATE = auto()  # Update dict key value with new dict key value
19    KEY_EXTEND = auto()  # Extend dict key value to existing dict key value
20    IMMUTABLE = auto()  # Raise if a key tries to change this value
21    NESTED_UPDATE = auto()  # Recursively updates the nested config

Supported strategies for adding new config to existing config

REPLACE = <UpdateStrategy.REPLACE: 1>
EXTEND = <UpdateStrategy.EXTEND: 2>
KEY_UPDATE = <UpdateStrategy.KEY_UPDATE: 3>
KEY_EXTEND = <UpdateStrategy.KEY_EXTEND: 4>
IMMUTABLE = <UpdateStrategy.IMMUTABLE: 5>
NESTED_UPDATE = <UpdateStrategy.NESTED_UPDATE: 6>
Inherited Members
enum.Enum
name
value
def update_field( old: Optional[Any], new: Any, update_strategy: Optional[UpdateStrategy] = None) -> Any:
 24def update_field(
 25    old: t.Optional[t.Any],
 26    new: t.Any,
 27    update_strategy: t.Optional[UpdateStrategy] = None,
 28) -> t.Any:
 29    """
 30    Update config field with new config value
 31
 32    Args:
 33        old: The existing config value
 34        new: The new config value
 35        update_strategy: The strategy to use when updating the field
 36
 37
 38        The updated field
 39    """
 40
 41    def _update_pydantic_config(old: BaseConfig, new: BaseConfig) -> PydanticModel:
 42        if type(new) != type(old):
 43            raise ConfigError(
 44                "NESTED_UPDATE behavior requires both values to have the same type. "
 45                f"{type(old)} and {type(new)} were given instead."
 46            )
 47        return old.update_with(new)
 48
 49    if not old:
 50        return new
 51
 52    update_strategy = update_strategy or UpdateStrategy.REPLACE
 53
 54    if update_strategy == UpdateStrategy.IMMUTABLE:
 55        raise ConfigError(f"Cannot modify property: {old}.")
 56
 57    if update_strategy == UpdateStrategy.REPLACE:
 58        return new
 59    if update_strategy == UpdateStrategy.EXTEND:
 60        if not isinstance(old, list) or not isinstance(new, list):
 61            raise ConfigError("EXTEND behavior requires list field.")
 62
 63        return old + new
 64    if update_strategy == UpdateStrategy.KEY_UPDATE:
 65        if not isinstance(old, dict) or not isinstance(new, dict):
 66            raise ConfigError("KEY_UPDATE behavior requires dictionary field.")
 67
 68        combined = old.copy()
 69        combined.update(new)
 70        return combined
 71    if update_strategy == UpdateStrategy.KEY_EXTEND:
 72        if not isinstance(old, dict) or not isinstance(new, dict):
 73            raise ConfigError("KEY_EXTEND behavior requires dictionary field.")
 74
 75        combined = old.copy()
 76        for key, value in new.items():
 77            if not isinstance(value, list):
 78                raise ConfigError("KEY_EXTEND behavior requires list values in dictionary.")
 79
 80            old_value = combined.get(key)
 81            if old_value:
 82                if not isinstance(old_value, list):
 83                    raise ConfigError("KEY_EXTEND behavior requires list values in dictionary.")
 84
 85                combined[key] = old_value + value
 86            else:
 87                combined[key] = value
 88
 89        return combined
 90    if update_strategy == UpdateStrategy.NESTED_UPDATE:
 91        if not isinstance(old, BaseConfig) and not isinstance(old, dict):
 92            raise ConfigError(
 93                f"NESTED_UPDATE behavior requires a config object and a dict of config objects as values. {type(old)} was given instead."
 94            )
 95
 96        if isinstance(old, dict):
 97            for k, pydantic_model in new.items():
 98                if k in old:
 99                    old[k] = _update_pydantic_config(old[k], pydantic_model)
100                else:
101                    old[k] = pydantic_model
102
103            return old
104        return _update_pydantic_config(old, new)
105
106    raise ConfigError(f"Unknown update strategy {update_strategy}.")

Update config field with new config value

Arguments:
  • old: The existing config value
  • new: The new config value
  • update_strategy: The strategy to use when updating the field
  • The updated field
class BaseConfig(sqlmesh.utils.pydantic.PydanticModel):
109class BaseConfig(PydanticModel):
110    """Base configuration functionality for configuration classes."""
111
112    _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = {}
113
114    def update_with(self: T, other: t.Union[t.Dict[str, t.Any], T]) -> T:
115        """Updates this instance's fields with the passed in config fields and returns a new instance.
116
117        Args:
118            other: Other configuration.
119
120        Returns:
121            New instance updated with the passed in config fields
122        """
123        if isinstance(other, dict):
124            other = self.__class__(**other)
125
126        updated_fields = {}
127
128        for field in other.fields_set:
129            if field in self.all_field_infos():
130                updated_fields[field] = update_field(
131                    getattr(self, field),
132                    getattr(other, field),
133                    self._FIELD_UPDATE_STRATEGY.get(field),
134                )
135            else:
136                updated_fields[field] = getattr(other, field)
137
138        # Assign each field to trigger assignment validators
139        updated = self.copy()
140        for field, value in updated_fields.items():
141            setattr(updated, field, value)
142
143        return updated

Base configuration functionality for configuration classes.

def update_with(self: ~T, other: Union[Dict[str, Any], ~T]) -> ~T:
114    def update_with(self: T, other: t.Union[t.Dict[str, t.Any], T]) -> T:
115        """Updates this instance's fields with the passed in config fields and returns a new instance.
116
117        Args:
118            other: Other configuration.
119
120        Returns:
121            New instance updated with the passed in config fields
122        """
123        if isinstance(other, dict):
124            other = self.__class__(**other)
125
126        updated_fields = {}
127
128        for field in other.fields_set:
129            if field in self.all_field_infos():
130                updated_fields[field] = update_field(
131                    getattr(self, field),
132                    getattr(other, field),
133                    self._FIELD_UPDATE_STRATEGY.get(field),
134                )
135            else:
136                updated_fields[field] = getattr(other, field)
137
138        # Assign each field to trigger assignment validators
139        updated = self.copy()
140        for field, value in updated_fields.items():
141            setattr(updated, field, value)
142
143        return updated

Updates this instance's fields with the passed in config fields and returns a new instance.

Arguments:
  • other: Other configuration.
Returns:

New instance updated with the passed in config fields

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields