Source code for dapper.met.adapters.era5

# dapper/met/adapters/era5.py
"""ERA5-Land adapter implementation."""

from __future__ import annotations
import numpy as np
import pandas as pd
from pathlib import Path

from dapper.met import temporal as dt
from dapper.met.adapters.base import BaseAdapter
from dapper.schemas.elm import elm_required_vars, is_nonnegative
from dapper.config.metsources.era5 import RAW_TO_ELM
from dapper.elm import utils as eu  # for compute_humidities, packing defaults


[docs] class ERA5Adapter(BaseAdapter): """ERA5-Land → ELM adapter. This adapter implements the ``BaseAdapter`` interface for ERA5-Land hourly data. It handles source-specific details—file discovery, unit conversions, humidity diagnostics, renaming to ELM short names, and nonnegativity enforcement, so the upstream ``Exporter`` can remain source-agnostic. Responsibilities ---------------- - **discover_files**: Find CSV shards in a directory and infer the overall (start_year, end_year) using their date coverage. - **normalize_locations**: Validate and normalize the locations table (adds ``lon_0-360``, ensures/creates ``zone``, stable sorting). - **id_column_for_csv**: Declare the identifier column name in the input CSVs. For ERA5 we require ``gid``. - **preprocess_shard**: Convert one merged shard (CSV rows joined to locations) into canonical ELM columns. Steps include: 1. time filtering and optional “noleap” removal of Feb 29 2. ERA5→ELM unit conversions (e.g., J/hr/m² → W/m², m/hr → mm/s) 3. optional humidity computation (RH/Q) if temperature, dewpoint, and surface pressure are available 4. renaming raw ERA5 fields to ELM short names via a mapping 5. clipping canonical nonnegative variables 6. returning only required columns in a deterministic order - **required_vars**: Report the canonical ELM variable names required for the requested output format. - **pack_params**: Provide robust ``(add_offset, scale_factor)`` for a canonical ELM variable, given optional data to tune ranges. Notes ----- - Humidity computation is performed only when ``temperature_2m``, ``dewpoint_temperature_2m``, and ``surface_pressure`` are present. - Precipitation conversion uses ``m/hr → mm/s`` via division by ``3.6``. """ # These are just for netCDF metadata SOURCE_NAME = "ERA5-Land hourly reanalysis" DRIVER_TAG = "ERA5" # ---------------- discovery & locations ----------------
[docs] def discover_files(self, csv_directory, calendar): """Discover ERA5 CSV shards in a directory and infer the inclusive year range.""" csv_directory = Path(csv_directory) # ignore directories; only pick real files that end with .csv (case-insensitive) csv_files = [ str(p) for p in csv_directory.iterdir() if p.is_file() and p.suffix.lower() == ".csv" ] if not csv_files: raise FileNotFoundError(f"No .csv files found in {csv_directory}") start_year, end_year = dt.get_start_end_years(csv_files, calendar=calendar) return csv_files, start_year, end_year
[docs] def id_column_for_csv(self, df_csv, id_col): """Return the required identifier column name expected in ERA5 CSV shards ("gid").""" if "gid" not in df_csv.columns: raise KeyError("Expected 'gid' column in input CSV.") return "gid"
# ---------------- preprocessing & requirements ----------------
[docs] def preprocess_shard(self, df_merged, start_year, end_year, calendar, dformat): """ 1) Filter time & handle no-leap 2) Apply ERA5 → ELM unit conversions 3) Compute humidities (if columns available) 4) Rename columns to canonical ELM names using RAW_TO_ELM 5) Clip canonical nonnegative variables 6) Return only the canonical vars required by elm_required_vars(dformat), plus LONGXY/LATIXY/time/gid/zone (coords/meta). """ df = df_merged.copy() # --- time handling --- if "date" not in df.columns: raise KeyError("Expected 'date' column in the CSV shard.") df["date"] = pd.to_datetime(df["date"]) df = df.sort_values("date") df = df[(df["date"].dt.year >= start_year) & (df["date"].dt.year <= end_year)] if str(calendar).lower() == "noleap": df = df[~((df["date"].dt.month == 2) & (df["date"].dt.day == 29))] # --- ERA5-specific unit conversions (kept local to adapter) --- df = self._unit_conversions(df) # --- humidities if possible --- needed = {"temperature_2m", "dewpoint_temperature_2m", "surface_pressure"} if needed.issubset(df.columns): RH, Q = eu.compute_humidities( df["temperature_2m"].values, df["dewpoint_temperature_2m"].values, df["surface_pressure"].values, ) df["relative_humidity"] = RH df["specific_humidity"] = Q # --- rename to canonical ELM names based on RAW_TO_ELM --- want_canon = set(elm_required_vars(dformat)) # includes LONGXY/LATIXY/time # keep only mappings that land in required canonical vars rename_map = {src: canon for src, canon in RAW_TO_ELM.items() if canon in want_canon} df = df.rename(columns=rename_map) # coords/time to canonical names df = df.rename(columns={"date": "time", "lon": "LONGXY", "lat": "LATIXY"}) # --- enforce nonnegativity for canonical variables (post-rename) --- for col in list(df.columns): if col in df.columns and is_nonnegative(col): df[col] = df[col].clip(lower=0) # --- final selection/order --- # Remove coords/meta from the "required data vars" list for column ordering coord_meta = {"LONGXY", "LATIXY", "time", "gid", "zone"} required_data_vars = [v for v in elm_required_vars(dformat) if v not in coord_meta] final_cols = required_data_vars + ["LONGXY", "LATIXY", "time", "gid", "zone"] # Keep only those that exist (some formats/inputs may not provide all) final_cols = [c for c in final_cols if c in df.columns] df = df[final_cols] return df.sort_values(["time", "LATIXY", "LONGXY"]).reset_index(drop=True)
[docs] def required_vars(self, dformat): """Return the canonical ELM variables required for the requested output format.""" return elm_required_vars(dformat)
# ---------------- packing ----------------
[docs] def pack_params(self, elm_var, data=None): # Delegate to your existing robust packer (range→offset/scale) """Return (add_offset, scale_factor) used to pack a variable for NetCDF output.""" ao, sf = eu.elm_var_packing_params(elm_var, data=(data if data is not None else [])) return float(ao), float(sf)
# ---------------- internal: ERA5 unit conversions ---------------- def _unit_conversions(self, df): """ ERA5-Land hourly → ELM unit alignment. """ out = df.copy() # Wind speed from u,v if "u_component_of_wind_10m" in out.columns and "v_component_of_wind_10m" in out.columns: u = out["u_component_of_wind_10m"].values v = out["v_component_of_wind_10m"].values out["wind_speed"] = np.sqrt(u**2 + v**2) # Optional diagnostic (not used by ELM) wd = np.degrees(np.arctan2(u, v)) wd[wd >= 180] -= 180 wd[wd < 180] += 180 out["wind_direction"] = wd # Precip: meters/hour → mm/s if "total_precipitation_hourly" in out.columns: out["total_precipitation_hourly"] = out["total_precipitation_hourly"].values / 3.6 # SW/LW: J/hr/m2 → W/m2 if "surface_solar_radiation_downwards_hourly" in out.columns: out["surface_solar_radiation_downwards_hourly"] = ( out["surface_solar_radiation_downwards_hourly"].values / 3600.0 ) if "surface_thermal_radiation_downwards_hourly" in out.columns: out["surface_thermal_radiation_downwards_hourly"] = ( out["surface_thermal_radiation_downwards_hourly"].values / 3600.0 ) return out