"""Topographic unit generation utilities."""
try:
import ee # type: ignore
except Exception: # pragma: no cover
ee = None # type: ignore
def _require_ee_global():
global ee
if ee is None:
try:
import ee as _ee # type: ignore
except Exception as e: # pragma: no cover
raise ImportError(
"Google Earth Engine (earthengine-api) is required for topounit creation. "
"Install it (pip install earthengine-api) and authenticate (earthengine authenticate)."
) from e
ee = _ee
return ee
if ee is None: # pragma: no cover
class _EEProxy:
def __getattr__(self, name):
return getattr(_require_ee_global(), name)
ee = _EEProxy() # type: ignore
import math
from dapper.domains.domain import Domain
from dapper.integrations.earthengine import gee_utils as gu
# ----------------------------
# Utilities
# ----------------------------
def _require_single_band_image(img, sid):
"""
Ensure 'img' behaves like an ee.Image with exactly one band.
We do not try to guess the band; users should select/mosaic beforehand.
"""
if not hasattr(img, 'bandNames') or not hasattr(img, 'select'):
raise TypeError(
f"binning['{sid}']['image'] must be an ee.Image (single band). "
f"Pass an ee.Image you've already prepared with .mosaic()/.select()."
)
try:
band_names = img.bandNames().getInfo() # tiny request; returns a small list
except Exception:
# If the server call fails, fall back to trying to select(0) and hope it errors if invalid
band_names = None
if band_names is None:
# try select(0) to trip a clear error message if multiple bands
_ = img.select(0)
# don't rename here; we rename after clipping
return img
if len(band_names) != 1:
raise ValueError(
f"Custom image for source '{sid}' must have exactly one band. "
f"Please do something like: myimg = ee.Image(path_or_ic).mosaic().select('band')."
)
return img
# -----------------------------------
# Multi-scale HAND auto-selection
# -----------------------------------
[docs]
def choose_hand_image(desired_scale=None, hand_edges=None, verbose=False):
"""Auto-select among available HAND products based on a desired scale (m)
and/or fixed edges. If edges extend >100 m, prefer *_1000 variants.
Returns (hand_image_single_band, native_scale_m).
"""
# Candidates
hand30_100 = ee.ImageCollection("users/gena/global-hand/hand-100").mosaic() # up to 100 m
hand30_1000 = ee.Image("users/gena/GlobalHAND/30m/hand-1000") # up to 1000 m
hand90_1000 = ee.Image("users/gena/GlobalHAND/90m-global/hand-1000") # up to 1000 m
cand = [
{"name": "hand30_1000", "img": hand30_1000, "scale": gu._nominal_scale_m(hand30_1000), "maxval": 1000},
{"name": "hand90_1000", "img": hand90_1000, "scale": gu._nominal_scale_m(hand90_1000), "maxval": 1000},
{"name": "hand30_100", "img": hand30_100, "scale": gu._nominal_scale_m(hand30_100), "maxval": 100},
]
# If user-provided edges exceed 100 m, prefer *_1000 products
if hand_edges is not None and len(hand_edges) >= 2 and max(hand_edges) > 100:
preferred = [c for c in cand if c["maxval"] >= 1000]
else:
preferred = cand
if desired_scale is None:
# Prefer 90 m *_1000 as a sensible default
pick = next((c for c in preferred if c["name"] == "hand90_1000"), preferred[0])
if verbose:
print(f"[HAND] Auto-selected {pick['name']} @ ~{pick['scale']} m (no desired scale).")
return pick["img"].select(0).rename("v"), pick["scale"]
# Choose not-finer-than desired_scale; otherwise fallback to coarsest
candidates = sorted(preferred, key=lambda c: c["scale"])
not_finer = [c for c in candidates if c["scale"] >= desired_scale]
pick = sorted(not_finer, key=lambda c: c["scale"])[0] if not_finer else candidates[-1]
if verbose:
print(f"[HAND] Desired ~{desired_scale:.1f} m → selected {pick['name']} @ ~{pick['scale']} m.")
return pick["img"].select(0).rename("v"), pick["scale"]
# ----------------------------------------------------
# Sources: elevation, HAND, aspect (as a source)
# ----------------------------------------------------
[docs]
def build_source(source_id, feature, desired_scale_hint, binning_spec, dem_source='arcticdem', verbose=False):
"""
Returns (single-band ee.Image renamed to 'v', native_scale_m, meta dict).
Built-in source_ids:
- 'elev' : ArcticDEM elevation (meters)
- 'hand' : multi-scale HAND auto-selection (meters)
- 'aspect' : Aspect (degrees) derived from DEM
- 'cti' : Compound Topographic Index (dimensionless), mosaicked & scaled
Custom image:
If binning[source_id] contains key 'image', that value MUST be a single-band ee.Image
(already mosaicked/selected/masked as needed). We validate single-band and use it.
If the image advertises a degree-based/undefined native scale (~111,319 m), we coerce
the native scale to desired_scale_hint (if provided) or 90 m to avoid over-clamping.
"""
def _coerce_native_scale_m(img, fallback_m=90.0):
"""Return a reasonable native scale in meters; fix ~1° (≈111 km) projections."""
try:
s = float(gu._nominal_scale_m(img))
except Exception:
s = float(fallback_m)
# If scale looks like degrees/undefined (> ~5 km), trust the hint or fallback
if s > 5000.0:
return float(desired_scale_hint) if (desired_scale_hint is not None) else float(fallback_m)
return s
region = gu._geom_from_any(feature)
# --- 0) Generic custom image (takes precedence if provided) ---
if isinstance(binning_spec, dict) and ('image' in binning_spec):
img = _require_single_band_image(binning_spec['image'], source_id)
img = img.clip(region).select(0).rename('v')
scale = _coerce_native_scale_m(img, fallback_m=90.0)
meta = {
"units": binning_spec.get("units", ""), # cosmetic
"name": binning_spec.get("name", str(source_id).upper())
}
if verbose:
print(f"[SRC {source_id}] custom image, native_scale≈{scale:.1f} m")
return img, scale, meta
# --- 1) Elevation (ArcticDEM) ---
if source_id == 'elev':
if dem_source != 'arcticdem':
raise KeyError(f"DEM source '{dem_source}' not supported.")
img = ee.Image("UMN/PGC/ArcticDEM/V4/2m_mosaic").select('elevation').rename('v').clip(region)
scale = _coerce_native_scale_m(img, fallback_m=2.0)
if verbose:
print(f"[SRC elev] native_scale≈{scale:.1f} m")
return img, scale, {"units": "m", "name": "Elevation"}
# --- 2) HAND (auto-pick scale/product) ---
if source_id == 'hand':
hand_edges = binning_spec.get('edges') if binning_spec.get('strategy') == 'fixed' else None
img, scale_native = choose_hand_image(desired_scale=desired_scale_hint, hand_edges=hand_edges, verbose=verbose)
img = img.updateMask(img.mask()).rename('v').clip(region)
scale = _coerce_native_scale_m(img, fallback_m=scale_native)
if verbose:
print(f"[SRC hand] native_scale≈{scale:.1f} m")
return img, scale, {"units": "m", "name": "HAND"}
# --- 3) Aspect (from DEM) ---
if source_id == 'aspect':
if dem_source != 'arcticdem':
raise KeyError(f"DEM source '{dem_source}' not supported for aspect.")
dem = ee.Image("UMN/PGC/ArcticDEM/V4/2m_mosaic").select('elevation').clip(region)
aspect = ee.Terrain.aspect(dem).rename('v')
scale = _coerce_native_scale_m(dem, fallback_m=2.0) # use DEM’s native scale
if verbose:
print(f"[SRC aspect] derived from DEM, native_scale≈{scale:.1f} m")
return aspect, scale, {"units": "deg", "name": "Aspect", "from_dem": True}
# --- 4) CTI (flow index) — unitless with scale factor 1e8 ---
if source_id == 'cti':
img = (ee.ImageCollection("projects/sat-io/open-datasets/HYDROGRAPHY90/flow_index/cti")
.mosaic().select(0).toFloat().divide(1e8) # apply 1e8 scale factor
.rename('v').clip(region))
scale = _coerce_native_scale_m(img, fallback_m=90.0) # many tiles advertise ~1°; coerce to 90 m
if verbose:
print(f"[SRC cti] scaled by 1e8, native_scale≈{scale:.1f} m")
return img, scale, {"units": "", "name": "CTI"} # unitless
# Unknown source id
raise ValueError(f"Unknown source_id: {source_id}")
# --------------------------------
# Binning helpers per source
# --------------------------------
def _clone_meta(acc_meta):
"""Deep-copy the nested bits so each branch mutates its own dicts."""
return {
**acc_meta,
'source_ids': list(acc_meta.get('source_ids', [])),
'labels': {**acc_meta.get('labels', {})},
'bin_bounds': {**acc_meta.get('bin_bounds', {})},
'bin_method': {**acc_meta.get('bin_method', {})},
}
def _compute_percentile_edges(image, region, n_bins, analysis_scale, max_samples=200_000, band_name='v'):
"""
Return monotonically increasing bin edges (list length n_bins+1) using
sampling-based empirical quantiles inside 'region'.
"""
samples = image.sample(
region=region,
scale=analysis_scale,
geometries=False,
dropNulls=True,
numPixels=max_samples
)
arr = ee.List(samples.aggregate_array(band_name))
size = ee.Number(arr.size())
sorted_arr = arr.sort()
def _edges_from_samples():
# indices 0..n_bins -> values
def idx_to_val(k):
# floor((k/n_bins) * size)
idx = ee.Number(k).multiply(size).divide(n_bins).int()
# clamp to size-1 (in case of k==n_bins)
idx = idx.min(size.subtract(1))
return sorted_arr.get(idx)
return ee.List.sequence(0, n_bins).map(idx_to_val)
def _edges_from_minmax():
stats = image.reduceRegion(
reducer=ee.Reducer.minMax(),
geometry=region,
scale=analysis_scale,
bestEffort=True,
maxPixels=1e13
)
vmin = ee.Number(stats.get(f"{band_name}_min"))
vmax = ee.Number(stats.get(f"{band_name}_max"))
step = vmax.subtract(vmin).divide(n_bins)
return ee.List.sequence(vmin, vmax, step)
quant_edges = ee.Algorithms.If(size.gt(0), _edges_from_samples(), _edges_from_minmax())
edges = ee.List(quant_edges).getInfo() # client-side list
# Ensure strict monotonicity (collapse duplicates with tiny epsilon)
cleaned = [edges[0]]
for e in edges[1:]:
if e <= cleaned[-1]:
e = float(cleaned[-1]) + 1e-9
cleaned.append(float(e))
return cleaned
def _compute_equalwidth_edges(image, region, n_bins, analysis_scale, band_name='v'):
stats = image.reduceRegion(
reducer=ee.Reducer.minMax(),
geometry=region,
scale=analysis_scale,
bestEffort=True,
maxPixels=1e13
)
vmin = stats.get(f"{band_name}_min")
vmax = stats.get(f"{band_name}_max")
if (vmin is None) or (vmax is None):
return [0.0, 1.0]
vmin = float(vmin)
vmax = float(vmax)
if vmin == vmax:
return [vmin, vmin + 1e-9]
step = (vmax - vmin) / n_bins
return [vmin + i * step for i in range(n_bins + 1)]
[docs]
def build_bins_for_source(source_id, image, region, binning_spec, analysis_scale, aspect_ranges_default=None):
"""
Produces a list of bin definitions for a source.
Numeric sources (elev, hand): [{'id', 'low', 'high', 'label', 'method', 'units'}]
Aspect (circular): [{'id','start','end','wrap','label','method','units'}]
"""
strategy = binning_spec.get('strategy')
label_prefix = binning_spec.get('label_prefix', source_id.upper())
bins = []
if source_id == 'aspect':
if strategy != 'fixed':
raise ValueError("Aspect currently supports 'fixed' ranges only.")
ranges = binning_spec.get('ranges') or aspect_ranges_default or [(270, 90, 'N'), (90.01, 269.99, 'S')]
for i, (start, end, name) in enumerate(ranges, start=1):
start = float(start); end = float(end)
wrap = start > end
bins.append({
'id': i,
'label': f"{label_prefix}_{name}",
'start': start,
'end': end,
'wrap': wrap,
'method': 'fixed',
'units': 'deg'
})
return bins
# Numeric sources: elev, hand
if strategy == 'percentiles':
n_bins = int(binning_spec.get('n_bins', 5))
edges = _compute_percentile_edges(image, region, n_bins, analysis_scale, band_name='v')
elif strategy == 'equalwidth':
n_bins = int(binning_spec.get('n_bins', 5))
edges = _compute_equalwidth_edges(image, region, n_bins, analysis_scale, band_name='v')
elif strategy == 'fixed':
edges = binning_spec.get('edges', None)
if edges is None or len(edges) < 2:
raise ValueError("Fixed-edge binning requires 'edges' with at least two values.")
edges = [float(x) for x in edges]
n_bins = len(edges) - 1
else:
raise ValueError(f"Unknown strategy: {strategy}")
for i in range(n_bins):
low, high = float(edges[i]), float(edges[i + 1])
if high <= low:
continue # skip degenerate
bins.append({
'id': i + 1,
'label': f"{label_prefix}_{low:.3f}-{high:.3f}",
'low': low,
'high': high,
'method': strategy,
'units': 'm'
})
return bins
# ----------------------------------------
# Mask builders & combination logic
# ----------------------------------------
def _numeric_bin_mask(image, low, high):
return image.gte(low).And(image.lt(high)).selfMask()
def _aspect_bin_mask(aspect_img, start, end, wrap):
return (aspect_img.gte(start).Or(aspect_img.lt(end)) if wrap
else aspect_img.gte(start).And(aspect_img.lt(end))).selfMask()
def _apply_min_patch(mask_img, min_patch_pixels):
if min_patch_pixels is None or min_patch_pixels <= 1:
return mask_img
cpp = mask_img.connectedPixelCount(100, True)
return mask_img.updateMask(cpp.gte(min_patch_pixels))
def _combine_cartesian(bin_masks_by_source, max_topounits=None, min_patch_pixels=None):
items = list(bin_masks_by_source.items())
if not items:
return []
combined = []
def _recurse(idx, acc_meta, acc_mask, acc_code):
nonlocal combined
if max_topounits is not None and len(combined) >= max_topounits:
return
if idx == len(items):
band_name = "topounit_" + "__".join(acc_code)
mask = _apply_min_patch(acc_mask, min_patch_pixels)
combined.append({'band_name': band_name, 'mask': mask, 'meta': acc_meta})
return
sid, binlist = items[idx]
for entry in binlist:
meta = _clone_meta(acc_meta) # <-- deep-clone nested dicts/lists
# append this source's info
meta.setdefault('source_ids', []).append(sid)
meta.setdefault('labels', {})[sid] = entry['def']['label']
if 'low' in entry['def']:
meta.setdefault('bin_bounds', {})[sid] = {'low': entry['def']['low'], 'high': entry['def']['high']}
else:
meta.setdefault('bin_bounds', {})[sid] = {
'start': entry['def']['start'], 'end': entry['def']['end'], 'wrap': entry['def']['wrap']
}
meta.setdefault('bin_method', {})[sid] = entry['def']['method']
codepiece = f"{sid}{entry['def']['id']}"
mask = entry['mask'] if acc_mask is None else acc_mask.And(entry['mask'])
_recurse(idx + 1, meta, mask, acc_code + [codepiece])
_recurse(0, {'topounit_schema': 'cartesian'}, None, [])
return combined
def _combine_hierarchical(order, bin_masks_by_source, max_topounits=None, min_patch_pixels=None):
if not order:
return []
combined = []
def _recurse(idx, parent_mask, parent_meta, parent_code):
nonlocal combined
if max_topounits is not None and len(combined) >= max_topounits:
return
sid = order[idx]
entries = bin_masks_by_source[sid]
for entry in entries:
this_mask = entry['mask'] if parent_mask is None else parent_mask.And(entry['mask'])
meta = _clone_meta(parent_meta) # <-- deep-clone nested dicts/lists
meta.setdefault('source_ids', []).append(sid)
meta.setdefault('labels', {})[sid] = entry['def']['label']
if 'low' in entry['def']:
meta.setdefault('bin_bounds', {})[sid] = {'low': entry['def']['low'], 'high': entry['def']['high']}
else:
meta.setdefault('bin_bounds', {})[sid] = {
'start': entry['def']['start'], 'end': entry['def']['end'], 'wrap': entry['def']['wrap']
}
meta.setdefault('bin_method', {})[sid] = entry['def']['method']
codepiece = f"{sid}{entry['def']['id']}"
if idx == len(order) - 1:
band_name = "topounit_" + "__".join(parent_code + [codepiece])
mask = _apply_min_patch(this_mask, min_patch_pixels)
combined.append({'band_name': band_name, 'mask': mask, 'meta': {**meta, 'topounit_schema': 'hierarchical'}})
if max_topounits is not None and len(combined) >= max_topounits:
return
else:
_recurse(idx + 1, this_mask, meta, parent_code + [codepiece])
if max_topounits is not None and len(combined) >= max_topounits:
return
_recurse(0, None, {}, [])
return combined
# -----------------------------------------
# Topounit characteristics sampling
# -----------------------------------------
def _attach_topounit_terrain_stats(
polygons_fc,
feature_for_gee,
dem_source,
analysis_scale,
verbose=False,
):
"""
Attach per-topounit terrain properties needed for topounit-enabled surface files:
- TopounitAveElv : mean elevation [m]
- TopounitSlope : mean slope [deg]
- TopounitAspect : mean aspect [deg, 0–360, cw from north]
Parameters
----------
polygons_fc : ee.FeatureCollection
One feature per topounit (unioned geometry).
feature_for_gee :
Same AOI object given to make_topounits (ee.Feature/Geometry/etc.).
dem_source : str
DEM source used by 'elev' (currently 'arcticdem').
analysis_scale : float
Scale in meters at which to compute zonal statistics.
verbose : bool
If True, prints a short status message.
"""
# Reuse the same DEM source as the 'elev' topounit source.
dem_img, _native_scale, _meta = build_source(
source_id='elev',
feature=feature_for_gee,
desired_scale_hint=analysis_scale,
binning_spec={}, # not used for 'elev'
dem_source=dem_source,
verbose=verbose,
)
# Elevation band (meters)
dem = dem_img.rename('elev')
# Slope in degrees
slope_deg = ee.Terrain.slope(dem).rename('slope_deg')
# Aspect in degrees (0–360, clockwise from north)
aspect_deg = ee.Terrain.aspect(dem).rename('aspect_deg')
# For a proper mean aspect, use circular mean of sin/cos
aspect_rad = aspect_deg.multiply(math.pi / 180.0)
aspect_sin = aspect_rad.sin().rename('aspect_sin')
aspect_cos = aspect_rad.cos().rename('aspect_cos')
# Multi-band image for one-shot reduceRegion
stats_image = (
dem
.addBands(slope_deg)
.addBands(aspect_deg)
.addBands(aspect_sin)
.addBands(aspect_cos)
)
def _add_stats(feat):
geom = feat.geometry()
stats = stats_image.reduceRegion(
reducer=ee.Reducer.mean(),
geometry=geom,
scale=analysis_scale,
maxPixels=1e13,
tileScale=2,
)
elev_mean = stats.get('elev') # m
slope_mean_deg = stats.get('slope_deg') # deg
mean_sin = stats.get('aspect_sin')
mean_cos = stats.get('aspect_cos')
# Circular mean of aspect (degrees 0–360)
aspect_mean_rad = ee.Number(mean_sin).atan2(ee.Number(mean_cos))
aspect_mean_deg = aspect_mean_rad.multiply(180.0 / math.pi).mod(360.0)
return feat.set({
'TopounitAveElv': elev_mean,
'TopounitSlope': slope_mean_deg,
'TopounitAspect': aspect_mean_deg,
})
fc_with_stats = polygons_fc.map(_add_stats)
if verbose:
print("[topounits] Attached terrain stats: TopounitAveElv, TopounitSlope, TopounitAspect.")
return fc_with_stats
# -----------------------------------------
# Main API
# -----------------------------------------
[docs]
def make_topounits(
aoi,
sources=None, # if None, inferred from binning keys (insertion order)
binning=None, # dict: { sid: {...}, ... }
combine='cartesian',
combine_order=None,
max_topounits=256,
dem_source='arcticdem',
return_as='gdf',
export_scale='native',
asset_name='topounits',
asset_ftype='GeoJSON',
min_patch_pixels=None,
target_pixels_per_topounit=500,
target_scale=None,
verbose=False
):
"""
SINGLE-AOI topounit builder.
aoi can be:
- Domain (must have exactly 1 geometry in domain.support / domain.gdf)
- shapely geometry
- ee.Geometry / ee.Feature / ee.FeatureCollection
- str (GEE FeatureCollection asset id)
If sources is None, sources = list(binning.keys()) (dict insertion order).
"""
if binning is None or not isinstance(binning, dict) or len(binning) == 0:
raise ValueError("make_topounits requires a non-empty 'binning' dict.")
# Infer sources from binning (keeps dict insertion order)
if sources is None:
sources = list(binning.keys())
# Validate sources
missing = [sid for sid in sources if sid not in binning]
if missing:
raise KeyError(f"Sources missing from binning: {missing}. Provide binning['sid'] for each source.")
# Coerce AOI input to a single shapely/ee object, then to ee.Geometry
if isinstance(aoi, Domain):
gdf = getattr(aoi, "support", None)
if gdf is None:
gdf = getattr(aoi, "gdf", None) # legacy fallback
if gdf is None or gdf.empty:
raise ValueError("Domain has no geometries; cannot build topounits.")
if len(gdf) != 1:
raise ValueError(
"make_topounits(aoi=Domain) expects a single-geometry Domain. "
"Use make_topounits_for_domain(domain, ...) for multi-geometry Domains."
)
aoi_obj = gdf.geometry.iloc[0] # shapely geom
else:
aoi_obj = aoi
# IMPORTANT: always use ee.Geometry for region-based EE ops (sample/reduceRegion/reduceToVectors)
region = gu.parse_geometry_object(aoi_obj, name="aoi")
# 1) Planned total bins (product across sources)
def _planned_bins_for_source(sid):
spec = binning[sid]
if sid == 'aspect' and spec.get('strategy') == 'fixed':
rngs = spec.get('ranges') or [(270, 90, 'N'), (90.01, 269.99, 'S')]
return len(rngs)
st = spec.get('strategy')
if st in ('percentiles', 'equalwidth'):
return int(spec.get('n_bins', 5))
if st == 'fixed':
edges = spec.get('edges', [])
return max(0, len(edges) - 1)
raise ValueError(f"Unknown/unsupported strategy for planned bin count: {st}")
planned_counts = {sid: _planned_bins_for_source(sid) for sid in sources}
total_planned = 1
for v in planned_counts.values():
total_planned *= max(1, v)
# 2) Build sources, record native scales
source_images = {}
native_scales = {}
source_meta = {}
desired_scale_hint = target_scale
# Pass region (ee.Geometry) downstream; build_source uses gu._geom_from_any(feature)
for sid in sources:
img, nat_scale, meta = build_source(
sid,
feature=region,
desired_scale_hint=desired_scale_hint,
binning_spec=binning[sid],
dem_source=dem_source,
verbose=verbose
)
source_images[sid] = img
native_scales[sid] = float(nat_scale)
source_meta[sid] = meta
# 3) Choose analysis scale from AOI area or user target; clamp to coarsest native
coarsest_native = max(native_scales.values())
if target_scale is not None:
analysis_scale = max(float(target_scale), coarsest_native)
else:
aoi_area = region.area() # m^2
required_pixel_area = aoi_area.divide(total_planned * target_pixels_per_topounit)
analysis_scale = float(required_pixel_area.sqrt().getInfo())
analysis_scale = max(analysis_scale, coarsest_native)
if verbose:
print(f"[Scale] planned bins={total_planned}, coarsest_native={coarsest_native:.1f} m, "
f"analysis_scale={analysis_scale:.1f} m")
# 4) Build per-source bins and masks
bin_defs_by_source = {}
bin_masks_by_source = {}
for sid in sources:
img = source_images[sid]
bin_defs = build_bins_for_source(
sid,
img,
region=region,
binning_spec=binning[sid],
analysis_scale=analysis_scale,
aspect_ranges_default=binning.get('aspect', {}).get('ranges') if 'aspect' in binning else None
)
bin_defs_by_source[sid] = bin_defs
entries = []
if sid == 'aspect':
for b in bin_defs:
mask = _aspect_bin_mask(img, b['start'], b['end'], b['wrap'])
entries.append({'def': b, 'mask': mask})
else:
for b in bin_defs:
mask = _numeric_bin_mask(img, b['low'], b['high'])
entries.append({'def': b, 'mask': mask})
bin_masks_by_source[sid] = entries
# 5) Combine masks
if combine == 'cartesian':
combined_entries = _combine_cartesian(
bin_masks_by_source,
max_topounits=max_topounits,
min_patch_pixels=min_patch_pixels
)
if len(combined_entries) == 0:
raise RuntimeError("No combined topounit masks produced (cartesian).")
elif combine == 'hierarchical':
if not combine_order:
raise ValueError("combine='hierarchical' requires combine_order (e.g., ['elev','hand']).")
combined_entries = _combine_hierarchical(
combine_order,
bin_masks_by_source,
max_topounits=max_topounits,
min_patch_pixels=min_patch_pixels
)
if len(combined_entries) == 0:
raise RuntimeError("No combined topounit masks produced (hierarchical).")
else:
raise ValueError(f"Unknown combine strategy: {combine}")
# 6) Export scale
if export_scale == 'native':
export_scale = analysis_scale
# 7) Attach reproducibility props
extra_props = {
'analysis_scale_m': analysis_scale,
'sources': sources,
'planned_counts': planned_counts,
'combine': combine,
'max_topounits': max_topounits,
'target_pixels_per_topounit': target_pixels_per_topounit,
'target_scale': target_scale
}
# 8) Vectorize to polygons
polygons_fc = gu.masks_to_featurecollection(
combined_entries,
region=region,
export_scale=export_scale,
extra_image_props=extra_props
)
# Attach terrain stats (uses DEM; region is fine as "feature")
polygons_fc = _attach_topounit_terrain_stats(
polygons_fc=polygons_fc,
feature_for_gee=region,
dem_source=dem_source,
analysis_scale=analysis_scale,
verbose=verbose,
)
# 9) Return or export
if return_as == 'gdf':
gdf = gu.try_to_download_featurecollection(polygons_fc, verbose=verbose)
if gdf is None:
print("Could not return as GeoDataFrame; exporting to Google Drive. Check Tasks in your GEE browser.")
gu.export_fc(polygons_fc, f'{asset_name}', asset_ftype, folder='topotest', verbose=True)
return None
return gdf
else:
gu.export_fc(polygons_fc, f'{asset_name}', asset_ftype, folder='topotest', verbose=True)
return None
[docs]
def add_topounit_image_samples(
topounits_gdf,
sampling_specs,
default_scale=None,
ensure_pixel_centers=True,
verbose=False,
):
"""Attach additional sampled properties from arbitrary GEE images to each
topounit.
Parameters
----------
topounits_gdf : GeoDataFrame
Output from make_topounits(return_as='gdf'). Must contain 'band_name'.
sampling_specs : list[dict]
Each dict describes one sampled variable with keys:
- 'image' : ee.Image or image ID string
- 'band' : optional band name (if omitted, image must be single-band)
- 'reducer' : str {'mean','min','max','std'} or ee.Reducer
- 'out_name': name of the column to add
- 'scale' : optional scale [m]; overrides default_scale
default_scale : float, optional
Default scale [m] for specs that do not define 'scale'. If None, uses
topounits_gdf['analysis_scale_m'].iloc[0] if present; otherwise the
nominal scale of each image.
ensure_pixel_centers : bool, default True
If True, guard against polygons with no pixel centers.
verbose : bool, default False
Print which variables were attached.
Returns
-------
GeoDataFrame
Copy of topounits_gdf with new columns added.
"""
if not sampling_specs:
return topounits_gdf
gdf = topounits_gdf.copy()
# Default scale: prefer analysis_scale_m if no explicit default
if default_scale is None and "analysis_scale_m" in gdf.columns:
default_scale = float(gdf["analysis_scale_m"].iloc[0])
attached_names = []
for spec in sampling_specs:
if "image" not in spec:
raise KeyError("Each sampling spec must include an 'image' key.")
img_spec = spec["image"]
if isinstance(img_spec, ee.Image):
img = img_spec
elif isinstance(img_spec, str):
img = ee.Image(img_spec)
else:
raise TypeError(
f"sampling_specs['image'] must be an ee.Image or image ID string; got {type(img_spec)}"
)
band = spec.get("band")
reducer = spec.get("reducer", "mean")
out_name = spec.get("out_name")
scale = spec.get("scale", default_scale)
# If still None, fall back to image's native scale
if scale is None:
scale = gu._nominal_scale_m(img)
gdf = gu.sample_image_over_polygons(
gdf,
image=img,
geometry_id_field="band_name",
band=band,
reducer=reducer,
out_name=out_name,
scale=scale,
ensure_pixel_centers=ensure_pixel_centers,
verbose=verbose,
)
# sample_image_over_polygons returns the final out_name it used
if out_name is None:
# If user didn't set out_name, recompute default name
if isinstance(reducer, str):
key = reducer.lower()
else:
key = "custom"
if band is None:
# This is rare (single-band images), but be robust
band_names = img.bandNames().getInfo()
band_used = band_names[0]
else:
band_used = band
out_name = f"{band_used}_{key}"
attached_names.append(out_name)
if verbose:
print(
"[topounits] Attached sampled properties: "
+ ", ".join(attached_names)
)
return gdf
[docs]
def make_topounits_for_domain(
domain: Domain,
*,
sources,
binning,
combine="cartesian",
combine_order=None,
max_topounits=256,
dem_source="arcticdem",
export_scale="native",
min_patch_pixels=None,
target_pixels_per_topounit=500,
target_scale=None,
verbose=False,
allow_slow_ncells: int = 25,
):
"""
Compute topounits per-domain-geometry and attach them to the Domain.
- For domain.mode == 'sites': this computes per-run (each run is 1 geom)
and returns a Domain with a concatenated topounits table; iter_runs()
can later subset by gid.
- For domain.mode == 'cellset': loops over each cell geometry and computes
topounits for each gid separately (can be slow for large N).
Returns
-------
Domain
domain with domain.topounits populated (expects Domain.with_topounits exists).
"""
import pandas as pd
import geopandas as gpd
from pyproj import Geod
# Choose which geometry view to use for topounits (support is the right one)
gdf = getattr(domain, "support", None)
if gdf is None:
gdf = getattr(domain, "gdf", None)
if gdf is None or gdf.empty:
raise ValueError("Domain has no geometries; cannot compute topounits.")
if "gid" not in gdf.columns:
raise KeyError("Domain geometry table must include a 'gid' column.")
gdf = gdf.copy()
gdf["gid"] = gdf["gid"].astype(str)
ncells = len(gdf)
if ncells > allow_slow_ncells and not verbose:
raise ValueError(
f"make_topounits_for_domain would run {ncells} separate GEE topounit builds. "
f"Set verbose=True to proceed intentionally, or raise allow_slow_ncells."
)
geod = Geod(ellps="WGS84")
def _geod_area_m2(geom) -> float:
# returns signed area; take abs
try:
a, _p = geod.geometry_area_perimeter(geom)
return float(abs(a))
except Exception:
# fallback: 0 rather than crashing; user will see pct=nan
return float("nan")
parts = []
for row in gdf.itertuples(index=False):
gid = str(getattr(row, "gid"))
geom = getattr(row, "geometry")
if verbose:
print(f"[topounits] gid={gid}: building topounits...")
tu = make_topounits(
aoi=geom,
sources=sources,
binning=binning,
combine=combine,
combine_order=combine_order,
max_topounits=max_topounits,
dem_source=dem_source,
return_as="gdf",
export_scale=export_scale,
min_patch_pixels=min_patch_pixels,
target_pixels_per_topounit=target_pixels_per_topounit,
target_scale=target_scale,
verbose=verbose,
)
if tu is None:
raise RuntimeError(
f"make_topounits returned None for gid={gid} "
"(likely fell back to Drive export). Use a smaller AOI/scale or run in an env where FC download works."
)
tu = tu.copy()
tu["gid"] = gid
# Stable ids across multi-cell: prefix band_name with gid
if "band_name" not in tu.columns:
raise KeyError("Topounits output must include 'band_name'.")
tu["topounit_id"] = tu["gid"].astype(str) + "__" + tu["band_name"].astype(str)
# Compute area fraction within the parent geometry (percent)
cell_area = _geod_area_m2(geom)
if not (cell_area and cell_area > 0):
tu["TopounitArea_m2"] = float("nan")
tu["TopounitPctOfCell"] = float("nan")
else:
# Intersection is defensive; should already be inside geom
inter = tu.geometry.intersection(geom)
tu["TopounitArea_m2"] = inter.apply(_geod_area_m2)
tu["TopounitPctOfCell"] = 100.0 * (tu["TopounitArea_m2"] / cell_area)
parts.append(tu)
all_topounits = gpd.GeoDataFrame(pd.concat(parts, ignore_index=True), crs=gdf.crs)
# Attach to Domain (expects you implemented Domain.with_topounits)
return domain.with_topounits(
all_topounits,
id_col="topounit_id",
gid_col="gid",
dim_name="topounit",
)