Edit on GitHub

sqlmesh.core.test.runner

  1from __future__ import annotations
  2
  3import time
  4import threading
  5import typing as t
  6import unittest
  7from io import StringIO
  8
  9import concurrent
 10from concurrent.futures import ThreadPoolExecutor
 11
 12from sqlmesh.core.engine_adapter import EngineAdapter
 13from sqlmesh.core.model import Model
 14from sqlmesh.core.test.definition import ModelTest as ModelTest, generate_test as generate_test
 15from sqlmesh.core.test.discovery import (
 16    ModelTestMetadata as ModelTestMetadata,
 17)
 18from sqlmesh.core.config.connection import BaseDuckDBConnectionConfig
 19from sqlmesh.core.test.result import ModelTextTestResult as ModelTextTestResult
 20from sqlmesh.utils import UniqueKeyDict, Verbosity
 21
 22
 23if t.TYPE_CHECKING:
 24    from sqlmesh.core.config.loader import C
 25
 26
 27class ModelTextTestRunner(unittest.TextTestRunner):
 28    def __init__(
 29        self,
 30        **kwargs: t.Any,
 31    ) -> None:
 32        # StringIO is used to capture the output of the tests since we'll
 33        # run them in parallel and we don't want to mix the output streams
 34        from io import StringIO
 35
 36        super().__init__(
 37            stream=StringIO(),
 38            resultclass=ModelTextTestResult,
 39            **kwargs,
 40        )
 41
 42
 43def create_testing_engine_adapters(
 44    model_test_metadata: list[ModelTestMetadata],
 45    config: C,
 46    selected_gateway: str,
 47    default_catalog: str | None = None,
 48    default_catalog_dialect: str = "",
 49) -> t.Dict[ModelTestMetadata, EngineAdapter]:
 50    testing_adapter_by_gateway: t.Dict[str, EngineAdapter] = {}
 51    metadata_to_adapter = {}
 52
 53    for metadata in model_test_metadata:
 54        gateway = metadata.body.get("gateway") or selected_gateway
 55        test_connection = config.get_test_connection(
 56            gateway, default_catalog, default_catalog_dialect
 57        )
 58
 59        concurrent_tasks = test_connection.concurrent_tasks
 60
 61        is_duckdb_connection = isinstance(test_connection, BaseDuckDBConnectionConfig)
 62        adapter = None
 63        if is_duckdb_connection:
 64            # Ensure DuckDB connections are fully isolated from each other
 65            # by forcing the creation of a new adapter with SingletonConnectionPool
 66            test_connection.concurrent_tasks = 1
 67            adapter = test_connection.create_engine_adapter(register_comments_override=False)
 68            test_connection.concurrent_tasks = concurrent_tasks
 69        elif gateway not in testing_adapter_by_gateway:
 70            # All other engines can share connections between threads
 71            testing_adapter_by_gateway[gateway] = test_connection.create_engine_adapter(
 72                register_comments_override=False
 73            )
 74
 75        metadata_to_adapter[metadata] = adapter or testing_adapter_by_gateway[gateway]
 76
 77    return metadata_to_adapter
 78
 79
 80def run_tests(
 81    model_test_metadata: list[ModelTestMetadata],
 82    models: UniqueKeyDict[str, Model],
 83    config: C,
 84    selected_gateway: str,
 85    dialect: str | None = None,
 86    verbosity: Verbosity = Verbosity.DEFAULT,
 87    preserve_fixtures: bool = False,
 88    stream: t.TextIO | None = None,
 89    default_catalog: str | None = None,
 90    default_catalog_dialect: str = "",
 91) -> ModelTextTestResult:
 92    """Create a test suite of ModelTest objects and run it.
 93
 94    Args:
 95        model_test_metadata: A list of ModelTestMetadata named tuples.
 96        models: All models to use for expansion and mapping of physical locations.
 97        verbosity: The verbosity level.
 98        preserve_fixtures: Preserve the fixture tables in the testing database, useful for debugging.
 99    """
