Source code for swxsoc_reach.historical.process_orchestrator

"""Per-day orchestrator for historical CSV \u2192 CDF processing.

Drives :func:`swxsoc_reach.calibration.calibration.process_file` over
an inclusive UTC date range, picking up CSVs produced by Phase 1's
download orchestrator from ``--input-dir``. Each day produces one
CDF in ``--output-dir`` and (optionally) one S3 upload via
:func:`swxsoc_reach.historical.s3_upload.upload_cdf_to_s3`. Telemetry
is appended to the same CSV file Phase 1 wrote (extended schema), so
the full download \u2192 process \u2192 upload lifecycle for a given day is
visible in one place.

Sequential by day, mirroring the download orchestrator. The
chdir-into-output-dir hack is required because
``process_file`` writes to ``Path.cwd()`` when ``LAMBDA_ENVIRONMENT``
is unset (which is the case for historical local runs); we restore
``cwd`` in ``finally`` so a failure mid-day does not leave the
process in a bad state.
"""

from __future__ import annotations

import os
import shutil
import time
import traceback
import uuid
from dataclasses import dataclass
from datetime import date
from pathlib import Path
from typing import Callable

from swxsoc_reach import log
from swxsoc_reach.historical._dates import iter_dates as _iter_dates
from swxsoc_reach.historical.telemetry import (
    HistoricalTelemetry,
    STATUS_DOWNLOAD_PENDING,
    STATUS_DOWNLOADED,
    STATUS_FAILED,
    STATUS_PROCESS_PENDING,
    STATUS_PROCESSED,
    STATUS_SKIPPED_NO_DATA,
    STATUS_SKIPPED_NO_INPUT,
    STATUS_UPLOAD_PENDING,
    STATUS_UPLOADED,
    TelemetryRow,
    utcnow_iso,
)

# Lazy: imported only when needed, to keep ``process_orchestrator``
# importable on machines without the calibration stack ready.


