swxsoc_reach.historical.telemetry#

Append-only CSV telemetry for the historical UDL download orchestrator.

One row is written per attempt at a per-day download. Older rows for the same chunk_date_utc are not removed; HistoricalTelemetry.load_state() returns the most-recent row per date (by started_at_utc), which is how restart/resume decisions are made.

Functions

utcnow_iso()

Return an ISO-8601 UTC timestamp suitable for telemetry columns.

Classes

HistoricalTelemetry(telemetry_path)

Append-only CSV writer / reader for download telemetry.

TelemetryRow([run_id, chunk_date_utc, ...])

One row in the download telemetry CSV.

class swxsoc_reach.historical.telemetry.HistoricalTelemetry(telemetry_path: Path | str)[source]#

Append-only CSV writer / reader for download telemetry.

append_row(row: TelemetryRow | dict) None[source]#

Append a single row, writing the header on first create.

The file is flushed and fsync-ed before returning so an interrupted run leaves the telemetry on disk in a consistent state.

iter_rows() Iterable[TelemetryRow][source]#

Yield every row in file order. Useful for tests/debug.

load_state() dict[date, TelemetryRow][source]#

Return the most-recent row per chunk_date_utc.

A missing telemetry file returns {}. Rows whose chunk_date_utc is unparseable are skipped with no error (treated as if they did not exist) so a hand-edited file cannot crash the orchestrator on startup.

class swxsoc_reach.historical.telemetry.TelemetryRow(run_id: str = '', chunk_date_utc: str = '', window_start_utc: str = '', window_end_utc: str = '', status: str = '', records_downloaded: str = '', expected_records: str = '', availability_pct: str = '', download_seconds: str = '', csv_size_mb: str = '', csv_path: str = '', sensor_id: str = '', descriptor: str = '', output_format: str = '', error_type: str = '', error_message: str = '', started_at_utc: str = '', finished_at_utc: str = '', process_seconds: str = '', cdf_size_mb: str = '', cdf_path: str = '', upload_seconds: str = '', s3_bucket: str = '', s3_key: str = '')[source]#

One row in the download telemetry CSV.

All fields default to "" so callers can populate just the columns relevant for a given status (e.g. a PENDING row has no finished_at_utc yet, a SKIPPED_NO_DATA row has no csv_path, etc.).

availability_pct: str = ''#
cdf_path: str = ''#
cdf_size_mb: str = ''#
chunk_date_utc: str = ''#
csv_path: str = ''#
csv_size_mb: str = ''#
descriptor: str = ''#
download_seconds: str = ''#
error_message: str = ''#
error_type: str = ''#
expected_records: str = ''#
finished_at_utc: str = ''#
classmethod from_dict(raw: dict[str, str]) TelemetryRow[source]#

Build a row from a CSV-parsed dict, ignoring unknown columns.

output_format: str = ''#
process_seconds: str = ''#
records_downloaded: str = ''#
run_id: str = ''#
s3_bucket: str = ''#
s3_key: str = ''#
sensor_id: str = ''#
started_at_utc: str = ''#
status: str = ''#
to_dict() dict[str, str][source]#

Return the row as a {column: str} dict in schema order.

upload_seconds: str = ''#
window_end_utc: str = ''#
window_start_utc: str = ''#
swxsoc_reach.historical.telemetry.utcnow_iso() str[source]#

Return an ISO-8601 UTC timestamp suitable for telemetry columns.

Format: YYYY-MM-DDTHH:MM:SS.ffffff+00:00. Stable lexicographic ordering, parseable by datetime.datetime.fromisoformat().