Edit on GitHub

sqlmesh.utils.connection_pool

  1import abc
  2import logging
  3import typing as t
  4from collections import defaultdict
  5from threading import Lock, get_ident
  6
  7logger = logging.getLogger(__name__)
  8
  9
 10class ConnectionPool(abc.ABC):
 11    @abc.abstractmethod
 12    def get_cursor(self) -> t.Any:
 13        """Returns cached cursor instance.
 14
 15        Automatically creates a new instance if one is not available.
 16
 17        Returns:
 18            A cursor instance.
 19        """
 20
 21    @abc.abstractmethod
 22    def get(self) -> t.Any:
 23        """Returns cached connection instance.
 24
 25        Automatically opens a new connection if one is not available.
 26
 27        Returns:
 28            A connection instance.
 29        """
 30
 31    @abc.abstractmethod
 32    def get_attribute(self, key: str) -> t.Optional[t.Any]:
 33        """Returns an attribute associated with the connection.
 34
 35        Args:
 36            key: Attribute key.
 37
 38        Returns:
 39            Attribute value or None if not found.
 40        """
 41
 42    @abc.abstractmethod
 43    def set_attribute(self, key: str, value: t.Any) -> None:
 44        """Sets an attribute associated with the connection.
 45
 46        Args:
 47            key: Attribute key.
 48            value: Attribute value.
 49        """
 50
 51    @abc.abstractmethod
 52    def get_all_attributes(self, key: str) -> t.List[t.Any]:
 53        """Returns all attributes with the given key across all connections/threads.
 54
 55        Args:
 56            key: Attribute key.
 57
 58        Returns:
 59            List of attribute values from all connections/threads.
 60        """
 61
 62    @abc.abstractmethod
 63    def begin(self) -> None:
 64        """Starts a new transaction."""
 65
 66    @abc.abstractmethod
 67    def commit(self) -> None:
 68        """Commits the current transaction."""
 69
 70    @abc.abstractmethod
 71    def rollback(self) -> None:
 72        """Rolls back the current transaction."""
 73
 74    @property
 75    @abc.abstractmethod
 76    def is_transaction_active(self) -> bool:
 77        """Returns True if there is an active transaction and False otherwise."""
 78
 79    @abc.abstractmethod
 80    def close_cursor(self) -> None:
 81        """Closes the current cursor instance if exists."""
 82
 83    @abc.abstractmethod
 84    def close(self) -> None:
 85        """Closes the current connection instance if exists.
 86
 87        Note: if there is a cursor instance available it will be closed as well.
 88        """
 89
 90    @abc.abstractmethod
 91    def close_all(self, exclude_calling_thread: bool = False) -> None:
 92        """Closes all cached cursors and connections.
 93
 94        Args:
 95            exclude_calling_thread: If set to True excludes cursors and connections associated
 96                with the calling thread.
 97        """
 98
 99
