sqlmesh.core.analytics.dispatcher
1from __future__ import annotations 2 3import abc 4import gzip 5import json 6import logging 7import platform 8import typing as t 9from functools import cached_property 10from threading import Event, Lock, Thread 11from urllib.parse import urljoin 12 13import requests 14from sqlglot import __version__ as sqlglot_version 15 16from sqlmesh.utils.errors import ApiClientError, raise_for_status 17 18TOBIKO_COLLECTOR_URL = "https://analytics.tobikodata.com/v1/" 19 20logger = logging.getLogger(__name__) 21 22 23class EventEmitter: 24 """Emits events to a remote collector.""" 25 26 def __init__( 27 self, 28 base_url: str = TOBIKO_COLLECTOR_URL, 29 read_timeout_sec: int = 10, 30 connect_timeout_sec: int = 10, 31 ): 32 from sqlmesh import __version__ as sqlmesh_version 33 34 self.sqlmesh_url = urljoin(base_url, "sqlmesh/") 35 self.read_timeout = read_timeout_sec 36 self.connect_timeout = connect_timeout_sec 37 38 self._session = requests.Session() 39 self._session.headers.update( 40 { 41 "Content-Type": "application/json", 42 "Content-Encoding": "gzip", 43 "User-Agent": f"SQLMesh/{sqlmesh_version}", 44 } 45 ) 46 47 def emit(self, events: t.List[t.Dict[str, t.Any]]) -> None: 48 data = json.dumps({"events": events, "versions": self._versions}).encode("utf-8") 49 data = gzip.compress(data) 50 response = self._session.post( 51 self.sqlmesh_url, data=data, timeout=(self.connect_timeout, self.read_timeout) 52 ) 53 raise_for_status(response) 54 55 def close(self) -> None: 56 self._session.close() 57 58 @cached_property 59 def _versions(self) -> str: 60 import pydantic 61 62 from sqlmesh import __version__ as sqlmesh_version 63 64 versions = { 65 "sqlmesh_version": sqlmesh_version, 66 "sqlglot_version": sqlglot_version, 67 "python_version": platform.python_version(), 68 "pydantic_version": pydantic.__version__, 69 "os_name": platform.system(), 70 "platform": platform.platform(), 71 } 72 return json.dumps(versions) 73 74 75class EventDispatcher(abc.ABC): 76 """Dispatches events to an emitter.""" 77 78 @abc.abstractmethod 79 def add_event(self, event: t.Dict[str, t.Any]) -> None: 80 """Add an event to be emitted. 81 82 Args: 83 event: The event data. 84 """ 85 86 @abc.abstractmethod 87 def flush(self) -> None: 88 """Flushes the collected events to the emitter.""" 89 90 @abc.abstractmethod 91 def shutdown(self, flush: bool = True) -> None: 92 """Shuts down this dispatcher and releases any resources. 93 94 Args: 95 flush: Whether to flush the collected events before shutting down. 96 """ 97 98 99class AsyncEventDispatcher(EventDispatcher): 100 def __init__( 101 self, 102 emit_interval_sec: int = 5, 103 max_emit_interval_sec: int = 120, 104 max_queue_size: int = 50, 105 emitter: t.Optional[EventEmitter] = None, 106 ): 107 self._emit_interval_sec = emit_interval_sec 108 self._max_emit_interval_sec = max_emit_interval_sec 109 self._max_queue_size = max_queue_size 110 self._emitter = emitter 111 112 self._events: t.List[t.Dict[str, t.Any]] = [] 113 self._events_lock = Lock() 114 115 self._shutdown_event = Event() 116 self._emitter_thread = Thread(target=self._run_flush, name="event-emitter", daemon=True) 117 self._emitter_thread.start() 118 119 @cached_property 120 def emitter(self) -> EventEmitter: 121 return self._emitter or EventEmitter() 122 123 def add_event(self, event: t.Dict[str, t.Any]) -> None: 124 self._add_events([event]) 125 126 def flush(self) -> None: 127 with self._events_lock: 128 events_to_emit = self._events 129 self._events = [] 130 131 if not events_to_emit: 132 return 133 134 logger.debug("Emitting %d events", len(events_to_emit)) 135 try: 136 self.emitter.emit(events_to_emit) 137 except Exception as e: 138 logger.info("Failed to emit events: %s", e) 139 if isinstance(e, ApiClientError): 140 if e.code == 429 and self._emit_interval_sec < self._max_emit_interval_sec: 141 self._emit_interval_sec = min( 142 self._emit_interval_sec * 2, self._max_emit_interval_sec 143 ) 144 logger.debug( 145 "Increasing the emit interval to %s seconds", self._emit_interval_sec 146 ) 147 elif e.code in (400, 403, 404, 405, 426): 148 logger.info( 149 "Non-retriable client error (%s) occurred, shutting down the event dispatcher", 150 e.code, 151 ) 152 self.shutdown(flush=False) 153 return 154 self._add_events(events_to_emit, prepend=True) 155 156 def shutdown(self, flush: bool = True) -> None: 157 if self._shutdown_event.is_set(): 158 return 159 logging.info("Shutting down the event dispatcher") 160 self._shutdown_event.set() 161 self._emitter_thread.join() 162 if flush and self._events: 163 # Reduce the connect timeout to avoid blocking the shutdown. 164 self.emitter.connect_timeout = 2 165 self.flush() 166 self.emitter.close() 167 168 def _add_events(self, events: t.List[t.Dict[str, t.Any]], prepend: bool = False) -> None: 169 with self._events_lock: 170 if not prepend: 171 self._events.extend(events) 172 else: 173 self._events = events + self._events 174 175 if len(self._events) > self._max_queue_size: 176 drop_count = len(self._events) - self._max_queue_size 177 logger.info("Event queue is full, dropping %s events", drop_count) 178 self._events = self._events[drop_count:] 179 180 def _run_flush(self) -> None: 181 while not self._shutdown_event.wait(self._emit_interval_sec): 182 self.flush() 183 184 185class NoopEventDispatcher(EventDispatcher): 186 def add_event(self, event: t.Dict[str, t.Any]) -> None: 187 logger.debug("Analytics is disabled, dropping event") 188 189 def flush(self) -> None: 190 pass 191 192 def shutdown(self, flush: bool = True) -> None: 193 pass
TOBIKO_COLLECTOR_URL =
'https://analytics.tobikodata.com/v1/'
logger =
<Logger sqlmesh.core.analytics.dispatcher (WARNING)>
class
EventEmitter:
24class EventEmitter: 25 """Emits events to a remote collector.""" 26 27 def __init__( 28 self, 29 base_url: str = TOBIKO_COLLECTOR_URL, 30 read_timeout_sec: int = 10, 31 connect_timeout_sec: int = 10, 32 ): 33 from sqlmesh import __version__ as sqlmesh_version 34 35 self.sqlmesh_url = urljoin(base_url, "sqlmesh/") 36 self.read_timeout = read_timeout_sec 37 self.connect_timeout = connect_timeout_sec 38 39 self._session = requests.Session() 40 self._session.headers.update( 41 { 42 "Content-Type": "application/json", 43 "Content-Encoding": "gzip", 44 "User-Agent": f"SQLMesh/{sqlmesh_version}", 45 } 46 ) 47 48 def emit(self, events: t.List[t.Dict[str, t.Any]]) -> None: 49 data = json.dumps({"events": events, "versions": self._versions}).encode("utf-8") 50 data = gzip.compress(data) 51 response = self._session.post( 52 self.sqlmesh_url, data=data, timeout=(self.connect_timeout, self.read_timeout) 53 ) 54 raise_for_status(response) 55 56 def close(self) -> None: 57 self._session.close() 58 59 @cached_property 60 def _versions(self) -> str: 61 import pydantic 62 63 from sqlmesh import __version__ as sqlmesh_version 64 65 versions = { 66 "sqlmesh_version": sqlmesh_version, 67 "sqlglot_version": sqlglot_version, 68 "python_version": platform.python_version(), 69 "pydantic_version": pydantic.__version__, 70 "os_name": platform.system(), 71 "platform": platform.platform(), 72 } 73 return json.dumps(versions)
Emits events to a remote collector.
EventEmitter( base_url: str = 'https://analytics.tobikodata.com/v1/', read_timeout_sec: int = 10, connect_timeout_sec: int = 10)
27 def __init__( 28 self, 29 base_url: str = TOBIKO_COLLECTOR_URL, 30 read_timeout_sec: int = 10, 31 connect_timeout_sec: int = 10, 32 ): 33 from sqlmesh import __version__ as sqlmesh_version 34 35 self.sqlmesh_url = urljoin(base_url, "sqlmesh/") 36 self.read_timeout = read_timeout_sec 37 self.connect_timeout = connect_timeout_sec 38 39 self._session = requests.Session() 40 self._session.headers.update( 41 { 42 "Content-Type": "application/json", 43 "Content-Encoding": "gzip", 44 "User-Agent": f"SQLMesh/{sqlmesh_version}", 45 } 46 )
def
emit(self, events: List[Dict[str, Any]]) -> None:
48 def emit(self, events: t.List[t.Dict[str, t.Any]]) -> None: 49 data = json.dumps({"events": events, "versions": self._versions}).encode("utf-8") 50 data = gzip.compress(data) 51 response = self._session.post( 52 self.sqlmesh_url, data=data, timeout=(self.connect_timeout, self.read_timeout) 53 ) 54 raise_for_status(response)
class
EventDispatcher(abc.ABC):
76class EventDispatcher(abc.ABC): 77 """Dispatches events to an emitter.""" 78 79 @abc.abstractmethod 80 def add_event(self, event: t.Dict[str, t.Any]) -> None: 81 """Add an event to be emitted. 82 83 Args: 84 event: The event data. 85 """ 86 87 @abc.abstractmethod 88 def flush(self) -> None: 89 """Flushes the collected events to the emitter.""" 90 91 @abc.abstractmethod 92 def shutdown(self, flush: bool = True) -> None: 93 """Shuts down this dispatcher and releases any resources. 94 95 Args: 96 flush: Whether to flush the collected events before shutting down. 97 """
Dispatches events to an emitter.
@abc.abstractmethod
def
add_event(self, event: Dict[str, Any]) -> None:
79 @abc.abstractmethod 80 def add_event(self, event: t.Dict[str, t.Any]) -> None: 81 """Add an event to be emitted. 82 83 Args: 84 event: The event data. 85 """
Add an event to be emitted.
Arguments:
- event: The event data.
@abc.abstractmethod
def
flush(self) -> None:
87 @abc.abstractmethod 88 def flush(self) -> None: 89 """Flushes the collected events to the emitter."""
Flushes the collected events to the emitter.
@abc.abstractmethod
def
shutdown(self, flush: bool = True) -> None:
91 @abc.abstractmethod 92 def shutdown(self, flush: bool = True) -> None: 93 """Shuts down this dispatcher and releases any resources. 94 95 Args: 96 flush: Whether to flush the collected events before shutting down. 97 """
Shuts down this dispatcher and releases any resources.
Arguments:
- flush: Whether to flush the collected events before shutting down.
100class AsyncEventDispatcher(EventDispatcher): 101 def __init__( 102 self, 103 emit_interval_sec: int = 5, 104 max_emit_interval_sec: int = 120, 105 max_queue_size: int = 50, 106 emitter: t.Optional[EventEmitter] = None, 107 ): 108 self._emit_interval_sec = emit_interval_sec 109 self._max_emit_interval_sec = max_emit_interval_sec 110 self._max_queue_size = max_queue_size 111 self._emitter = emitter 112 113 self._events: t.List[t.Dict[str, t.Any]] = [] 114 self._events_lock = Lock() 115 116 self._shutdown_event = Event() 117 self._emitter_thread = Thread(target=self._run_flush, name="event-emitter", daemon=True) 118 self._emitter_thread.start() 119 120 @cached_property 121 def emitter(self) -> EventEmitter: 122 return self._emitter or EventEmitter() 123 124 def add_event(self, event: t.Dict[str, t.Any]) -> None: 125 self._add_events([event]) 126 127 def flush(self) -> None: 128 with self._events_lock: 129 events_to_emit = self._events 130 self._events = [] 131 132 if not events_to_emit: 133 return 134 135 logger.debug("Emitting %d events", len(events_to_emit)) 136 try: 137 self.emitter.emit(events_to_emit) 138 except Exception as e: 139 logger.info("Failed to emit events: %s", e) 140 if isinstance(e, ApiClientError): 141 if e.code == 429 and self._emit_interval_sec < self._max_emit_interval_sec: 142 self._emit_interval_sec = min( 143 self._emit_interval_sec * 2, self._max_emit_interval_sec 144 ) 145 logger.debug( 146 "Increasing the emit interval to %s seconds", self._emit_interval_sec 147 ) 148 elif e.code in (400, 403, 404, 405, 426): 149 logger.info( 150 "Non-retriable client error (%s) occurred, shutting down the event dispatcher", 151 e.code, 152 ) 153 self.shutdown(flush=False) 154 return 155 self._add_events(events_to_emit, prepend=True) 156 157 def shutdown(self, flush: bool = True) -> None: 158 if self._shutdown_event.is_set(): 159 return 160 logging.info("Shutting down the event dispatcher") 161 self._shutdown_event.set() 162 self._emitter_thread.join() 163 if flush and self._events: 164 # Reduce the connect timeout to avoid blocking the shutdown. 165 self.emitter.connect_timeout = 2 166 self.flush() 167 self.emitter.close() 168 169 def _add_events(self, events: t.List[t.Dict[str, t.Any]], prepend: bool = False) -> None: 170 with self._events_lock: 171 if not prepend: 172 self._events.extend(events) 173 else: 174 self._events = events + self._events 175 176 if len(self._events) > self._max_queue_size: 177 drop_count = len(self._events) - self._max_queue_size 178 logger.info("Event queue is full, dropping %s events", drop_count) 179 self._events = self._events[drop_count:] 180 181 def _run_flush(self) -> None: 182 while not self._shutdown_event.wait(self._emit_interval_sec): 183 self.flush()
Dispatches events to an emitter.
AsyncEventDispatcher( emit_interval_sec: int = 5, max_emit_interval_sec: int = 120, max_queue_size: int = 50, emitter: Optional[EventEmitter] = None)
101 def __init__( 102 self, 103 emit_interval_sec: int = 5, 104 max_emit_interval_sec: int = 120, 105 max_queue_size: int = 50, 106 emitter: t.Optional[EventEmitter] = None, 107 ): 108 self._emit_interval_sec = emit_interval_sec 109 self._max_emit_interval_sec = max_emit_interval_sec 110 self._max_queue_size = max_queue_size 111 self._emitter = emitter 112 113 self._events: t.List[t.Dict[str, t.Any]] = [] 114 self._events_lock = Lock() 115 116 self._shutdown_event = Event() 117 self._emitter_thread = Thread(target=self._run_flush, name="event-emitter", daemon=True) 118 self._emitter_thread.start()
emitter: EventEmitter
def
add_event(self, event: Dict[str, Any]) -> None:
Add an event to be emitted.
Arguments:
- event: The event data.
def
flush(self) -> None:
127 def flush(self) -> None: 128 with self._events_lock: 129 events_to_emit = self._events 130 self._events = [] 131 132 if not events_to_emit: 133 return 134 135 logger.debug("Emitting %d events", len(events_to_emit)) 136 try: 137 self.emitter.emit(events_to_emit) 138 except Exception as e: 139 logger.info("Failed to emit events: %s", e) 140 if isinstance(e, ApiClientError): 141 if e.code == 429 and self._emit_interval_sec < self._max_emit_interval_sec: 142 self._emit_interval_sec = min( 143 self._emit_interval_sec * 2, self._max_emit_interval_sec 144 ) 145 logger.debug( 146 "Increasing the emit interval to %s seconds", self._emit_interval_sec 147 ) 148 elif e.code in (400, 403, 404, 405, 426): 149 logger.info( 150 "Non-retriable client error (%s) occurred, shutting down the event dispatcher", 151 e.code, 152 ) 153 self.shutdown(flush=False) 154 return 155 self._add_events(events_to_emit, prepend=True)
Flushes the collected events to the emitter.
def
shutdown(self, flush: bool = True) -> None:
157 def shutdown(self, flush: bool = True) -> None: 158 if self._shutdown_event.is_set(): 159 return 160 logging.info("Shutting down the event dispatcher") 161 self._shutdown_event.set() 162 self._emitter_thread.join() 163 if flush and self._events: 164 # Reduce the connect timeout to avoid blocking the shutdown. 165 self.emitter.connect_timeout = 2 166 self.flush() 167 self.emitter.close()
Shuts down this dispatcher and releases any resources.
Arguments:
- flush: Whether to flush the collected events before shutting down.
186class NoopEventDispatcher(EventDispatcher): 187 def add_event(self, event: t.Dict[str, t.Any]) -> None: 188 logger.debug("Analytics is disabled, dropping event") 189 190 def flush(self) -> None: 191 pass 192 193 def shutdown(self, flush: bool = True) -> None: 194 pass
Dispatches events to an emitter.
def
add_event(self, event: Dict[str, Any]) -> None:
187 def add_event(self, event: t.Dict[str, t.Any]) -> None: 188 logger.debug("Analytics is disabled, dropping event")
Add an event to be emitted.
Arguments:
- event: The event data.