swxsoc_reach.historical.process_orchestrator#
Per-day orchestrator for historical CSV → CDF processing.
Drives swxsoc_reach.calibration.calibration.process_file() over
an inclusive UTC date range, picking up CSVs produced by Phase 1’s
download orchestrator from --input-dir. Each day produces one
CDF in --output-dir and (optionally) one S3 upload via
swxsoc_reach.historical.s3_upload.upload_cdf_to_s3(). Telemetry
is appended to the same CSV file Phase 1 wrote (extended schema), so
the full download → process → upload lifecycle for a given day is
visible in one place.
Sequential by day, mirroring the download orchestrator. The
chdir-into-output-dir hack is required because
process_file writes to Path.cwd() when LAMBDA_ENVIRONMENT
is unset (which is the case for historical local runs); we restore
cwd in finally so a failure mid-day does not leave the
process in a bad state.
Functions
|
Run the historical CSV -> CDF (-> S3) orchestrator. |
Classes
|
Inputs to |
|
Aggregate result of one |
- class swxsoc_reach.historical.process_orchestrator.ProcessRunConfig(start_date: date, end_date: date, input_dir: Path, output_dir: Path, telemetry_path: Path, sensor_id: str = 'ALL', descriptor: str = 'QUICKLOOK', output_format: str = 'csv', retry_failed: bool = False, limit_days: int | None = None, dry_run: bool = False, upload_to_s3: bool = False, s3_bucket: str | None = None)[source]#
Inputs to
run_process().Mirrors the historical-process CLI flags one-for-one. The CLI layer parses argv into one of these instances.
Fields#
start_date,end_date: inclusive UTC date range.input_dir: directory holding Phase 1 download artifacts (CSV/JSON files named perbuild_reach_output_filename()).output_dir: directory where CDF files are written.telemetry_path: append-only telemetry CSV (typically the same file Phase 1 wrote).sensor_id,descriptor,output_format: used to reconstruct the expected per-day input filename when scanninginput_dir.retry_failed: when True, days with prior statusFAILEDare re-attempted.limit_days: cap on attempted days (counted from the first not-yet-complete day).dry_run: plan only - no work, no telemetry writes.upload_to_s3: if True, attempt an S3 upload after a successful CDF write. If False,PROCESSEDis the terminal status for the day.s3_bucket: destination bucket (required iffupload_to_s3is True).
- class swxsoc_reach.historical.process_orchestrator.ProcessRunSummary(run_id: str, days_planned: int, days_attempted: int, days_processed: int, days_uploaded: int, days_skipped_existing: int, days_skipped_no_input: int, days_failed: int)[source]#
Aggregate result of one
run_process()invocation.
- swxsoc_reach.historical.process_orchestrator._carry_forward(prior: TelemetryRow | None) dict[str, str][source]#
Carry Phase 1 download columns forward onto a Phase 2 row.
Keeps the most-recent row per day self-describing in the telemetry CSV. When no prior row exists, returns blank values so column positions are populated.
- swxsoc_reach.historical.process_orchestrator._decide_process_action(prior: TelemetryRow | None, *, upload_to_s3: bool, csv_available: bool, retry_failed: bool) str[source]#
Return one of:
run_process: (re)run process_file from CSV (and upload if configured)run_upload_only: CDF already exists; just uploadskip_existing: day already terminal; nothing to doskip_terminal: prior SKIPPED_NO_INPUT and CSV still missingskip_failed: prior FAILED and--retry-failednot set
- swxsoc_reach.historical.process_orchestrator._match_csv_for_date(input_dir: Path, day: date, sensor_id: str, output_format: str) Path | None[source]#
Return the per-day input artifact path or
Noneif missing.Phase 1 names files via
build_reach_output_filename():{sensor_prefix}_{startTIME}_{endTIME}.{output_format}where the time format is%Y%m%dT%H%M%S. For a UTC day, the start component isYYYYMMDDT000000and the end is the next day’sYYYYMMDDT000000. We glob loosely on the leading sensor + start-time prefix so any pair of matching start/end timestamps is accepted.
- swxsoc_reach.historical.process_orchestrator._process_one_day(csv_path: Path, output_dir: Path, process_fn: Callable[[Path], list[Path]]) list[Path][source]#
Run
process_file(csv_path)with cwd switched tooutput_dir.process_filewrites toPath.cwd()whenLAMBDA_ENVIRONMENTis unset, so we chdir for the duration of the call and restore infinally. Sequential per-day execution makes this safe.
- swxsoc_reach.historical.process_orchestrator._relocate_to_nested_layout(flat_path: Path, output_dir: Path) Path[source]#
Move flat_path into a nested subdirectory of output_dir.
The subdirectory mirrors the S3 key produced by
sdc_aws_utils.aws.create_s3_file_key()(e.g.l1c/prelim/2026/01/01/). Falls back to returning flat_path unchanged ifsdc_aws_utilsorswxsocare not importable, or if key computation raises for any reason.
- swxsoc_reach.historical.process_orchestrator.run_process(config: ProcessRunConfig, *, process_fn: Callable[[Path], list[Path]] | None = None, upload_fn: Callable[[...], tuple[str, str]] | None = None, telemetry: HistoricalTelemetry | None = None) ProcessRunSummary[source]#
Run the historical CSV -> CDF (-> S3) orchestrator.
- Parameters:
config (ProcessRunConfig) – Run inputs, typically built from CLI args.
process_fn (callable, optional) – Override for
swxsoc_reach.calibration.calibration.process_file(). Receives the CSV path and must return alist[Path]of produced CDFs (written intocwdper the existing contract). Tests inject a stub here.upload_fn (callable, optional) – Override for
swxsoc_reach.historical.s3_upload.upload_cdf_to_s3(). Must accept(cdf_path, destination_bucket=...)keyword args and return(bucket, s3_key).telemetry (HistoricalTelemetry, optional) – Override for the telemetry writer/reader.
- Returns:
ProcessRunSummary – Aggregate counts. The CLI layer uses these to log the final summary line and choose an exit code.