Edit on GitHub

sqlmesh.utils.connection_pool

  1import abc
  2import logging
  3import typing as t
  4from collections import defaultdict
  5from threading import Lock, get_ident
  6
  7logger = logging.getLogger(__name__)
  8
  9
 10class ConnectionPool(abc.ABC):
 11    @abc.abstractmethod
 12    def get_cursor(self) -> t.Any:
 13        """Returns cached cursor instance.
 14
 15        Automatically creates a new instance if one is not available.
 16
 17        Returns:
 18            A cursor instance.
 19        """
 20
 21    @abc.abstractmethod
 22    def get(self) -> t.Any:
 23        """Returns cached connection instance.
 24
 25        Automatically opens a new connection if one is not available.
 26
 27        Returns:
 28            A connection instance.
 29        """
 30
 31    @abc.abstractmethod
 32    def get_attribute(self, key: str) -> t.Optional[t.Any]:
 33        """Returns an attribute associated with the connection.
 34
 35        Args:
 36            key: Attribute key.
 37
 38        Returns:
 39            Attribute value or None if not found.
 40        """
 41
 42    @abc.abstractmethod
 43    def set_attribute(self, key: str, value: t.Any) -> None:
 44        """Sets an attribute associated with the connection.
 45
 46        Args:
 47            key: Attribute key.
 48            value: Attribute value.
 49        """
 50
 51    @abc.abstractmethod
 52    def begin(self) -> None:
 53        """Starts a new transaction."""
 54
 55    @abc.abstractmethod
 56    def commit(self) -> None:
 57        """Commits the current transaction."""
 58
 59    @abc.abstractmethod
 60    def rollback(self) -> None:
 61        """Rolls back the current transaction."""
 62
 63    @property
 64    @abc.abstractmethod
 65    def is_transaction_active(self) -> bool:
 66        """Returns True if there is an active transaction and False otherwise."""
 67
 68    @abc.abstractmethod
 69    def close_cursor(self) -> None:
 70        """Closes the current cursor instance if exists."""
 71
 72    @abc.abstractmethod
 73    def close(self) -> None:
 74        """Closes the current connection instance if exists.
 75
 76        Note: if there is a cursor instance available it will be closed as well.
 77        """
 78
 79    @abc.abstractmethod
 80    def close_all(self, exclude_calling_thread: bool = False) -> None:
 81        """Closes all cached cursors and connections.
 82
 83        Args:
 84            exclude_calling_thread: If set to True excludes cursors and connections associated
 85                with the calling thread.
 86        """
 87
 88
 89class _TransactionManagementMixin(ConnectionPool):
 90    def _do_begin(self) -> None:
 91        cursor = self.get_cursor()
 92        if hasattr(cursor, "begin"):
 93            cursor.begin()
 94        else:
 95            conn = self.get()
 96            if hasattr(conn, "begin"):
 97                conn.begin()
 98
 99    def _do_commit(self) -> None:
100        cursor = self.get_cursor()
101        if hasattr(cursor, "commit"):
102            cursor.commit()
103        else:
104            self.get().commit()
105
106    def _do_rollback(self) -> None:
107        cursor = self.get_cursor()
108        if hasattr(cursor, "rollback"):
109            cursor.rollback()
110        else:
111            self.get().rollback()
112
113
114class ThreadLocalConnectionPool(_TransactionManagementMixin):
115    def __init__(
116        self,
117        connection_factory: t.Callable[[], t.Any],
118        cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
119        cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
120    ):
121        self._connection_factory = connection_factory
122        self._thread_connections: t.Dict[t.Hashable, t.Any] = {}
123        self._thread_cursors: t.Dict[t.Hashable, t.Any] = {}
124        self._thread_transactions: t.Set[t.Hashable] = set()
125        self._thread_attributes: t.Dict[t.Hashable, t.Dict[str, t.Any]] = defaultdict(dict)
126        self._thread_connections_lock = Lock()
127        self._thread_cursors_lock = Lock()
128        self._thread_transactions_lock = Lock()
129        self._cursor_kwargs = cursor_kwargs or {}
130        self._cursor_init = cursor_init
131
132    def get_cursor(self) -> t.Any:
133        thread_id = get_ident()
134        with self._thread_cursors_lock:
135            if thread_id not in self._thread_cursors:
136                self._thread_cursors[thread_id] = self.get().cursor(**self._cursor_kwargs)
137                if self._cursor_init:
138                    self._cursor_init(self._thread_cursors[thread_id])
139            return self._thread_cursors[thread_id]
140
141    def get(self) -> t.Any:
142        thread_id = get_ident()
143        with self._thread_connections_lock:
144            if thread_id not in self._thread_connections:
145                self._thread_connections[thread_id] = self._connection_factory()
146            return self._thread_connections[thread_id]
147
148    def get_attribute(self, key: str) -> t.Optional[t.Any]:
149        thread_id = get_ident()
150        return self._thread_attributes[thread_id].get(key)
151
152    def set_attribute(self, key: str, value: t.Any) -> None:
153        thread_id = get_ident()
154        self._thread_attributes[thread_id][key] = value
155
156    def begin(self) -> None:
157        self._do_begin()
158        with self._thread_transactions_lock:
159            self._thread_transactions.add(get_ident())
160
161    def commit(self) -> None:
162        self._do_commit()
163        self._discard_transaction(get_ident())
164
165    def rollback(self) -> None:
166        self._do_rollback()
167        self._discard_transaction(get_ident())
168
169    @property
170    def is_transaction_active(self) -> bool:
171        with self._thread_transactions_lock:
172            return get_ident() in self._thread_transactions
173
174    def close_cursor(self) -> None:
175        thread_id = get_ident()
176        with self._thread_cursors_lock:
177            if thread_id in self._thread_cursors:
178                _try_close(self._thread_cursors[thread_id], "cursor")
179                self._thread_cursors.pop(thread_id)
180
181    def close(self) -> None:
182        thread_id = get_ident()
183        with self._thread_cursors_lock, self._thread_connections_lock:
184            if thread_id in self._thread_connections:
185                _try_close(self._thread_connections[thread_id], "connection")
186                self._thread_connections.pop(thread_id)
187                self._thread_cursors.pop(thread_id, None)
188                self._discard_transaction(thread_id)
189            self._thread_attributes.pop(thread_id, None)
190
191    def close_all(self, exclude_calling_thread: bool = False) -> None:
192        calling_thread_id = get_ident()
193        with self._thread_cursors_lock, self._thread_connections_lock:
194            for thread_id, connection in self._thread_connections.copy().items():
195                if not exclude_calling_thread or thread_id != calling_thread_id:
196                    # NOTE: the access to the connection instance itself is not thread-safe here.
197                    _try_close(connection, "connection")
198                    self._thread_connections.pop(thread_id)
199                    self._thread_cursors.pop(thread_id, None)
200                    self._discard_transaction(thread_id)
201                self._thread_attributes.pop(thread_id, None)
202
203    def _discard_transaction(self, thread_id: t.Hashable) -> None:
204        with self._thread_transactions_lock:
205            self._thread_transactions.discard(thread_id)
206
207
208class SingletonConnectionPool(_TransactionManagementMixin):
209    def __init__(
210        self,
211        connection_factory: t.Callable[[], t.Any],
212        cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
213        cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
214    ):
215        self._connection_factory = connection_factory
216        self._connection: t.Optional[t.Any] = None
217        self._cursor: t.Optional[t.Any] = None
218        self._cursor_kwargs = cursor_kwargs or {}
219        self._attributes: t.Dict[str, t.Any] = {}
220        self._is_transaction_active: bool = False
221        self._cursor_init = cursor_init
222
223    def get_cursor(self) -> t.Any:
224        if not self._cursor:
225            self._cursor = self.get().cursor(**self._cursor_kwargs)
226            if self._cursor_init:
227                self._cursor_init(self._cursor)
228        return self._cursor
229
230    def get(self) -> t.Any:
231        if not self._connection:
232            self._connection = self._connection_factory()
233        return self._connection
234
235    def get_attribute(self, key: str) -> t.Optional[t.Any]:
236        return self._attributes.get(key)
237
238    def set_attribute(self, key: str, value: t.Any) -> None:
239        self._attributes[key] = value
240
241    def begin(self) -> None:
242        self._do_begin()
243        self._is_transaction_active = True
244
245    def commit(self) -> None:
246        self._do_commit()
247        self._is_transaction_active = False
248
249    def rollback(self) -> None:
250        self._do_rollback()
251        self._is_transaction_active = False
252
253    @property
254    def is_transaction_active(self) -> bool:
255        return self._is_transaction_active
256
257    def close_cursor(self) -> None:
258        _try_close(self._cursor, "cursor")
259        self._cursor = None
260
261    def close(self) -> None:
262        _try_close(self._connection, "connection")
263        self._connection = None
264        self._cursor = None
265        self._is_transaction_active = False
266        self._attributes.clear()
267
268    def close_all(self, exclude_calling_thread: bool = False) -> None:
269        if not exclude_calling_thread:
270            self.close()
271
272
273def create_connection_pool(
274    connection_factory: t.Callable[[], t.Any],
275    multithreaded: bool,
276    cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
277    cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
278) -> ConnectionPool:
279    return (
280        ThreadLocalConnectionPool(
281            connection_factory, cursor_kwargs=cursor_kwargs, cursor_init=cursor_init
282        )
283        if multithreaded
284        else SingletonConnectionPool(
285            connection_factory, cursor_kwargs=cursor_kwargs, cursor_init=cursor_init
286        )
287    )
288
289
290def _try_close(closeable: t.Any, kind: str) -> None:
291    if closeable is None:
292        return
293    try:
294        closeable.close()
295    except Exception:
296        logger.exception("Failed to close %s", kind)
class ConnectionPool(abc.ABC):
11class ConnectionPool(abc.ABC):
12    @abc.abstractmethod
13    def get_cursor(self) -> t.Any:
14        """Returns cached cursor instance.
15
16        Automatically creates a new instance if one is not available.
17
18        Returns:
19            A cursor instance.
20        """
21
22    @abc.abstractmethod
23    def get(self) -> t.Any:
24        """Returns cached connection instance.
25
26        Automatically opens a new connection if one is not available.
27
28        Returns:
29            A connection instance.
30        """
31
32    @abc.abstractmethod
33    def get_attribute(self, key: str) -> t.Optional[t.Any]:
34        """Returns an attribute associated with the connection.
35
36        Args:
37            key: Attribute key.
38
39        Returns:
40            Attribute value or None if not found.
41        """
42
43    @abc.abstractmethod
44    def set_attribute(self, key: str, value: t.Any) -> None:
45        """Sets an attribute associated with the connection.
46
47        Args:
48            key: Attribute key.
49            value: Attribute value.
50        """
51
52    @abc.abstractmethod
53    def begin(self) -> None:
54        """Starts a new transaction."""
55
56    @abc.abstractmethod
57    def commit(self) -> None:
58        """Commits the current transaction."""
59
60    @abc.abstractmethod
61    def rollback(self) -> None:
62        """Rolls back the current transaction."""
63
64    @property
65    @abc.abstractmethod
66    def is_transaction_active(self) -> bool:
67        """Returns True if there is an active transaction and False otherwise."""
68
69    @abc.abstractmethod
70    def close_cursor(self) -> None:
71        """Closes the current cursor instance if exists."""
72
73    @abc.abstractmethod
74    def close(self) -> None:
75        """Closes the current connection instance if exists.
76
77        Note: if there is a cursor instance available it will be closed as well.
78        """
79
80    @abc.abstractmethod
81    def close_all(self, exclude_calling_thread: bool = False) -> None:
82        """Closes all cached cursors and connections.
83
84        Args:
85            exclude_calling_thread: If set to True excludes cursors and connections associated
86                with the calling thread.
87        """