100class _TransactionManagementMixin(ConnectionPool):
101    def _do_begin(self) -> None:
102        cursor = self.get_cursor()
103        if hasattr(cursor, "begin"):
104            cursor.begin()
105        else:
106            conn = self.get()
107            if hasattr(conn, "begin"):
108                conn.begin()
109
110    def _do_commit(self) -> None:
111        cursor = self.get_cursor()
112        if hasattr(cursor, "commit"):
113            cursor.commit()
114        else:
115            self.get().commit()
116
117    def _do_rollback(self) -> None:
118        cursor = self.get_cursor()
119        if hasattr(cursor, "rollback"):
120            cursor.rollback()
121        else:
122            self.get().rollback()
123
124
125class _ThreadLocalBase(_TransactionManagementMixin):
126    def __init__(
127        self,
128        connection_factory: t.Callable[[], t.Any],
129        cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
130    ):
131        self._connection_factory = connection_factory
132        self._thread_cursors: t.Dict[t.Hashable, t.Any] = {}
133        self._thread_transactions: t.Set[t.Hashable] = set()
134        self._thread_attributes: t.Dict[t.Hashable, t.Dict[str, t.Any]] = defaultdict(dict)
135        self._thread_cursors_lock = Lock()
136        self._thread_transactions_lock = Lock()
137        self._cursor_init = cursor_init
138
139    def get_cursor(self) -> t.Any:
140        thread_id = get_ident()
141        with self._thread_cursors_lock:
142            if thread_id not in self._thread_cursors:
143                self._thread_cursors[thread_id] = self.get().cursor()
144                if self._cursor_init:
145                    self._cursor_init(self._thread_cursors[thread_id])
146            return self._thread_cursors[thread_id]
147
148    def get_attribute(self, key: str) -> t.Optional[t.Any]:
149        thread_id = get_ident()
150        return self._thread_attributes[thread_id].get(key)
151
152    def set_attribute(self, key: str, value: t.Any) -> None:
153        thread_id = get_ident()
154        self._thread_attributes[thread_id][key] = value
155
156    def get_all_attributes(self, key: str) -> t.List[t.Any]:
157        """Returns all attributes with the given key across all threads."""
158        return [
159            thread_attrs[key]
160            for thread_attrs in self._thread_attributes.values()
161            if key in thread_attrs
162        ]
163
164    def begin(self) -> None:
165        self._do_begin()
166        with self._thread_transactions_lock:
167            self._thread_transactions.add(get_ident())
168
169    def commit(self) -> None:
170        self._do_commit()
171        self._discard_transaction(get_ident())
172
173    def rollback(self) -> None:
174        self._do_rollback()
175        self._discard_transaction(get_ident())
176
177    @property
178    def is_transaction_active(self) -> bool:
179        with self._thread_transactions_lock:
180            return get_ident() in self._thread_transactions
181
182    def close_cursor(self) -> None:
183        thread_id = get_ident()
184        with self._thread_cursors_lock:
185            if thread_id in self._thread_cursors:
186                _try_close(self._thread_cursors[thread_id], "cursor")
187                self._thread_cursors.pop(thread_id)
188
189    def _discard_transaction(self, thread_id: t.Hashable) -> None:
190        with self._thread_transactions_lock:
191            self._thread_transactions.discard(thread_id)
192
193
194class ThreadLocalConnectionPool(_ThreadLocalBase):
195    def __init__(
196        self,
197        connection_factory: t.Callable[[], t.Any],
198        cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
199    ):
200        super().__init__(connection_factory, cursor_init)
201        self._thread_connections: t.Dict[t.Hashable, t.Any] = {}
202        self._thread_connections_lock = Lock()
203
204    def get(self) -> t.Any:
205        thread_id = get_ident()
206        with self._thread_connections_lock:
207            if thread_id not in self._thread_connections:
208                self._thread_connections[thread_id] = self._connection_factory()
209            return self._thread_connections[thread_id]
210
211    def close(self) -> None:
212        thread_id = get_ident()
213        with self._thread_cursors_lock, self._thread_connections_lock:
214            if thread_id in self._thread_connections:
215                _try_close(self._thread_connections[thread_id], "connection")
216                self._thread_connections.pop(thread_id)
217                self._thread_cursors.pop(thread_id, None)
218                self._discard_transaction(thread_id)
219            self._thread_attributes.pop(thread_id, None)
220
221    def close_all(self, exclude_calling_thread: bool = False) -> None:
222        calling_thread_id = get_ident()
223        with self._thread_cursors_lock, self._thread_connections_lock:
224            for thread_id, connection in self._thread_connections.copy().items():
225                if not exclude_calling_thread or thread_id != calling_thread_id:
226                    _try_close(connection, "connection")
227                    self._thread_connections.pop(thread_id)
228                    self._thread_cursors.pop(thread_id, None)
229                    self._discard_transaction(thread_id)
230
231            self._thread_attributes.clear()
232
233
234class ThreadLocalSharedConnectionPool(_ThreadLocalBase):
235    def __init__(
236        self,
237        connection_factory: t.Callable[[], t.Any],
238        cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
239    ):
240        super().__init__(connection_factory, cursor_init)
241        self._connection: t.Optional[t.Any] = None
242        self._connection_lock = Lock()
243
244    def get(self) -> t.Any:
245        with self._connection_lock:
246            if self._connection is None:
247                self._connection = self._connection_factory()
248            return self._connection
249
250    def close(self) -> None:
251        thread_id = get_ident()
252        with self._thread_cursors_lock, self._connection_lock:
253            if thread_id in self._thread_cursors:
254                _try_close(self._thread_cursors[thread_id], "cursor")
255                self._thread_cursors.pop(thread_id)
256            self._discard_transaction(thread_id)
257            self._thread_attributes.pop(thread_id, None)
258
259    def close_all(self, exclude_calling_thread: bool = False) -> None:
260        calling_thread_id = get_ident()
261        with self._thread_cursors_lock, self._connection_lock:
262            for thread_id, cursor in self._thread_cursors.copy().items():
263                if not exclude_calling_thread or thread_id != calling_thread_id:
264                    _try_close(cursor, "cursor")
265                    self._thread_cursors.pop(thread_id)
266                    self._discard_transaction(thread_id)
267                self._thread_attributes.pop(thread_id, None)
268
269            if not exclude_calling_thread:
270                _try_close(self._connection, "connection")
271                self._connection = None
272
273
274class SingletonConnectionPool(_TransactionManagementMixin):
275    def __init__(
276        self,
277        connection_factory: t.Callable[[], t.Any],
278        cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
279    ):
280        self._connection_factory = connection_factory
281        self._connection: t.Optional[t.Any] = None
282        self._cursor: t.Optional[t.Any] = None
283        self._attributes: t.Dict[str, t.Any] = {}
284        self._is_transaction_active: bool = False
285        self._cursor_init = cursor_init
286
287    def get_cursor(self) -> t.Any:
288        if not self._cursor:
289            self._cursor = self.get().cursor()
290            if self._cursor_init:
291                self._cursor_init(self._cursor)
292        return self._cursor
293
294    def get(self) -> t.Any:
295        if not self._connection:
296            self._connection = self._connection_factory()
297        return self._connection
298
299    def get_attribute(self, key: str) -> t.Optional[t.Any]:
300        return self._attributes.get(key)
301
302    def set_attribute(self, key: str, value: t.Any) -> None:
303        self._attributes[key] = value
304
305    def get_all_attributes(self, key: str) -> t.List[t.Any]:
306        """Returns all attributes with the given key (single-threaded pool has at most one)."""
307        value = self._attributes.get(key)
308        return [value] if value is not None else []
309
310    def begin(self) -> None:
311        self._do_begin()
312        self._is_transaction_active = True
313
314    def commit(self) -> None:
315        self._do_commit()
316        self._is_transaction_active = False
317
318    def rollback(self) -> None:
319        self._do_rollback()
320        self._is_transaction_active = False
321
322    @property
323    def is_transaction_active(self) -> bool:
324        return self._is_transaction_active
325
326    def close_cursor(self) -> None:
327        _try_close(self._cursor, "cursor")
328        self._cursor = None
329
330    def close(self) -> None:
331        _try_close(self._connection, "connection")
332        self._connection = None
333        self._cursor = None
334        self._is_transaction_active = False
335        self._attributes.clear()
336
337    def close_all(self, exclude_calling_thread: bool = False) -> None:
338        if not exclude_calling_thread:
339            self.close()
340
341
342def create_connection_pool(
343    connection_factory: t.Callable[[], t.Any],
344    multithreaded: bool,
345    shared_connection: bool = False,
346    cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
347) -> ConnectionPool:
348    pool_class = (
349        ThreadLocalSharedConnectionPool
350        if multithreaded and shared_connection
351        else ThreadLocalConnectionPool
352        if multithreaded
353        else SingletonConnectionPool
354    )
355    return pool_class(connection_factory, cursor_init=cursor_init)
356
357
358def _try_close(closeable: t.Any, kind: str) -> None:
359    if closeable is None:
360        return
361    try:
362        closeable.close()
363    except Exception:
364        logger.exception("Failed to close %s", kind)
logger = <Logger sqlmesh.utils.connection_pool (WARNING)>
class ConnectionPool(abc.ABC):
11class ConnectionPool(abc.ABC):
12    @abc.abstractmethod
13    def get_cursor(self) -> t.Any:
14        """Returns cached cursor instance.
15
16        Automatically creates a new instance if one is not available.
17
18        Returns:
19            A cursor instance.
20        """
21
22    @abc.abstractmethod
23    def get(self) -> t.Any:
24        """Returns cached connection instance.
25
26        Automatically opens a new connection if one is not available.
27
28        Returns:
29            A connection instance.
30        """
31
32    @abc.abstractmethod
33    def get_attribute(self, key: str) -> t.Optional[t.Any]:
34        """Returns an attribute associated with the connection.
35
36        Args:
37            key: Attribute key.
38
39        Returns:
40            Attribute value or None if not found.
41        """
42
43    @abc.abstractmethod
44    def set_attribute(self, key: str, value: t.Any) -> None:
45        """Sets an attribute associated with the connection.
46
47        Args:
48            key: Attribute key.
49            value: Attribute value.
50        """
51
52    @abc.abstractmethod
53    def get_all_attributes(self, key: str) -> t.List[t.Any]:
54        """Returns all attributes with the given key across all connections/threads.
55
56        Args:
57            key: Attribute key.
58
59        Returns:
60            List of attribute values from all connections/threads.
61        """
62
63    @abc.abstractmethod
64    def begin(self) -> None:
65        """Starts a new transaction."""
66
67    @abc.abstractmethod
68    def commit(self) -> None:
69        """Commits the current transaction."""
70
71    @abc.abstractmethod
72    def rollback(self) -> None:
73        """Rolls back the current transaction."""
74
75    @property
76    @abc.abstractmethod
77    def is_transaction_active(self) -> bool:
78        """Returns True if there is an active transaction and False otherwise."""
79
80    @abc.abstractmethod
81    def close_cursor(self) -> None:
82        """Closes the current cursor instance if exists."""
83
84    @abc.abstractmethod
85    def close(self) -> None:
86        """Closes the current connection instance if exists.
87
88        Note: if there is a cursor instance available it will be closed as well.
89        """
90
91    @abc.abstractmethod
92    def close_all(self, exclude_calling_thread: bool = False) -> None:
93        """Closes all cached cursors and connections.
94
95        Args:
96            exclude_calling_thread: If set to True excludes cursors and connections associated
97                with the calling thread.
98        """

