Source code for swxsoc_reach.historical.telemetry

"""Append-only CSV telemetry for the historical UDL download orchestrator.

One row is written per attempt at a per-day download. Older rows for the
same ``chunk_date_utc`` are *not* removed; :meth:`HistoricalTelemetry.load_state`
returns the most-recent row per date (by ``started_at_utc``), which is
how restart/resume decisions are made.
"""

from __future__ import annotations

import csv
import os
from dataclasses import dataclass, fields
from datetime import date, datetime
from pathlib import Path
from typing import Iterable

# Schema column order is the file's source-of-truth for the CSV header.
SCHEMA: tuple[str, ...] = (
    "run_id",
    "chunk_date_utc",
    "window_start_utc",
    "window_end_utc",
    "status",
    # Download Columns
    "records_downloaded",
    "expected_records",
    "availability_pct",
    "download_seconds",
    "csv_size_mb",
    "csv_path",
    "sensor_id",
    "descriptor",
    "output_format",
    "error_type",
    "error_message",
    "started_at_utc",
    "finished_at_utc",
    # Processing Columns
    "process_seconds",
    "cdf_size_mb",
    "cdf_path",
    # Upload Columns
    "upload_seconds",
    "s3_bucket",
    "s3_key",
)

# Status values for the download phase.
STATUS_DOWNLOAD_PENDING = "DOWNLOAD_PENDING"
STATUS_DOWNLOADED = "DOWNLOADED"
STATUS_SKIPPED_NO_DATA = "SKIPPED_NO_DATA"
STATUS_FAILED = "FAILED"

# Status values for the process / upload phase.
STATUS_PROCESS_PENDING = "PROCESS_PENDING"
STATUS_PROCESSED = "PROCESSED"
STATUS_UPLOAD_PENDING = "UPLOAD_PENDING"
STATUS_UPLOADED = "UPLOADED"
STATUS_SKIPPED_NO_INPUT = "SKIPPED_NO_INPUT"

VALID_STATUSES: frozenset[str] = frozenset(
    {
        STATUS_DOWNLOAD_PENDING,
        STATUS_DOWNLOADED,
        STATUS_SKIPPED_NO_DATA,
        STATUS_FAILED,
        STATUS_PROCESS_PENDING,
        STATUS_PROCESSED,
        STATUS_UPLOAD_PENDING,
        STATUS_UPLOADED,
        STATUS_SKIPPED_NO_INPUT,
    }
)


[docs] @dataclass class TelemetryRow: """One row in the download telemetry CSV. All fields default to ``""`` so callers can populate just the columns relevant for a given status (e.g. a ``PENDING`` row has no ``finished_at_utc`` yet, a ``SKIPPED_NO_DATA`` row has no ``csv_path``, etc.). """ run_id: str = "" chunk_date_utc: str = "" window_start_utc: str = "" window_end_utc: str = "" status: str = "" # Download Columns records_downloaded: str = "" expected_records: str = "" availability_pct: str = "" download_seconds: str = "" csv_size_mb: str = "" csv_path: str = "" sensor_id: str = "" descriptor: str = "" output_format: str = "" error_type: str = "" error_message: str = "" started_at_utc: str = "" finished_at_utc: str = "" # Processing Columns process_seconds: str = "" cdf_size_mb: str = "" cdf_path: str = "" # Upload Columns upload_seconds: str = "" s3_bucket: str = "" s3_key: str = ""
[docs] def to_dict(self) -> dict[str, str]: """Return the row as a ``{column: str}`` dict in schema order.""" return {f.name: str(getattr(self, f.name)) for f in fields(self)}
[docs] @classmethod def from_dict(cls, raw: dict[str, str]) -> "TelemetryRow": """Build a row from a CSV-parsed dict, ignoring unknown columns.""" known = {f.name for f in fields(cls)} return cls(**{k: v for k, v in raw.items() if k in known})
[docs] class HistoricalTelemetry: """Append-only CSV writer / reader for download telemetry.""" def __init__(self, telemetry_path: Path | str): self.path = Path(telemetry_path)
[docs] def append_row(self, row: TelemetryRow | dict) -> None: """Append a single row, writing the header on first create. The file is flushed and ``fsync``-ed before returning so an interrupted run leaves the telemetry on disk in a consistent state. """ if isinstance(row, TelemetryRow): data = row.to_dict() else: unknown = set(row) - set(SCHEMA) if unknown: raise ValueError( f"Unknown telemetry columns: {sorted(unknown)}. Allowed: {SCHEMA}." ) data = {col: str(row.get(col, "")) for col in SCHEMA} # Reject invalid statuses early — they're the sole field the # orchestrator branches on. status = data.get("status", "") if status and status not in VALID_STATUSES: raise ValueError( f"Invalid telemetry status {status!r}. " f"Allowed: {sorted(VALID_STATUSES)}." ) self.path.parent.mkdir(parents=True, exist_ok=True) write_header = not self.path.exists() or self.path.stat().st_size == 0 # newline="" is required by the csv module on all platforms. with open(self.path, "a", newline="", encoding="utf-8") as fh: writer = csv.DictWriter(fh, fieldnames=SCHEMA) if write_header: writer.writeheader() writer.writerow({col: data.get(col, "") for col in SCHEMA}) fh.flush() os.fsync(fh.fileno())
[docs] def load_state(self) -> dict[date, TelemetryRow]: """Return the most-recent row per ``chunk_date_utc``. A missing telemetry file returns ``{}``. Rows whose ``chunk_date_utc`` is unparseable are skipped with no error (treated as if they did not exist) so a hand-edited file cannot crash the orchestrator on startup. """ if not self.path.exists(): return {} latest: dict[date, TelemetryRow] = {} latest_started: dict[date, str] = {} with open(self.path, "r", newline="", encoding="utf-8") as fh: reader = csv.DictReader(fh) for raw in reader: row = TelemetryRow.from_dict(raw) try: chunk_date = date.fromisoformat(row.chunk_date_utc) except ValueError: continue started = row.started_at_utc # "Most recent" by ISO-8601 ``started_at_utc`` lexicographic # compare (we always write UTC). On ties — e.g. PENDING # and DOWNLOADED rows from the same attempt share a # ``started_at_utc`` — file order breaks the tie via # ``>=`` so the later-written row wins. if chunk_date not in latest or started >= latest_started.get( chunk_date, "" ): latest[chunk_date] = row latest_started[chunk_date] = started return latest
[docs] def iter_rows(self) -> Iterable[TelemetryRow]: """Yield every row in file order. Useful for tests/debug.""" if not self.path.exists(): return with open(self.path, "r", newline="", encoding="utf-8") as fh: reader = csv.DictReader(fh) for raw in reader: yield TelemetryRow.from_dict(raw)
[docs] def utcnow_iso() -> str: """Return an ISO-8601 UTC timestamp suitable for telemetry columns. Format: ``YYYY-MM-DDTHH:MM:SS.ffffff+00:00``. Stable lexicographic ordering, parseable by :func:`datetime.datetime.fromisoformat`. """ from datetime import timezone return datetime.now(tz=timezone.utc).isoformat()