Helper class that provides a standard way to create an ABC using inheritance.

@abc.abstractmethod
def get_cursor(self) -> Any:
12    @abc.abstractmethod
13    def get_cursor(self) -> t.Any:
14        """Returns cached cursor instance.
15
16        Automatically creates a new instance if one is not available.
17
18        Returns:
19            A cursor instance.
20        """

Returns cached cursor instance.

Automatically creates a new instance if one is not available.

Returns:

A cursor instance.

@abc.abstractmethod
def get(self) -> Any:
22    @abc.abstractmethod
23    def get(self) -> t.Any:
24        """Returns cached connection instance.
25
26        Automatically opens a new connection if one is not available.
27
28        Returns:
29            A connection instance.
30        """

Returns cached connection instance.

Automatically opens a new connection if one is not available.

Returns:

A connection instance.

@abc.abstractmethod
def get_attribute(self, key: str) -> Union[Any, NoneType]:
32    @abc.abstractmethod
33    def get_attribute(self, key: str) -> t.Optional[t.Any]:
34        """Returns an attribute associated with the connection.
35
36        Args:
37            key: Attribute key.
38
39        Returns:
40            Attribute value or None if not found.
41        """

Returns an attribute associated with the connection.

Arguments:
  • key: Attribute key.
Returns:

Attribute value or None if not found.

@abc.abstractmethod
def set_attribute(self, key: str, value: Any) -> None:
43    @abc.abstractmethod
44    def set_attribute(self, key: str, value: t.Any) -> None:
45        """Sets an attribute associated with the connection.
46
47        Args:
48            key: Attribute key.
49            value: Attribute value.
50        """

Sets an attribute associated with the connection.

Arguments:
  • key: Attribute key.
  • value: Attribute value.
@abc.abstractmethod
def begin(self) -> None:
52    @abc.abstractmethod
53    def begin(self) -> None:
54        """Starts a new transaction."""

Starts a new transaction.

@abc.abstractmethod
def commit(self) -> None:
56    @abc.abstractmethod
57    def commit(self) -> None:
58        """Commits the current transaction."""

Commits the current transaction.

@abc.abstractmethod
def rollback(self) -> None:
60    @abc.abstractmethod
61    def rollback(self) -> None:
62        """Rolls back the current transaction."""

Rolls back the current transaction.

is_transaction_active: bool

Returns True if there is an active transaction and False otherwise.

@abc.abstractmethod
def close_cursor(self) -> None:
69    @abc.abstractmethod
70    def close_cursor(self) -> None:
71        """Closes the current cursor instance if exists."""

Closes the current cursor instance if exists.

@abc.abstractmethod
def close(self) -> None:
73    @abc.abstractmethod
74    def close(self) -> None:
75        """Closes the current connection instance if exists.
76
77        Note: if there is a cursor instance available it will be closed as well.
78        """

Closes the current connection instance if exists.

Note: if there is a cursor instance available it will be closed as well.

@abc.abstractmethod
def close_all(self, exclude_calling_thread: bool = False) -> None:
80    @abc.abstractmethod
81    def close_all(self, exclude_calling_thread: bool = False) -> None:
82        """Closes all cached cursors and connections.
83
84        Args:
85            exclude_calling_thread: If set to True excludes cursors and connections associated
86                with the calling thread.
87        """

Closes all cached cursors and connections.

Arguments:
  • exclude_calling_thread: If set to True excludes cursors and connections associated with the calling thread.
class ThreadLocalConnectionPool(_TransactionManagementMixin):
115class ThreadLocalConnectionPool(_TransactionManagementMixin):
116    def __init__(
117        self,
118        connection_factory: t.Callable[[], t.Any],
119        cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
120        cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
121    ):
122        self._connection_factory = connection_factory
123        self._thread_connections: t.Dict[t.Hashable, t.Any] = {}
124        self._thread_cursors: t.Dict[t.Hashable, t.Any] = {}
125        self._thread_transactions: t.Set[t.Hashable] = set()
126        self._thread_attributes: t.Dict[t.Hashable, t.Dict[str, t.Any]] = defaultdict(dict)
127        self._thread_connections_lock = Lock()
128        self._thread_cursors_lock = Lock()
129        self._thread_transactions_lock = Lock()
130        self._cursor_kwargs = cursor_kwargs or {}
131        self._cursor_init = cursor_init
132
133    def get_cursor(self) -> t.Any:
134        thread_id = get_ident()
135        with self._thread_cursors_lock:
136            if thread_id not in self._thread_cursors:
137                self._thread_cursors[thread_id] = self.get().cursor(**self._cursor_kwargs)
138                if self._cursor_init:
139                    self._cursor_init(self._thread_cursors[thread_id])
140            return self._thread_cursors[thread_id]
141
142    def get(self) -> t.Any:
143        thread_id = get_ident()
144        with self._thread_connections_lock:
145            if thread_id not in self._thread_connections:
146                self._thread_connections[thread_id] = self._connection_factory()
147            return self._thread_connections[thread_id]
148
149    def get_attribute(self, key: str) -> t.Optional[t.Any]:
150        thread_id = get_ident()
151        return self._thread_attributes[thread_id].get(key)
152
153    def set_attribute(self, key: str, value: t.Any) -> None:
154        thread_id = get_ident()
155        self._thread_attributes[thread_id][key] = value
156
157    def begin(self) -> None:
158        self._do_begin()
159        with self._thread_transactions_lock:
160            self._thread_transactions.add(get_ident())
161
162    def commit(self) -> None:
163        self._do_commit()
164        self._discard_transaction(get_ident())
165
166    def rollback(self) -> None:
167        self._do_rollback()
168        self._discard_transaction(get_ident())
169
170    @property
171    def is_transaction_active(self) -> bool:
172        with self._thread_transactions_lock:
173            return get_ident() in self._thread_transactions
174
175    def close_cursor(self) -> None:
176        thread_id = get_ident()
177        with self._thread_cursors_lock:
178            if thread_id in self._thread_cursors:
179                _try_close(self._thread_cursors[thread_id], "cursor")
180                self._thread_cursors.pop(thread_id)
181
182    def close(self) -> None:
183        thread_id = get_ident()
184        with self._thread_cursors_lock, self._thread_connections_lock:
185            if thread_id in self._thread_connections:
186                _try_close(self._thread_connections[thread_id], "connection")
187                self._thread_connections.pop(thread_id)
188                self._thread_cursors.pop(thread_id, None)
189                self._discard_transaction(thread_id)
190            self._thread_attributes.pop(thread_id, None)
191
192    def close_all(self, exclude_calling_thread: bool = False) -> None:
193        calling_thread_id = get_ident()
194        with self._thread_cursors_lock, self._thread_connections_lock:
195            for thread_id, connection in self._thread_connections.copy().items():
196                if not exclude_calling_thread or thread_id != calling_thread_id:
197                    # NOTE: the access to the connection instance itself is not thread-safe here.
198                    _try_close(connection, "connection")
199                    self._thread_connections.pop(thread_id)
200                    self._thread_cursors.pop(thread_id, None)
201                    self._discard_transaction(thread_id)
202                self._thread_attributes.pop(thread_id, None)
203
204    def _discard_transaction(self, thread_id: t.Hashable) -> None:
205        with self._thread_transactions_lock:
206            self._thread_transactions.discard(thread_id)

