Source code for panoptes.utils.telemetry.client

"""Client helpers for the telemetry server."""

from __future__ import annotations

import os
from typing import Any

import requests


[docs] class TelemetryClientError(RuntimeError): """Raised when the telemetry server returns an error response.""" def __init__(self, status_code: int, detail: str) -> None: """Initialize the error. Args: status_code: HTTP status code returned by the server. detail: Error detail returned by the server. """ self.status_code = status_code self.detail = detail super().__init__(f"Telemetry server error {status_code}: {detail}")
[docs] class TelemetryClient: """Simple Python client for the telemetry server. The client wraps the telemetry HTTP API with small convenience methods for the common lifecycle: check readiness, optionally start a run, emit events, inspect the current materialized view, and stop the run or the server. `start_run` activates a run context. After that, `post_event(...)` calls are associated with the active run and stamped with `meta.run_id` until `stop_run()` is called. """ def __init__( self, host: str | None = None, port: int | str | None = None, timeout: float = 5.0, session: Any | None = None, base_url: str | None = None, ) -> None: """Create a telemetry client. Args: host: Telemetry server host. Defaults to `PANOPTES_TELEMETRY_HOST` or `localhost`. port: Telemetry server port. Defaults to `PANOPTES_TELEMETRY_PORT` or `6562`. timeout: Request timeout in seconds. session: Optional requests-compatible client object for dependency injection. base_url: Optional explicit base URL. If provided, this takes precedence over `host` and `port`. """ resolved_host = host or os.getenv("PANOPTES_TELEMETRY_HOST", "localhost") resolved_port = int(port or os.getenv("PANOPTES_TELEMETRY_PORT", 6562)) self.base_url = (base_url or f"http://{resolved_host}:{resolved_port}").rstrip("/") self.timeout = timeout self._session = session or requests
[docs] def health(self) -> dict[str, Any]: """Return the telemetry health response.""" return self._request("GET", "/health")
[docs] def ready(self) -> dict[str, Any]: """Return the telemetry readiness response.""" return self._request("GET", "/ready")
[docs] def get_run(self) -> dict[str, Any]: """Return the current run metadata.""" return self._request("GET", "/run")
[docs] def start_run( self, run_dir: str | None = None, run_id: str | None = None, meta: dict[str, Any] | None = None, ) -> dict[str, Any]: """Start a telemetry run. Relative `run_dir` values are resolved by the server under its configured `site_dir`. If `run_dir` is omitted, the server uses `site_dir/run_id`. If `run_id` is also omitted, the server derives the next numeric run ID from existing run directories under `site_dir`. """ payload_meta = dict(meta or {}) if run_id is not None: payload_meta["run_id"] = run_id return self._request( "POST", "/run/start", json={"run_dir": run_dir, "run_id": run_id, "meta": payload_meta}, )
[docs] def stop_run(self) -> dict[str, Any]: """Stop the current telemetry run.""" return self._request("POST", "/run/stop")
[docs] def post_event( self, event_type: str, data: Any, make_current: bool = True, meta: dict[str, Any] | None = None, ) -> dict[str, Any]: """Post a telemetry event to the current telemetry context.""" payload = { "type": event_type, "data": data, "make_current": make_current, "meta": meta or {}, } return self._request("POST", "/event", json=payload)
[docs] def current(self) -> dict[str, Any]: """Return the current snapshot for the public telemetry feed.""" return self._request("GET", "/current")
[docs] def current_event(self, event_type: str) -> dict[str, Any]: """Return the current envelope for a single event type.""" return self._request("GET", f"/current/{event_type}")
[docs] def shutdown(self) -> dict[str, Any]: """Request telemetry server shutdown.""" return self._request("POST", "/shutdown")
def _request( self, method: str, path: str, json: dict[str, Any] | None = None, params: dict[str, Any] | None = None, ) -> dict[str, Any]: response = self._session.request( method, f"{self.base_url}{path}", json=json, params=params, timeout=self.timeout, ) if response.status_code >= 400: detail = response.text try: payload = response.json() detail = payload.get("detail", detail) except ValueError: pass raise TelemetryClientError(response.status_code, str(detail)) return response.json()