panoptes.utils.telemetry package

Submodules

panoptes.utils.telemetry.client module

Client helpers for the telemetry server.

class panoptes.utils.telemetry.client.TelemetryClient(host: str | None = None, port: int | str | None = None, timeout: float = 5.0, session: Any | None = None, base_url: str | None = None)[source]

Bases: object

Simple Python client for the telemetry server.

The client wraps the telemetry HTTP API with small convenience methods for the common lifecycle: check readiness, optionally start a run, emit events, inspect the current materialized view, and stop the run or the server.

start_run activates a run context. After that, post_event(…) calls are associated with the active run and stamped with meta.run_id until stop_run() is called.

current() dict[str, Any][source]

Return the current snapshot for the public telemetry feed.

current_event(event_type: str) dict[str, Any][source]

Return the current envelope for a single event type.

get_run() dict[str, Any][source]

Return the current run metadata.

health() dict[str, Any][source]

Return the telemetry health response.

post_event(event_type: str, data: Any, make_current: bool = True, meta: dict[str, Any] | None = None) dict[str, Any][source]

Post a telemetry event to the current telemetry context.

ready() dict[str, Any][source]

Return the telemetry readiness response.

shutdown() dict[str, Any][source]

Request telemetry server shutdown.

start_run(run_dir: str | None = None, run_id: str | None = None, meta: dict[str, Any] | None = None) dict[str, Any][source]

Start a telemetry run.

Relative run_dir values are resolved by the server under its configured site_dir. If run_dir is omitted, the server uses site_dir/run_id. If run_id is also omitted, the server derives the next numeric run ID from existing run directories under site_dir.

stop_run() dict[str, Any][source]

Stop the current telemetry run.

exception panoptes.utils.telemetry.client.TelemetryClientError(status_code: int, detail: str)[source]

Bases: RuntimeError

Raised when the telemetry server returns an error response.

panoptes.utils.telemetry.server module

Telemetry server implementation.

class panoptes.utils.telemetry.server.ActiveRun(run_dir: Path, run_id: str, meta: dict[str, ~typing.Any]=<factory>, started_at: str = '')[source]

Bases: object

Metadata describing the currently active run.

as_dict() dict[str, Any][source]

Return a JSON-serializable representation of the run.

meta: dict[str, Any]
run_dir: Path
run_id: str
started_at: str
class panoptes.utils.telemetry.server.EventRequest(*, type: str, data: Any, make_current: bool = True, meta: dict[str, ~typing.Any]=<factory>)[source]

Bases: BaseModel

Request body for POST /event.

data: Any
make_current: bool
meta: dict[str, Any]
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

type: str
class panoptes.utils.telemetry.server.RunStartRequest(*, run_dir: str | None = None, run_id: str | None = None, meta: dict[str, ~typing.Any]=<factory>)[source]

Bases: BaseModel

Request body for POST /run/start.

meta: dict[str, Any]
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

run_dir: str | None
run_id: str | None
exception panoptes.utils.telemetry.server.TelemetryConflictError[source]

Bases: RuntimeError

Raised when a requested telemetry action conflicts with server state.

exception panoptes.utils.telemetry.server.TelemetryNotFoundError[source]

Bases: RuntimeError

Raised when a requested telemetry resource does not exist.

class panoptes.utils.telemetry.server.TelemetryService(site_dir: str | Path, now_provider: Callable[[], datetime] | None = None)[source]

Bases: object

Manage telemetry state, file storage, and current snapshots.

append_event(request: EventRequest) dict[str, Any][source]

Append an event to the current telemetry target and update the current view.

Parameters:

request – Event request payload.

Returns:

The NDJSON event envelope.

current_event(event_type: str) dict[str, Any][source]

Return the current envelope for a single event type.

Raises:

TelemetryNotFoundError – If the event type is not present.

current_snapshot() dict[str, Any][source]

Return the materialized current view for the public telemetry feed.

get_run() dict[str, Any][source]

Return the active run metadata.

Raises:

TelemetryNotFoundError – If no run is active.

ready() dict[str, Any][source]

Return a readiness payload.

property run_active: bool

Whether a run is currently active.

start_run(run_dir: str | Path | None = None, meta: dict[str, Any] | None = None, run_id: str | None = None) dict[str, Any][source]

Start a new telemetry run context.

Parameters:
  • run_dir – Directory that will contain telemetry.ndjson. Relative paths are resolved under site_dir. If omitted, site_dir/run_id is used.

  • meta – Optional run metadata to expose via the API.

  • run_id – Optional identifier for the run. If omitted, one is taken from meta["run_id"] or, if that is not provided, the next numeric run directory under site_dir.

Returns:

The active run metadata.

Raises:

TelemetryConflictError – If a run is already active.

stop_run() dict[str, Any][source]

Stop the active telemetry run context.

Returns:

The run metadata that was active before stopping.

Raises:

TelemetryNotFoundError – If no run is active.

panoptes.utils.telemetry.server.create_app(service: TelemetryService) FastAPI[source]

Create a FastAPI telemetry app backed by service.

panoptes.utils.telemetry.server.get_site_day_key(local_dt: datetime) str[source]

Return the site stream day key using the local-time noon boundary.

Parameters:

local_dt – A timezone-aware datetime in the machine’s local timezone.

Returns:

The YYYYMMDD day key for the site stream file.

Raises:

ValueError – If local_dt is naive.

panoptes.utils.telemetry.server.telemetry_server(site_dir: str | Path | None = None, host: str | None = None, port: str | int | None = None, auto_start: bool = True, access_logs: bool | None = None, verbose: bool = False) Process[source]

