swxsoc_reach.historical.download_orchestrator#
Per-day orchestrator for historical UDL downloads.
Drives swxsoc_reach.net.udl.download_UDL_reach_window() over an
inclusive UTC date range, recording one or more rows per day in a
HistoricalTelemetry CSV. Days
that already completed (per telemetry + on-disk artifact) are skipped,
so reruns are idempotent and resumable.
The orchestrator is intentionally sequential at the day level —
concurrency lives inside download_UDL_reach_window via the existing
thread pool + AIMD rate controller.
Functions
|
Run the historical UDL download orchestrator. |
Classes
|
Inputs to |
|
Aggregate result of one |
- class swxsoc_reach.historical.download_orchestrator.DownloadRunConfig(start_date: date, end_date: date, output_dir: Path, telemetry_path: Path, sensor_id: str = 'ALL', descriptor: str = 'QUICKLOOK', output_format: str = 'csv', retry_failed: bool = False, limit_days: int | None = None, dry_run: bool = False, auth_token: str = '', max_concurrent_requests: int = 4, initial_rate: float = 5.0, additive_increase: float = 1.0, multiplicative_decrease: float = 0.5, min_rate: float = 5.0, max_rate: float = 25.0)[source]#
Inputs to
run_download().Mirrors the historical-download CLI flags one-for-one. The CLI layer in
swxsoc_reach.__main__parses argv into one of these instances, then hands it torun_download(). Tests construct it directly to drive the orchestrator without touching argparse.Fields#
start_date(datetime.date): inclusive UTC start of the date range to process.end_date(datetime.date): inclusive UTC end of the date range. Must be>= start_date.output_dir(pathlib.Path): directory where per-day CSV/JSON artifacts are written. Created if missing.telemetry_path(pathlib.Path): path to the append-only telemetry CSV. Created if missing. Conventionally lives insideoutput_dir.sensor_id(str, default"ALL"): REACH sensor identifier or"ALL". Drives chunk size inget_reach_datetimelist()(5-min chunks forALL, 6-hour chunks for a specific sensor) and the expected-records baseline used foravailability_pct.descriptor(str, default"QUICKLOOK"): UDLdescriptorquery value.output_format({'csv', 'json'}, default'csv'): output serialization format passed through to the downloader.retry_failed(bool, defaultFalse): whenTrue, days whose latest telemetry row isFAILEDare re-attempted; otherwise they are skipped.limit_days(intorNone): if set, cap the number of days attempted, counted from the first day not alreadyDOWNLOADEDwith its artifact on disk.dry_run(bool, defaultFalse): whenTrue, no network calls and no telemetry writes — only logs the planned action per day.auth_token(str): UDL HTTP Basic auth header value. Resolved by the CLI layer fromswxsoc_reach.net.auth.resolve_udl_auth()(Secrets Manager orBASICAUTHenv var).max_concurrent_requests(int, default4): max concurrent UDL chunk requests per day; forwarded todownload_UDL_reach_window().initial_rate,additive_increase,multiplicative_decrease,min_rate,max_rate(float): AIMD rate-controller tuning parameters; forwarded todownload_UDL_reach_window().
- class swxsoc_reach.historical.download_orchestrator.DownloadRunSummary(run_id: str, days_planned: int, days_attempted: int, days_downloaded: int, days_skipped_existing: int, days_skipped_no_data: int, days_failed: int)[source]#
Aggregate result of one
run_download()invocation.Returned to the CLI layer (or any direct caller) so it can log a final “X downloaded, Y skipped, Z failed” line and choose a process exit code. Per-day detail lives in the telemetry CSV; this struct is intentionally just rollups.
Fields#
run_id(str): UUID4 stamped on every telemetry row written by this run. Lets operators correlate rows in the telemetry CSV with a specific invocation.days_planned(int): days in the inclusive date range considered after applying--limit-days. Equalsdays_attempted + days_skipped_existing + days_skipped_no_data + days_failedon a non-dry-run.days_attempted(int): days for which the downloader was invoked (regardless of outcome).days_downloaded(int): days that ended inDOWNLOADED(artifact written, telemetry row appended).days_skipped_existing(int): days short-circuited because a priorDOWNLOADEDrow exists and its CSV artifact is still on disk.days_skipped_no_data(int): days that ended inSKIPPED_NO_DATA— either freshly (UDL returned zero records, terminal) or via a priorSKIPPED_NO_DATArow.days_failed(int): days that ended inFAILED(or were skipped because of a priorFAILEDrow without--retry-failed).
- swxsoc_reach.historical.download_orchestrator._count_records(csv_path: Path, output_format: str) int[source]#
Count data records in the just-written file.
Cheap line-count for CSV (header subtracted); for JSON, parse and return
len(...). Errors fall through as 0 — telemetry is not worth crashing the run over.
- swxsoc_reach.historical.download_orchestrator._day_window(d: date) tuple[Time, Time, str, str][source]#
Return (start_time, end_time, start_iso, end_iso) for a UTC day.
end_timeis exclusive:start + 86400 s. Matches the existing chunk-list semantics so records timestamped in the last second of the day are included.
- swxsoc_reach.historical.download_orchestrator._decide_action(chunk_date: date, prior: TelemetryRow | None, retry_failed: bool) str[source]#
Return one of
run|skip_existing|skip_terminal|skip_failed.Decision table (see plan):
no prior row →
runprior
DOWNLOADEDand CSV exists →skip_existingprior
DOWNLOADEDand CSV missing →run(re-download)prior
SKIPPED_NO_DATA→skip_terminalprior
FAILED→skip_failedunlessretry_failedthenrunprior
DOWNLOAD_PENDING(interrupted) →run
- swxsoc_reach.historical.download_orchestrator._expected_records(sensor_id: str) int[source]#
Upper-bound reference count for
availability_pct.
- swxsoc_reach.historical.download_orchestrator.run_download(config: DownloadRunConfig, *, download_fn: Callable[[...], Path] | None = None, telemetry: HistoricalTelemetry | None = None) DownloadRunSummary[source]#
Run the historical UDL download orchestrator.
Steps performed, in order:
Initialize. Generate a fresh
run_id(UUID4) used to stamp every telemetry row written by this invocation. Ensureconfig.output_direxists.Load prior state. Read the telemetry CSV at
config.telemetry_pathand reduce it to a{date: latest row}mapping viaload_state(). Missing/empty file is treated as no prior state.Plan. Expand
[start_date, end_date]into one UTC-midnight-bounded window per day, decide an action per day via_decide_action()(run/skip_existing/skip_terminal/skip_failed), and apply--limit-daysby counting only days whose action is notskip_existing.Execute, sequentially per day. For each planned day:
If
config.dry_run, log the action and continue (no telemetry rows written, no network calls).For skip actions, log the reason, increment the matching summary counter, and continue.
For
runactions, append aPENDINGrow, then invokedownload_fnwith absolute UTCstart_time/end_time(00:00:00 → next day 00:00:00, exclusive end) and the AIMD knobs fromconfig.
Classify outcomes per attempted day.
On success: append a
DOWNLOADEDrow withrecords_downloaded,availability_pct(vs the per-sensor expected-records baseline),download_seconds,csv_size_mb, andcsv_path.On
ValueErrorfrom the downloader (its “no records” signal): append a terminalSKIPPED_NO_DATArow.On any other exception: append a
FAILEDrow witherror_typeanderror_message. The orchestrator never aborts mid-range — it continues to the next day.
Return an aggregated
DownloadRunSummary.
Concurrency: this function is sequential at the day level. Per-day concurrency lives inside
download_fn(the existing thread pool + AIMD rate controller indownload_UDL_reach_window()).- Parameters:
config (DownloadRunConfig) – Run inputs, typically built from CLI args.
download_fn (callable, optional) – Override for
swxsoc_reach.net.udl.download_UDL_reach_window(). Must accept the same keyword arguments and return the path to the written artifact, or raiseValueErrorfor an empty window. Tests inject a stub here; production callers leave itNone.telemetry (HistoricalTelemetry, optional) – Override for the telemetry writer/reader. Defaults to one backed by
config.telemetry_path. Tests may inject a pre-populated instance to simulate restart scenarios.
- Returns:
DownloadRunSummary – Aggregate counts for the run. The CLI layer uses these to log the final summary line and choose an exit code.