swxsoc_reach.historical.process_orchestrator#

Per-day orchestrator for historical CSV → CDF processing.

Drives swxsoc_reach.calibration.calibration.process_file() over an inclusive UTC date range, picking up CSVs produced by Phase 1’s download orchestrator from --input-dir. Each day produces one CDF in --output-dir and (optionally) one S3 upload via swxsoc_reach.historical.s3_upload.upload_cdf_to_s3(). Telemetry is appended to the same CSV file Phase 1 wrote (extended schema), so the full download → process → upload lifecycle for a given day is visible in one place.

Sequential by day, mirroring the download orchestrator. The chdir-into-output-dir hack is required because process_file writes to Path.cwd() when LAMBDA_ENVIRONMENT is unset (which is the case for historical local runs); we restore cwd in finally so a failure mid-day does not leave the process in a bad state.

Functions

run_process(config, *[, process_fn, ...])

Run the historical CSV -> CDF (-> S3) orchestrator.

Classes

ProcessRunConfig(start_date, end_date, ...)

Inputs to run_process().

ProcessRunSummary(run_id, days_planned, ...)

Aggregate result of one run_process() invocation.

class swxsoc_reach.historical.process_orchestrator.ProcessRunConfig(start_date: date, end_date: date, input_dir: Path, output_dir: Path, telemetry_path: Path, sensor_id: str = 'ALL', descriptor: str = 'QUICKLOOK', output_format: str = 'csv', retry_failed: bool = False, limit_days: int | None = None, dry_run: bool = False, upload_to_s3: bool = False, s3_bucket: str | None = None)[source]#

Inputs to run_process().

Mirrors the historical-process CLI flags one-for-one. The CLI layer parses argv into one of these instances.

Fields#

  • start_date, end_date : inclusive UTC date range.

  • input_dir : directory holding Phase 1 download artifacts (CSV/JSON files named per build_reach_output_filename()).

  • output_dir : directory where CDF files are written.

  • telemetry_path : append-only telemetry CSV (typically the same file Phase 1 wrote).

  • sensor_id, descriptor, output_format : used to reconstruct the expected per-day input filename when scanning input_dir.

  • retry_failed : when True, days with prior status FAILED are re-attempted.

  • limit_days : cap on attempted days (counted from the first not-yet-complete day).

  • dry_run : plan only - no work, no telemetry writes.

  • upload_to_s3 : if True, attempt an S3 upload after a successful CDF write. If False, PROCESSED is the terminal status for the day.

  • s3_bucket : destination bucket (required iff upload_to_s3 is True).

descriptor: str = 'QUICKLOOK'#
dry_run: bool = False#
end_date: date#
input_dir: Path#
limit_days: int | None = None#
output_dir: Path#
output_format: str = 'csv'#
retry_failed: bool = False#
s3_bucket: str | None = None#
sensor_id: str = 'ALL'#
start_date: date#
telemetry_path: Path#
upload_to_s3: bool = False#
class swxsoc_reach.historical.process_orchestrator.ProcessRunSummary(run_id: str, days_planned: int, days_attempted: int, days_processed: int, days_uploaded: int, days_skipped_existing: int, days_skipped_no_input: int, days_failed: int)[source]#

Aggregate result of one run_process() invocation.

days_attempted: int#
days_failed: int#
days_planned: int#
days_processed: int#
days_skipped_existing: int#
days_skipped_no_input: int#
days_uploaded: int#
run_id: str#
swxsoc_reach.historical.process_orchestrator._carry_forward(prior: TelemetryRow | None) dict[str, str][source]#

Carry Phase 1 download columns forward onto a Phase 2 row.

Keeps the most-recent row per day self-describing in the telemetry CSV. When no prior row exists, returns blank values so column positions are populated.

swxsoc_reach.historical.process_orchestrator._decide_process_action(prior: TelemetryRow | None, *, upload_to_s3: bool, csv_available: bool, retry_failed: bool) str[source]#

Return one of:

  • run_process: (re)run process_file from CSV (and upload if configured)

  • run_upload_only: CDF already exists; just upload

  • skip_existing: day already terminal; nothing to do

  • skip_terminal: prior SKIPPED_NO_INPUT and CSV still missing

  • skip_failed: prior FAILED and --retry-failed not set

swxsoc_reach.historical.process_orchestrator._match_csv_for_date(input_dir: Path, day: date, sensor_id: str, output_format: str) Path | None[source]#

Return the per-day input artifact path or None if missing.

Phase 1 names files via build_reach_output_filename(): {sensor_prefix}_{startTIME}_{endTIME}.{output_format} where the time format is %Y%m%dT%H%M%S. For a UTC day, the start component is YYYYMMDDT000000 and the end is the next day’s YYYYMMDDT000000. We glob loosely on the leading sensor + start-time prefix so any pair of matching start/end timestamps is accepted.

swxsoc_reach.historical.process_orchestrator._process_one_day(csv_path: Path, output_dir: Path, process_fn: Callable[[Path], list[Path]]) list[Path][source]#

Run process_file(csv_path) with cwd switched to output_dir.

process_file writes to Path.cwd() when LAMBDA_ENVIRONMENT is unset, so we chdir for the duration of the call and restore in finally. Sequential per-day execution makes this safe.

swxsoc_reach.historical.process_orchestrator._relocate_to_nested_layout(flat_path: Path, output_dir: Path) Path[source]#

Move flat_path into a nested subdirectory of output_dir.

The subdirectory mirrors the S3 key produced by sdc_aws_utils.aws.create_s3_file_key() (e.g. l1c/prelim/2026/01/01/). Falls back to returning flat_path unchanged if sdc_aws_utils or swxsoc are not importable, or if key computation raises for any reason.

swxsoc_reach.historical.process_orchestrator.run_process(config: ProcessRunConfig, *, process_fn: Callable[[Path], list[Path]] | None = None, upload_fn: Callable[[...], tuple[str, str]] | None = None, telemetry: HistoricalTelemetry | None = None) ProcessRunSummary[source]#

Run the historical CSV -> CDF (-> S3) orchestrator.

Parameters:
Returns:

ProcessRunSummary – Aggregate counts. The CLI layer uses these to log the final summary line and choose an exit code.