Edit on GitHub

sqlmesh.core.analytics.dispatcher

  1from __future__ import annotations
  2
  3import abc
  4import gzip
  5import json
  6import logging
  7import platform
  8import typing as t
  9from functools import cached_property
 10from threading import Event, Lock, Thread
 11from urllib.parse import urljoin
 12
 13import requests
 14from sqlglot import __version__ as sqlglot_version
 15
 16from sqlmesh.utils.errors import ApiClientError, raise_for_status
 17
 18TOBIKO_COLLECTOR_URL = "https://analytics.tobikodata.com/v1/"
 19
 20logger = logging.getLogger(__name__)
 21
 22
 23class EventEmitter:
 24    """Emits events to a remote collector."""
 25
 26    def __init__(
 27        self,
 28        base_url: str = TOBIKO_COLLECTOR_URL,
 29        read_timeout_sec: int = 10,
 30        connect_timeout_sec: int = 10,
 31    ):
 32        from sqlmesh import __version__ as sqlmesh_version
 33
 34        self.sqlmesh_url = urljoin(base_url, "sqlmesh/")
 35        self.read_timeout = read_timeout_sec
 36        self.connect_timeout = connect_timeout_sec
 37
 38        self._session = requests.Session()
 39        self._session.headers.update(
 40            {
 41                "Content-Type": "application/json",
 42                "Content-Encoding": "gzip",
 43                "User-Agent": f"SQLMesh/{sqlmesh_version}",
 44            }
 45        )
 46
 47    def emit(self, events: t.List[t.Dict[str, t.Any]]) -> None:
 48        data = json.dumps({"events": events, "versions": self._versions}).encode("utf-8")
 49        data = gzip.compress(data)
 50        response = self._session.post(
 51            self.sqlmesh_url, data=data, timeout=(self.connect_timeout, self.read_timeout)
 52        )
 53        raise_for_status(response)
 54
 55    def close(self) -> None:
 56        self._session.close()
 57
 58    @cached_property
 59    def _versions(self) -> str:
 60        import pydantic
 61
 62        from sqlmesh import __version__ as sqlmesh_version
 63
 64        versions = {
 65            "sqlmesh_version": sqlmesh_version,
 66            "sqlglot_version": sqlglot_version,
 67            "python_version": platform.python_version(),
 68            "pydantic_version": pydantic.__version__,
 69            "os_name": platform.system(),
 70            "platform": platform.platform(),
 71        }
 72        return json.dumps(versions)
 73
 74
 75class EventDispatcher(abc.ABC):
 76    """Dispatches events to an emitter."""
 77
 78    @abc.abstractmethod
 79    def add_event(self, event: t.Dict[str, t.Any]) -> None:
 80        """Add an event to be emitted.
 81
 82        Args:
 83            event: The event data.
 84        """
 85
 86    @abc.abstractmethod
 87    def flush(self) -> None:
 88        """Flushes the collected events to the emitter."""
 89
 90    @abc.abstractmethod
 91    def shutdown(self, flush: bool = True) -> None:
 92        """Shuts down this dispatcher and releases any resources.
 93
 94        Args:
 95            flush: Whether to flush the collected events before shutting down.
 96        """
 97
 98
 99class AsyncEventDispatcher(EventDispatcher):
