sqlmesh.utils.connection_pool
1import abc 2import logging 3import typing as t 4from collections import defaultdict 5from threading import Lock, get_ident 6 7logger = logging.getLogger(__name__) 8 9 10class ConnectionPool(abc.ABC): 11 @abc.abstractmethod 12 def get_cursor(self) -> t.Any: 13 """Returns cached cursor instance. 14 15 Automatically creates a new instance if one is not available. 16 17 Returns: 18 A cursor instance. 19 """ 20 21 @abc.abstractmethod 22 def get(self) -> t.Any: 23 """Returns cached connection instance. 24 25 Automatically opens a new connection if one is not available. 26 27 Returns: 28 A connection instance. 29 """ 30 31 @abc.abstractmethod 32 def get_attribute(self, key: str) -> t.Optional[t.Any]: 33 """Returns an attribute associated with the connection. 34 35 Args: 36 key: Attribute key. 37 38 Returns: 39 Attribute value or None if not found. 40 """ 41 42 @abc.abstractmethod 43 def set_attribute(self, key: str, value: t.Any) -> None: 44 """Sets an attribute associated with the connection. 45 46 Args: 47 key: Attribute key. 48 value: Attribute value. 49 """ 50 51 @abc.abstractmethod 52 def begin(self) -> None: 53 """Starts a new transaction.""" 54 55 @abc.abstractmethod 56 def commit(self) -> None: 57 """Commits the current transaction.""" 58 59 @abc.abstractmethod 60 def rollback(self) -> None: 61 """Rolls back the current transaction.""" 62 63 @property 64 @abc.abstractmethod 65 def is_transaction_active(self) -> bool: 66 """Returns True if there is an active transaction and False otherwise.""" 67 68 @abc.abstractmethod 69 def close_cursor(self) -> None: 70 """Closes the current cursor instance if exists.""" 71 72 @abc.abstractmethod 73 def close(self) -> None: 74 """Closes the current connection instance if exists. 75 76 Note: if there is a cursor instance available it will be closed as well. 77 """ 78 79 @abc.abstractmethod 80 def close_all(self, exclude_calling_thread: bool = False) -> None: 81 """Closes all cached cursors and connections. 82 83 Args: 84 exclude_calling_thread: If set to True excludes cursors and connections associated 85 with the calling thread. 86 """ 87 88 89class _TransactionManagementMixin(ConnectionPool): 90 def _do_begin(self) -> None: 91 cursor = self.get_cursor() 92 if hasattr(cursor, "begin"): 93 cursor.begin() 94 else: 95 conn = self.get() 96 if hasattr(conn, "begin"): 97 conn.begin() 98 99 def _do_commit(self) -> None: 100 cursor = self.get_cursor() 101 if hasattr(cursor, "commit"): 102 cursor.commit() 103 else: 104 self.get().commit() 105 106 def _do_rollback(self) -> None: 107 cursor = self.get_cursor() 108 if hasattr(cursor, "rollback"): 109 cursor.rollback() 110 else: 111 self.get().rollback() 112 113 114class ThreadLocalConnectionPool(_TransactionManagementMixin): 115 def __init__( 116 self, 117 connection_factory: t.Callable[[], t.Any], 118 cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None, 119 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 120 ): 121 self._connection_factory = connection_factory 122 self._thread_connections: t.Dict[t.Hashable, t.Any] = {} 123 self._thread_cursors: t.Dict[t.Hashable, t.Any] = {} 124 self._thread_transactions: t.Set[t.Hashable] = set() 125 self._thread_attributes: t.Dict[t.Hashable, t.Dict[str, t.Any]] = defaultdict(dict) 126 self._thread_connections_lock = Lock() 127 self._thread_cursors_lock = Lock() 128 self._thread_transactions_lock = Lock() 129 self._cursor_kwargs = cursor_kwargs or {} 130 self._cursor_init = cursor_init 131 132 def get_cursor(self) -> t.Any: 133 thread_id = get_ident() 134 with self._thread_cursors_lock: 135 if thread_id not in self._thread_cursors: 136 self._thread_cursors[thread_id] = self.get().cursor(**self._cursor_kwargs) 137 if self._cursor_init: 138 self._cursor_init(self._thread_cursors[thread_id]) 139 return self._thread_cursors[thread_id] 140 141 def get(self) -> t.Any: 142 thread_id = get_ident() 143 with self._thread_connections_lock: 144 if thread_id not in self._thread_connections: 145 self._thread_connections[thread_id] = self._connection_factory() 146 return self._thread_connections[thread_id] 147 148 def get_attribute(self, key: str) -> t.Optional[t.Any]: 149 thread_id = get_ident() 150 return self._thread_attributes[thread_id].get(key) 151 152 def set_attribute(self, key: str, value: t.Any) -> None: 153 thread_id = get_ident() 154 self._thread_attributes[thread_id][key] = value 155 156 def begin(self) -> None: 157 self._do_begin() 158 with self._thread_transactions_lock: 159 self._thread_transactions.add(get_ident()) 160 161 def commit(self) -> None: 162 self._do_commit() 163 self._discard_transaction(get_ident()) 164 165 def rollback(self) -> None: 166 self._do_rollback() 167 self._discard_transaction(get_ident()) 168 169 @property 170 def is_transaction_active(self) -> bool: 171 with self._thread_transactions_lock: 172 return get_ident() in self._thread_transactions 173 174 def close_cursor(self) -> None: 175 thread_id = get_ident() 176 with self._thread_cursors_lock: 177 if thread_id in self._thread_cursors: 178 _try_close(self._thread_cursors[thread_id], "cursor") 179 self._thread_cursors.pop(thread_id) 180 181 def close(self) -> None: 182 thread_id = get_ident() 183 with self._thread_cursors_lock, self._thread_connections_lock: 184 if thread_id in self._thread_connections: 185 _try_close(self._thread_connections[thread_id], "connection") 186 self._thread_connections.pop(thread_id) 187 self._thread_cursors.pop(thread_id, None) 188 self._discard_transaction(thread_id) 189 self._thread_attributes.pop(thread_id, None) 190 191 def close_all(self, exclude_calling_thread: bool = False) -> None: 192 calling_thread_id = get_ident() 193 with self._thread_cursors_lock, self._thread_connections_lock: 194 for thread_id, connection in self._thread_connections.copy().items(): 195 if not exclude_calling_thread or thread_id != calling_thread_id: 196 # NOTE: the access to the connection instance itself is not thread-safe here. 197 _try_close(connection, "connection") 198 self._thread_connections.pop(thread_id) 199 self._thread_cursors.pop(thread_id, None) 200 self._discard_transaction(thread_id) 201 self._thread_attributes.pop(thread_id, None) 202 203 def _discard_transaction(self, thread_id: t.Hashable) -> None: 204 with self._thread_transactions_lock: 205 self._thread_transactions.discard(thread_id) 206 207 208class SingletonConnectionPool(_TransactionManagementMixin): 209 def __init__( 210 self, 211 connection_factory: t.Callable[[], t.Any], 212 cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None, 213 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 214 ): 215 self._connection_factory = connection_factory 216 self._connection: t.Optional[t.Any] = None 217 self._cursor: t.Optional[t.Any] = None 218 self._cursor_kwargs = cursor_kwargs or {} 219 self._attributes: t.Dict[str, t.Any] = {} 220 self._is_transaction_active: bool = False 221 self._cursor_init = cursor_init 222 223 def get_cursor(self) -> t.Any: 224 if not self._cursor: 225 self._cursor = self.get().cursor(**self._cursor_kwargs) 226 if self._cursor_init: 227 self._cursor_init(self._cursor) 228 return self._cursor 229 230 def get(self) -> t.Any: 231 if not self._connection: 232 self._connection = self._connection_factory() 233 return self._connection 234 235 def get_attribute(self, key: str) -> t.Optional[t.Any]: 236 return self._attributes.get(key) 237 238 def set_attribute(self, key: str, value: t.Any) -> None: 239 self._attributes[key] = value 240 241 def begin(self) -> None: 242 self._do_begin() 243 self._is_transaction_active = True 244 245 def commit(self) -> None: 246 self._do_commit() 247 self._is_transaction_active = False 248 249 def rollback(self) -> None: 250 self._do_rollback() 251 self._is_transaction_active = False 252 253 @property 254 def is_transaction_active(self) -> bool: 255 return self._is_transaction_active 256 257 def close_cursor(self) -> None: 258 _try_close(self._cursor, "cursor") 259 self._cursor = None 260 261 def close(self) -> None: 262 _try_close(self._connection, "connection") 263 self._connection = None 264 self._cursor = None 265 self._is_transaction_active = False 266 self._attributes.clear() 267 268 def close_all(self, exclude_calling_thread: bool = False) -> None: 269 if not exclude_calling_thread: 270 self.close() 271 272 273def create_connection_pool( 274 connection_factory: t.Callable[[], t.Any], 275 multithreaded: bool, 276 cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None, 277 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 278) -> ConnectionPool: 279 return ( 280 ThreadLocalConnectionPool( 281 connection_factory, cursor_kwargs=cursor_kwargs, cursor_init=cursor_init 282 ) 283 if multithreaded 284 else SingletonConnectionPool( 285 connection_factory, cursor_kwargs=cursor_kwargs, cursor_init=cursor_init 286 ) 287 ) 288 289 290def _try_close(closeable: t.Any, kind: str) -> None: 291 if closeable is None: 292 return 293 try: 294 closeable.close() 295 except Exception: 296 logger.exception("Failed to close %s", kind)
11class ConnectionPool(abc.ABC): 12 @abc.abstractmethod 13 def get_cursor(self) -> t.Any: 14 """Returns cached cursor instance. 15 16 Automatically creates a new instance if one is not available. 17 18 Returns: 19 A cursor instance. 20 """ 21 22 @abc.abstractmethod 23 def get(self) -> t.Any: 24 """Returns cached connection instance. 25 26 Automatically opens a new connection if one is not available. 27 28 Returns: 29 A connection instance. 30 """ 31 32 @abc.abstractmethod 33 def get_attribute(self, key: str) -> t.Optional[t.Any]: 34 """Returns an attribute associated with the connection. 35 36 Args: 37 key: Attribute key. 38 39 Returns: 40 Attribute value or None if not found. 41 """ 42 43 @abc.abstractmethod 44 def set_attribute(self, key: str, value: t.Any) -> None: 45 """Sets an attribute associated with the connection. 46 47 Args: 48 key: Attribute key. 49 value: Attribute value. 50 """ 51 52 @abc.abstractmethod 53 def begin(self) -> None: 54 """Starts a new transaction.""" 55 56 @abc.abstractmethod 57 def commit(self) -> None: 58 """Commits the current transaction.""" 59 60 @abc.abstractmethod 61 def rollback(self) -> None: 62 """Rolls back the current transaction.""" 63 64 @property 65 @abc.abstractmethod 66 def is_transaction_active(self) -> bool: 67 """Returns True if there is an active transaction and False otherwise.""" 68 69 @abc.abstractmethod 70 def close_cursor(self) -> None: 71 """Closes the current cursor instance if exists.""" 72 73 @abc.abstractmethod 74 def close(self) -> None: 75 """Closes the current connection instance if exists. 76 77 Note: if there is a cursor instance available it will be closed as well. 78 """ 79 80 @abc.abstractmethod 81 def close_all(self, exclude_calling_thread: bool = False) -> None: 82 """Closes all cached cursors and connections. 83 84 Args: 85 exclude_calling_thread: If set to True excludes cursors and connections associated 86 with the calling thread. 87 """
Helper class that provides a standard way to create an ABC using inheritance.
12 @abc.abstractmethod 13 def get_cursor(self) -> t.Any: 14 """Returns cached cursor instance. 15 16 Automatically creates a new instance if one is not available. 17 18 Returns: 19 A cursor instance. 20 """
Returns cached cursor instance.
Automatically creates a new instance if one is not available.
Returns:
A cursor instance.
22 @abc.abstractmethod 23 def get(self) -> t.Any: 24 """Returns cached connection instance. 25 26 Automatically opens a new connection if one is not available. 27 28 Returns: 29 A connection instance. 30 """
Returns cached connection instance.
Automatically opens a new connection if one is not available.
Returns:
A connection instance.
32 @abc.abstractmethod 33 def get_attribute(self, key: str) -> t.Optional[t.Any]: 34 """Returns an attribute associated with the connection. 35 36 Args: 37 key: Attribute key. 38 39 Returns: 40 Attribute value or None if not found. 41 """
Returns an attribute associated with the connection.
Arguments:
- key: Attribute key.
Returns:
Attribute value or None if not found.
43 @abc.abstractmethod 44 def set_attribute(self, key: str, value: t.Any) -> None: 45 """Sets an attribute associated with the connection. 46 47 Args: 48 key: Attribute key. 49 value: Attribute value. 50 """
Sets an attribute associated with the connection.
Arguments:
- key: Attribute key.
- value: Attribute value.
69 @abc.abstractmethod 70 def close_cursor(self) -> None: 71 """Closes the current cursor instance if exists."""
Closes the current cursor instance if exists.
73 @abc.abstractmethod 74 def close(self) -> None: 75 """Closes the current connection instance if exists. 76 77 Note: if there is a cursor instance available it will be closed as well. 78 """
Closes the current connection instance if exists.
Note: if there is a cursor instance available it will be closed as well.
80 @abc.abstractmethod 81 def close_all(self, exclude_calling_thread: bool = False) -> None: 82 """Closes all cached cursors and connections. 83 84 Args: 85 exclude_calling_thread: If set to True excludes cursors and connections associated 86 with the calling thread. 87 """
Closes all cached cursors and connections.
Arguments:
- exclude_calling_thread: If set to True excludes cursors and connections associated with the calling thread.
115class ThreadLocalConnectionPool(_TransactionManagementMixin): 116 def __init__( 117 self, 118 connection_factory: t.Callable[[], t.Any], 119 cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None, 120 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 121 ): 122 self._connection_factory = connection_factory 123 self._thread_connections: t.Dict[t.Hashable, t.Any] = {} 124 self._thread_cursors: t.Dict[t.Hashable, t.Any] = {} 125 self._thread_transactions: t.Set[t.Hashable] = set() 126 self._thread_attributes: t.Dict[t.Hashable, t.Dict[str, t.Any]] = defaultdict(dict) 127 self._thread_connections_lock = Lock() 128 self._thread_cursors_lock = Lock() 129 self._thread_transactions_lock = Lock() 130 self._cursor_kwargs = cursor_kwargs or {} 131 self._cursor_init = cursor_init 132 133 def get_cursor(self) -> t.Any: 134 thread_id = get_ident() 135 with self._thread_cursors_lock: 136 if thread_id not in self._thread_cursors: 137 self._thread_cursors[thread_id] = self.get().cursor(**self._cursor_kwargs) 138 if self._cursor_init: 139 self._cursor_init(self._thread_cursors[thread_id]) 140 return self._thread_cursors[thread_id] 141 142 def get(self) -> t.Any: 143 thread_id = get_ident() 144 with self._thread_connections_lock: 145 if thread_id not in self._thread_connections: 146 self._thread_connections[thread_id] = self._connection_factory() 147 return self._thread_connections[thread_id] 148 149 def get_attribute(self, key: str) -> t.Optional[t.Any]: 150 thread_id = get_ident() 151 return self._thread_attributes[thread_id].get(key) 152 153 def set_attribute(self, key: str, value: t.Any) -> None: 154 thread_id = get_ident() 155 self._thread_attributes[thread_id][key] = value 156 157 def begin(self) -> None: 158 self._do_begin() 159 with self._thread_transactions_lock: 160 self._thread_transactions.add(get_ident()) 161 162 def commit(self) -> None: 163 self._do_commit() 164 self._discard_transaction(get_ident()) 165 166 def rollback(self) -> None: 167 self._do_rollback() 168 self._discard_transaction(get_ident()) 169 170 @property 171 def is_transaction_active(self) -> bool: 172 with self._thread_transactions_lock: 173 return get_ident() in self._thread_transactions 174 175 def close_cursor(self) -> None: 176 thread_id = get_ident() 177 with self._thread_cursors_lock: 178 if thread_id in self._thread_cursors: 179 _try_close(self._thread_cursors[thread_id], "cursor") 180 self._thread_cursors.pop(thread_id) 181 182 def close(self) -> None: 183 thread_id = get_ident() 184 with self._thread_cursors_lock, self._thread_connections_lock: 185 if thread_id in self._thread_connections: 186 _try_close(self._thread_connections[thread_id], "connection") 187 self._thread_connections.pop(thread_id) 188 self._thread_cursors.pop(thread_id, None) 189 self._discard_transaction(thread_id) 190 self._thread_attributes.pop(thread_id, None) 191 192 def close_all(self, exclude_calling_thread: bool = False) -> None: 193 calling_thread_id = get_ident() 194 with self._thread_cursors_lock, self._thread_connections_lock: 195 for thread_id, connection in self._thread_connections.copy().items(): 196 if not exclude_calling_thread or thread_id != calling_thread_id: 197 # NOTE: the access to the connection instance itself is not thread-safe here. 198 _try_close(connection, "connection") 199 self._thread_connections.pop(thread_id) 200 self._thread_cursors.pop(thread_id, None) 201 self._discard_transaction(thread_id) 202 self._thread_attributes.pop(thread_id, None) 203 204 def _discard_transaction(self, thread_id: t.Hashable) -> None: 205 with self._thread_transactions_lock: 206 self._thread_transactions.discard(thread_id)
Helper class that provides a standard way to create an ABC using inheritance.
116 def __init__( 117 self, 118 connection_factory: t.Callable[[], t.Any], 119 cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None, 120 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 121 ): 122 self._connection_factory = connection_factory 123 self._thread_connections: t.Dict[t.Hashable, t.Any] = {} 124 self._thread_cursors: t.Dict[t.Hashable, t.Any] = {} 125 self._thread_transactions: t.Set[t.Hashable] = set() 126 self._thread_attributes: t.Dict[t.Hashable, t.Dict[str, t.Any]] = defaultdict(dict) 127 self._thread_connections_lock = Lock() 128 self._thread_cursors_lock = Lock() 129 self._thread_transactions_lock = Lock() 130 self._cursor_kwargs = cursor_kwargs or {} 131 self._cursor_init = cursor_init
133 def get_cursor(self) -> t.Any: 134 thread_id = get_ident() 135 with self._thread_cursors_lock: 136 if thread_id not in self._thread_cursors: 137 self._thread_cursors[thread_id] = self.get().cursor(**self._cursor_kwargs) 138 if self._cursor_init: 139 self._cursor_init(self._thread_cursors[thread_id]) 140 return self._thread_cursors[thread_id]
Returns cached cursor instance.
Automatically creates a new instance if one is not available.
Returns:
A cursor instance.
142 def get(self) -> t.Any: 143 thread_id = get_ident() 144 with self._thread_connections_lock: 145 if thread_id not in self._thread_connections: 146 self._thread_connections[thread_id] = self._connection_factory() 147 return self._thread_connections[thread_id]
Returns cached connection instance.
Automatically opens a new connection if one is not available.
Returns:
A connection instance.
149 def get_attribute(self, key: str) -> t.Optional[t.Any]: 150 thread_id = get_ident() 151 return self._thread_attributes[thread_id].get(key)
Returns an attribute associated with the connection.
Arguments:
- key: Attribute key.
Returns:
Attribute value or None if not found.
153 def set_attribute(self, key: str, value: t.Any) -> None: 154 thread_id = get_ident() 155 self._thread_attributes[thread_id][key] = value
Sets an attribute associated with the connection.
Arguments:
- key: Attribute key.
- value: Attribute value.
157 def begin(self) -> None: 158 self._do_begin() 159 with self._thread_transactions_lock: 160 self._thread_transactions.add(get_ident())
Starts a new transaction.
175 def close_cursor(self) -> None: 176 thread_id = get_ident() 177 with self._thread_cursors_lock: 178 if thread_id in self._thread_cursors: 179 _try_close(self._thread_cursors[thread_id], "cursor") 180 self._thread_cursors.pop(thread_id)
Closes the current cursor instance if exists.
182 def close(self) -> None: 183 thread_id = get_ident() 184 with self._thread_cursors_lock, self._thread_connections_lock: 185 if thread_id in self._thread_connections: 186 _try_close(self._thread_connections[thread_id], "connection") 187 self._thread_connections.pop(thread_id) 188 self._thread_cursors.pop(thread_id, None) 189 self._discard_transaction(thread_id) 190 self._thread_attributes.pop(thread_id, None)
Closes the current connection instance if exists.
Note: if there is a cursor instance available it will be closed as well.
192 def close_all(self, exclude_calling_thread: bool = False) -> None: 193 calling_thread_id = get_ident() 194 with self._thread_cursors_lock, self._thread_connections_lock: 195 for thread_id, connection in self._thread_connections.copy().items(): 196 if not exclude_calling_thread or thread_id != calling_thread_id: 197 # NOTE: the access to the connection instance itself is not thread-safe here. 198 _try_close(connection, "connection") 199 self._thread_connections.pop(thread_id) 200 self._thread_cursors.pop(thread_id, None) 201 self._discard_transaction(thread_id) 202 self._thread_attributes.pop(thread_id, None)
Closes all cached cursors and connections.
Arguments:
- exclude_calling_thread: If set to True excludes cursors and connections associated with the calling thread.
209class SingletonConnectionPool(_TransactionManagementMixin): 210 def __init__( 211 self, 212 connection_factory: t.Callable[[], t.Any], 213 cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None, 214 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 215 ): 216 self._connection_factory = connection_factory 217 self._connection: t.Optional[t.Any] = None 218 self._cursor: t.Optional[t.Any] = None 219 self._cursor_kwargs = cursor_kwargs or {} 220 self._attributes: t.Dict[str, t.Any] = {} 221 self._is_transaction_active: bool = False 222 self._cursor_init = cursor_init 223 224 def get_cursor(self) -> t.Any: 225 if not self._cursor: 226 self._cursor = self.get().cursor(**self._cursor_kwargs) 227 if self._cursor_init: 228 self._cursor_init(self._cursor) 229 return self._cursor 230 231 def get(self) -> t.Any: 232 if not self._connection: 233 self._connection = self._connection_factory() 234 return self._connection 235 236 def get_attribute(self, key: str) -> t.Optional[t.Any]: 237 return self._attributes.get(key) 238 239 def set_attribute(self, key: str, value: t.Any) -> None: 240 self._attributes[key] = value 241 242 def begin(self) -> None: 243 self._do_begin() 244 self._is_transaction_active = True 245 246 def commit(self) -> None: 247 self._do_commit() 248 self._is_transaction_active = False 249 250 def rollback(self) -> None: 251 self._do_rollback() 252 self._is_transaction_active = False 253 254 @property 255 def is_transaction_active(self) -> bool: 256 return self._is_transaction_active 257 258 def close_cursor(self) -> None: 259 _try_close(self._cursor, "cursor") 260 self._cursor = None 261 262 def close(self) -> None: 263 _try_close(self._connection, "connection") 264 self._connection = None 265 self._cursor = None 266 self._is_transaction_active = False 267 self._attributes.clear() 268 269 def close_all(self, exclude_calling_thread: bool = False) -> None: 270 if not exclude_calling_thread: 271 self.close()
Helper class that provides a standard way to create an ABC using inheritance.
210 def __init__( 211 self, 212 connection_factory: t.Callable[[], t.Any], 213 cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None, 214 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 215 ): 216 self._connection_factory = connection_factory 217 self._connection: t.Optional[t.Any] = None 218 self._cursor: t.Optional[t.Any] = None 219 self._cursor_kwargs = cursor_kwargs or {} 220 self._attributes: t.Dict[str, t.Any] = {} 221 self._is_transaction_active: bool = False 222 self._cursor_init = cursor_init
224 def get_cursor(self) -> t.Any: 225 if not self._cursor: 226 self._cursor = self.get().cursor(**self._cursor_kwargs) 227 if self._cursor_init: 228 self._cursor_init(self._cursor) 229 return self._cursor
Returns cached cursor instance.
Automatically creates a new instance if one is not available.
Returns:
A cursor instance.
231 def get(self) -> t.Any: 232 if not self._connection: 233 self._connection = self._connection_factory() 234 return self._connection
Returns cached connection instance.
Automatically opens a new connection if one is not available.
Returns:
A connection instance.
Returns an attribute associated with the connection.
Arguments:
- key: Attribute key.
Returns:
Attribute value or None if not found.
Sets an attribute associated with the connection.
Arguments:
- key: Attribute key.
- value: Attribute value.
262 def close(self) -> None: 263 _try_close(self._connection, "connection") 264 self._connection = None 265 self._cursor = None 266 self._is_transaction_active = False 267 self._attributes.clear()
Closes the current connection instance if exists.
Note: if there is a cursor instance available it will be closed as well.
269 def close_all(self, exclude_calling_thread: bool = False) -> None: 270 if not exclude_calling_thread: 271 self.close()
Closes all cached cursors and connections.
Arguments:
- exclude_calling_thread: If set to True excludes cursors and connections associated with the calling thread.
274def create_connection_pool( 275 connection_factory: t.Callable[[], t.Any], 276 multithreaded: bool, 277 cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None, 278 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 279) -> ConnectionPool: 280 return ( 281 ThreadLocalConnectionPool( 282 connection_factory, cursor_kwargs=cursor_kwargs, cursor_init=cursor_init 283 ) 284 if multithreaded 285 else SingletonConnectionPool( 286 connection_factory, cursor_kwargs=cursor_kwargs, cursor_init=cursor_init 287 ) 288 )