sqlmesh.utils.connection_pool
1import abc 2import logging 3import typing as t 4from collections import defaultdict 5from threading import Lock, get_ident 6 7logger = logging.getLogger(__name__) 8 9 10class ConnectionPool(abc.ABC): 11 @abc.abstractmethod 12 def get_cursor(self) -> t.Any: 13 """Returns cached cursor instance. 14 15 Automatically creates a new instance if one is not available. 16 17 Returns: 18 A cursor instance. 19 """ 20 21 @abc.abstractmethod 22 def get(self) -> t.Any: 23 """Returns cached connection instance. 24 25 Automatically opens a new connection if one is not available. 26 27 Returns: 28 A connection instance. 29 """ 30 31 @abc.abstractmethod 32 def get_attribute(self, key: str) -> t.Optional[t.Any]: 33 """Returns an attribute associated with the connection. 34 35 Args: 36 key: Attribute key. 37 38 Returns: 39 Attribute value or None if not found. 40 """ 41 42 @abc.abstractmethod 43 def set_attribute(self, key: str, value: t.Any) -> None: 44 """Sets an attribute associated with the connection. 45 46 Args: 47 key: Attribute key. 48 value: Attribute value. 49 """ 50 51 @abc.abstractmethod 52 def get_all_attributes(self, key: str) -> t.List[t.Any]: 53 """Returns all attributes with the given key across all connections/threads. 54 55 Args: 56 key: Attribute key. 57 58 Returns: 59 List of attribute values from all connections/threads. 60 """ 61 62 @abc.abstractmethod 63 def begin(self) -> None: 64 """Starts a new transaction.""" 65 66 @abc.abstractmethod 67 def commit(self) -> None: 68 """Commits the current transaction.""" 69 70 @abc.abstractmethod 71 def rollback(self) -> None: 72 """Rolls back the current transaction.""" 73 74 @property 75 @abc.abstractmethod 76 def is_transaction_active(self) -> bool: 77 """Returns True if there is an active transaction and False otherwise.""" 78 79 @abc.abstractmethod 80 def close_cursor(self) -> None: 81 """Closes the current cursor instance if exists.""" 82 83 @abc.abstractmethod 84 def close(self) -> None: 85 """Closes the current connection instance if exists. 86 87 Note: if there is a cursor instance available it will be closed as well. 88 """ 89 90 @abc.abstractmethod 91 def close_all(self, exclude_calling_thread: bool = False) -> None: 92 """Closes all cached cursors and connections. 93 94 Args: 95 exclude_calling_thread: If set to True excludes cursors and connections associated 96 with the calling thread. 97 """ 98 99 100class _TransactionManagementMixin(ConnectionPool): 101 def _do_begin(self) -> None: 102 cursor = self.get_cursor() 103 if hasattr(cursor, "begin"): 104 cursor.begin() 105 else: 106 conn = self.get() 107 if hasattr(conn, "begin"): 108 conn.begin() 109 110 def _do_commit(self) -> None: 111 cursor = self.get_cursor() 112 if hasattr(cursor, "commit"): 113 cursor.commit() 114 else: 115 self.get().commit() 116 117 def _do_rollback(self) -> None: 118 cursor = self.get_cursor() 119 if hasattr(cursor, "rollback"): 120 cursor.rollback() 121 else: 122 self.get().rollback() 123 124 125class _ThreadLocalBase(_TransactionManagementMixin): 126 def __init__( 127 self, 128 connection_factory: t.Callable[[], t.Any], 129 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 130 ): 131 self._connection_factory = connection_factory 132 self._thread_cursors: t.Dict[t.Hashable, t.Any] = {} 133 self._thread_transactions: t.Set[t.Hashable] = set() 134 self._thread_attributes: t.Dict[t.Hashable, t.Dict[str, t.Any]] = defaultdict(dict) 135 self._thread_cursors_lock = Lock() 136 self._thread_transactions_lock = Lock() 137 self._cursor_init = cursor_init 138 139 def get_cursor(self) -> t.Any: 140 thread_id = get_ident() 141 with self._thread_cursors_lock: 142 if thread_id not in self._thread_cursors: 143 self._thread_cursors[thread_id] = self.get().cursor() 144 if self._cursor_init: 145 self._cursor_init(self._thread_cursors[thread_id]) 146 return self._thread_cursors[thread_id] 147 148 def get_attribute(self, key: str) -> t.Optional[t.Any]: 149 thread_id = get_ident() 150 return self._thread_attributes[thread_id].get(key) 151 152 def set_attribute(self, key: str, value: t.Any) -> None: 153 thread_id = get_ident() 154 self._thread_attributes[thread_id][key] = value 155 156 def get_all_attributes(self, key: str) -> t.List[t.Any]: 157 """Returns all attributes with the given key across all threads.""" 158 return [ 159 thread_attrs[key] 160 for thread_attrs in self._thread_attributes.values() 161 if key in thread_attrs 162 ] 163 164 def begin(self) -> None: 165 self._do_begin() 166 with self._thread_transactions_lock: 167 self._thread_transactions.add(get_ident()) 168 169 def commit(self) -> None: 170 self._do_commit() 171 self._discard_transaction(get_ident()) 172 173 def rollback(self) -> None: 174 self._do_rollback() 175 self._discard_transaction(get_ident()) 176 177 @property 178 def is_transaction_active(self) -> bool: 179 with self._thread_transactions_lock: 180 return get_ident() in self._thread_transactions 181 182 def close_cursor(self) -> None: 183 thread_id = get_ident() 184 with self._thread_cursors_lock: 185 if thread_id in self._thread_cursors: 186 _try_close(self._thread_cursors[thread_id], "cursor") 187 self._thread_cursors.pop(thread_id) 188 189 def _discard_transaction(self, thread_id: t.Hashable) -> None: 190 with self._thread_transactions_lock: 191 self._thread_transactions.discard(thread_id) 192 193 194class ThreadLocalConnectionPool(_ThreadLocalBase): 195 def __init__( 196 self, 197 connection_factory: t.Callable[[], t.Any], 198 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 199 ): 200 super().__init__(connection_factory, cursor_init) 201 self._thread_connections: t.Dict[t.Hashable, t.Any] = {} 202 self._thread_connections_lock = Lock() 203 204 def get(self) -> t.Any: 205 thread_id = get_ident() 206 with self._thread_connections_lock: 207 if thread_id not in self._thread_connections: 208 self._thread_connections[thread_id] = self._connection_factory() 209 return self._thread_connections[thread_id] 210 211 def close(self) -> None: 212 thread_id = get_ident() 213 with self._thread_cursors_lock, self._thread_connections_lock: 214 if thread_id in self._thread_connections: 215 _try_close(self._thread_connections[thread_id], "connection") 216 self._thread_connections.pop(thread_id) 217 self._thread_cursors.pop(thread_id, None) 218 self._discard_transaction(thread_id) 219 self._thread_attributes.pop(thread_id, None) 220 221 def close_all(self, exclude_calling_thread: bool = False) -> None: 222 calling_thread_id = get_ident() 223 with self._thread_cursors_lock, self._thread_connections_lock: 224 for thread_id, connection in self._thread_connections.copy().items(): 225 if not exclude_calling_thread or thread_id != calling_thread_id: 226 _try_close(connection, "connection") 227 self._thread_connections.pop(thread_id) 228 self._thread_cursors.pop(thread_id, None) 229 self._discard_transaction(thread_id) 230 231 self._thread_attributes.clear() 232 233 234class ThreadLocalSharedConnectionPool(_ThreadLocalBase): 235 def __init__( 236 self, 237 connection_factory: t.Callable[[], t.Any], 238 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 239 ): 240 super().__init__(connection_factory, cursor_init) 241 self._connection: t.Optional[t.Any] = None 242 self._connection_lock = Lock() 243 244 def get(self) -> t.Any: 245 with self._connection_lock: 246 if self._connection is None: 247 self._connection = self._connection_factory() 248 return self._connection 249 250 def close(self) -> None: 251 thread_id = get_ident() 252 with self._thread_cursors_lock, self._connection_lock: 253 if thread_id in self._thread_cursors: 254 _try_close(self._thread_cursors[thread_id], "cursor") 255 self._thread_cursors.pop(thread_id) 256 self._discard_transaction(thread_id) 257 self._thread_attributes.pop(thread_id, None) 258 259 def close_all(self, exclude_calling_thread: bool = False) -> None: 260 calling_thread_id = get_ident() 261 with self._thread_cursors_lock, self._connection_lock: 262 for thread_id, cursor in self._thread_cursors.copy().items(): 263 if not exclude_calling_thread or thread_id != calling_thread_id: 264 _try_close(cursor, "cursor") 265 self._thread_cursors.pop(thread_id) 266 self._discard_transaction(thread_id) 267 self._thread_attributes.pop(thread_id, None) 268 269 if not exclude_calling_thread: 270 _try_close(self._connection, "connection") 271 self._connection = None 272 273 274class SingletonConnectionPool(_TransactionManagementMixin): 275 def __init__( 276 self, 277 connection_factory: t.Callable[[], t.Any], 278 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 279 ): 280 self._connection_factory = connection_factory 281 self._connection: t.Optional[t.Any] = None 282 self._cursor: t.Optional[t.Any] = None 283 self._attributes: t.Dict[str, t.Any] = {} 284 self._is_transaction_active: bool = False 285 self._cursor_init = cursor_init 286 287 def get_cursor(self) -> t.Any: 288 if not self._cursor: 289 self._cursor = self.get().cursor() 290 if self._cursor_init: 291 self._cursor_init(self._cursor) 292 return self._cursor 293 294 def get(self) -> t.Any: 295 if not self._connection: 296 self._connection = self._connection_factory() 297 return self._connection 298 299 def get_attribute(self, key: str) -> t.Optional[t.Any]: 300 return self._attributes.get(key) 301 302 def set_attribute(self, key: str, value: t.Any) -> None: 303 self._attributes[key] = value 304 305 def get_all_attributes(self, key: str) -> t.List[t.Any]: 306 """Returns all attributes with the given key (single-threaded pool has at most one).""" 307 value = self._attributes.get(key) 308 return [value] if value is not None else [] 309 310 def begin(self) -> None: 311 self._do_begin() 312 self._is_transaction_active = True 313 314 def commit(self) -> None: 315 self._do_commit() 316 self._is_transaction_active = False 317 318 def rollback(self) -> None: 319 self._do_rollback() 320 self._is_transaction_active = False 321 322 @property 323 def is_transaction_active(self) -> bool: 324 return self._is_transaction_active 325 326 def close_cursor(self) -> None: 327 _try_close(self._cursor, "cursor") 328 self._cursor = None 329 330 def close(self) -> None: 331 _try_close(self._connection, "connection") 332 self._connection = None 333 self._cursor = None 334 self._is_transaction_active = False 335 self._attributes.clear() 336 337 def close_all(self, exclude_calling_thread: bool = False) -> None: 338 if not exclude_calling_thread: 339 self.close() 340 341 342def create_connection_pool( 343 connection_factory: t.Callable[[], t.Any], 344 multithreaded: bool, 345 shared_connection: bool = False, 346 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 347) -> ConnectionPool: 348 pool_class = ( 349 ThreadLocalSharedConnectionPool 350 if multithreaded and shared_connection 351 else ThreadLocalConnectionPool 352 if multithreaded 353 else SingletonConnectionPool 354 ) 355 return pool_class(connection_factory, cursor_init=cursor_init) 356 357 358def _try_close(closeable: t.Any, kind: str) -> None: 359 if closeable is None: 360 return 361 try: 362 closeable.close() 363 except Exception: 364 logger.exception("Failed to close %s", kind)
11class ConnectionPool(abc.ABC): 12 @abc.abstractmethod 13 def get_cursor(self) -> t.Any: 14 """Returns cached cursor instance. 15 16 Automatically creates a new instance if one is not available. 17 18 Returns: 19 A cursor instance. 20 """ 21 22 @abc.abstractmethod 23 def get(self) -> t.Any: 24 """Returns cached connection instance. 25 26 Automatically opens a new connection if one is not available. 27 28 Returns: 29 A connection instance. 30 """ 31 32 @abc.abstractmethod 33 def get_attribute(self, key: str) -> t.Optional[t.Any]: 34 """Returns an attribute associated with the connection. 35 36 Args: 37 key: Attribute key. 38 39 Returns: 40 Attribute value or None if not found. 41 """ 42 43 @abc.abstractmethod 44 def set_attribute(self, key: str, value: t.Any) -> None: 45 """Sets an attribute associated with the connection. 46 47 Args: 48 key: Attribute key. 49 value: Attribute value. 50 """ 51 52 @abc.abstractmethod 53 def get_all_attributes(self, key: str) -> t.List[t.Any]: 54 """Returns all attributes with the given key across all connections/threads. 55 56 Args: 57 key: Attribute key. 58 59 Returns: 60 List of attribute values from all connections/threads. 61 """ 62 63 @abc.abstractmethod 64 def begin(self) -> None: 65 """Starts a new transaction.""" 66 67 @abc.abstractmethod 68 def commit(self) -> None: 69 """Commits the current transaction.""" 70 71 @abc.abstractmethod 72 def rollback(self) -> None: 73 """Rolls back the current transaction.""" 74 75 @property 76 @abc.abstractmethod 77 def is_transaction_active(self) -> bool: 78 """Returns True if there is an active transaction and False otherwise.""" 79 80 @abc.abstractmethod 81 def close_cursor(self) -> None: 82 """Closes the current cursor instance if exists.""" 83 84 @abc.abstractmethod 85 def close(self) -> None: 86 """Closes the current connection instance if exists. 87 88 Note: if there is a cursor instance available it will be closed as well. 89 """ 90 91 @abc.abstractmethod 92 def close_all(self, exclude_calling_thread: bool = False) -> None: 93 """Closes all cached cursors and connections. 94 95 Args: 96 exclude_calling_thread: If set to True excludes cursors and connections associated 97 with the calling thread. 98 """
Helper class that provides a standard way to create an ABC using inheritance.
12 @abc.abstractmethod 13 def get_cursor(self) -> t.Any: 14 """Returns cached cursor instance. 15 16 Automatically creates a new instance if one is not available. 17 18 Returns: 19 A cursor instance. 20 """
Returns cached cursor instance.
Automatically creates a new instance if one is not available.
Returns:
A cursor instance.
22 @abc.abstractmethod 23 def get(self) -> t.Any: 24 """Returns cached connection instance. 25 26 Automatically opens a new connection if one is not available. 27 28 Returns: 29 A connection instance. 30 """
Returns cached connection instance.
Automatically opens a new connection if one is not available.
Returns:
A connection instance.
32 @abc.abstractmethod 33 def get_attribute(self, key: str) -> t.Optional[t.Any]: 34 """Returns an attribute associated with the connection. 35 36 Args: 37 key: Attribute key. 38 39 Returns: 40 Attribute value or None if not found. 41 """
Returns an attribute associated with the connection.
Arguments:
- key: Attribute key.
Returns:
Attribute value or None if not found.
43 @abc.abstractmethod 44 def set_attribute(self, key: str, value: t.Any) -> None: 45 """Sets an attribute associated with the connection. 46 47 Args: 48 key: Attribute key. 49 value: Attribute value. 50 """
Sets an attribute associated with the connection.
Arguments:
- key: Attribute key.
- value: Attribute value.
52 @abc.abstractmethod 53 def get_all_attributes(self, key: str) -> t.List[t.Any]: 54 """Returns all attributes with the given key across all connections/threads. 55 56 Args: 57 key: Attribute key. 58 59 Returns: 60 List of attribute values from all connections/threads. 61 """
Returns all attributes with the given key across all connections/threads.
Arguments:
- key: Attribute key.
Returns:
List of attribute values from all connections/threads.
75 @property 76 @abc.abstractmethod 77 def is_transaction_active(self) -> bool: 78 """Returns True if there is an active transaction and False otherwise."""
Returns True if there is an active transaction and False otherwise.
80 @abc.abstractmethod 81 def close_cursor(self) -> None: 82 """Closes the current cursor instance if exists."""
Closes the current cursor instance if exists.
84 @abc.abstractmethod 85 def close(self) -> None: 86 """Closes the current connection instance if exists. 87 88 Note: if there is a cursor instance available it will be closed as well. 89 """
Closes the current connection instance if exists.
Note: if there is a cursor instance available it will be closed as well.
91 @abc.abstractmethod 92 def close_all(self, exclude_calling_thread: bool = False) -> None: 93 """Closes all cached cursors and connections. 94 95 Args: 96 exclude_calling_thread: If set to True excludes cursors and connections associated 97 with the calling thread. 98 """
Closes all cached cursors and connections.
Arguments:
- exclude_calling_thread: If set to True excludes cursors and connections associated with the calling thread.
195class ThreadLocalConnectionPool(_ThreadLocalBase): 196 def __init__( 197 self, 198 connection_factory: t.Callable[[], t.Any], 199 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 200 ): 201 super().__init__(connection_factory, cursor_init) 202 self._thread_connections: t.Dict[t.Hashable, t.Any] = {} 203 self._thread_connections_lock = Lock() 204 205 def get(self) -> t.Any: 206 thread_id = get_ident() 207 with self._thread_connections_lock: 208 if thread_id not in self._thread_connections: 209 self._thread_connections[thread_id] = self._connection_factory() 210 return self._thread_connections[thread_id] 211 212 def close(self) -> None: 213 thread_id = get_ident() 214 with self._thread_cursors_lock, self._thread_connections_lock: 215 if thread_id in self._thread_connections: 216 _try_close(self._thread_connections[thread_id], "connection") 217 self._thread_connections.pop(thread_id) 218 self._thread_cursors.pop(thread_id, None) 219 self._discard_transaction(thread_id) 220 self._thread_attributes.pop(thread_id, None) 221 222 def close_all(self, exclude_calling_thread: bool = False) -> None: 223 calling_thread_id = get_ident() 224 with self._thread_cursors_lock, self._thread_connections_lock: 225 for thread_id, connection in self._thread_connections.copy().items(): 226 if not exclude_calling_thread or thread_id != calling_thread_id: 227 _try_close(connection, "connection") 228 self._thread_connections.pop(thread_id) 229 self._thread_cursors.pop(thread_id, None) 230 self._discard_transaction(thread_id) 231 232 self._thread_attributes.clear()
Helper class that provides a standard way to create an ABC using inheritance.
196 def __init__( 197 self, 198 connection_factory: t.Callable[[], t.Any], 199 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 200 ): 201 super().__init__(connection_factory, cursor_init) 202 self._thread_connections: t.Dict[t.Hashable, t.Any] = {} 203 self._thread_connections_lock = Lock()
205 def get(self) -> t.Any: 206 thread_id = get_ident() 207 with self._thread_connections_lock: 208 if thread_id not in self._thread_connections: 209 self._thread_connections[thread_id] = self._connection_factory() 210 return self._thread_connections[thread_id]
Returns cached connection instance.
Automatically opens a new connection if one is not available.
Returns:
A connection instance.
212 def close(self) -> None: 213 thread_id = get_ident() 214 with self._thread_cursors_lock, self._thread_connections_lock: 215 if thread_id in self._thread_connections: 216 _try_close(self._thread_connections[thread_id], "connection") 217 self._thread_connections.pop(thread_id) 218 self._thread_cursors.pop(thread_id, None) 219 self._discard_transaction(thread_id) 220 self._thread_attributes.pop(thread_id, None)
Closes the current connection instance if exists.
Note: if there is a cursor instance available it will be closed as well.
222 def close_all(self, exclude_calling_thread: bool = False) -> None: 223 calling_thread_id = get_ident() 224 with self._thread_cursors_lock, self._thread_connections_lock: 225 for thread_id, connection in self._thread_connections.copy().items(): 226 if not exclude_calling_thread or thread_id != calling_thread_id: 227 _try_close(connection, "connection") 228 self._thread_connections.pop(thread_id) 229 self._thread_cursors.pop(thread_id, None) 230 self._discard_transaction(thread_id) 231 232 self._thread_attributes.clear()
Closes all cached cursors and connections.
Arguments:
- exclude_calling_thread: If set to True excludes cursors and connections associated with the calling thread.
275class SingletonConnectionPool(_TransactionManagementMixin): 276 def __init__( 277 self, 278 connection_factory: t.Callable[[], t.Any], 279 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 280 ): 281 self._connection_factory = connection_factory 282 self._connection: t.Optional[t.Any] = None 283 self._cursor: t.Optional[t.Any] = None 284 self._attributes: t.Dict[str, t.Any] = {} 285 self._is_transaction_active: bool = False 286 self._cursor_init = cursor_init 287 288 def get_cursor(self) -> t.Any: 289 if not self._cursor: 290 self._cursor = self.get().cursor() 291 if self._cursor_init: 292 self._cursor_init(self._cursor) 293 return self._cursor 294 295 def get(self) -> t.Any: 296 if not self._connection: 297 self._connection = self._connection_factory() 298 return self._connection 299 300 def get_attribute(self, key: str) -> t.Optional[t.Any]: 301 return self._attributes.get(key) 302 303 def set_attribute(self, key: str, value: t.Any) -> None: 304 self._attributes[key] = value 305 306 def get_all_attributes(self, key: str) -> t.List[t.Any]: 307 """Returns all attributes with the given key (single-threaded pool has at most one).""" 308 value = self._attributes.get(key) 309 return [value] if value is not None else [] 310 311 def begin(self) -> None: 312 self._do_begin() 313 self._is_transaction_active = True 314 315 def commit(self) -> None: 316 self._do_commit() 317 self._is_transaction_active = False 318 319 def rollback(self) -> None: 320 self._do_rollback() 321 self._is_transaction_active = False 322 323 @property 324 def is_transaction_active(self) -> bool: 325 return self._is_transaction_active 326 327 def close_cursor(self) -> None: 328 _try_close(self._cursor, "cursor") 329 self._cursor = None 330 331 def close(self) -> None: 332 _try_close(self._connection, "connection") 333 self._connection = None 334 self._cursor = None 335 self._is_transaction_active = False 336 self._attributes.clear() 337 338 def close_all(self, exclude_calling_thread: bool = False) -> None: 339 if not exclude_calling_thread: 340 self.close()
Helper class that provides a standard way to create an ABC using inheritance.
276 def __init__( 277 self, 278 connection_factory: t.Callable[[], t.Any], 279 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 280 ): 281 self._connection_factory = connection_factory 282 self._connection: t.Optional[t.Any] = None 283 self._cursor: t.Optional[t.Any] = None 284 self._attributes: t.Dict[str, t.Any] = {} 285 self._is_transaction_active: bool = False 286 self._cursor_init = cursor_init
288 def get_cursor(self) -> t.Any: 289 if not self._cursor: 290 self._cursor = self.get().cursor() 291 if self._cursor_init: 292 self._cursor_init(self._cursor) 293 return self._cursor
Returns cached cursor instance.
Automatically creates a new instance if one is not available.
Returns:
A cursor instance.
295 def get(self) -> t.Any: 296 if not self._connection: 297 self._connection = self._connection_factory() 298 return self._connection
Returns cached connection instance.
Automatically opens a new connection if one is not available.
Returns:
A connection instance.
Returns an attribute associated with the connection.
Arguments:
- key: Attribute key.
Returns:
Attribute value or None if not found.
Sets an attribute associated with the connection.
Arguments:
- key: Attribute key.
- value: Attribute value.
306 def get_all_attributes(self, key: str) -> t.List[t.Any]: 307 """Returns all attributes with the given key (single-threaded pool has at most one).""" 308 value = self._attributes.get(key) 309 return [value] if value is not None else []
Returns all attributes with the given key (single-threaded pool has at most one).
331 def close(self) -> None: 332 _try_close(self._connection, "connection") 333 self._connection = None 334 self._cursor = None 335 self._is_transaction_active = False 336 self._attributes.clear()
Closes the current connection instance if exists.
Note: if there is a cursor instance available it will be closed as well.
338 def close_all(self, exclude_calling_thread: bool = False) -> None: 339 if not exclude_calling_thread: 340 self.close()
Closes all cached cursors and connections.
Arguments:
- exclude_calling_thread: If set to True excludes cursors and connections associated with the calling thread.
343def create_connection_pool( 344 connection_factory: t.Callable[[], t.Any], 345 multithreaded: bool, 346 shared_connection: bool = False, 347 cursor_init: t.Optional[t.Callable[[t.Any], None]] = None, 348) -> ConnectionPool: 349 pool_class = ( 350 ThreadLocalSharedConnectionPool 351 if multithreaded and shared_connection 352 else ThreadLocalConnectionPool 353 if multithreaded 354 else SingletonConnectionPool 355 ) 356 return pool_class(connection_factory, cursor_init=cursor_init)