Helper class that provides a standard way to create an ABC using inheritance.

@abc.abstractmethod
def get_cursor(self) -> Any:
12    @abc.abstractmethod
13    def get_cursor(self) -> t.Any:
14        """Returns cached cursor instance.
15
16        Automatically creates a new instance if one is not available.
17
18        Returns:
19            A cursor instance.
20        """

Returns cached cursor instance.

Automatically creates a new instance if one is not available.

Returns:

A cursor instance.

@abc.abstractmethod
def get(self) -> Any:
22    @abc.abstractmethod
23    def get(self) -> t.Any:
24        """Returns cached connection instance.
25
26        Automatically opens a new connection if one is not available.
27
28        Returns:
29            A connection instance.
30        """

Returns cached connection instance.

Automatically opens a new connection if one is not available.

Returns:

A connection instance.

@abc.abstractmethod
def get_attribute(self, key: str) -> Optional[Any]:
32    @abc.abstractmethod
33    def get_attribute(self, key: str) -> t.Optional[t.Any]:
34        """Returns an attribute associated with the connection.
35
36        Args:
37            key: Attribute key.
38
39        Returns:
40            Attribute value or None if not found.
41        """

Returns an attribute associated with the connection.

Arguments:
  • key: Attribute key.
Returns:

Attribute value or None if not found.