100    def __init__(
101        self,
102        emit_interval_sec: int = 5,
103        max_emit_interval_sec: int = 120,
104        max_queue_size: int = 50,
105        emitter: t.Optional[EventEmitter] = None,
106    ):
107        self._emit_interval_sec = emit_interval_sec
108        self._max_emit_interval_sec = max_emit_interval_sec
109        self._max_queue_size = max_queue_size
110        self._emitter = emitter
111
112        self._events: t.List[t.Dict[str, t.Any]] = []
113        self._events_lock = Lock()
114
115        self._shutdown_event = Event()
116        self._emitter_thread = Thread(target=self._run_flush, name="event-emitter", daemon=True)
117        self._emitter_thread.start()
118
119    @cached_property
120    def emitter(self) -> EventEmitter:
121        return self._emitter or EventEmitter()
122
123    def add_event(self, event: t.Dict[str, t.Any]) -> None:
124        self._add_events([event])
125
126    def flush(self) -> None:
127        with self._events_lock:
128            events_to_emit = self._events
129            self._events = []
130
131        if not events_to_emit:
132            return
133
134        logger.debug("Emitting %d events", len(events_to_emit))
135        try:
136            self.emitter.emit(events_to_emit)
137        except Exception as e:
138            logger.info("Failed to emit events: %s", e)
139            if isinstance(e, ApiClientError):
140                if e.code == 429 and self._emit_interval_sec < self._max_emit_interval_sec:
141                    self._emit_interval_sec = min(
142                        self._emit_interval_sec * 2, self._max_emit_interval_sec
143                    )
144                    logger.debug(
145                        "Increasing the emit interval to %s seconds", self._emit_interval_sec
146                    )
147                elif e.code in (400, 403, 404, 405, 426):
148                    logger.info(
149                        "Non-retriable client error (%s) occurred, shutting down the event dispatcher",
150                        e.code,
151                    )
152                    self.shutdown(flush=False)
153                    return
154            self._add_events(events_to_emit, prepend=True)
155
156    def shutdown(self, flush: bool = True) -> None:
157        if self._shutdown_event.is_set():
158            return
159        logging.info("Shutting down the event dispatcher")
160        self._shutdown_event.set()
161        self._emitter_thread.join()
162        if flush and self._events:
163            # Reduce the connect timeout to avoid blocking the shutdown.
164            self.emitter.connect_timeout = 2
165            self.flush()
166        self.emitter.close()
167
168    def _add_events(self, events: t.List[t.Dict[str, t.Any]], prepend: bool = False) -> None:
169        with self._events_lock:
170            if not prepend:
171                self._events.extend(events)
172            else:
173                self._events = events + self._events
174
175            if len(self._events) > self._max_queue_size:
176                drop_count = len(self._events) - self._max_queue_size
177                logger.info("Event queue is full, dropping %s events", drop_count)
178                self._events = self._events[drop_count:]
179
180    def _run_flush(self) -> None:
181        while not self._shutdown_event.wait(self._emit_interval_sec):
182            self.flush()
183
184
185class NoopEventDispatcher(EventDispatcher):
186    def add_event(self, event: t.Dict[str, t.Any]) -> None:
187        logger.debug("Analytics is disabled, dropping event")
188
189    def flush(self) -> None:
190        pass
191
192    def shutdown(self, flush: bool = True) -> None:
193        pass
TOBIKO_COLLECTOR_URL = 'https://analytics.tobikodata.com/v1/'
logger = <Logger sqlmesh.core.analytics.dispatcher (WARNING)>
class EventEmitter:
24class EventEmitter:
25    """Emits events to a remote collector."""
26
27    def __init__(
28        self,
29        base_url: str = TOBIKO_COLLECTOR_URL,
30        read_timeout_sec: int = 10,
31        connect_timeout_sec: int = 10,
32    ):
33        from sqlmesh import __version__ as sqlmesh_version
34
35        self.sqlmesh_url = urljoin(base_url, "sqlmesh/")
36        self.read_timeout = read_timeout_sec
37        self.connect_timeout = connect_timeout_sec
38
39        self._session = requests.Session()
40        self._session.headers.update(
41            {
42                "Content-Type": "application/json",
43                "Content-Encoding": "gzip",
44                "User-Agent": f"SQLMesh/{sqlmesh_version}",
45            }
46        )
47
48    def emit(self, events: t.List[t.Dict[str, t.Any]]) -> None:
49        data = json.dumps({"events": events, "versions": self._versions}).encode("utf-8")
50        data = gzip.compress(data)
51        response = self._session.post(
52            self.sqlmesh_url, data=data, timeout=(self.connect_timeout, self.read_timeout)
53        )
54        raise_for_status(response)
55
56    def close(self) -> None:
57        self._session.close()
58
59    @cached_property
60    def _versions(self) -> str:
61        import pydantic
62
63        from sqlmesh import __version__ as sqlmesh_version
64
65        versions = {
66            "sqlmesh_version": sqlmesh_version,
67            "sqlglot_version": sqlglot_version,
68            "python_version": platform.python_version(),
69            "pydantic_version": pydantic.__version__,
70            "os_name": platform.system(),
71            "platform": platform.platform(),
72        }
73        return json.dumps(versions)

Emits events to a remote collector.

