"""Per-day orchestrator for historical UDL downloads.
Drives :func:`swxsoc_reach.net.udl.download_UDL_reach_window` over an
inclusive UTC date range, recording one or more rows per day in a
:class:`~swxsoc_reach.historical.telemetry.HistoricalTelemetry` CSV. Days
that already completed (per telemetry + on-disk artifact) are skipped,
so reruns are idempotent and resumable.
The orchestrator is intentionally sequential at the day level —
concurrency lives inside ``download_UDL_reach_window`` via the existing
thread pool + AIMD rate controller.
"""
from __future__ import annotations
import time
import traceback
import uuid
from dataclasses import dataclass
from datetime import date, datetime, timedelta, timezone
from pathlib import Path
from typing import Callable
from astropy.time import Time
from swxsoc_reach import log
from swxsoc_reach.historical._dates import iter_dates as _iter_dates
from swxsoc_reach.historical.telemetry import (
HistoricalTelemetry,
STATUS_DOWNLOAD_PENDING,
STATUS_DOWNLOADED,
STATUS_FAILED,
STATUS_SKIPPED_NO_DATA,
TelemetryRow,
utcnow_iso,
)
from swxsoc_reach.net import udl as udl_module
# Per-satellite per-dosimeter samples per UTC day at 5-second cadence.
# Used as the upper-bound reference for ``availability_pct``.
_SAMPLES_PER_DOSIMETER_PER_DAY = 17_280 # 24 Hours at 5-second cadence
_REACH_NUM_SATELLITES = 32
_DOSIMETERS_PER_SATELLITE = 2
EXPECTED_RECORDS_ALL = (
_REACH_NUM_SATELLITES * _DOSIMETERS_PER_SATELLITE * _SAMPLES_PER_DOSIMETER_PER_DAY
) # 1,105,920
EXPECTED_RECORDS_SINGLE = (
_DOSIMETERS_PER_SATELLITE * _SAMPLES_PER_DOSIMETER_PER_DAY
) # 34,560
[docs]
@dataclass
class DownloadRunConfig:
"""Inputs to :func:`run_download`.
Mirrors the historical-download CLI flags one-for-one. The CLI
layer in :mod:`swxsoc_reach.__main__` parses argv into one of these
instances, then hands it to :func:`run_download`. Tests construct
it directly to drive the orchestrator without touching argparse.
Fields
------
- ``start_date`` (``datetime.date``): inclusive UTC start of the
date range to process.
- ``end_date`` (``datetime.date``): inclusive UTC end of the date
range. Must be ``>= start_date``.
- ``output_dir`` (``pathlib.Path``): directory where per-day
CSV/JSON artifacts are written. Created if missing.
- ``telemetry_path`` (``pathlib.Path``): path to the append-only
telemetry CSV. Created if missing. Conventionally lives inside
``output_dir``.
- ``sensor_id`` (``str``, default ``"ALL"``): REACH sensor
identifier or ``"ALL"``. Drives chunk size in
:func:`~swxsoc_reach.net.udl.get_reach_datetimelist` (5-min
chunks for ``ALL``, 6-hour chunks for a specific sensor) and the
expected-records baseline used for ``availability_pct``.
- ``descriptor`` (``str``, default ``"QUICKLOOK"``): UDL
``descriptor`` query value.
- ``output_format`` (``{'csv', 'json'}``, default ``'csv'``):
output serialization format passed through to the downloader.
- ``retry_failed`` (``bool``, default ``False``): when ``True``,
days whose latest telemetry row is ``FAILED`` are re-attempted;
otherwise they are skipped.
- ``limit_days`` (``int`` or ``None``): if set, cap the number of
days *attempted*, counted from the first day not already
``DOWNLOADED`` with its artifact on disk.
- ``dry_run`` (``bool``, default ``False``): when ``True``, no
network calls and no telemetry writes — only logs the planned
action per day.
- ``auth_token`` (``str``): UDL HTTP Basic auth header value.
Resolved by the CLI layer from
:func:`swxsoc_reach.net.auth.resolve_udl_auth` (Secrets Manager
or ``BASICAUTH`` env var).
- ``max_concurrent_requests`` (``int``, default ``4``): max
concurrent UDL chunk requests per day; forwarded to
:func:`~swxsoc_reach.net.udl.download_UDL_reach_window`.
- ``initial_rate``, ``additive_increase``,
``multiplicative_decrease``, ``min_rate``, ``max_rate``
(``float``): AIMD rate-controller tuning parameters; forwarded
to :func:`~swxsoc_reach.net.udl.download_UDL_reach_window`.
"""
start_date: date
end_date: date
output_dir: Path
telemetry_path: Path
sensor_id: str = "ALL"
descriptor: str = "QUICKLOOK"
output_format: str = "csv"
retry_failed: bool = False
limit_days: int | None = None
dry_run: bool = False
auth_token: str = ""
max_concurrent_requests: int = 4
initial_rate: float = 5.0
additive_increase: float = 1.0
multiplicative_decrease: float = 0.5
min_rate: float = 5.0
max_rate: float = 25.0
[docs]
@dataclass
class DownloadRunSummary:
"""Aggregate result of one :func:`run_download` invocation.
Returned to the CLI layer (or any direct caller) so it can log a
final "X downloaded, Y skipped, Z failed" line and choose a
process exit code. Per-day detail lives in the telemetry CSV;
this struct is intentionally just rollups.
Fields
------
- ``run_id`` (``str``): UUID4 stamped on every telemetry row
written by this run. Lets operators correlate rows in the
telemetry CSV with a specific invocation.
- ``days_planned`` (``int``): days in the inclusive date range
considered after applying ``--limit-days``. Equals
``days_attempted + days_skipped_existing + days_skipped_no_data
+ days_failed`` on a non-dry-run.
- ``days_attempted`` (``int``): days for which the downloader was
invoked (regardless of outcome).
- ``days_downloaded`` (``int``): days that ended in
``DOWNLOADED`` (artifact written, telemetry row appended).
- ``days_skipped_existing`` (``int``): days short-circuited
because a prior ``DOWNLOADED`` row exists and its CSV artifact
is still on disk.
- ``days_skipped_no_data`` (``int``): days that ended in
``SKIPPED_NO_DATA`` — either freshly (UDL returned zero
records, terminal) or via a prior ``SKIPPED_NO_DATA`` row.
- ``days_failed`` (``int``): days that ended in ``FAILED`` (or
were skipped because of a prior ``FAILED`` row without
``--retry-failed``).
"""
run_id: str
days_planned: int
days_attempted: int
days_downloaded: int
days_skipped_existing: int
days_skipped_no_data: int
days_failed: int
[docs]
def _expected_records(sensor_id: str) -> int:
"""Upper-bound reference count for ``availability_pct``."""
if sensor_id.upper() == "ALL":
return EXPECTED_RECORDS_ALL
return EXPECTED_RECORDS_SINGLE
[docs]
def _day_window(d: date) -> tuple[Time, Time, str, str]:
"""Return (start_time, end_time, start_iso, end_iso) for a UTC day.
``end_time`` is exclusive: ``start + 86400 s``. Matches the existing
chunk-list semantics so records timestamped in the last second of
the day are included.
"""
start_dt = datetime(d.year, d.month, d.day, tzinfo=timezone.utc)
end_dt = start_dt + timedelta(days=1)
start_t = Time(
start_dt.replace(tzinfo=None).isoformat(), format="isot", scale="utc"
)
end_t = Time(end_dt.replace(tzinfo=None).isoformat(), format="isot", scale="utc")
return start_t, end_t, start_dt.isoformat(), end_dt.isoformat()
[docs]
def _decide_action(
chunk_date: date,
prior: TelemetryRow | None,
retry_failed: bool,
) -> str:
"""Return one of ``run`` | ``skip_existing`` | ``skip_terminal`` | ``skip_failed``.
Decision table (see plan):
- no prior row → ``run``
- prior ``DOWNLOADED`` and CSV exists → ``skip_existing``
- prior ``DOWNLOADED`` and CSV missing → ``run`` (re-download)
- prior ``SKIPPED_NO_DATA`` → ``skip_terminal``
- prior ``FAILED`` → ``skip_failed`` unless ``retry_failed`` then ``run``
- prior ``DOWNLOAD_PENDING`` (interrupted) → ``run``
"""
if prior is None:
return "run"
if prior.status == STATUS_DOWNLOADED:
csv_path = prior.csv_path
if csv_path and Path(csv_path).exists():
return "skip_existing"
return "run"
if prior.status == STATUS_SKIPPED_NO_DATA:
return "skip_terminal"
if prior.status == STATUS_FAILED:
return "run" if retry_failed else "skip_failed"
# DOWNLOAD_PENDING or unknown → re-run.
return "run"
[docs]
def run_download(
config: DownloadRunConfig,
*,
download_fn: Callable[..., Path] | None = None,
telemetry: HistoricalTelemetry | None = None,
) -> DownloadRunSummary:
"""Run the historical UDL download orchestrator.
Steps performed, in order:
1. **Initialize.** Generate a fresh ``run_id`` (UUID4) used to
stamp every telemetry row written by this invocation. Ensure
``config.output_dir`` exists.
2. **Load prior state.** Read the telemetry CSV at
``config.telemetry_path`` and reduce it to a ``{date: latest
row}`` mapping via
:meth:`~swxsoc_reach.historical.telemetry.HistoricalTelemetry.load_state`.
Missing/empty file is treated as no prior state.
3. **Plan.** Expand ``[start_date, end_date]`` into one
UTC-midnight-bounded window per day, decide an action per day
via :func:`_decide_action` (``run`` / ``skip_existing`` /
``skip_terminal`` / ``skip_failed``), and apply
``--limit-days`` by counting only days whose action is not
``skip_existing``.
4. **Execute, sequentially per day.** For each planned day:
a. If ``config.dry_run``, log the action and continue (no
telemetry rows written, no network calls).
b. For skip actions, log the reason, increment the matching
summary counter, and continue.
c. For ``run`` actions, append a ``PENDING`` row, then invoke
``download_fn`` with absolute UTC ``start_time`` /
``end_time`` (00:00:00 → next day 00:00:00, exclusive end)
and the AIMD knobs from ``config``.
5. **Classify outcomes per attempted day.**
- On success: append a ``DOWNLOADED`` row with
``records_downloaded``, ``availability_pct`` (vs the
per-sensor expected-records baseline), ``download_seconds``,
``csv_size_mb``, and ``csv_path``.
- On :class:`ValueError` from the downloader (its "no records"
signal): append a terminal ``SKIPPED_NO_DATA`` row.
- On any other exception: append a ``FAILED`` row with
``error_type`` and ``error_message``. **The orchestrator
never aborts mid-range — it continues to the next day.**
6. **Return** an aggregated :class:`DownloadRunSummary`.
Concurrency: this function is sequential at the day level.
Per-day concurrency lives inside ``download_fn`` (the existing
thread pool + AIMD rate controller in
:func:`~swxsoc_reach.net.udl.download_UDL_reach_window`).
Parameters
----------
config : DownloadRunConfig
Run inputs, typically built from CLI args.
download_fn : callable, optional
Override for
:func:`swxsoc_reach.net.udl.download_UDL_reach_window`. Must
accept the same keyword arguments and return the path to the
written artifact, or raise ``ValueError`` for an empty window.
Tests inject a stub here; production callers leave it ``None``.
telemetry : HistoricalTelemetry, optional
Override for the telemetry writer/reader. Defaults to one
backed by ``config.telemetry_path``. Tests may inject a
pre-populated instance to simulate restart scenarios.
Returns
-------
DownloadRunSummary
Aggregate counts for the run. The CLI layer uses these to log
the final summary line and choose an exit code.
"""
if download_fn is None:
download_fn = udl_module.download_UDL_reach_window
if telemetry is None:
telemetry = HistoricalTelemetry(config.telemetry_path)
run_id = str(uuid.uuid4())
config.output_dir.mkdir(parents=True, exist_ok=True)
state = telemetry.load_state()
all_dates = list(_iter_dates(config.start_date, config.end_date))
# Determine which days need work in order — used for ``--limit-days``
# which counts from the first not-yet-complete day.
actionable: list[tuple[date, str]] = []
for d in all_dates:
action = _decide_action(d, state.get(d), config.retry_failed)
actionable.append((d, action))
# Apply --limit-days to the first N days that are not ``skip_existing``.
if config.limit_days is not None:
kept: list[tuple[date, str]] = []
worked = 0
for d, action in actionable:
if action == "skip_existing":
kept.append((d, action))
continue
if worked >= config.limit_days:
break
kept.append((d, action))
worked += 1
actionable = kept
summary = DownloadRunSummary(
run_id=run_id,
days_planned=len(actionable),
days_attempted=0,
days_downloaded=0,
days_skipped_existing=0,
days_skipped_no_data=0,
days_failed=0,
)
for d, action in actionable:
start_t, end_t, start_iso, end_iso = _day_window(d)
prior = state.get(d)
if config.dry_run:
log.info(
f"[dry-run] {d.isoformat()} action={action}"
+ (f" prior_status={prior.status}" if prior else "")
)
continue
if action == "skip_existing":
log.info(f"{d.isoformat()}: skip (already DOWNLOADED at {prior.csv_path})")
summary.days_skipped_existing += 1
continue
if action == "skip_terminal":
log.info(f"{d.isoformat()}: skip (prior SKIPPED_NO_DATA)")
summary.days_skipped_no_data += 1
continue
if action == "skip_failed":
log.info(
f"{d.isoformat()}: skip (prior FAILED; pass --retry-failed to retry)"
)
summary.days_failed += 1
continue
# action == "run"
summary.days_attempted += 1
started = utcnow_iso()
base_row = dict(
run_id=run_id,
chunk_date_utc=d.isoformat(),
window_start_utc=start_iso,
window_end_utc=end_iso,
sensor_id=config.sensor_id,
descriptor=config.descriptor,
output_format=config.output_format,
expected_records=str(_expected_records(config.sensor_id)),
started_at_utc=started,
)
telemetry.append_row(TelemetryRow(status=STATUS_DOWNLOAD_PENDING, **base_row))
t0 = time.monotonic()
try:
csv_path = download_fn(
auth_token=config.auth_token,
sensor_id=config.sensor_id,
descriptor=config.descriptor,
output_format=config.output_format,
start_time=start_t,
end_time=end_t,
output_dir=config.output_dir,
max_concurrent_requests=config.max_concurrent_requests,
initial_rate=config.initial_rate,
additive_increase=config.additive_increase,
multiplicative_decrease=config.multiplicative_decrease,
min_rate=config.min_rate,
max_rate=config.max_rate,
)
except ValueError as exc:
# ``download_UDL_reach_window`` raises ValueError for empty
# windows. Treat as terminal SKIPPED_NO_DATA.
elapsed = time.monotonic() - t0
log.info(f"{d.isoformat()}: SKIPPED_NO_DATA ({exc})")
telemetry.append_row(
TelemetryRow(
status=STATUS_SKIPPED_NO_DATA,
download_seconds=f"{elapsed:.3f}",
error_message=str(exc),
finished_at_utc=utcnow_iso(),
**base_row,
)
)
summary.days_skipped_no_data += 1
continue
except Exception as exc: # noqa: BLE001 — orchestrator must not abort
elapsed = time.monotonic() - t0
log.error(
f"{d.isoformat()}: FAILED {type(exc).__name__}: {exc}\n"
f"{traceback.format_exc()}"
)
telemetry.append_row(
TelemetryRow(
status=STATUS_FAILED,
download_seconds=f"{elapsed:.3f}",
error_type=type(exc).__name__,
error_message=str(exc),
finished_at_utc=utcnow_iso(),
**base_row,
)
)
summary.days_failed += 1
continue
elapsed = time.monotonic() - t0
records = _count_records(csv_path, config.output_format)
size_mb = csv_path.stat().st_size / (1024 * 1024) if csv_path.exists() else 0.0
expected = _expected_records(config.sensor_id)
availability = (records / expected * 100.0) if expected else 0.0
log.info(
f"{d.isoformat()}: DOWNLOADED records={records} "
f"availability={availability:.1f}% "
f"size={size_mb:.2f}MB in {elapsed:.1f}s -> {csv_path}"
)
telemetry.append_row(
TelemetryRow(
status=STATUS_DOWNLOADED,
records_downloaded=str(records),
availability_pct=f"{availability:.4f}",
download_seconds=f"{elapsed:.3f}",
csv_size_mb=f"{size_mb:.4f}",
csv_path=str(csv_path),
finished_at_utc=utcnow_iso(),
**base_row,
)
)
summary.days_downloaded += 1
return summary
[docs]
def _count_records(csv_path: Path, output_format: str) -> int:
"""Count data records in the just-written file.
Cheap line-count for CSV (header subtracted); for JSON, parse and
return ``len(...)``. Errors fall through as 0 — telemetry is not
worth crashing the run over.
"""
try:
if output_format == "csv":
with open(csv_path, "r", encoding="utf-8") as fh:
# subtract one for the header row
return max(sum(1 for _ in fh) - 1, 0)
if output_format == "json":
import json
with open(csv_path, "r", encoding="utf-8") as fh:
payload = json.load(fh)
return len(payload) if isinstance(payload, list) else 0
except OSError:
return 0
return 0