"""Client helpers for the telemetry server."""
from __future__ import annotations
import os
from typing import Any
import requests
[docs]
class TelemetryClientError(RuntimeError):
"""Raised when the telemetry server returns an error response."""
def __init__(self, status_code: int, detail: str) -> None:
"""Initialize the error.
Args:
status_code: HTTP status code returned by the server.
detail: Error detail returned by the server.
"""
self.status_code = status_code
self.detail = detail
super().__init__(f"Telemetry server error {status_code}: {detail}")
[docs]
class TelemetryClient:
"""Simple Python client for the telemetry server.
The client wraps the telemetry HTTP API with small convenience methods for the
common lifecycle: check readiness, optionally start a run, emit events, inspect
the current materialized view, and stop the run or the server.
`start_run` activates a run context. After that, `post_event(...)` calls are
associated with the active run and stamped with `meta.run_id` until
`stop_run()` is called.
"""
def __init__(
self,
host: str | None = None,
port: int | str | None = None,
timeout: float = 5.0,
session: Any | None = None,
base_url: str | None = None,
) -> None:
"""Create a telemetry client.
Args:
host: Telemetry server host. Defaults to `PANOPTES_TELEMETRY_HOST` or `localhost`.
port: Telemetry server port. Defaults to `PANOPTES_TELEMETRY_PORT` or `6562`.
timeout: Request timeout in seconds.
session: Optional requests-compatible client object for dependency injection.
base_url: Optional explicit base URL. If provided, this takes precedence over
`host` and `port`.
"""
resolved_host = host or os.getenv("PANOPTES_TELEMETRY_HOST", "localhost")
resolved_port = int(port or os.getenv("PANOPTES_TELEMETRY_PORT", 6562))
self.base_url = (base_url or f"http://{resolved_host}:{resolved_port}").rstrip("/")
self.timeout = timeout
self._session = session or requests
[docs]
def health(self) -> dict[str, Any]:
"""Return the telemetry health response."""
return self._request("GET", "/health")
[docs]
def ready(self) -> dict[str, Any]:
"""Return the telemetry readiness response."""
return self._request("GET", "/ready")
[docs]
def get_run(self) -> dict[str, Any]:
"""Return the current run metadata."""
return self._request("GET", "/run")
[docs]
def start_run(
self,
run_dir: str | None = None,
run_id: str | None = None,
meta: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Start a telemetry run.
Relative `run_dir` values are resolved by the server under its configured
`site_dir`. If `run_dir` is omitted, the server uses `site_dir/run_id`.
If `run_id` is also omitted, the server derives the next numeric run ID
from existing run directories under `site_dir`.
"""
payload_meta = dict(meta or {})
if run_id is not None:
payload_meta["run_id"] = run_id
return self._request(
"POST",
"/run/start",
json={"run_dir": run_dir, "run_id": run_id, "meta": payload_meta},
)
[docs]
def stop_run(self) -> dict[str, Any]:
"""Stop the current telemetry run."""
return self._request("POST", "/run/stop")
[docs]
def post_event(
self,
event_type: str,
data: Any,
make_current: bool = True,
meta: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Post a telemetry event to the current telemetry context."""
payload = {
"type": event_type,
"data": data,
"make_current": make_current,
"meta": meta or {},
}
return self._request("POST", "/event", json=payload)
[docs]
def current(self) -> dict[str, Any]:
"""Return the current snapshot for the public telemetry feed."""
return self._request("GET", "/current")
[docs]
def current_event(self, event_type: str) -> dict[str, Any]:
"""Return the current envelope for a single event type."""
return self._request("GET", f"/current/{event_type}")
[docs]
def shutdown(self) -> dict[str, Any]:
"""Request telemetry server shutdown."""
return self._request("POST", "/shutdown")
def _request(
self,
method: str,
path: str,
json: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
) -> dict[str, Any]:
response = self._session.request(
method,
f"{self.base_url}{path}",
json=json,
params=params,
timeout=self.timeout,
)
if response.status_code >= 400:
detail = response.text
try:
payload = response.json()
detail = payload.get("detail", detail)
except ValueError:
pass
raise TelemetryClientError(response.status_code, str(detail))
return response.json()