Helper class that provides a standard way to create an ABC using inheritance.

ThreadLocalConnectionPool( connection_factory: Callable[[], Any], cursor_kwargs: Union[Dict[str, Any], NoneType] = None, cursor_init: Union[Callable[[Any], NoneType], NoneType] = None)
116    def __init__(
117        self,
118        connection_factory: t.Callable[[], t.Any],
119        cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
120        cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
121    ):
122        self._connection_factory = connection_factory
123        self._thread_connections: t.Dict[t.Hashable, t.Any] = {}
124        self._thread_cursors: t.Dict[t.Hashable, t.Any] = {}
125        self._thread_transactions: t.Set[t.Hashable] = set()
126        self._thread_attributes: t.Dict[t.Hashable, t.Dict[str, t.Any]] = defaultdict(dict)
127        self._thread_connections_lock = Lock()
128        self._thread_cursors_lock = Lock()
129        self._thread_transactions_lock = Lock()
130        self._cursor_kwargs = cursor_kwargs or {}
131        self._cursor_init = cursor_init
def get_cursor(self) -> Any:
133    def get_cursor(self) -> t.Any:
134        thread_id = get_ident()
135        with self._thread_cursors_lock:
136            if thread_id not in self._thread_cursors:
137                self._thread_cursors[thread_id] = self.get().cursor(**self._cursor_kwargs)
138                if self._cursor_init:
139                    self._cursor_init(self._thread_cursors[thread_id])
140            return self._thread_cursors[thread_id]

Returns cached cursor instance.

Automatically creates a new instance if one is not available.

Returns:

A cursor instance.

def get(self) -> Any:
142    def get(self) -> t.Any:
143        thread_id = get_ident()
144        with self._thread_connections_lock:
145            if thread_id not in self._thread_connections:
146                self._thread_connections[thread_id] = self._connection_factory()
147            return self._thread_connections[thread_id]

Returns cached connection instance.

Automatically opens a new connection if one is not available.

Returns:

A connection instance.

def get_attribute(self, key: str) -> Union[Any, NoneType]:
149    def get_attribute(self, key: str) -> t.Optional[t.Any]:
150        thread_id = get_ident()
151        return self._thread_attributes[thread_id].get(key)

Returns an attribute associated with the connection.

Arguments:
  • key: Attribute key.
Returns:

Attribute value or None if not found.