EventEmitter( base_url: str = 'https://analytics.tobikodata.com/v1/', read_timeout_sec: int = 10, connect_timeout_sec: int = 10)
27    def __init__(
28        self,
29        base_url: str = TOBIKO_COLLECTOR_URL,
30        read_timeout_sec: int = 10,
31        connect_timeout_sec: int = 10,
32    ):
33        from sqlmesh import __version__ as sqlmesh_version
34
35        self.sqlmesh_url = urljoin(base_url, "sqlmesh/")
36        self.read_timeout = read_timeout_sec
37        self.connect_timeout = connect_timeout_sec
38
39        self._session = requests.Session()
40        self._session.headers.update(
41            {
42                "Content-Type": "application/json",
43                "Content-Encoding": "gzip",
44                "User-Agent": f"SQLMesh/{sqlmesh_version}",
45            }
46        )
sqlmesh_url
read_timeout
connect_timeout
def emit(self, events: List[Dict[str, Any]]) -> None:
48    def emit(self, events: t.List[t.Dict[str, t.Any]]) -> None:
49        data = json.dumps({"events": events, "versions": self._versions}).encode("utf-8")
50        data = gzip.compress(data)
51        response = self._session.post(
52            self.sqlmesh_url, data=data, timeout=(self.connect_timeout, self.read_timeout)
53        )
54        raise_for_status(response)
def close(self) -> None:
56    def close(self) -> None:
57        self._session.close()
class EventDispatcher(abc.ABC):
76class EventDispatcher(abc.ABC):
77    """Dispatches events to an emitter."""
78
79    @abc.abstractmethod
80    def add_event(self, event: t.Dict[str, t.Any]) -> None:
81        """Add an event to be emitted.
82
83        Args:
84            event: The event data.
85        """
86
87    @abc.abstractmethod
88    def flush(self) -> None:
89        """Flushes the collected events to the emitter."""
90
91    @abc.abstractmethod
92    def shutdown(self, flush: bool = True) -> None:
93        """Shuts down this dispatcher and releases any resources.
94
95        Args:
96            flush: Whether to flush the collected events before shutting down.
97        """

Dispatches events to an emitter.

@abc.abstractmethod
def add_event(self, event: Dict[str, Any]) -> None:
79    @abc.abstractmethod
80    def add_event(self, event: t.Dict[str, t.Any]) -> None:
81        """Add an event to be emitted.
82
83        Args:
84            event: The event data.
85        """

Add an event to be emitted.

Arguments:
  • event: The event data.
@abc.abstractmethod
def flush(self) -> None:
87    @abc.abstractmethod
88    def flush(self) -> None:
89        """Flushes the collected events to the emitter."""

Flushes the collected events to the emitter.

@abc.abstractmethod
def shutdown(self, flush: bool = True) -> None:
91    @abc.abstractmethod
92    def shutdown(self, flush: bool = True) -> None:
93        """Shuts down this dispatcher and releases any resources.
94
95        Args:
96            flush: Whether to flush the collected events before shutting down.
97        """

Shuts down this dispatcher and releases any resources.

Arguments:
  • flush: Whether to flush the collected events before shutting down.
class AsyncEventDispatcher(EventDispatcher):
100class AsyncEventDispatcher(EventDispatcher):
101    def __init__(
102        self,
103        emit_interval_sec: int = 5,
104        max_emit_interval_sec: int = 120,
105        max_queue_size: int = 50,
106        emitter: t.Optional[EventEmitter] = None,
107    ):
108        self._emit_interval_sec = emit_interval_sec
109        self._max_emit_interval_sec = max_emit_interval_sec
110        self._max_queue_size = max_queue_size
111        self._emitter = emitter
112
113        self._events: t.List[t.Dict[str, t.Any]] = []
114        self._events_lock = Lock()
115
116        self._shutdown_event = Event()
117        self._emitter_thread = Thread(target=self._run_flush, name="event-emitter", daemon=True)
118        self._emitter_thread.start()
119
120    @cached_property
121    def emitter(self) -> EventEmitter:
122        return self._emitter or EventEmitter()
123
124    def add_event(self, event: t.Dict[str, t.Any]) -> None:
125        self._add_events([event])
126
127    def flush(self) -> None:
128        with self._events_lock:
129            events_to_emit = self._events
130            self._events = []
131
132        if not events_to_emit:
133            return
134
135        logger.debug("Emitting %d events", len(events_to_emit))
136        try:
137            self.emitter.emit(events_to_emit)
138        except Exception as e:
139            logger.info("Failed to emit events: %s", e)
140            if isinstance(e, ApiClientError):
141                if e.code == 429 and self._emit_interval_sec < self._max_emit_interval_sec:
142                    self._emit_interval_sec = min(
143                        self._emit_interval_sec * 2, self._max_emit_interval_sec
144                    )
145                    logger.debug(
146                        "Increasing the emit interval to %s seconds", self._emit_interval_sec
147                    )
148                elif e.code in (400, 403, 404, 405, 426):
149                    logger.info(
150                        "Non-retriable client error (%s) occurred, shutting down the event dispatcher",
151                        e.code,
152                    )
153                    self.shutdown(flush=False)
154                    return
155            self._add_events(events_to_emit, prepend=True)
156
157    def shutdown(self, flush: bool = True) -> None:
158        if self._shutdown_event.is_set():
159            return
160        logging.info("Shutting down the event dispatcher")
161        self._shutdown_event.set()
162        self._emitter_thread.join()
163        if flush and self._events:
164            # Reduce the connect timeout to avoid blocking the shutdown.
165            self.emitter.connect_timeout = 2
166            self.flush()
167        self.emitter.close()
168
169    def _add_events(self, events: t.List[t.Dict[str, t.Any]], prepend: bool = False) -> None:
170        with self._events_lock:
171            if not prepend:
172                self._events.extend(events)
173            else:
174                self._events = events + self._events
175
176            if len(self._events) > self._max_queue_size:
177                drop_count = len(self._events) - self._max_queue_size
178                logger.info("Event queue is full, dropping %s events", drop_count)
179                self._events = self._events[drop_count:]
180
181    def _run_flush(self) -> None:
182        while not self._shutdown_event.wait(self._emit_interval_sec):
183            self.flush()

