swxsoc_reach.net.udl#

Functions

build_reach_output_filename(sensor_id, ...)

Build a deterministic output filename for combined REACH data.

download_UDL_reach_to_file(auth_token, ...)

Download REACH data from UDL for a relative-time window.

download_UDL_reach_window(auth_token, ...[, ...])

Download REACH data from UDL for an explicit absolute UTC window.

fetch_reach_chunk(dt, url, auth_token[, ...])

Fetch one UDL chunk and normalize the payload into a list of records.

format_udl_timestamp(value)

Format an Astropy time value for UDL query parameters.

get_reach_datetimelist(start_time, end_time, ...)

Split a time range into UDL-safe query windows.

get_reach_urllist(dtlist, sensor_id, descriptor)

Build UDL request URLs for each time interval.

write_reach_output(filepath, obs, output_format)

Write REACH payload records to disk.

Classes

AdaptiveRateController([initial_rate, ...])

Thread-safe AIMD rate controller for throttling HTTP requests.

class swxsoc_reach.net.udl.AdaptiveRateController(initial_rate: float = 5.0, additive_increase: float = 1.0, multiplicative_decrease: float = 0.5, min_rate: float = 5.0, max_rate: float = 25.0)[source]#

Thread-safe AIMD rate controller for throttling HTTP requests.

Uses Additive Increase / Multiplicative Decrease to dynamically adjust the permitted request rate based on server feedback.

Parameters:
  • initial_rate (float, optional) – Starting request rate in requests per second.

  • additive_increase (float, optional) – Amount to add to the rate after each successful request.

  • multiplicative_decrease (float, optional) – Factor to multiply the rate by after a rate-limit response.

  • min_rate (float, optional) – Minimum permitted request rate.

  • max_rate (float, optional) – Maximum permitted request rate.

acquire() None[source]#

Block until the next request is permitted under the current rate.

record_rate_limit() None[source]#

Record a rate-limit (429) response and decrease the rate.

record_success() None[source]#

Record a successful request and increase the rate additively.

swxsoc_reach.net.udl._concatenate_chunk_files(filepath: Path, dtlist: list[str], chunk_files: dict[str, Path], output_format: Literal['json', 'csv']) None[source]#

Stream-concatenate per-chunk temp files into one combined output file.

Reads one temp file at a time so peak memory stays proportional to a single chunk rather than the full dataset.

Parameters:
  • filepath (pathlib.Path) – Destination path for the combined output file.

  • dtlist (list[str]) – Chunk window identifiers in the desired output order.

  • chunk_files (dict[str, pathlib.Path]) – Mapping of chunk window identifiers to their temp file paths. Only chunks that produced records are present.

  • output_format ({'json', 'csv'}) – Serialization format of the output file.

swxsoc_reach.net.udl._fetch_and_spool_chunk(dt: str, url: str, auth_token: str, chunk_path: Path, output_format: Literal['json', 'csv'], rate_controller: AdaptiveRateController | None = None, max_retries: int = 5) tuple[str, int, Path | None][source]#

Fetch one UDL chunk and spool records to a temp file.

Wraps fetch_reach_chunk() so that the heavy record data is written to disk inside the worker thread and never stored in the Future result. This keeps peak memory proportional to one chunk regardless of how many futures are in flight.

Parameters:
  • dt (str) – Chunk window identifier.

  • url (str) – UDL request URL.

  • auth_token (str) – Authorization header value.

  • chunk_path (pathlib.Path) – Temp file path to write records into.

  • output_format ({'json', 'csv'}) – Serialization format for the temp file.

  • rate_controller (AdaptiveRateController or None, optional) – Shared AIMD rate controller.

  • max_retries (int, optional) – Maximum retry attempts on HTTP 429.

Returns:

tuple[str, int, pathlib.Path or None](dt, record_count, chunk_path) where chunk_path is None when the chunk contained no records.

swxsoc_reach.net.udl._write_chunk_file(chunk_path: Path, records: list[dict[str, Any]], output_format: Literal['json', 'csv']) None[source]#

Write a single chunk’s records to a temporary file.