def set_attribute(self, key: str, value: Any) -> None:
153    def set_attribute(self, key: str, value: t.Any) -> None:
154        thread_id = get_ident()
155        self._thread_attributes[thread_id][key] = value

Sets an attribute associated with the connection.

Arguments:
  • key: Attribute key.
  • value: Attribute value.
def begin(self) -> None:
157    def begin(self) -> None:
158        self._do_begin()
159        with self._thread_transactions_lock:
160            self._thread_transactions.add(get_ident())

Starts a new transaction.

def commit(self) -> None:
162    def commit(self) -> None:
163        self._do_commit()
164        self._discard_transaction(get_ident())

Commits the current transaction.

def rollback(self) -> None:
166    def rollback(self) -> None:
167        self._do_rollback()
168        self._discard_transaction(get_ident())

Rolls back the current transaction.

is_transaction_active: bool

Returns True if there is an active transaction and False otherwise.

def close_cursor(self) -> None:
175    def close_cursor(self) -> None:
176        thread_id = get_ident()
177        with self._thread_cursors_lock:
178            if thread_id in self._thread_cursors:
179                _try_close(self._thread_cursors[thread_id], "cursor")
180                self._thread_cursors.pop(thread_id)

Closes the current cursor instance if exists.

def close(self) -> None:
182    def close(self) -> None:
183        thread_id = get_ident()
184        with self._thread_cursors_lock, self._thread_connections_lock:
185            if thread_id in self._thread_connections:
186                _try_close(self._thread_connections[thread_id], "connection")
187                self._thread_connections.pop(thread_id)
188                self._thread_cursors.pop(thread_id, None)
189                self._discard_transaction(thread_id)
190            self._thread_attributes.pop(thread_id, None)

Closes the current connection instance if exists.

Note: if there is a cursor instance available it will be closed as well.

def close_all(self, exclude_calling_thread: bool = False) -> None:
192    def close_all(self, exclude_calling_thread: bool = False) -> None:
193        calling_thread_id = get_ident()
194        with self._thread_cursors_lock, self._thread_connections_lock:
195            for thread_id, connection in self._thread_connections.copy().items():
196                if not exclude_calling_thread or thread_id != calling_thread_id:
197                    # NOTE: the access to the connection instance itself is not thread-safe here.
198                    _try_close(connection, "connection")
199                    self._thread_connections.pop(thread_id)
200                    self._thread_cursors.pop(thread_id, None)
201                    self._discard_transaction(thread_id)
202                self._thread_attributes.pop(thread_id, None)

Closes all cached cursors and connections.

Arguments:
  • exclude_calling_thread: If set to True excludes cursors and connections associated with the calling thread.
class SingletonConnectionPool(_TransactionManagementMixin):
209class SingletonConnectionPool(_TransactionManagementMixin):
210    def __init__(
211        self,
212        connection_factory: t.Callable[[], t.Any],
213        cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
214        cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
215    ):
216        self._connection_factory = connection_factory
217        self._connection: t.Optional[t.Any] = None
218        self._cursor: t.Optional[t.Any] = None
219        self._cursor_kwargs = cursor_kwargs or {}
220        self._attributes: t.Dict[str, t.Any] = {}
221        self._is_transaction_active: bool = False
222        self._cursor_init = cursor_init
223
224    def get_cursor(self) -> t.Any:
225        if not self._cursor:
226            self._cursor = self.get().cursor(**self._cursor_kwargs)
227            if self._cursor_init:
228                self._cursor_init(self._cursor)
229        return self._cursor
230
231    def get(self) -> t.Any:
232        if not self._connection:
233            self._connection = self._connection_factory()
234        return self._connection
235
236    def get_attribute(self, key: str) -> t.Optional[t.Any]:
237        return self._attributes.get(key)
238
239    def set_attribute(self, key: str, value: t.Any) -> None:
240        self._attributes[key] = value
241
242    def begin(self) -> None:
243        self._do_begin()
244        self._is_transaction_active = True
245
246    def commit(self) -> None:
247        self._do_commit()
248        self._is_transaction_active = False
249
250    def rollback(self) -> None:
251        self._do_rollback()
252        self._is_transaction_active = False
253
254    @property
255    def is_transaction_active(self) -> bool:
256        return self._is_transaction_active
257
258    def close_cursor(self) -> None:
259        _try_close(self._cursor, "cursor")
260        self._cursor = None
261
262    def close(self) -> None:
263        _try_close(self._connection, "connection")
264        self._connection = None
265        self._cursor = None
266        self._is_transaction_active = False
267        self._attributes.clear()
268
269    def close_all(self, exclude_calling_thread: bool = False) -> None:
270        if not exclude_calling_thread:
271            self.close()

