sqlmesh.engines.commands
1import typing as t 2from enum import Enum 3 4from sqlmesh.core.environment import Environment, EnvironmentNamingInfo 5from sqlmesh.core.snapshot import ( 6 DeployabilityIndex, 7 Snapshot, 8 SnapshotEvaluator, 9 SnapshotId, 10 SnapshotTableCleanupTask, 11 SnapshotTableInfo, 12) 13from sqlmesh.core.state_sync import cleanup_expired_views 14from sqlmesh.utils.date import TimeLike 15from sqlmesh.utils.pydantic import PydanticModel 16 17COMMAND_PAYLOAD_FILE_NAME = "payload.json" 18 19 20class CommandType(str, Enum): 21 EVALUATE = "evaluate" 22 PROMOTE = "promote" 23 DEMOTE = "demote" 24 CLEANUP = "cleanup" 25 CREATE_TABLES = "create_tables" 26 MIGRATE_TABLES = "migrate_tables" 27 28 # This makes it easy to integrate with argparse 29 def __str__(self) -> str: 30 return self.value 31 32 33class EvaluateCommandPayload(PydanticModel): 34 snapshot: Snapshot 35 parent_snapshots: t.Dict[str, Snapshot] 36 start: TimeLike 37 end: TimeLike 38 execution_time: TimeLike 39 deployability_index: DeployabilityIndex 40 41 42class PromoteCommandPayload(PydanticModel): 43 snapshots: t.List[Snapshot] 44 environment_naming_info: EnvironmentNamingInfo 45 deployability_index: DeployabilityIndex 46 47 48class DemoteCommandPayload(PydanticModel): 49 snapshots: t.List[SnapshotTableInfo] 50 environment_naming_info: EnvironmentNamingInfo 51 52 53class CleanupCommandPayload(PydanticModel): 54 environments: t.List[Environment] 55 tasks: t.List[SnapshotTableCleanupTask] 56 57 58class CreateTablesCommandPayload(PydanticModel): 59 target_snapshot_ids: t.List[SnapshotId] 60 snapshots: t.List[Snapshot] 61 deployability_index: DeployabilityIndex 62 63 64class MigrateTablesCommandPayload(PydanticModel): 65 target_snapshot_ids: t.List[SnapshotId] 66 snapshots: t.List[Snapshot] 67 68 69def evaluate( 70 evaluator: SnapshotEvaluator, command_payload: t.Union[str, EvaluateCommandPayload] 71) -> None: 72 if isinstance(command_payload, str): 73 command_payload = EvaluateCommandPayload.parse_raw(command_payload) 74 75 parent_snapshots = command_payload.parent_snapshots 76 parent_snapshots[command_payload.snapshot.name] = command_payload.snapshot 77 78 wap_id = evaluator.evaluate( 79 command_payload.snapshot, 80 start=command_payload.start, 81 end=command_payload.end, 82 execution_time=command_payload.execution_time, 83 snapshots=parent_snapshots, 84 deployability_index=command_payload.deployability_index, 85 ) 86 evaluator.audit( 87 snapshot=command_payload.snapshot, 88 start=command_payload.start, 89 end=command_payload.end, 90 execution_time=command_payload.execution_time, 91 snapshots=parent_snapshots, 92 deployability_index=command_payload.deployability_index, 93 wap_id=wap_id, 94 ) 95 96 97def promote( 98 evaluator: SnapshotEvaluator, command_payload: t.Union[str, PromoteCommandPayload] 99) -> None: 100 if isinstance(command_payload, str): 101 command_payload = PromoteCommandPayload.parse_raw(command_payload) 102 evaluator.promote( 103 command_payload.snapshots, 104 command_payload.environment_naming_info, 105 deployability_index=command_payload.deployability_index, 106 ) 107 108 109def demote( 110 evaluator: SnapshotEvaluator, command_payload: t.Union[str, DemoteCommandPayload] 111) -> None: 112 if isinstance(command_payload, str): 113 command_payload = DemoteCommandPayload.parse_raw(command_payload) 114 evaluator.demote( 115 command_payload.snapshots, 116 command_payload.environment_naming_info, 117 ) 118 119 120def cleanup( 121 evaluator: SnapshotEvaluator, command_payload: t.Union[str, CleanupCommandPayload] 122) -> None: 123 if isinstance(command_payload, str): 124 command_payload = CleanupCommandPayload.parse_raw(command_payload) 125 126 cleanup_expired_views(evaluator.adapter, command_payload.environments) 127 evaluator.cleanup(command_payload.tasks) 128 129 130def create_tables( 131 evaluator: SnapshotEvaluator, 132 command_payload: t.Union[str, CreateTablesCommandPayload], 133) -> None: 134 if isinstance(command_payload, str): 135 command_payload = CreateTablesCommandPayload.parse_raw(command_payload) 136 137 snapshots_by_id = {s.snapshot_id: s for s in command_payload.snapshots} 138 target_snapshots = [snapshots_by_id[sid] for sid in command_payload.target_snapshot_ids] 139 evaluator.create( 140 target_snapshots, 141 snapshots_by_id, 142 deployability_index=command_payload.deployability_index, 143 ) 144 145 146def migrate_tables( 147 evaluator: SnapshotEvaluator, 148 command_payload: t.Union[str, MigrateTablesCommandPayload], 149) -> None: 150 if isinstance(command_payload, str): 151 command_payload = MigrateTablesCommandPayload.parse_raw(command_payload) 152 snapshots_by_id = {s.snapshot_id: s for s in command_payload.snapshots} 153 target_snapshots = [snapshots_by_id[sid] for sid in command_payload.target_snapshot_ids] 154 evaluator.migrate(target_snapshots, snapshots_by_id) 155 156 157COMMAND_HANDLERS: t.Dict[CommandType, t.Callable[[SnapshotEvaluator, str], None]] = { 158 CommandType.EVALUATE: evaluate, 159 CommandType.PROMOTE: promote, 160 CommandType.DEMOTE: demote, 161 CommandType.CLEANUP: cleanup, 162 CommandType.CREATE_TABLES: create_tables, 163 CommandType.MIGRATE_TABLES: migrate_tables, 164}
class
CommandType(builtins.str, enum.Enum):
21class CommandType(str, Enum): 22 EVALUATE = "evaluate" 23 PROMOTE = "promote" 24 DEMOTE = "demote" 25 CLEANUP = "cleanup" 26 CREATE_TABLES = "create_tables" 27 MIGRATE_TABLES = "migrate_tables" 28 29 # This makes it easy to integrate with argparse 30 def __str__(self) -> str: 31 return self.value
An enumeration.
EVALUATE =
<CommandType.EVALUATE: 'evaluate'>
PROMOTE =
<CommandType.PROMOTE: 'promote'>
DEMOTE =
<CommandType.DEMOTE: 'demote'>
CLEANUP =
<CommandType.CLEANUP: 'cleanup'>
CREATE_TABLES =
<CommandType.CREATE_TABLES: 'create_tables'>
MIGRATE_TABLES =
<CommandType.MIGRATE_TABLES: 'migrate_tables'>
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
34class EvaluateCommandPayload(PydanticModel): 35 snapshot: Snapshot 36 parent_snapshots: t.Dict[str, Snapshot] 37 start: TimeLike 38 end: TimeLike 39 execution_time: TimeLike 40 deployability_index: DeployabilityIndex
Usage docs: https://docs.pydantic.dev/2.7/concepts/models/
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of classvars defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The signature for instantiating the model.
- __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
- __pydantic_custom_init__: Whether the model has a custom
__init__
function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__
andModel.__root_validators__
from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a
RootModel
. - __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
- __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
- __pydantic_extra__: An instance attribute with the values of extra fields from validation when
model_config['extra'] == 'allow'
. - __pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
- __pydantic_private__: Instance attribute with the values of private attributes set on the model instance.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
43class PromoteCommandPayload(PydanticModel): 44 snapshots: t.List[Snapshot] 45 environment_naming_info: EnvironmentNamingInfo 46 deployability_index: DeployabilityIndex
Usage docs: https://docs.pydantic.dev/2.7/concepts/models/
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of classvars defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The signature for instantiating the model.
- __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
- __pydantic_custom_init__: Whether the model has a custom
__init__
function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__
andModel.__root_validators__
from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a
RootModel
. - __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
- __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
- __pydantic_extra__: An instance attribute with the values of extra fields from validation when
model_config['extra'] == 'allow'
. - __pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
- __pydantic_private__: Instance attribute with the values of private attributes set on the model instance.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
49class DemoteCommandPayload(PydanticModel): 50 snapshots: t.List[SnapshotTableInfo] 51 environment_naming_info: EnvironmentNamingInfo
Usage docs: https://docs.pydantic.dev/2.7/concepts/models/
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of classvars defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The signature for instantiating the model.
- __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
- __pydantic_custom_init__: Whether the model has a custom
__init__
function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__
andModel.__root_validators__
from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a
RootModel
. - __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
- __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
- __pydantic_extra__: An instance attribute with the values of extra fields from validation when
model_config['extra'] == 'allow'
. - __pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
- __pydantic_private__: Instance attribute with the values of private attributes set on the model instance.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
54class CleanupCommandPayload(PydanticModel): 55 environments: t.List[Environment] 56 tasks: t.List[SnapshotTableCleanupTask]
Usage docs: https://docs.pydantic.dev/2.7/concepts/models/
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of classvars defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The signature for instantiating the model.
- __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
- __pydantic_custom_init__: Whether the model has a custom
__init__
function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__
andModel.__root_validators__
from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a
RootModel
. - __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
- __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
- __pydantic_extra__: An instance attribute with the values of extra fields from validation when
model_config['extra'] == 'allow'
. - __pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
- __pydantic_private__: Instance attribute with the values of private attributes set on the model instance.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
59class CreateTablesCommandPayload(PydanticModel): 60 target_snapshot_ids: t.List[SnapshotId] 61 snapshots: t.List[Snapshot] 62 deployability_index: DeployabilityIndex
Usage docs: https://docs.pydantic.dev/2.7/concepts/models/
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of classvars defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The signature for instantiating the model.
- __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
- __pydantic_custom_init__: Whether the model has a custom
__init__
function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__
andModel.__root_validators__
from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a
RootModel
. - __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
- __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
- __pydantic_extra__: An instance attribute with the values of extra fields from validation when
model_config['extra'] == 'allow'
. - __pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
- __pydantic_private__: Instance attribute with the values of private attributes set on the model instance.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
65class MigrateTablesCommandPayload(PydanticModel): 66 target_snapshot_ids: t.List[SnapshotId] 67 snapshots: t.List[Snapshot]
Usage docs: https://docs.pydantic.dev/2.7/concepts/models/
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of classvars defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The signature for instantiating the model.
- __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
- __pydantic_custom_init__: Whether the model has a custom
__init__
function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__
andModel.__root_validators__
from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a
RootModel
. - __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
- __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
- __pydantic_extra__: An instance attribute with the values of extra fields from validation when
model_config['extra'] == 'allow'
. - __pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
- __pydantic_private__: Instance attribute with the values of private attributes set on the model instance.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
def
evaluate( evaluator: sqlmesh.core.snapshot.evaluator.SnapshotEvaluator, command_payload: Union[str, sqlmesh.engines.commands.EvaluateCommandPayload]) -> None:
70def evaluate( 71 evaluator: SnapshotEvaluator, command_payload: t.Union[str, EvaluateCommandPayload] 72) -> None: 73 if isinstance(command_payload, str): 74 command_payload = EvaluateCommandPayload.parse_raw(command_payload) 75 76 parent_snapshots = command_payload.parent_snapshots 77 parent_snapshots[command_payload.snapshot.name] = command_payload.snapshot 78 79 wap_id = evaluator.evaluate( 80 command_payload.snapshot, 81 start=command_payload.start, 82 end=command_payload.end, 83 execution_time=command_payload.execution_time, 84 snapshots=parent_snapshots, 85 deployability_index=command_payload.deployability_index, 86 ) 87 evaluator.audit( 88 snapshot=command_payload.snapshot, 89 start=command_payload.start, 90 end=command_payload.end, 91 execution_time=command_payload.execution_time, 92 snapshots=parent_snapshots, 93 deployability_index=command_payload.deployability_index, 94 wap_id=wap_id, 95 )
def
promote( evaluator: sqlmesh.core.snapshot.evaluator.SnapshotEvaluator, command_payload: Union[str, sqlmesh.engines.commands.PromoteCommandPayload]) -> None:
98def promote( 99 evaluator: SnapshotEvaluator, command_payload: t.Union[str, PromoteCommandPayload] 100) -> None: 101 if isinstance(command_payload, str): 102 command_payload = PromoteCommandPayload.parse_raw(command_payload) 103 evaluator.promote( 104 command_payload.snapshots, 105 command_payload.environment_naming_info, 106 deployability_index=command_payload.deployability_index, 107 )
def
demote( evaluator: sqlmesh.core.snapshot.evaluator.SnapshotEvaluator, command_payload: Union[str, sqlmesh.engines.commands.DemoteCommandPayload]) -> None:
110def demote( 111 evaluator: SnapshotEvaluator, command_payload: t.Union[str, DemoteCommandPayload] 112) -> None: 113 if isinstance(command_payload, str): 114 command_payload = DemoteCommandPayload.parse_raw(command_payload) 115 evaluator.demote( 116 command_payload.snapshots, 117 command_payload.environment_naming_info, 118 )
def
cleanup( evaluator: sqlmesh.core.snapshot.evaluator.SnapshotEvaluator, command_payload: Union[str, sqlmesh.engines.commands.CleanupCommandPayload]) -> None:
121def cleanup( 122 evaluator: SnapshotEvaluator, command_payload: t.Union[str, CleanupCommandPayload] 123) -> None: 124 if isinstance(command_payload, str): 125 command_payload = CleanupCommandPayload.parse_raw(command_payload) 126 127 cleanup_expired_views(evaluator.adapter, command_payload.environments) 128 evaluator.cleanup(command_payload.tasks)
def
create_tables( evaluator: sqlmesh.core.snapshot.evaluator.SnapshotEvaluator, command_payload: Union[str, sqlmesh.engines.commands.CreateTablesCommandPayload]) -> None:
131def create_tables( 132 evaluator: SnapshotEvaluator, 133 command_payload: t.Union[str, CreateTablesCommandPayload], 134) -> None: 135 if isinstance(command_payload, str): 136 command_payload = CreateTablesCommandPayload.parse_raw(command_payload) 137 138 snapshots_by_id = {s.snapshot_id: s for s in command_payload.snapshots} 139 target_snapshots = [snapshots_by_id[sid] for sid in command_payload.target_snapshot_ids] 140 evaluator.create( 141 target_snapshots, 142 snapshots_by_id, 143 deployability_index=command_payload.deployability_index, 144 )
def
migrate_tables( evaluator: sqlmesh.core.snapshot.evaluator.SnapshotEvaluator, command_payload: Union[str, sqlmesh.engines.commands.MigrateTablesCommandPayload]) -> None:
147def migrate_tables( 148 evaluator: SnapshotEvaluator, 149 command_payload: t.Union[str, MigrateTablesCommandPayload], 150) -> None: 151 if isinstance(command_payload, str): 152 command_payload = MigrateTablesCommandPayload.parse_raw(command_payload) 153 snapshots_by_id = {s.snapshot_id: s for s in command_payload.snapshots} 154 target_snapshots = [snapshots_by_id[sid] for sid in command_payload.target_snapshot_ids] 155 evaluator.migrate(target_snapshots, snapshots_by_id)