Start the telemetry server in a separate process.

Parameters:
  • site_dir – Base directory for site stream NDJSON files.

  • host – Host address to bind to. Defaults to localhost or the PANOPTES_TELEMETRY_HOST environment variable.

  • port – Port number to bind to. Defaults to 6562 or the PANOPTES_TELEMETRY_PORT environment variable.

  • auto_start – Whether to start the child process immediately.

  • access_logs – Whether to enable uvicorn access logs.

  • verbose – Whether to enable DEBUG-level server logging.

Returns:

The child process that hosts the telemetry API.

panoptes.utils.telemetry.server.utc_iso_z(now: datetime | None = None) str[source]

Return a UTC ISO-8601 timestamp with a trailing Z.

Module contents

Telemetry utilities for PANOPTES.

class panoptes.utils.telemetry.TelemetryClient(host: str | None = None, port: int | str | None = None, timeout: float = 5.0, session: Any | None = None, base_url: str | None = None)[source]

Bases: object

Simple Python client for the telemetry server.

The client wraps the telemetry HTTP API with small convenience methods for the common lifecycle: check readiness, optionally start a run, emit events, inspect the current materialized view, and stop the run or the server.

start_run activates a run context. After that, post_event(…) calls are associated with the active run and stamped with meta.run_id until stop_run() is called.

current() dict[str, Any][source]

Return the current snapshot for the public telemetry feed.

current_event(event_type: str) dict[str, Any][source]

Return the current envelope for a single event type.

get_run() dict[str, Any][source]

Return the current run metadata.

health() dict[str, Any][source]

Return the telemetry health response.

post_event(event_type: str, data: Any, make_current: bool = True, meta: dict[str, Any] | None = None) dict[str, Any][source]

Post a telemetry event to the current telemetry context.

ready() dict[str, Any][source]

Return the telemetry readiness response.

shutdown() dict[str, Any][source]

Request telemetry server shutdown.

start_run(run_dir: str | None = None, run_id: str | None = None, meta: dict[str, Any] | None = None) dict[str, Any][source]

Start a telemetry run.

Relative run_dir values are resolved by the server under its configured site_dir. If run_dir is omitted, the server uses site_dir/run_id. If run_id is also omitted, the server derives the next numeric run ID from existing run directories under site_dir.

stop_run() dict[str, Any][source]

Stop the current telemetry run.

exception panoptes.utils.telemetry.TelemetryClientError(status_code: int, detail: str)[source]

Bases: RuntimeError

Raised when the telemetry server returns an error response.

class panoptes.utils.telemetry.TelemetryService(site_dir: str | Path, now_provider: Callable[[], datetime] | None = None)[source]

Bases: object

Manage telemetry state, file storage, and current snapshots.

append_event(request: EventRequest) dict[str, Any][source]

Append an event to the current telemetry target and update the current view.

Parameters:

request – Event request payload.

Returns:

The NDJSON event envelope.

current_event(event_type: str) dict[str, Any][source]

Return the current envelope for a single event type.

Raises:

TelemetryNotFoundError – If the event type is not present.

current_snapshot() dict[str, Any][source]

Return the materialized current view for the public telemetry feed.

get_run() dict[str, Any][source]

Return the active run metadata.

Raises:

TelemetryNotFoundError – If no run is active.

ready() dict[str, Any][source]

Return a readiness payload.

property run_active: bool

Whether a run is currently active.

start_run(run_dir: str | Path | None = None, meta: dict[str, Any] | None = None, run_id: str | None = None) dict[str, Any][source]

Start a new telemetry run context.

Parameters:
  • run_dir – Directory that will contain telemetry.ndjson. Relative paths are resolved under site_dir. If omitted, site_dir/run_id is used.

  • meta – Optional run metadata to expose via the API.

  • run_id – Optional identifier for the run. If omitted, one is taken from meta["run_id"] or, if that is not provided, the next numeric run directory under site_dir.

Returns:

The active run metadata.

Raises:

TelemetryConflictError – If a run is already active.

stop_run() dict[str, Any][source]

Stop the active telemetry run context.

Returns:

The run metadata that was active before stopping.

Raises:

TelemetryNotFoundError – If no run is active.

panoptes.utils.telemetry.create_app(service: TelemetryService) FastAPI[source]

Create a FastAPI telemetry app backed by service.

panoptes.utils.telemetry.get_site_day_key(local_dt: datetime) str[source]

Return the site stream day key using the local-time noon boundary.

Parameters:

local_dt – A timezone-aware datetime in the machine’s local timezone.

Returns:

The YYYYMMDD day key for the site stream file.

Raises:

ValueError – If local_dt is naive.

panoptes.utils.telemetry.telemetry_server(site_dir: str | Path | None = None, host: str | None = None, port: str | int | None = None, auto_start: bool = True, access_logs: bool | None = None, verbose: bool = False) Process[source]

Start the telemetry server in a separate process.

Parameters:
  • site_dir – Base directory for site stream NDJSON files.

  • host – Host address to bind to. Defaults to localhost or the PANOPTES_TELEMETRY_HOST environment variable.

  • port – Port number to bind to. Defaults to 6562 or the PANOPTES_TELEMETRY_PORT environment variable.

  • auto_start – Whether to start the child process immediately.

  • access_logs – Whether to enable uvicorn access logs.

  • verbose – Whether to enable DEBUG-level server logging.

Returns:

The child process that hosts the telemetry API.

panoptes.utils.telemetry.utc_iso_z(now: datetime | None = None) str[source]

Return a UTC ISO-8601 timestamp with a trailing Z.