Parameters:
  • chunk_path (pathlib.Path) – Destination path for the chunk file.

  • records (list[dict[str, Any]]) – Non-empty list of observation records to serialize.

  • output_format ({'json', 'csv'}) – Serialization format.

swxsoc_reach.net.udl.build_reach_output_filename(sensor_id: str, start_time: Time, end_time: Time, output_format: Literal['json', 'csv']) str[source]#

Build a deterministic output filename for combined REACH data.

Parameters:
  • sensor_id (str) – REACH sensor identifier, or ALL.

  • start_time (astropy.time.Time) – Start time used in the query.

  • end_time (astropy.time.Time) – End time used in the query.

  • output_format ({'json', 'csv'}) – Output serialization format.

Returns:

str – Filename with sensor prefix and query time range.

swxsoc_reach.net.udl.download_UDL_reach_to_file(auth_token: str, sensor_id: str, descriptor: str, output_format: Literal['json', 'csv'], delay_seconds: int, window_seconds: int, output_dir: Path | str, max_concurrent_requests: int = 4, initial_rate: float = 5.0, additive_increase: float = 1.0, multiplicative_decrease: float = 0.5, min_rate: float = 5.0, max_rate: float = 25.0) Path[source]#

Download REACH data from UDL for a relative-time window.

Computes end_time = Time.now() - delay_seconds and start_time = end_time - window_seconds and delegates to download_UDL_reach_window(). This is the entry point used by the scheduled Lambda.

Parameters:
  • auth_token (str) – UDL authorization token value for the Authorization header.

  • sensor_id (str) – REACH sensor identifier, or ALL.

  • descriptor (str) – UDL descriptor value to include in each request.

  • output_format ({'json', 'csv'}) – Output serialization format.

  • delay_seconds (int) – Number of seconds to subtract from Time.now() before ending the query window.

  • window_seconds (int) – Duration of the query window in seconds.

  • output_dir (pathlib.Path or str) – Directory where the combined output file is written.

  • max_concurrent_requests (int, optional) – Maximum number of chunk requests to run concurrently.

  • initial_rate (float, optional) – AIMD rate controller tuning parameters; see download_UDL_reach_window().

  • additive_increase (float, optional) – AIMD rate controller tuning parameters; see download_UDL_reach_window().

  • multiplicative_decrease (float, optional) – AIMD rate controller tuning parameters; see download_UDL_reach_window().

  • min_rate (float, optional) – AIMD rate controller tuning parameters; see download_UDL_reach_window().

  • max_rate (float, optional) – AIMD rate controller tuning parameters; see download_UDL_reach_window().

Returns:

pathlib.Path – Path to the written output file.

Raises:
  • ValueError – If output_format is invalid or no records are returned.

  • requests.HTTPError – If any UDL request fails.

swxsoc_reach.net.udl.download_UDL_reach_window(auth_token: str, sensor_id: str, descriptor: str, output_format: Literal['json', 'csv'], start_time: Time, end_time: Time, output_dir: Path | str, max_concurrent_requests: int = 4, initial_rate: float = 5.0, additive_increase: float = 1.0, multiplicative_decrease: float = 0.5, min_rate: float = 5.0, max_rate: float = 25.0) Path[source]#

Download REACH data from UDL for an explicit absolute UTC window.

Behaves identically to download_UDL_reach_to_file() except that the query window is provided directly as start_time / end_time rather than computed relative to Time.now(). Suitable for historical reprocessing where the operator drives the window.

Each chunk is written to a temporary file as it arrives, keeping peak memory proportional to one chunk instead of the full dataset. Temp files are concatenated in time order into the final output file and cleaned up automatically.

