swxsoc_reach.historical.download_orchestrator#

Per-day orchestrator for historical UDL downloads.

Drives swxsoc_reach.net.udl.download_UDL_reach_window() over an inclusive UTC date range, recording one or more rows per day in a HistoricalTelemetry CSV. Days that already completed (per telemetry + on-disk artifact) are skipped, so reruns are idempotent and resumable.

The orchestrator is intentionally sequential at the day level — concurrency lives inside download_UDL_reach_window via the existing thread pool + AIMD rate controller.

Functions

run_download(config, *[, download_fn, telemetry])

Run the historical UDL download orchestrator.

Classes

DownloadRunConfig(start_date, end_date, ...)

Inputs to run_download().

DownloadRunSummary(run_id, days_planned, ...)

Aggregate result of one run_download() invocation.

class swxsoc_reach.historical.download_orchestrator.DownloadRunConfig(start_date: date, end_date: date, output_dir: Path, telemetry_path: Path, sensor_id: str = 'ALL', descriptor: str = 'QUICKLOOK', output_format: str = 'csv', retry_failed: bool = False, limit_days: int | None = None, dry_run: bool = False, auth_token: str = '', max_concurrent_requests: int = 4, initial_rate: float = 5.0, additive_increase: float = 1.0, multiplicative_decrease: float = 0.5, min_rate: float = 5.0, max_rate: float = 25.0)[source]#

Inputs to run_download().

Mirrors the historical-download CLI flags one-for-one. The CLI layer in swxsoc_reach.__main__ parses argv into one of these instances, then hands it to run_download(). Tests construct it directly to drive the orchestrator without touching argparse.

Fields#

  • start_date (datetime.date): inclusive UTC start of the date range to process.

  • end_date (datetime.date): inclusive UTC end of the date range. Must be >= start_date.

  • output_dir (pathlib.Path): directory where per-day CSV/JSON artifacts are written. Created if missing.

  • telemetry_path (pathlib.Path): path to the append-only telemetry CSV. Created if missing. Conventionally lives inside output_dir.

  • sensor_id (str, default "ALL"): REACH sensor identifier or "ALL". Drives chunk size in get_reach_datetimelist() (5-min chunks for ALL, 6-hour chunks for a specific sensor) and the expected-records baseline used for availability_pct.

  • descriptor (str, default "QUICKLOOK"): UDL descriptor query value.

  • output_format ({'csv', 'json'}, default 'csv'): output serialization format passed through to the downloader.

  • retry_failed (bool, default False): when True, days whose latest telemetry row is FAILED are re-attempted; otherwise they are skipped.

  • limit_days (int or None): if set, cap the number of days attempted, counted from the first day not already DOWNLOADED with its artifact on disk.

  • dry_run (bool, default False): when True, no network calls and no telemetry writes — only logs the planned action per day.

  • auth_token (str): UDL HTTP Basic auth header value. Resolved by the CLI layer from swxsoc_reach.net.auth.resolve_udl_auth() (Secrets Manager or BASICAUTH env var).

  • max_concurrent_requests (int, default 4): max concurrent UDL chunk requests per day; forwarded to download_UDL_reach_window().

  • initial_rate, additive_increase, multiplicative_decrease, min_rate, max_rate (float): AIMD rate-controller tuning parameters; forwarded to download_UDL_reach_window().

additive_increase: float = 1.0#
auth_token: str = ''#
descriptor: str = 'QUICKLOOK'#
dry_run: bool = False#
end_date: date#
initial_rate: float = 5.0#
limit_days: int | None = None#
max_concurrent_requests: int = 4#
max_rate: float = 25.0#
min_rate: float = 5.0#
multiplicative_decrease: float = 0.5#
output_dir: Path#
output_format: str = 'csv'#
retry_failed: bool = False#
sensor_id: str = 'ALL'#
start_date: date#
telemetry_path: Path#
class swxsoc_reach.historical.download_orchestrator.DownloadRunSummary(run_id: str, days_planned: int, days_attempted: int, days_downloaded: int, days_skipped_existing: int, days_skipped_no_data: int, days_failed: int)[source]#

Aggregate result of one run_download() invocation.

Returned to the CLI layer (or any direct caller) so it can log a final “X downloaded, Y skipped, Z failed” line and choose a process exit code. Per-day detail lives in the telemetry CSV; this struct is intentionally just rollups.