100    default_test_connection = config.get_test_connection(
101        gateway_name=selected_gateway,
102        default_catalog=default_catalog,
103        default_catalog_dialect=default_catalog_dialect,
104    )
105
106    lock = threading.Lock()
107
108    from sqlmesh.core.console import get_console
109
110    combined_results = ModelTextTestResult(
111        stream=unittest.runner._WritelnDecorator(stream or StringIO()),  # type: ignore
112        verbosity=2 if verbosity >= Verbosity.VERBOSE else 1,
113        descriptions=True,
114        console=get_console(),
115    )
116
117    metadata_to_adapter = create_testing_engine_adapters(
118        model_test_metadata=model_test_metadata,
119        config=config,
120        selected_gateway=selected_gateway,
121        default_catalog=default_catalog,
122        default_catalog_dialect=default_catalog_dialect,
123    )
124
125    # Ensure workers are not greater than the number of tests
126    num_workers = min(len(model_test_metadata) or 1, default_test_connection.concurrent_tasks)
127
128    def _run_single_test(
129        metadata: ModelTestMetadata, engine_adapter: EngineAdapter
130    ) -> t.Optional[ModelTextTestResult]:
131        test = ModelTest.create_test(
132            body=metadata.body,
133            test_name=metadata.test_name,
134            models=models,
135            engine_adapter=engine_adapter,
136            dialect=dialect,
137            path=metadata.path,
138            default_catalog=default_catalog,
139            preserve_fixtures=preserve_fixtures,
140            concurrency=num_workers > 1,
141            verbosity=verbosity,
142        )
143
144        if not test:
145            return None
146
147        result = t.cast(
148            ModelTextTestResult,
149            ModelTextTestRunner().run(t.cast(unittest.TestCase, test)),
150        )
151
152        with lock:
153            combined_results.merge(result)
154
155        return result
156
157    test_results = []
158
159    start_time = time.perf_counter()
160    try:
161        with ThreadPoolExecutor(max_workers=num_workers) as pool:
162            futures = [
163                pool.submit(_run_single_test, metadata=metadata, engine_adapter=engine_adapter)
164                for metadata, engine_adapter in metadata_to_adapter.items()
165            ]
166
167            for future in concurrent.futures.as_completed(futures):
168                test_results.append(future.result())
169    finally:
170        for engine_adapter in set(metadata_to_adapter.values()):
171            # The engine adapters list might have duplicates, so we ensure that we close each adapter once
172            if engine_adapter:
173                engine_adapter.close()
174
175    end_time = time.perf_counter()
176
177    combined_results.duration = round(end_time - start_time, 2)
178
179    return combined_results
class ModelTextTestRunner(unittest.runner.TextTestRunner):
28class ModelTextTestRunner(unittest.TextTestRunner):
29    def __init__(
30        self,
31        **kwargs: t.Any,
32    ) -> None:
33        # StringIO is used to capture the output of the tests since we'll
34        # run them in parallel and we don't want to mix the output streams
35        from io import StringIO
36
37        super().__init__(
38            stream=StringIO(),
39            resultclass=ModelTextTestResult,
40            **kwargs,
41        )

A test runner class that displays results in textual form.

It prints out the names of tests as they are run, errors as they occur, and a summary of the results at the end of the test run.

ModelTextTestRunner(**kwargs: Any)
29    def __init__(
30        self,
31        **kwargs: t.Any,
32    ) -> None:
33        # StringIO is used to capture the output of the tests since we'll
34        # run them in parallel and we don't want to mix the output streams
35        from io import StringIO
36
37        super().__init__(
38            stream=StringIO(),
39            resultclass=ModelTextTestResult,
40            **kwargs,
41        )

Construct a TextTestRunner.

Subclasses should accept **kwargs to ensure compatibility as the interface changes.

