sqlmesh.core.test.runner
1from __future__ import annotations 2 3import time 4import threading 5import typing as t 6import unittest 7from io import StringIO 8 9import concurrent 10from concurrent.futures import ThreadPoolExecutor 11 12from sqlmesh.core.engine_adapter import EngineAdapter 13from sqlmesh.core.model import Model 14from sqlmesh.core.test.definition import ModelTest as ModelTest, generate_test as generate_test 15from sqlmesh.core.test.discovery import ( 16 ModelTestMetadata as ModelTestMetadata, 17) 18from sqlmesh.core.config.connection import BaseDuckDBConnectionConfig 19from sqlmesh.core.test.result import ModelTextTestResult as ModelTextTestResult 20from sqlmesh.utils import UniqueKeyDict, Verbosity 21 22 23if t.TYPE_CHECKING: 24 from sqlmesh.core.config.loader import C 25 26 27class ModelTextTestRunner(unittest.TextTestRunner): 28 def __init__( 29 self, 30 **kwargs: t.Any, 31 ) -> None: 32 # StringIO is used to capture the output of the tests since we'll 33 # run them in parallel and we don't want to mix the output streams 34 from io import StringIO 35 36 super().__init__( 37 stream=StringIO(), 38 resultclass=ModelTextTestResult, 39 **kwargs, 40 ) 41 42 43def create_testing_engine_adapters( 44 model_test_metadata: list[ModelTestMetadata], 45 config: C, 46 selected_gateway: str, 47 default_catalog: str | None = None, 48 default_catalog_dialect: str = "", 49) -> t.Dict[ModelTestMetadata, EngineAdapter]: 50 testing_adapter_by_gateway: t.Dict[str, EngineAdapter] = {} 51 metadata_to_adapter = {} 52 53 for metadata in model_test_metadata: 54 gateway = metadata.body.get("gateway") or selected_gateway 55 test_connection = config.get_test_connection( 56 gateway, default_catalog, default_catalog_dialect 57 ) 58 59 concurrent_tasks = test_connection.concurrent_tasks 60 61 is_duckdb_connection = isinstance(test_connection, BaseDuckDBConnectionConfig) 62 adapter = None 63 if is_duckdb_connection: 64 # Ensure DuckDB connections are fully isolated from each other 65 # by forcing the creation of a new adapter with SingletonConnectionPool 66 test_connection.concurrent_tasks = 1 67 adapter = test_connection.create_engine_adapter(register_comments_override=False) 68 test_connection.concurrent_tasks = concurrent_tasks 69 elif gateway not in testing_adapter_by_gateway: 70 # All other engines can share connections between threads 71 testing_adapter_by_gateway[gateway] = test_connection.create_engine_adapter( 72 register_comments_override=False 73 ) 74 75 metadata_to_adapter[metadata] = adapter or testing_adapter_by_gateway[gateway] 76 77 return metadata_to_adapter 78 79 80def run_tests( 81 model_test_metadata: list[ModelTestMetadata], 82 models: UniqueKeyDict[str, Model], 83 config: C, 84 selected_gateway: str, 85 dialect: str | None = None, 86 verbosity: Verbosity = Verbosity.DEFAULT, 87 preserve_fixtures: bool = False, 88 stream: t.TextIO | None = None, 89 default_catalog: str | None = None, 90 default_catalog_dialect: str = "", 91) -> ModelTextTestResult: 92 """Create a test suite of ModelTest objects and run it. 93 94 Args: 95 model_test_metadata: A list of ModelTestMetadata named tuples. 96 models: All models to use for expansion and mapping of physical locations. 97 verbosity: The verbosity level. 98 preserve_fixtures: Preserve the fixture tables in the testing database, useful for debugging. 99 """ 100 default_test_connection = config.get_test_connection( 101 gateway_name=selected_gateway, 102 default_catalog=default_catalog, 103 default_catalog_dialect=default_catalog_dialect, 104 ) 105 106 lock = threading.Lock() 107 108 from sqlmesh.core.console import get_console 109 110 combined_results = ModelTextTestResult( 111 stream=unittest.runner._WritelnDecorator(stream or StringIO()), # type: ignore 112 verbosity=2 if verbosity >= Verbosity.VERBOSE else 1, 113 descriptions=True, 114 console=get_console(), 115 ) 116 117 metadata_to_adapter = create_testing_engine_adapters( 118 model_test_metadata=model_test_metadata, 119 config=config, 120 selected_gateway=selected_gateway, 121 default_catalog=default_catalog, 122 default_catalog_dialect=default_catalog_dialect, 123 ) 124 125 # Ensure workers are not greater than the number of tests 126 num_workers = min(len(model_test_metadata) or 1, default_test_connection.concurrent_tasks) 127 128 def _run_single_test( 129 metadata: ModelTestMetadata, engine_adapter: EngineAdapter 130 ) -> t.Optional[ModelTextTestResult]: 131 test = ModelTest.create_test( 132 body=metadata.body, 133 test_name=metadata.test_name, 134 models=models, 135 engine_adapter=engine_adapter, 136 dialect=dialect, 137 path=metadata.path, 138 default_catalog=default_catalog, 139 preserve_fixtures=preserve_fixtures, 140 concurrency=num_workers > 1, 141 verbosity=verbosity, 142 ) 143 144 if not test: 145 return None 146 147 result = t.cast( 148 ModelTextTestResult, 149 ModelTextTestRunner().run(t.cast(unittest.TestCase, test)), 150 ) 151 152 with lock: 153 combined_results.merge(result) 154 155 return result 156 157 test_results = [] 158 159 start_time = time.perf_counter() 160 try: 161 with ThreadPoolExecutor(max_workers=num_workers) as pool: 162 futures = [ 163 pool.submit(_run_single_test, metadata=metadata, engine_adapter=engine_adapter) 164 for metadata, engine_adapter in metadata_to_adapter.items() 165 ] 166 167 for future in concurrent.futures.as_completed(futures): 168 test_results.append(future.result()) 169 finally: 170 for engine_adapter in set(metadata_to_adapter.values()): 171 # The engine adapters list might have duplicates, so we ensure that we close each adapter once 172 if engine_adapter: 173 engine_adapter.close() 174 175 end_time = time.perf_counter() 176 177 combined_results.duration = round(end_time - start_time, 2) 178 179 return combined_results
class
ModelTextTestRunner(unittest.runner.TextTestRunner):
28class ModelTextTestRunner(unittest.TextTestRunner): 29 def __init__( 30 self, 31 **kwargs: t.Any, 32 ) -> None: 33 # StringIO is used to capture the output of the tests since we'll 34 # run them in parallel and we don't want to mix the output streams 35 from io import StringIO 36 37 super().__init__( 38 stream=StringIO(), 39 resultclass=ModelTextTestResult, 40 **kwargs, 41 )
A test runner class that displays results in textual form.
It prints out the names of tests as they are run, errors as they occur, and a summary of the results at the end of the test run.
ModelTextTestRunner(**kwargs: Any)
29 def __init__( 30 self, 31 **kwargs: t.Any, 32 ) -> None: 33 # StringIO is used to capture the output of the tests since we'll 34 # run them in parallel and we don't want to mix the output streams 35 from io import StringIO 36 37 super().__init__( 38 stream=StringIO(), 39 resultclass=ModelTextTestResult, 40 **kwargs, 41 )
Construct a TextTestRunner.
Subclasses should accept **kwargs to ensure compatibility as the interface changes.
Inherited Members
- unittest.runner.TextTestRunner
- resultclass
- stream
- descriptions
- verbosity
- failfast
- buffer
- tb_locals
- warnings
- run
def
create_testing_engine_adapters( model_test_metadata: list[sqlmesh.core.test.discovery.ModelTestMetadata], config: ~C, selected_gateway: str, default_catalog: str | None = None, default_catalog_dialect: str = '') -> Dict[sqlmesh.core.test.discovery.ModelTestMetadata, sqlmesh.core.engine_adapter.base.EngineAdapter]:
44def create_testing_engine_adapters( 45 model_test_metadata: list[ModelTestMetadata], 46 config: C, 47 selected_gateway: str, 48 default_catalog: str | None = None, 49 default_catalog_dialect: str = "", 50) -> t.Dict[ModelTestMetadata, EngineAdapter]: 51 testing_adapter_by_gateway: t.Dict[str, EngineAdapter] = {} 52 metadata_to_adapter = {} 53 54 for metadata in model_test_metadata: 55 gateway = metadata.body.get("gateway") or selected_gateway 56 test_connection = config.get_test_connection( 57 gateway, default_catalog, default_catalog_dialect 58 ) 59 60 concurrent_tasks = test_connection.concurrent_tasks 61 62 is_duckdb_connection = isinstance(test_connection, BaseDuckDBConnectionConfig) 63 adapter = None 64 if is_duckdb_connection: 65 # Ensure DuckDB connections are fully isolated from each other 66 # by forcing the creation of a new adapter with SingletonConnectionPool 67 test_connection.concurrent_tasks = 1 68 adapter = test_connection.create_engine_adapter(register_comments_override=False) 69 test_connection.concurrent_tasks = concurrent_tasks 70 elif gateway not in testing_adapter_by_gateway: 71 # All other engines can share connections between threads 72 testing_adapter_by_gateway[gateway] = test_connection.create_engine_adapter( 73 register_comments_override=False 74 ) 75 76 metadata_to_adapter[metadata] = adapter or testing_adapter_by_gateway[gateway] 77 78 return metadata_to_adapter
def
run_tests( model_test_metadata: list[sqlmesh.core.test.discovery.ModelTestMetadata], models: sqlmesh.utils.UniqueKeyDict[str, typing.Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel]], config: ~C, selected_gateway: str, dialect: str | None = None, verbosity: sqlmesh.utils.Verbosity = <Verbosity.DEFAULT: 0>, preserve_fixtures: bool = False, stream: typing.TextIO | None = None, default_catalog: str | None = None, default_catalog_dialect: str = '') -> sqlmesh.core.test.result.ModelTextTestResult:
81def run_tests( 82 model_test_metadata: list[ModelTestMetadata], 83 models: UniqueKeyDict[str, Model], 84 config: C, 85 selected_gateway: str, 86 dialect: str | None = None, 87 verbosity: Verbosity = Verbosity.DEFAULT, 88 preserve_fixtures: bool = False, 89 stream: t.TextIO | None = None, 90 default_catalog: str | None = None, 91 default_catalog_dialect: str = "", 92) -> ModelTextTestResult: 93 """Create a test suite of ModelTest objects and run it. 94 95 Args: 96 model_test_metadata: A list of ModelTestMetadata named tuples. 97 models: All models to use for expansion and mapping of physical locations. 98 verbosity: The verbosity level. 99 preserve_fixtures: Preserve the fixture tables in the testing database, useful for debugging. 100 """ 101 default_test_connection = config.get_test_connection( 102 gateway_name=selected_gateway, 103 default_catalog=default_catalog, 104 default_catalog_dialect=default_catalog_dialect, 105 ) 106 107 lock = threading.Lock() 108 109 from sqlmesh.core.console import get_console 110 111 combined_results = ModelTextTestResult( 112 stream=unittest.runner._WritelnDecorator(stream or StringIO()), # type: ignore 113 verbosity=2 if verbosity >= Verbosity.VERBOSE else 1, 114 descriptions=True, 115 console=get_console(), 116 ) 117 118 metadata_to_adapter = create_testing_engine_adapters( 119 model_test_metadata=model_test_metadata, 120 config=config, 121 selected_gateway=selected_gateway, 122 default_catalog=default_catalog, 123 default_catalog_dialect=default_catalog_dialect, 124 ) 125 126 # Ensure workers are not greater than the number of tests 127 num_workers = min(len(model_test_metadata) or 1, default_test_connection.concurrent_tasks) 128 129 def _run_single_test( 130 metadata: ModelTestMetadata, engine_adapter: EngineAdapter 131 ) -> t.Optional[ModelTextTestResult]: 132 test = ModelTest.create_test( 133 body=metadata.body, 134 test_name=metadata.test_name, 135 models=models, 136 engine_adapter=engine_adapter, 137 dialect=dialect, 138 path=metadata.path, 139 default_catalog=default_catalog, 140 preserve_fixtures=preserve_fixtures, 141 concurrency=num_workers > 1, 142 verbosity=verbosity, 143 ) 144 145 if not test: 146 return None 147 148 result = t.cast( 149 ModelTextTestResult, 150 ModelTextTestRunner().run(t.cast(unittest.TestCase, test)), 151 ) 152 153 with lock: 154 combined_results.merge(result) 155 156 return result 157 158 test_results = [] 159 160 start_time = time.perf_counter() 161 try: 162 with ThreadPoolExecutor(max_workers=num_workers) as pool: 163 futures = [ 164 pool.submit(_run_single_test, metadata=metadata, engine_adapter=engine_adapter) 165 for metadata, engine_adapter in metadata_to_adapter.items() 166 ] 167 168 for future in concurrent.futures.as_completed(futures): 169 test_results.append(future.result()) 170 finally: 171 for engine_adapter in set(metadata_to_adapter.values()): 172 # The engine adapters list might have duplicates, so we ensure that we close each adapter once 173 if engine_adapter: 174 engine_adapter.close() 175 176 end_time = time.perf_counter() 177 178 combined_results.duration = round(end_time - start_time, 2) 179 180 return combined_results
Create a test suite of ModelTest objects and run it.
Arguments:
- model_test_metadata: A list of ModelTestMetadata named tuples.
- models: All models to use for expansion and mapping of physical locations.
- verbosity: The verbosity level.
- preserve_fixtures: Preserve the fixture tables in the testing database, useful for debugging.