Source code for dapper.met.temporal

# dapper/met/temporal.py
"""
Temporal helpers used by Exporter and adapters.
NetCDF I/O is handled in dapper.met.writers. This module is intentionally small.
"""

import numpy as np
import pandas as pd


_CUMDAYS_NONLEAP = np.asarray(
    [0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334], dtype=np.int64
)


def _drop_feb29(times):
    """Drop Feb 29 timestamps from a 1D array-like of datetimes."""
    tidx = pd.DatetimeIndex(times)
    mask = ~((tidx.month == 2) & (tidx.day == 29))
    return tidx[mask].to_numpy()


def _noleap_offset(dtime_units: str, target_times, ref_date):
    """Compute numeric offsets in a 365-day (noleap) calendar."""

    tidx = pd.DatetimeIndex(target_times)
    ref = pd.Timestamp(ref_date)

    # Days since 0001-01-01 in a 365-day calendar (non-leap month lengths)
    y = tidx.year.astype(np.int64)
    m = tidx.month.astype(np.int64)
    d = tidx.day.astype(np.int64)
    day0 = 365 * (y - 1) + _CUMDAYS_NONLEAP[m - 1] + (d - 1)

    ref_day0 = 365 * (ref.year - 1) + _CUMDAYS_NONLEAP[ref.month - 1] + (ref.day - 1)

    # Fractional day from time-of-day
    frac = (
        tidx.hour.astype(np.float64) * 3600.0
        + tidx.minute.astype(np.float64) * 60.0
        + tidx.second.astype(np.float64)
        + tidx.microsecond.astype(np.float64) / 1.0e6
        + tidx.nanosecond.astype(np.float64) / 1.0e9
    ) / 86400.0
    ref_frac = (
        ref.hour * 3600.0
        + ref.minute * 60.0
        + ref.second
        + ref.microsecond / 1.0e6
        + ref.nanosecond / 1.0e9
    ) / 86400.0

    days = (day0.astype(np.float64) + frac) - (float(ref_day0) + float(ref_frac))

    if dtime_units == "days":
        return days
    if dtime_units == "hours":
        return days * 24.0
    raise ValueError("Unsupported dtime_units: choose 'days' or 'hours'")