[docs] @dataclass class ProcessRunConfig: """Inputs to :func:`run_process`. Mirrors the historical-process CLI flags one-for-one. The CLI layer parses argv into one of these instances. Fields ------ - ``start_date``, ``end_date`` : inclusive UTC date range. - ``input_dir`` : directory holding Phase 1 download artifacts (CSV/JSON files named per :func:`~swxsoc_reach.net.udl.build_reach_output_filename`). - ``output_dir`` : directory where CDF files are written. - ``telemetry_path`` : append-only telemetry CSV (typically the same file Phase 1 wrote). - ``sensor_id``, ``descriptor``, ``output_format`` : used to reconstruct the expected per-day input filename when scanning ``input_dir``. - ``retry_failed`` : when True, days with prior status ``FAILED`` are re-attempted. - ``limit_days`` : cap on attempted days (counted from the first not-yet-complete day). - ``dry_run`` : plan only - no work, no telemetry writes. - ``upload_to_s3`` : if True, attempt an S3 upload after a successful CDF write. If False, ``PROCESSED`` is the terminal status for the day. - ``s3_bucket`` : destination bucket (required iff ``upload_to_s3`` is True). """ start_date: date end_date: date input_dir: Path output_dir: Path telemetry_path: Path sensor_id: str = "ALL" descriptor: str = "QUICKLOOK" output_format: str = "csv" retry_failed: bool = False limit_days: int | None = None dry_run: bool = False upload_to_s3: bool = False s3_bucket: str | None = None
[docs] @dataclass class ProcessRunSummary: """Aggregate result of one :func:`run_process` invocation.""" run_id: str days_planned: int days_attempted: int days_processed: int days_uploaded: int days_skipped_existing: int days_skipped_no_input: int days_failed: int
[docs] def _match_csv_for_date( input_dir: Path, day: date, sensor_id: str, output_format: str, ) -> Path | None: """Return the per-day input artifact path or ``None`` if missing. Phase 1 names files via :func:`~swxsoc_reach.net.udl.build_reach_output_filename`: ``{sensor_prefix}_{startTIME}_{endTIME}.{output_format}`` where the time format is ``%Y%m%dT%H%M%S``. For a UTC day, the start component is ``YYYYMMDDT000000`` and the end is the next day's ``YYYYMMDDT000000``. We glob loosely on the leading sensor + start-time prefix so any pair of matching start/end timestamps is accepted. """ sensor_prefix = "REACH-ALL" if sensor_id.upper() == "ALL" else sensor_id start_str = day.strftime("%Y%m%dT000000") pattern = f"{sensor_prefix}_{start_str}_*.{output_format}" matches = sorted(input_dir.glob(pattern)) if not matches: return None if len(matches) > 1: log.warning( f"{day.isoformat()}: multiple input files match {pattern!r} in " f"{input_dir}; using {matches[0].name}" ) return matches[0]
[docs] def _decide_process_action( prior: TelemetryRow | None, *, upload_to_s3: bool, csv_available: bool, retry_failed: bool, ) -> str: """Return one of: - ``run_process``: (re)run process_file from CSV (and upload if configured) - ``run_upload_only``: CDF already exists; just upload - ``skip_existing``: day already terminal; nothing to do - ``skip_terminal``: prior SKIPPED_NO_INPUT and CSV still missing - ``skip_failed``: prior FAILED and ``--retry-failed`` not set """ if prior is None: return "run_process" if csv_available else "skip_no_input" status = prior.status if status == STATUS_UPLOADED: return "skip_existing" if status == STATUS_PROCESSED: cdf = prior.cdf_path cdf_exists = bool(cdf) and Path(cdf).exists() if upload_to_s3: if cdf_exists: return "run_upload_only" return "run_process" if csv_available else "skip_no_input" # local-only mode: PROCESSED is terminal if cdf_exists: return "skip_existing" return "run_process" if csv_available else "skip_no_input" if status == STATUS_UPLOAD_PENDING: cdf = prior.cdf_path if cdf and Path(cdf).exists(): return "run_upload_only" return "run_process" if csv_available else "skip_no_input" if status == STATUS_PROCESS_PENDING: return "run_process" if csv_available else "skip_no_input" if status == STATUS_FAILED: if not retry_failed: return "skip_failed" return "run_process" if csv_available else "skip_no_input" if status == STATUS_SKIPPED_NO_INPUT: return "run_process" if csv_available else "skip_terminal" # Phase 1 statuses (DOWNLOAD_PENDING / DOWNLOADED / SKIPPED_NO_DATA): # treat as no prior process-stage row. if status in (STATUS_DOWNLOAD_PENDING, STATUS_DOWNLOADED, STATUS_SKIPPED_NO_DATA): return "run_process" if csv_available else "skip_no_input" # Unknown status \u2192 attempt to process if we have an input. return "run_process" if csv_available else "skip_no_input"
[docs] def _carry_forward(prior: TelemetryRow | None) -> dict[str, str]: """Carry Phase 1 download columns forward onto a Phase 2 row. Keeps the most-recent row per day self-describing in the telemetry CSV. When no prior row exists, returns blank values so column positions are populated. """ if prior is None: return { "records_downloaded": "", "expected_records": "", "availability_pct": "", "download_seconds": "", "csv_size_mb": "", "csv_path": "", } return { "records_downloaded": prior.records_downloaded, "expected_records": prior.expected_records, "availability_pct": prior.availability_pct, "download_seconds": prior.download_seconds, "csv_size_mb": prior.csv_size_mb, "csv_path": prior.csv_path, }
[docs] def _process_one_day( csv_path: Path, output_dir: Path, process_fn: Callable[[Path], list[Path]], ) -> list[Path]: """Run ``process_file(csv_path)`` with cwd switched to ``output_dir``. ``process_file`` writes to ``Path.cwd()`` when ``LAMBDA_ENVIRONMENT`` is unset, so we chdir for the duration of the call and restore in ``finally``. Sequential per-day execution makes this safe. """ output_dir.mkdir(parents=True, exist_ok=True) saved_cwd = os.getcwd() try: os.chdir(output_dir) return list(process_fn(csv_path)) finally: os.chdir(saved_cwd)
[docs] def _relocate_to_nested_layout(flat_path: Path, output_dir: Path) -> Path: """Move *flat_path* into a nested subdirectory of *output_dir*. The subdirectory mirrors the S3 key produced by :func:`sdc_aws_utils.aws.create_s3_file_key` (e.g. ``l1c/prelim/2026/01/01/``). Falls back to returning *flat_path* unchanged if ``sdc_aws_utils`` or ``swxsoc`` are not importable, or if key computation raises for any reason. """ try: from sdc_aws_utils.aws import create_s3_file_key from swxsoc.util.util import parse_science_filename except ImportError: log.debug( f"_relocate_to_nested_layout: sdc_aws_utils/swxsoc not available; " f"keeping flat layout for {flat_path.name!r}" ) return flat_path try: nested_key = create_s3_file_key(parse_science_filename, flat_path.name) except Exception as exc: # noqa: BLE001 log.warning( f"Could not compute nested layout key for {flat_path.name!r}; " f"keeping flat ({type(exc).__name__}: {exc})" ) return flat_path dest = output_dir / nested_key if dest.resolve() == flat_path.resolve(): return flat_path dest.parent.mkdir(parents=True, exist_ok=True) shutil.move(str(flat_path), dest) log.debug(f"Relocated CDF to nested layout: {dest}") return dest
[docs] def run_process( config: ProcessRunConfig, *, process_fn: Callable[[Path], list[Path]] | None = None, upload_fn: Callable[..., tuple[str, str]] | None = None, telemetry: HistoricalTelemetry | None = None, ) -> ProcessRunSummary: """Run the historical CSV -> CDF (-> S3) orchestrator. Parameters ---------- config : ProcessRunConfig Run inputs, typically built from CLI args. process_fn : callable, optional Override for :func:`swxsoc_reach.calibration.calibration.process_file`. Receives the CSV path and must return a ``list[Path]`` of produced CDFs (written into ``cwd`` per the existing contract). Tests inject a stub here. upload_fn : callable, optional Override for :func:`swxsoc_reach.historical.s3_upload.upload_cdf_to_s3`. Must accept ``(cdf_path, destination_bucket=...)`` keyword args and return ``(bucket, s3_key)``. telemetry : HistoricalTelemetry, optional Override for the telemetry writer/reader. Returns ------- ProcessRunSummary Aggregate counts. The CLI layer uses these to log the final summary line and choose an exit code. """ if process_fn is None: from swxsoc_reach.calibration.calibration import process_file process_fn = process_file if upload_fn is None and config.upload_to_s3: from swxsoc_reach.historical.s3_upload import upload_cdf_to_s3 upload_fn = upload_cdf_to_s3 if telemetry is None: telemetry = HistoricalTelemetry(config.telemetry_path) if config.upload_to_s3 and not config.s3_bucket: raise ValueError("upload_to_s3=True requires s3_bucket to be set") run_id = str(uuid.uuid4()) config.output_dir.mkdir(parents=True, exist_ok=True) state = telemetry.load_state() all_dates = list(_iter_dates(config.start_date, config.end_date)) # Plan: per-day action. actionable: list[tuple[date, str, Path | None]] = [] for d in all_dates: csv_path = _match_csv_for_date( config.input_dir, d, config.sensor_id, config.output_format ) action = _decide_process_action( state.get(d), upload_to_s3=config.upload_to_s3, csv_available=csv_path is not None, retry_failed=config.retry_failed, ) actionable.append((d, action, csv_path)) # --limit-days counts only days that need work (not skip_existing). if config.limit_days is not None: kept: list[tuple[date, str, Path | None]] = [] worked = 0 for entry in actionable: _, action, _ = entry if action == "skip_existing": kept.append(entry) continue if worked >= config.limit_days: break kept.append(entry) worked += 1 actionable = kept summary = ProcessRunSummary( run_id=run_id, days_planned=len(actionable), days_attempted=0, days_processed=0, days_uploaded=0, days_skipped_existing=0, days_skipped_no_input=0, days_failed=0, ) for d, action, csv_path in actionable: prior = state.get(d) date_iso = d.isoformat() if config.dry_run: log.info(f"[dry-run] {date_iso} action={action}") continue if action == "skip_existing": log.info(f"{date_iso}: skip (already complete)") summary.days_skipped_existing += 1 continue if action == "skip_terminal": log.info(f"{date_iso}: skip (prior SKIPPED_NO_INPUT, still no input)") summary.days_skipped_no_input += 1 continue if action == "skip_failed": log.info(f"{date_iso}: skip (prior FAILED; pass --retry-failed to retry)") summary.days_failed += 1 continue if action == "skip_no_input": log.info(f"{date_iso}: SKIPPED_NO_INPUT (no matching CSV in input-dir)") telemetry.append_row( TelemetryRow( run_id=run_id, chunk_date_utc=date_iso, status=STATUS_SKIPPED_NO_INPUT, sensor_id=config.sensor_id, descriptor=config.descriptor, output_format=config.output_format, started_at_utc=utcnow_iso(), finished_at_utc=utcnow_iso(), **_carry_forward(prior), ) ) summary.days_skipped_no_input += 1 continue # action is run_process or run_upload_only carried = _carry_forward(prior) base_row = dict( run_id=run_id, chunk_date_utc=date_iso, sensor_id=config.sensor_id, descriptor=config.descriptor, output_format=config.output_format, **carried, ) cdf_path: Path | None = None process_seconds = "" cdf_size_mb = "" if action == "run_process": assert csv_path is not None # invariant from _decide_process_action base_row["csv_path"] = str(csv_path) summary.days_attempted += 1 started = utcnow_iso() telemetry.append_row( TelemetryRow( status=STATUS_PROCESS_PENDING, started_at_utc=started, **base_row, ) ) t0 = time.monotonic() try: produced = _process_one_day(csv_path, config.output_dir, process_fn) except Exception as exc: # noqa: BLE001 - never abort mid-range elapsed = time.monotonic() - t0 log.error( f"{date_iso}: FAILED at process stage " f"{type(exc).__name__}: {exc}\n{traceback.format_exc()}" ) telemetry.append_row( TelemetryRow( status=STATUS_FAILED, started_at_utc=started, finished_at_utc=utcnow_iso(), process_seconds=f"{elapsed:.3f}", error_type=type(exc).__name__, error_message=str(exc), **base_row, ) ) summary.days_failed += 1 continue elapsed = time.monotonic() - t0 if not produced: log.error(f"{date_iso}: FAILED process_file returned no paths") telemetry.append_row( TelemetryRow( status=STATUS_FAILED, started_at_utc=started, finished_at_utc=utcnow_iso(), process_seconds=f"{elapsed:.3f}", error_type="RuntimeError", error_message="process_file returned no output paths", **base_row, ) ) summary.days_failed += 1 continue if len(produced) > 1: log.warning( f"{date_iso}: process_file returned {len(produced)} paths; " f"recording the first ({produced[0].name})" ) cdf_path = _relocate_to_nested_layout(Path(produced[0]), config.output_dir) cdf_size_mb = ( f"{cdf_path.stat().st_size / (1024 * 1024):.4f}" if cdf_path.exists() else "0.0000" ) process_seconds = f"{elapsed:.3f}" base_row["cdf_path"] = str(cdf_path) base_row["cdf_size_mb"] = cdf_size_mb base_row["process_seconds"] = process_seconds log.info( f"{date_iso}: PROCESSED size={cdf_size_mb}MB in {process_seconds}s " f"-> {cdf_path}" ) telemetry.append_row( TelemetryRow( status=STATUS_PROCESSED, started_at_utc=started, finished_at_utc=utcnow_iso(), **base_row, ) ) summary.days_processed += 1 if not config.upload_to_s3: continue # fall through to upload using cdf_path elif action == "run_upload_only": assert prior is not None cdf_path = Path(prior.cdf_path) if prior.cdf_path else None if cdf_path is None or not cdf_path.exists(): # Should not reach here given _decide_process_action, # but be defensive. log.warning( f"{date_iso}: expected existing CDF for upload-only " f"but path is missing; falling back to skip_failed" ) summary.days_failed += 1 continue base_row["cdf_path"] = str(cdf_path) base_row["cdf_size_mb"] = prior.cdf_size_mb base_row["process_seconds"] = prior.process_seconds # === Upload stage === if not config.upload_to_s3 or upload_fn is None: continue upload_started = utcnow_iso() telemetry.append_row( TelemetryRow( status=STATUS_UPLOAD_PENDING, started_at_utc=upload_started, **base_row, ) ) u0 = time.monotonic() try: bucket, s3_key = upload_fn(cdf_path, destination_bucket=config.s3_bucket) except Exception as exc: # noqa: BLE001 u_elapsed = time.monotonic() - u0 log.error( f"{date_iso}: FAILED at upload stage " f"{type(exc).__name__}: {exc}\n{traceback.format_exc()}" ) telemetry.append_row( TelemetryRow( status=STATUS_FAILED, started_at_utc=upload_started, finished_at_utc=utcnow_iso(), upload_seconds=f"{u_elapsed:.3f}", error_type=type(exc).__name__, error_message=str(exc), **base_row, ) ) summary.days_failed += 1 continue u_elapsed = time.monotonic() - u0 log.info(f"{date_iso}: UPLOADED s3://{bucket}/{s3_key} in {u_elapsed:.1f}s") telemetry.append_row( TelemetryRow( status=STATUS_UPLOADED, started_at_utc=upload_started, finished_at_utc=utcnow_iso(), upload_seconds=f"{u_elapsed:.3f}", s3_bucket=bucket, s3_key=s3_key, **base_row, ) ) summary.days_uploaded += 1 return summary