Source code for swxsoc_reach.historical.s3_upload

"""Local CDF upload helper for the historical process orchestrator.

Mirrors the executor Lambda's ``_upload_reach_file_to_s3`` exactly:
sets ``SWXSOC_MISSION=swxsoc_pipeline``, calls
:func:`swxsoc._reconfigure`, and delegates the upload to
:func:`sdc_aws_utils.aws.push_science_file`.

``push_science_file`` (via :func:`sdc_aws_utils.aws.upload_file_to_s3`)
hard-codes ``/tmp/{filename}`` as the source path because it was
written for the Lambda runtime where the CDF already lives in
``/tmp``. For a local historical run the CDF is in
``--output-dir`` instead, so this helper stages a copy of the file
into ``/tmp`` before invoking ``push_science_file`` and removes the
staged copy afterwards. The original CDF in ``--output-dir`` is left
untouched.

``boto3`` and ``sdc_aws_utils`` are imported lazily inside the
function so the package still imports on dev machines that have not
installed the ``[net]`` extra.
"""

from __future__ import annotations

import shutil
import tempfile
from pathlib import Path

from swxsoc_reach import log

_INSTALL_HINT = (
    "S3 upload requires the optional 'net' extra: "
    "pip install 'swxsoc_reach[net]' (provides boto3 + sdc_aws_utils)."
)


[docs] def upload_cdf_to_s3(cdf_path: Path, *, destination_bucket: str) -> tuple[str, str]: """Upload a single CDF file to S3 via ``sdc_aws_utils``. Parameters ---------- cdf_path : pathlib.Path Local path to the CDF file. Must exist on disk. destination_bucket : str Target S3 bucket name (no ``s3://`` prefix). Returns ------- tuple[str, str] ``(destination_bucket, s3_key)`` where ``s3_key`` is the value returned by :func:`sdc_aws_utils.aws.push_science_file`. Raises ------ RuntimeError If ``boto3`` or ``sdc_aws_utils`` are not importable. The message includes the install hint. FileNotFoundError If ``cdf_path`` does not exist. """ cdf_path = Path(cdf_path) if not cdf_path.is_file(): raise FileNotFoundError(f"CDF not found for upload: {cdf_path}") try: import boto3 # noqa: F401 -- imported for availability check from sdc_aws_utils.aws import push_science_file from sdc_aws_utils.config import parser as science_filename_parser except ImportError as exc: # pragma: no cover - exercised via test stub raise RuntimeError(f"{_INSTALL_HINT} (import error: {exc})") from exc # ``upload_file_to_s3`` (called inside push_science_file) reads from # ``/tmp/{basename}``. Stage the file there for the duration of the # upload, then remove the staging copy. filename = cdf_path.name tmp_dir = Path(tempfile.gettempdir()) staged = tmp_dir / filename if staged.resolve() != cdf_path.resolve(): shutil.copy2(cdf_path, staged) staged_was_copied = True else: staged_was_copied = False try: s3_key = push_science_file( science_filename_parser=science_filename_parser, destination_bucket=destination_bucket, calibrated_filename=filename, ) finally: if staged_was_copied: try: staged.unlink() except OSError as exc: log.warning(f"Failed to remove staged upload {staged}: {exc}") log.info( "Uploaded REACH CDF to S3", extra={ "cdf_path": str(cdf_path), "destination_bucket": destination_bucket, "s3_key": s3_key, }, ) return destination_bucket, s3_key