@abc.abstractmethod
def set_attribute(self, key: str, value: Any) -> None:
43    @abc.abstractmethod
44    def set_attribute(self, key: str, value: t.Any) -> None:
45        """Sets an attribute associated with the connection.
46
47        Args:
48            key: Attribute key.
49            value: Attribute value.
50        """

Sets an attribute associated with the connection.

Arguments:
  • key: Attribute key.
  • value: Attribute value.
@abc.abstractmethod
def get_all_attributes(self, key: str) -> List[Any]:
52    @abc.abstractmethod
53    def get_all_attributes(self, key: str) -> t.List[t.Any]:
54        """Returns all attributes with the given key across all connections/threads.
55
56        Args:
57            key: Attribute key.
58
59        Returns:
60            List of attribute values from all connections/threads.
61        """

Returns all attributes with the given key across all connections/threads.

Arguments:
  • key: Attribute key.
Returns:

List of attribute values from all connections/threads.

@abc.abstractmethod
def begin(self) -> None:
63    @abc.abstractmethod
64    def begin(self) -> None:
65        """Starts a new transaction."""

Starts a new transaction.

@abc.abstractmethod
def commit(self) -> None:
67    @abc.abstractmethod
68    def commit(self) -> None:
69        """Commits the current transaction."""

Commits the current transaction.