Parameters:
  • auth_token (str) – UDL authorization token value for the Authorization header.

  • sensor_id (str) – REACH sensor identifier, or ALL.

  • descriptor (str) – UDL descriptor value to include in each request.

  • output_format ({'json', 'csv'}) – Output serialization format.

  • start_time (astropy.time.Time) – Inclusive start of the UTC query window.

  • end_time (astropy.time.Time) – Exclusive end of the UTC query window. Must be strictly after start_time.

  • output_dir (pathlib.Path or str) – Directory where the combined output file is written.

  • max_concurrent_requests (int, optional) – Maximum number of chunk requests to run concurrently. Lower values are safer for unknown API limits; higher values can improve throughput.

  • initial_rate (float, optional) – Starting request rate in requests per second for the AIMD rate controller. Default is 5.0.

  • additive_increase (float, optional) – Amount added to the rate after each successful request. Default is 1.0.

  • multiplicative_decrease (float, optional) – Factor to multiply the rate by after a 429 response. Default is 0.5.

  • min_rate (float, optional) – Minimum permitted request rate. Default is 5.0.

  • max_rate (float, optional) – Maximum permitted request rate. Default is 25.0.

Returns:

pathlib.Path – Absolute or relative path (as provided) to the written output file.

Raises:
  • ValueError – If output_format is not one of 'json' or 'csv'.

  • ValueError – If no records are returned for the requested time window.

  • requests.HTTPError – If any UDL request returns an unsuccessful HTTP status code.

swxsoc_reach.net.udl.fetch_reach_chunk(dt: str, url: str, auth_token: str, timeout_seconds: int = 120, rate_controller: AdaptiveRateController | None = None, max_retries: int = 5) tuple[str, list[dict[str, Any]]][source]#

Fetch one UDL chunk and normalize the payload into a list of records.

Includes retry logic with exponential back-off and jitter for HTTP 429 responses. When a rate_controller is provided, it is used to throttle requests and receives success/failure feedback.

Parameters:
  • dt (str) – Chunk window identifier (<start>..<end>) used for logging/order.

  • url (str) – UDL request URL for the chunk.

  • auth_token (str) – Authorization header value for UDL.

  • timeout_seconds (int, optional) – Request timeout in seconds. Default is 120 seconds to allow for large chunks or slow responses.

  • rate_controller (AdaptiveRateController or None, optional) – Shared rate controller for AIMD throttling. If None, no throttling or adaptive feedback is applied.

  • max_retries (int, optional) – Maximum number of retry attempts after a 429 response or transient connection error.

Returns:

tuple[str, list[dict[str, Any]]] – The chunk window string and its records.

Raises:
  • requests.HTTPError – If UDL responds with an unsuccessful status code after all retries.

  • requests.ConnectionError – If the connection fails on every retry attempt.

swxsoc_reach.net.udl.format_udl_timestamp(value: Time) str[source]#

Format an Astropy time value for UDL query parameters.

Parameters:

value (astropy.time.Time) – Timestamp to convert into the UDL API timestamp format.

Returns:

str – Timestamp formatted as YYYY-MM-DDTHH:MM:SS.000Z.

swxsoc_reach.net.udl.get_reach_datetimelist(start_time: Time, end_time: Time, sensor_id: str) list[str][source]#

Split a time range into UDL-safe query windows.

Parameters:
  • start_time (astropy.time.Time) – Inclusive start time of the requested observation window.

  • end_time (astropy.time.Time) – Inclusive end time of the requested observation window.

  • sensor_id (str) – REACH sensor identifier. IDs beginning with REACH- use 6-hour chunks; all other values use 5-minute chunks.

Returns:

list[str] – List of obTime interval strings in UDL range format (<start>..<end>).

swxsoc_reach.net.udl.get_reach_urllist(dtlist: list[str], sensor_id: str, descriptor: str) dict[str, str][source]#

Build UDL request URLs for each time interval.

Parameters:
  • dtlist (list[str]) – List of UDL obTime interval strings.

  • sensor_id (str) – REACH sensor identifier, or ALL for all sensors.

  • descriptor (str) – UDL descriptor value to include in the query.

Returns:

dict[str, str] – Mapping of each interval string to its full UDL query URL.

swxsoc_reach.net.udl.write_reach_output(filepath: Path, obs: list[dict[str, Any]], output_format: Literal['json', 'csv']) None[source]#

Write REACH payload records to disk.

Parameters:
  • filepath (pathlib.Path) – Destination file path.

  • obs (list[dict[str, Any]]) – Observation records to serialize.

  • output_format ({'json', 'csv'}) – Output serialization format.

Returns:

None – This function writes a file as a side effect.