Dispatches events to an emitter.

AsyncEventDispatcher( emit_interval_sec: int = 5, max_emit_interval_sec: int = 120, max_queue_size: int = 50, emitter: Optional[EventEmitter] = None)
101    def __init__(
102        self,
103        emit_interval_sec: int = 5,
104        max_emit_interval_sec: int = 120,
105        max_queue_size: int = 50,
106        emitter: t.Optional[EventEmitter] = None,
107    ):
108        self._emit_interval_sec = emit_interval_sec
109        self._max_emit_interval_sec = max_emit_interval_sec
110        self._max_queue_size = max_queue_size
111        self._emitter = emitter
112
113        self._events: t.List[t.Dict[str, t.Any]] = []
114        self._events_lock = Lock()
115
116        self._shutdown_event = Event()
117        self._emitter_thread = Thread(target=self._run_flush, name="event-emitter", daemon=True)
118        self._emitter_thread.start()
emitter: EventEmitter
120    @cached_property
121    def emitter(self) -> EventEmitter:
122        return self._emitter or EventEmitter()
def add_event(self, event: Dict[str, Any]) -> None:
124    def add_event(self, event: t.Dict[str, t.Any]) -> None:
125        self._add_events([event])

Add an event to be emitted.

Arguments:
  • event: The event data.
def flush(self) -> None:
127    def flush(self) -> None:
128        with self._events_lock:
129            events_to_emit = self._events
130            self._events = []
131
132        if not events_to_emit:
133            return
134
135        logger.debug("Emitting %d events", len(events_to_emit))
136        try:
137            self.emitter.emit(events_to_emit)
138        except Exception as e:
139            logger.info("Failed to emit events: %s", e)
140            if isinstance(e, ApiClientError):
141                if e.code == 429 and self._emit_interval_sec < self._max_emit_interval_sec:
142                    self._emit_interval_sec = min(
143                        self._emit_interval_sec * 2, self._max_emit_interval_sec
144                    )
145                    logger.debug(
146                        "Increasing the emit interval to %s seconds", self._emit_interval_sec
147                    )
148                elif e.code in (400, 403, 404, 405, 426):
149                    logger.info(
150                        "Non-retriable client error (%s) occurred, shutting down the event dispatcher",
151                        e.code,
152                    )
153                    self.shutdown(flush=False)
154                    return
155            self._add_events(events_to_emit, prepend=True)

Flushes the collected events to the emitter.

def shutdown(self, flush: bool = True) -> None:
157    def shutdown(self, flush: bool = True) -> None:
158        if self._shutdown_event.is_set():
159            return
160        logging.info("Shutting down the event dispatcher")
161        self._shutdown_event.set()
162        self._emitter_thread.join()
163        if flush and self._events:
164            # Reduce the connect timeout to avoid blocking the shutdown.
165            self.emitter.connect_timeout = 2
166            self.flush()
167        self.emitter.close()

Shuts down this dispatcher and releases any resources.

Arguments:
  • flush: Whether to flush the collected events before shutting down.
class NoopEventDispatcher(EventDispatcher):
186class NoopEventDispatcher(EventDispatcher):
187    def add_event(self, event: t.Dict[str, t.Any]) -> None:
188        logger.debug("Analytics is disabled, dropping event")
189
190    def flush(self) -> None:
191        pass
192
193    def shutdown(self, flush: bool = True) -> None:
194        pass

Dispatches events to an emitter.

def add_event(self, event: Dict[str, Any]) -> None:
187    def add_event(self, event: t.Dict[str, t.Any]) -> None:
188        logger.debug("Analytics is disabled, dropping event")

Add an event to be emitted.

Arguments:
  • event: The event data.
def flush(self) -> None:
190    def flush(self) -> None:
191        pass

Flushes the collected events to the emitter.

def shutdown(self, flush: bool = True) -> None:
193    def shutdown(self, flush: bool = True) -> None:
194        pass

Shuts down this dispatcher and releases any resources.

Arguments:
  • flush: Whether to flush the collected events before shutting down.