@abc.abstractmethod
def rollback(self) -> None:
71    @abc.abstractmethod
72    def rollback(self) -> None:
73        """Rolls back the current transaction."""

Rolls back the current transaction.

is_transaction_active: bool
75    @property
76    @abc.abstractmethod
77    def is_transaction_active(self) -> bool:
78        """Returns True if there is an active transaction and False otherwise."""

Returns True if there is an active transaction and False otherwise.

@abc.abstractmethod
def close_cursor(self) -> None:
80    @abc.abstractmethod
81    def close_cursor(self) -> None:
82        """Closes the current cursor instance if exists."""

Closes the current cursor instance if exists.

@abc.abstractmethod
def close(self) -> None:
84    @abc.abstractmethod
85    def close(self) -> None:
86        """Closes the current connection instance if exists.
87
88        Note: if there is a cursor instance available it will be closed as well.
89        """

Closes the current connection instance if exists.

Note: if there is a cursor instance available it will be closed as well.

@abc.abstractmethod
def close_all(self, exclude_calling_thread: bool = False) -> None:
91    @abc.abstractmethod
92    def close_all(self, exclude_calling_thread: bool = False) -> None:
93        """Closes all cached cursors and connections.
94
95        Args:
96            exclude_calling_thread: If set to True excludes cursors and connections associated
97                with the calling thread.
98        """

Closes all cached cursors and connections.

Arguments:
  • exclude_calling_thread: If set to True excludes cursors and connections associated with the calling thread.
class ThreadLocalConnectionPool(_ThreadLocalBase):
195class ThreadLocalConnectionPool(_ThreadLocalBase):
196    def __init__(
197        self,
198        connection_factory: t.Callable[[], t.Any],
199        cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
200    ):
201        super().__init__(connection_factory, cursor_init)
202        self._thread_connections: t.Dict[t.Hashable, t.Any] = {}
203        self._thread_connections_lock = Lock()
204
205    def get(self) -> t.Any:
206        thread_id = get_ident()
207        with self._thread_connections_lock:
208            if thread_id not in self._thread_connections:
209                self._thread_connections[thread_id] = self._connection_factory()
210            return self._thread_connections[thread_id]
211
212    def close(self) -> None:
213        thread_id = get_ident()
214        with self._thread_cursors_lock, self._thread_connections_lock:
215            if thread_id in self._thread_connections:
216                _try_close(self._thread_connections[thread_id], "connection")
217                self._thread_connections.pop(thread_id)
218                self._thread_cursors.pop(thread_id, None)
219                self._discard_transaction(thread_id)
220            self._thread_attributes.pop(thread_id, None)
221
222    def close_all(self, exclude_calling_thread: bool = False) -> None:
223        calling_thread_id = get_ident()
224        with self._thread_cursors_lock, self._thread_connections_lock:
225            for thread_id, connection in self._thread_connections.copy().items():
226                if not exclude_calling_thread or thread_id != calling_thread_id:
227                    _try_close(connection, "connection")
228                    self._thread_connections.pop(thread_id)
229                    self._thread_cursors.pop(thread_id, None)
230                    self._discard_transaction(thread_id)
231
232            self._thread_attributes.clear()

Helper class that provides a standard way to create an ABC using inheritance.