Helper class that provides a standard way to create an ABC using inheritance.

SingletonConnectionPool( connection_factory: Callable[[], Any], cursor_kwargs: Union[Dict[str, Any], NoneType] = None, cursor_init: Union[Callable[[Any], NoneType], NoneType] = None)
210    def __init__(
211        self,
212        connection_factory: t.Callable[[], t.Any],
213        cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
214        cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
215    ):
216        self._connection_factory = connection_factory
217        self._connection: t.Optional[t.Any] = None
218        self._cursor: t.Optional[t.Any] = None
219        self._cursor_kwargs = cursor_kwargs or {}
220        self._attributes: t.Dict[str, t.Any] = {}
221        self._is_transaction_active: bool = False
222        self._cursor_init = cursor_init
def get_cursor(self) -> Any:
224    def get_cursor(self) -> t.Any:
225        if not self._cursor:
226            self._cursor = self.get().cursor(**self._cursor_kwargs)
227            if self._cursor_init:
228                self._cursor_init(self._cursor)
229        return self._cursor

Returns cached cursor instance.

Automatically creates a new instance if one is not available.

Returns:

A cursor instance.

def get(self) -> Any:
231    def get(self) -> t.Any:
232        if not self._connection:
233            self._connection = self._connection_factory()
234        return self._connection

Returns cached connection instance.

Automatically opens a new connection if one is not available.

Returns:

A connection instance.

def get_attribute(self, key: str) -> Union[Any, NoneType]:
236    def get_attribute(self, key: str) -> t.Optional[t.Any]:
237        return self._attributes.get(key)

Returns an attribute associated with the connection.

Arguments:
  • key: Attribute key.
Returns:

Attribute value or None if not found.

def set_attribute(self, key: str, value: Any) -> None:
239    def set_attribute(self, key: str, value: t.Any) -> None:
240        self._attributes[key] = value

Sets an attribute associated with the connection.

Arguments:
  • key: Attribute key.
  • value: Attribute value.
def begin(self) -> None:
242    def begin(self) -> None:
243        self._do_begin()
244        self._is_transaction_active = True

Starts a new transaction.

def commit(self) -> None:
246    def commit(self) -> None:
247        self._do_commit()
248        self._is_transaction_active = False

Commits the current transaction.

def rollback(self) -> None:
250    def rollback(self) -> None:
251        self._do_rollback()
252        self._is_transaction_active = False

Rolls back the current transaction.

is_transaction_active: bool

Returns True if there is an active transaction and False otherwise.

def close_cursor(self) -> None:
258    def close_cursor(self) -> None:
259        _try_close(self._cursor, "cursor")
260        self._cursor = None

Closes the current cursor instance if exists.

def close(self) -> None:
262    def close(self) -> None:
263        _try_close(self._connection, "connection")
264        self._connection = None
265        self._cursor = None
266        self._is_transaction_active = False
267        self._attributes.clear()

Closes the current connection instance if exists.

Note: if there is a cursor instance available it will be closed as well.

def close_all(self, exclude_calling_thread: bool = False) -> None:
269    def close_all(self, exclude_calling_thread: bool = False) -> None:
270        if not exclude_calling_thread:
271            self.close()

Closes all cached cursors and connections.

Arguments:
  • exclude_calling_thread: If set to True excludes cursors and connections associated with the calling thread.
def create_connection_pool( connection_factory: Callable[[], Any], multithreaded: bool, cursor_kwargs: Union[Dict[str, Any], NoneType] = None, cursor_init: Union[Callable[[Any], NoneType], NoneType] = None) -> sqlmesh.utils.connection_pool.ConnectionPool:
274def create_connection_pool(
275    connection_factory: t.Callable[[], t.Any],
276    multithreaded: bool,
277    cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
278    cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
279) -> ConnectionPool:
280    return (
281        ThreadLocalConnectionPool(
282            connection_factory, cursor_kwargs=cursor_kwargs, cursor_init=cursor_init
283        )
284        if multithreaded
285        else SingletonConnectionPool(
286            connection_factory, cursor_kwargs=cursor_kwargs, cursor_init=cursor_init
287        )
288    )