Fields#

  • run_id (str): UUID4 stamped on every telemetry row written by this run. Lets operators correlate rows in the telemetry CSV with a specific invocation.

  • days_planned (int): days in the inclusive date range considered after applying --limit-days. Equals days_attempted + days_skipped_existing + days_skipped_no_data + days_failed on a non-dry-run.

  • days_attempted (int): days for which the downloader was invoked (regardless of outcome).

  • days_downloaded (int): days that ended in DOWNLOADED (artifact written, telemetry row appended).

  • days_skipped_existing (int): days short-circuited because a prior DOWNLOADED row exists and its CSV artifact is still on disk.

  • days_skipped_no_data (int): days that ended in SKIPPED_NO_DATA — either freshly (UDL returned zero records, terminal) or via a prior SKIPPED_NO_DATA row.

  • days_failed (int): days that ended in FAILED (or were skipped because of a prior FAILED row without --retry-failed).

days_attempted: int#
days_downloaded: int#
days_failed: int#
days_planned: int#
days_skipped_existing: int#
days_skipped_no_data: int#
run_id: str#
swxsoc_reach.historical.download_orchestrator._count_records(csv_path: Path, output_format: str) int[source]#

Count data records in the just-written file.

Cheap line-count for CSV (header subtracted); for JSON, parse and return len(...). Errors fall through as 0 — telemetry is not worth crashing the run over.

swxsoc_reach.historical.download_orchestrator._day_window(d: date) tuple[Time, Time, str, str][source]#

Return (start_time, end_time, start_iso, end_iso) for a UTC day.

end_time is exclusive: start + 86400 s. Matches the existing chunk-list semantics so records timestamped in the last second of the day are included.

swxsoc_reach.historical.download_orchestrator._decide_action(chunk_date: date, prior: TelemetryRow | None, retry_failed: bool) str[source]#

Return one of run | skip_existing | skip_terminal | skip_failed.

Decision table (see plan):

  • no prior row → run

  • prior DOWNLOADED and CSV exists → skip_existing

  • prior DOWNLOADED and CSV missing → run (re-download)

  • prior SKIPPED_NO_DATAskip_terminal

  • prior FAILEDskip_failed unless retry_failed then run

  • prior DOWNLOAD_PENDING (interrupted) → run

swxsoc_reach.historical.download_orchestrator._expected_records(sensor_id: str) int[source]#

Upper-bound reference count for availability_pct.

swxsoc_reach.historical.download_orchestrator.run_download(config: DownloadRunConfig, *, download_fn: Callable[[...], Path] | None = None, telemetry: HistoricalTelemetry | None = None) DownloadRunSummary[source]#

Run the historical UDL download orchestrator.

Steps performed, in order:

  1. Initialize. Generate a fresh run_id (UUID4) used to stamp every telemetry row written by this invocation. Ensure config.output_dir exists.

  2. Load prior state. Read the telemetry CSV at config.telemetry_path and reduce it to a {date: latest row} mapping via load_state(). Missing/empty file is treated as no prior state.

  3. Plan. Expand [start_date, end_date] into one UTC-midnight-bounded window per day, decide an action per day via _decide_action() (run / skip_existing / skip_terminal / skip_failed), and apply --limit-days by counting only days whose action is not skip_existing.

  4. Execute, sequentially per day. For each planned day:

    1. If config.dry_run, log the action and continue (no telemetry rows written, no network calls).

    2. For skip actions, log the reason, increment the matching summary counter, and continue.

    3. For run actions, append a PENDING row, then invoke download_fn with absolute UTC start_time / end_time (00:00:00 → next day 00:00:00, exclusive end) and the AIMD knobs from config.

  5. Classify outcomes per attempted day.

    • On success: append a DOWNLOADED row with records_downloaded, availability_pct (vs the per-sensor expected-records baseline), download_seconds, csv_size_mb, and csv_path.

    • On ValueError from the downloader (its “no records” signal): append a terminal SKIPPED_NO_DATA row.

    • On any other exception: append a FAILED row with error_type and error_message. The orchestrator never aborts mid-range — it continues to the next day.

  6. Return an aggregated DownloadRunSummary.

Concurrency: this function is sequential at the day level. Per-day concurrency lives inside download_fn (the existing thread pool + AIMD rate controller in download_UDL_reach_window()).

Parameters:
  • config (DownloadRunConfig) – Run inputs, typically built from CLI args.

  • download_fn (callable, optional) – Override for swxsoc_reach.net.udl.download_UDL_reach_window(). Must accept the same keyword arguments and return the path to the written artifact, or raise ValueError for an empty window. Tests inject a stub here; production callers leave it None.

  • telemetry (HistoricalTelemetry, optional) – Override for the telemetry writer/reader. Defaults to one backed by config.telemetry_path. Tests may inject a pre-populated instance to simulate restart scenarios.

Returns:

DownloadRunSummary – Aggregate counts for the run. The CLI layer uses these to log the final summary line and choose an exit code.