ThreadLocalConnectionPool( connection_factory: Callable[[], Any], cursor_init: Optional[Callable[[Any], NoneType]] = None)
196    def __init__(
197        self,
198        connection_factory: t.Callable[[], t.Any],
199        cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
200    ):
201        super().__init__(connection_factory, cursor_init)
202        self._thread_connections: t.Dict[t.Hashable, t.Any] = {}
203        self._thread_connections_lock = Lock()
def get(self) -> Any:
205    def get(self) -> t.Any:
206        thread_id = get_ident()
207        with self._thread_connections_lock:
208            if thread_id not in self._thread_connections:
209                self._thread_connections[thread_id] = self._connection_factory()
210            return self._thread_connections[thread_id]

Returns cached connection instance.

Automatically opens a new connection if one is not available.

Returns:

A connection instance.

def close(self) -> None:
212    def close(self) -> None:
213        thread_id = get_ident()
214        with self._thread_cursors_lock, self._thread_connections_lock:
215            if thread_id in self._thread_connections:
216                _try_close(self._thread_connections[thread_id], "connection")
217                self._thread_connections.pop(thread_id)
218                self._thread_cursors.pop(thread_id, None)
219                self._discard_transaction(thread_id)
220            self._thread_attributes.pop(thread_id, None)

Closes the current connection instance if exists.

Note: if there is a cursor instance available it will be closed as well.

def close_all(self, exclude_calling_thread: bool = False) -> None:
222    def close_all(self, exclude_calling_thread: bool = False) -> None:
223        calling_thread_id = get_ident()
224        with self._thread_cursors_lock, self._thread_connections_lock:
225            for thread_id, connection in self._thread_connections.copy().items():
226                if not exclude_calling_thread or thread_id != calling_thread_id:
227                    _try_close(connection, "connection")
228                    self._thread_connections.pop(thread_id)
229                    self._thread_cursors.pop(thread_id, None)
230                    self._discard_transaction(thread_id)
231
232            self._thread_attributes.clear()

Closes all cached cursors and connections.

Arguments:
  • exclude_calling_thread: If set to True excludes cursors and connections associated with the calling thread.
class ThreadLocalSharedConnectionPool(_ThreadLocalBase):
235class ThreadLocalSharedConnectionPool(_ThreadLocalBase):
236    def __init__(
237        self,
238        connection_factory: t.Callable[[], t.Any],
239        cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
240    ):
241        super().__init__(connection_factory, cursor_init)
242        self._connection: t.Optional[t.Any] = None
243        self._connection_lock = Lock()
244
245    def get(self) -> t.Any:
246        with self._connection_lock:
247            if self._connection is None:
248                self._connection = self._connection_factory()
249            return self._connection
250
251    def close(self) -> None:
252        thread_id = get_ident()
253        with self._thread_cursors_lock, self._connection_lock:
254            if thread_id in self._thread_cursors:
255                _try_close(self._thread_cursors[thread_id], "cursor")
256                self._thread_cursors.pop(thread_id)
257            self._discard_transaction(thread_id)
258            self._thread_attributes.pop(thread_id, None)
259
260    def close_all(self, exclude_calling_thread: bool = False) -> None:
261        calling_thread_id = get_ident()
262        with self._thread_cursors_lock, self._connection_lock:
263            for thread_id, cursor in self._thread_cursors.copy().items():
264                if not exclude_calling_thread or thread_id != calling_thread_id:
265                    _try_close(cursor, "cursor")
266                    self._thread_cursors.pop(thread_id)
267                    self._discard_transaction(thread_id)
268                self._thread_attributes.pop(thread_id, None)
269
270            if not exclude_calling_thread:
271                _try_close(self._connection, "connection")
272                self._connection = None

Helper class that provides a standard way to create an ABC using inheritance.

ThreadLocalSharedConnectionPool( connection_factory: Callable[[], Any], cursor_init: Optional[Callable[[Any], NoneType]] = None)
236    def __init__(
237        self,
238        connection_factory: t.Callable[[], t.Any],
239        cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
240    ):
241        super().__init__(connection_factory, cursor_init)
242        self._connection: t.Optional[t.Any] = None
243        self._connection_lock = Lock()
def get(self) -> Any:
245    def get(self) -> t.Any:
246        with self._connection_lock:
247            if self._connection is None:
248                self._connection = self._connection_factory()
249            return self._connection