Inherited Members
unittest.runner.TextTestRunner
resultclass
stream
descriptions
verbosity
failfast
buffer
tb_locals
warnings
run
def create_testing_engine_adapters( model_test_metadata: list[sqlmesh.core.test.discovery.ModelTestMetadata], config: ~C, selected_gateway: str, default_catalog: str | None = None, default_catalog_dialect: str = '') -> Dict[sqlmesh.core.test.discovery.ModelTestMetadata, sqlmesh.core.engine_adapter.base.EngineAdapter]:
44def create_testing_engine_adapters(
45    model_test_metadata: list[ModelTestMetadata],
46    config: C,
47    selected_gateway: str,
48    default_catalog: str | None = None,
49    default_catalog_dialect: str = "",
50) -> t.Dict[ModelTestMetadata, EngineAdapter]:
51    testing_adapter_by_gateway: t.Dict[str, EngineAdapter] = {}
52    metadata_to_adapter = {}
53
54    for metadata in model_test_metadata:
55        gateway = metadata.body.get("gateway") or selected_gateway
56        test_connection = config.get_test_connection(
57            gateway, default_catalog, default_catalog_dialect
58        )
59
60        concurrent_tasks = test_connection.concurrent_tasks
61
62        is_duckdb_connection = isinstance(test_connection, BaseDuckDBConnectionConfig)
63        adapter = None
64        if is_duckdb_connection:
65            # Ensure DuckDB connections are fully isolated from each other
66            # by forcing the creation of a new adapter with SingletonConnectionPool
67            test_connection.concurrent_tasks = 1
68            adapter = test_connection.create_engine_adapter(register_comments_override=False)
69            test_connection.concurrent_tasks = concurrent_tasks
70        elif gateway not in testing_adapter_by_gateway:
71            # All other engines can share connections between threads
72            testing_adapter_by_gateway[gateway] = test_connection.create_engine_adapter(
73                register_comments_override=False
74            )
75
76        metadata_to_adapter[metadata] = adapter or testing_adapter_by_gateway[gateway]
77
78    return metadata_to_adapter
def run_tests( model_test_metadata: list[sqlmesh.core.test.discovery.ModelTestMetadata], models: sqlmesh.utils.UniqueKeyDict[str, typing.Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel]], config: ~C, selected_gateway: str, dialect: str | None = None, verbosity: sqlmesh.utils.Verbosity = <Verbosity.DEFAULT: 0>, preserve_fixtures: bool = False, stream: typing.TextIO | None = None, default_catalog: str | None = None, default_catalog_dialect: str = '') -> sqlmesh.core.test.result.ModelTextTestResult:
 81def run_tests(
 82    model_test_metadata: list[ModelTestMetadata],
 83    models: UniqueKeyDict[str, Model],
 84    config: C,
 85    selected_gateway: str,
 86    dialect: str | None = None,
 87    verbosity: Verbosity = Verbosity.DEFAULT,
 88    preserve_fixtures: bool = False,
 89    stream: t.TextIO | None = None,
 90    default_catalog: str | None = None,
 91    default_catalog_dialect: str = "",
 92) -> ModelTextTestResult:
 93    """Create a test suite of ModelTest objects and run it.
 94
 95    Args:
 96        model_test_metadata: A list of ModelTestMetadata named tuples.
 97        models: All models to use for expansion and mapping of physical locations.
 98        verbosity: The verbosity level.
 99        preserve_fixtures: Preserve the fixture tables in the testing database, useful for debugging.
100    """
101    default_test_connection = config.get_test_connection(
102        gateway_name=selected_gateway,
103        default_catalog=default_catalog,
104        default_catalog_dialect=default_catalog_dialect,
105    )
106
107    lock = threading.Lock()
108
109    from sqlmesh.core.console import get_console
110
111    combined_results = ModelTextTestResult(
112        stream=unittest.runner._WritelnDecorator(stream or StringIO()),  # type: ignore
113        verbosity=2 if verbosity >= Verbosity.VERBOSE else 1,
114        descriptions=True,
115        console=get_console(),
116    )
117
118    metadata_to_adapter = create_testing_engine_adapters(
119        model_test_metadata=model_test_metadata,
120        config=config,
121        selected_gateway=selected_gateway,
122        default_catalog=default_catalog,
123        default_catalog_dialect=default_catalog_dialect,
124    )
125
126    # Ensure workers are not greater than the number of tests
127    num_workers = min(len(model_test_metadata) or 1, default_test_connection.concurrent_tasks)
128
129    def _run_single_test(
130        metadata: ModelTestMetadata, engine_adapter: EngineAdapter
131    ) -> t.Optional[ModelTextTestResult]:
132        test = ModelTest.create_test(
133            body=metadata.body,
134            test_name=metadata.test_name,
135            models=models,
136            engine_adapter=engine_adapter,
137            dialect=dialect,
138            path=metadata.path,
139            default_catalog=default_catalog,
140            preserve_fixtures=preserve_fixtures,
141            concurrency=num_workers > 1,
142            verbosity=verbosity,
143        )
144
145        if not test:
146            return None
147
148        result = t.cast(
149            ModelTextTestResult,
150            ModelTextTestRunner().run(t.cast(unittest.TestCase, test)),
151        )
152
153        with lock:
154            combined_results.merge(result)
155
156        return result
157
158    test_results = []
159
160    start_time = time.perf_counter()
161    try:
162        with ThreadPoolExecutor(max_workers=num_workers) as pool:
163            futures = [
164                pool.submit(_run_single_test, metadata=metadata, engine_adapter=engine_adapter)
165                for metadata, engine_adapter in metadata_to_adapter.items()
166            ]
167
168            for future in concurrent.futures.as_completed(futures):
169                test_results.append(future.result())
170    finally:
171        for engine_adapter in set(metadata_to_adapter.values()):
172            # The engine adapters list might have duplicates, so we ensure that we close each adapter once
173            if engine_adapter:
174                engine_adapter.close()
175
176    end_time = time.perf_counter()
177
178    combined_results.duration = round(end_time - start_time, 2)
179
180    return combined_results

Create a test suite of ModelTest objects and run it.

Arguments:
  • model_test_metadata: A list of ModelTestMetadata named tuples.
  • models: All models to use for expansion and mapping of physical locations.
  • verbosity: The verbosity level.
  • preserve_fixtures: Preserve the fixture tables in the testing database, useful for debugging.