[docs] def create_dtime( df, calendar: str = "standard", dtime_units: str = "days", dtime_resolution_hrs: float = 1.0, ): """ Construct a numeric DTIME axis and align data onto it at an arbitrary cadence. Accepts fractional hours, e.g., 0.5 (30 min), 0.3 (18 min), 1.5 (90 min). """ if "time" not in df.columns: raise ValueError("DataFrame must contain a 'time' column.") if dtime_resolution_hrs <= 0: raise ValueError("dtime_resolution_hrs must be > 0.") df = df.copy() df["time"] = pd.to_datetime(df["time"]) df = df.sort_values("time") if calendar.lower() == "noleap": df = df[~((df["time"].dt.month == 2) & (df["time"].dt.day == 29))] # Variable categories (ELM-ish) linear_vars = ['TBOT', 'DTBOT', 'RH', 'QBOT', 'PSRF', 'ZBOT', 'UWIND', 'VWIND', 'WIND'] ffill_vars = ['FSDS', 'FLDS', 'PRECTmms'] accum_vars = [] # put true accumulations here if needed # --- derive target step in minutes (rounded to nearest minute) --- step_minutes = int(round(float(dtime_resolution_hrs) * 60.0)) if step_minutes < 1: step_minutes = 1 # minimum of 1 minute # --- infer native cadence (median minute delta) --- if len(df) >= 2: diffs_min = (df["time"].sort_values().diff().dropna() / np.timedelta64(1, "m")).to_numpy() native_step_minutes = int(round(np.median(diffs_min))) if diffs_min.size else step_minutes if native_step_minutes < 1: native_step_minutes = 1 else: native_step_minutes = step_minutes # trivial series # --- build target grid --- if step_minutes > native_step_minutes: # Downsample (coarser): resample to the target cadence df = df.set_index("time") rule = f"{step_minutes}min" df = df.resample(rule).mean(numeric_only=True).dropna().reset_index() target_times = df["time"].drop_duplicates().sort_values().to_numpy() elif step_minutes == native_step_minutes: # Keep native timestamps (no-op) target_times = df["time"].drop_duplicates().sort_values().to_numpy() else: # Upsample (finer): construct evenly spaced grid and align/interpolate t0, t1 = df["time"].iloc[0], df["time"].iloc[-1] rule = f"{step_minutes}min" target_times = pd.date_range(t0, t1, freq=rule, inclusive="both").to_numpy() # In noleap calendars we must *also* ensure the target grid contains no Feb 29. if calendar.lower() == "noleap": target_times = _drop_feb29(target_times) ref_date = target_times[0] # Numeric DTIME if calendar.lower() == "noleap": # IMPORTANT: DTIME must be computed in the declared calendar. # Using real (Gregorian) timedeltas in leap years will shift Mar 1 → Mar 2 # and push the end of year into the next year when interpreted as 'noleap'. dtime_vals = _noleap_offset(dtime_units, target_times, ref_date) else: if dtime_units == "days": dtime_vals = (target_times - ref_date) / np.timedelta64(1, "D") elif dtime_units == "hours": dtime_vals = (target_times - ref_date) / np.timedelta64(1, "h") else: raise ValueError("Unsupported dtime_units: choose 'days' or 'hours'") dtime_attr = f"{dtime_units} since {pd.Timestamp(ref_date).strftime('%Y-%m-%d %H:%M:%S')}" # Align to target axis with your existing rules df = df.set_index("time").sort_index() target_index = pd.DatetimeIndex(target_times, name="time") df_out = pd.DataFrame(index=target_index) # (1) Interpolate state vars cols = [c for c in linear_vars if c in df.columns] if cols: df_out[cols] = ( df[cols].reindex(target_index) .interpolate(method="time", limit_direction="both") .ffill().bfill() ) # (2) Forward-fill rates/fluxes cols = [c for c in ffill_vars if c in df.columns] if cols: df_out[cols] = df[cols].reindex(target_index).ffill().bfill() # (3) True accumulations (none by default) for v in accum_vars: if v in df.columns: df_out[v] = df[v].reindex(target_index).ffill().bfill() # (4) Carry through other columns (meta), fill both ways other_cols = [c for c in df.columns if c not in (linear_vars + ffill_vars + accum_vars)] if other_cols: df_out[other_cols] = df[other_cols].reindex(target_index).ffill().bfill() df_out.index.name = "time" df_out = (df_out.reset_index() .sort_values("time") .drop_duplicates(subset="time", keep="first")) assert np.array_equal(df_out["time"].to_numpy(), target_times), \ "df_out['time'] does not match generated target_times" return dtime_vals.astype("float64"), dtime_attr, df_out
[docs] def get_start_end_years(csv_filepaths, calendar: str = "standard"): """ Inspect CSVs (must contain a 'date' column) and return earliest/latest full years present. If no full years, return min/max year in data. """ dates = [pd.read_csv(file, usecols=["date"]) for file in csv_filepaths] dates = pd.concat(dates, ignore_index=True) dates["date"] = pd.to_datetime(dates["date"]) dates.sort_values(by="date", inplace=True) if calendar.lower() == "noleap": dates = dates[~((dates["date"].dt.month == 2) & (dates["date"].dt.day == 29))] dates["year"] = dates["date"].dt.year dates["month_day"] = dates["date"].dt.month * 100 + dates["date"].dt.day full = dates.groupby("year")["month_day"].agg(lambda x: {101, 1231}.issubset(set(x))) full_years = full[full].index if len(full_years) > 0: return int(full_years[0]), int(full_years[-1]) return int(dates["date"].dt.year.min()), int(dates["date"].dt.year.max())