Returns cached connection instance.

Automatically opens a new connection if one is not available.

Returns:

A connection instance.

def close(self) -> None:
251    def close(self) -> None:
252        thread_id = get_ident()
253        with self._thread_cursors_lock, self._connection_lock:
254            if thread_id in self._thread_cursors:
255                _try_close(self._thread_cursors[thread_id], "cursor")
256                self._thread_cursors.pop(thread_id)
257            self._discard_transaction(thread_id)
258            self._thread_attributes.pop(thread_id, None)

Closes the current connection instance if exists.

Note: if there is a cursor instance available it will be closed as well.

def close_all(self, exclude_calling_thread: bool = False) -> None:
260    def close_all(self, exclude_calling_thread: bool = False) -> None:
261        calling_thread_id = get_ident()
262        with self._thread_cursors_lock, self._connection_lock:
263            for thread_id, cursor in self._thread_cursors.copy().items():
264                if not exclude_calling_thread or thread_id != calling_thread_id:
265                    _try_close(cursor, "cursor")
266                    self._thread_cursors.pop(thread_id)
267                    self._discard_transaction(thread_id)
268                self._thread_attributes.pop(thread_id, None)
269
270            if not exclude_calling_thread:
271                _try_close(self._connection, "connection")
272                self._connection = None

Closes all cached cursors and connections.

Arguments:
  • exclude_calling_thread: If set to True excludes cursors and connections associated with the calling thread.
class SingletonConnectionPool(_TransactionManagementMixin):
275class SingletonConnectionPool(_TransactionManagementMixin):
276    def __init__(
277        self,
278        connection_factory: t.Callable[[], t.Any],
279        cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
280    ):
281        self._connection_factory = connection_factory
282        self._connection: t.Optional[t.Any] = None
283        self._cursor: t.Optional[t.Any] = None
284        self._attributes: t.Dict[str, t.Any] = {}
285        self._is_transaction_active: bool = False
286        self._cursor_init = cursor_init
287
288    def get_cursor(self) -> t.Any:
289        if not self._cursor:
290            self._cursor = self.get().cursor()
291            if self._cursor_init:
292                self._cursor_init(self._cursor)
293        return self._cursor
294
295    def get(self) -> t.Any:
296        if not self._connection:
297            self._connection = self._connection_factory()
298        return self._connection
299
300    def get_attribute(self, key: str) -> t.Optional[t.Any]:
301        return self._attributes.get(key)
302
303    def set_attribute(self, key: str, value: t.Any) -> None:
304        self._attributes[key] = value
305
306    def get_all_attributes(self, key: str) -> t.List[t.Any]:
307        """Returns all attributes with the given key (single-threaded pool has at most one)."""
308        value = self._attributes.get(key)
309        return [value] if value is not None else []
310
311    def begin(self) -> None:
312        self._do_begin()
313        self._is_transaction_active = True
314
315    def commit(self) -> None:
316        self._do_commit()
317        self._is_transaction_active = False
318
319    def rollback(self) -> None:
320        self._do_rollback()
321        self._is_transaction_active = False
322
323    @property
324    def is_transaction_active(self) -> bool:
325        return self._is_transaction_active
326
327    def close_cursor(self) -> None:
328        _try_close(self._cursor, "cursor")
329        self._cursor = None
330
331    def close(self) -> None:
332        _try_close(self._connection, "connection")
333        self._connection = None
334        self._cursor = None
335        self._is_transaction_active = False
336        self._attributes.clear()
337
338    def close_all(self, exclude_calling_thread: bool = False) -> None:
339        if not exclude_calling_thread:
340            self.close()

Helper class that provides a standard way to create an ABC using inheritance.

SingletonConnectionPool( connection_factory: Callable[[], Any], cursor_init: Optional[Callable[[Any], NoneType]] = None)
276    def __init__(
277        self,
278        connection_factory: t.Callable[[], t.Any],
279        cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
280    ):
281        self._connection_factory = connection_factory
282        self._connection: t.Optional[t.Any] = None
283        self._cursor: t.Optional[t.Any] = None
284        self._attributes: t.Dict[str, t.Any] = {}
285        self._is_transaction_active: bool = False
286        self._cursor_init = cursor_init
def get_cursor(self) -> Any:
288    def get_cursor(self) -> t.Any:
289        if not self._cursor:
290            self._cursor = self.get().cursor()
291            if self._cursor_init:
292                self._cursor_init(self._cursor)
293        return self._cursor

Returns cached cursor instance.

Automatically creates a new instance if one is not available.

Returns:

A cursor instance.

def get(self) -> Any:
295    def get(self) -> t.Any:
296        if not self._connection:
297            self._connection = self._connection_factory()
298        return self._connection

Returns cached connection instance.

Automatically opens a new connection if one is not available.

Returns:

A connection instance.

def get_attribute(self, key: str) -> Optional[Any]:
300    def get_attribute(self, key: str) -> t.Optional[t.Any]:
301        return self._attributes.get(key)

Returns an attribute associated with the connection.

Arguments:
  • key: Attribute key.
Returns:

Attribute value or None if not found.

def set_attribute(self, key: str, value: Any) -> None:
303    def set_attribute(self, key: str, value: t.Any) -> None:
304        self._attributes[key] = value

Sets an attribute associated with the connection.

Arguments:
  • key: Attribute key.
  • value: Attribute value.
def get_all_attributes(self, key: str) -> List[Any]:
306    def get_all_attributes(self, key: str) -> t.List[t.Any]:
307        """Returns all attributes with the given key (single-threaded pool has at most one)."""
308        value = self._attributes.get(key)
309        return [value] if value is not None else []

Returns all attributes with the given key (single-threaded pool has at most one).

def begin(self) -> None:
311    def begin(self) -> None:
312        self._do_begin()
313        self._is_transaction_active = True

Starts a new transaction.

def commit(self) -> None:
315    def commit(self) -> None:
316        self._do_commit()
317        self._is_transaction_active = False

Commits the current transaction.

def rollback(self) -> None:
319    def rollback(self) -> None:
320        self._do_rollback()
321        self._is_transaction_active = False

Rolls back the current transaction.

is_transaction_active: bool
323    @property
324    def is_transaction_active(self) -> bool:
325        return self._is_transaction_active

Returns True if there is an active transaction and False otherwise.

def close_cursor(self) -> None:
327    def close_cursor(self) -> None:
328        _try_close(self._cursor, "cursor")
329        self._cursor = None

Closes the current cursor instance if exists.

def close(self) -> None:
331    def close(self) -> None:
332        _try_close(self._connection, "connection")
333        self._connection = None
334        self._cursor = None
335        self._is_transaction_active = False
336        self._attributes.clear()

Closes the current connection instance if exists.

Note: if there is a cursor instance available it will be closed as well.

def close_all(self, exclude_calling_thread: bool = False) -> None:
338    def close_all(self, exclude_calling_thread: bool = False) -> None:
339        if not exclude_calling_thread:
340            self.close()

Closes all cached cursors and connections.

Arguments:
  • exclude_calling_thread: If set to True excludes cursors and connections associated with the calling thread.
def create_connection_pool( connection_factory: Callable[[], Any], multithreaded: bool, shared_connection: bool = False, cursor_init: Optional[Callable[[Any], NoneType]] = None) -> ConnectionPool:
343def create_connection_pool(
344    connection_factory: t.Callable[[], t.Any],
345    multithreaded: bool,
346    shared_connection: bool = False,
347    cursor_init: t.Optional[t.Callable[[t.Any], None]] = None,
348) -> ConnectionPool:
349    pool_class = (
350        ThreadLocalSharedConnectionPool
351        if multithreaded and shared_connection
352        else ThreadLocalConnectionPool
353        if multithreaded
354        else SingletonConnectionPool
355    )
356    return pool_class(connection_factory, cursor_init=cursor_init)