"""
pybob.ddem_tools provides a number of tools for working with DEM differencing and calculating volume changes. Primarily
designed for glaciers, but could be useful for calculating other volume changes as well.
"""
from __future__ import print_function
# import multiprocessing as mp
import numpy as np
import datetime as dt
from scipy.interpolate import griddata, interp1d, Rbf, RectBivariateSpline as RBS
from numpy.polynomial.polynomial import polyval, polyfit
import pybob.coreg_tools as ct
import pybob.image_tools as it
from pybob.bob_tools import bin_data
from pybob.GeoImg import GeoImg
sensor_names = ['AST', 'SETSM', 'Map', 'SPOT', 'SDMI', 'HMA']
def difference(dem1, dem2, glaciermask=None, landmask=None, outdir='.'):
master, slave = ct.dem_coregistration(dem1, dem2, glaciermask, landmask, outdir)
master.unmask()
slave.unmask()
return master.copy(new_raster=(master.img - slave.img))
def fill_holes(dDEM, method, **kwargs):
if type(dDEM) is not GeoImg:
raise TypeError('dDEM must be a GeoImg')
else:
return method(dDEM, **kwargs)
# the following (linear, elevation_mean, elevation_median, elevation_poly, neighborhood_average)
# are all methods to pass to fill_holes. E.g., filled_DEM = fill_holes(DEM, linear)
# will use linear interpolation to fill any holes in the DEM (or GeoImg raster).
# linear and neighborhood_average can be used on either a DEM or a dDEM, but elevation_* requires
# both a DEM and a dDEM to work correctly.
def linear(DEM, **kwargs):
if type(DEM) is not GeoImg:
raise TypeError('DEM must be a GeoImg')
X, Y = DEM.xy()
if 'valid_area' in kwargs:
interp_mask = np.logical_and(kwargs['valid_area'], np.isfinite(DEM.img))
else:
interp_mask = np.isfinite(DEM.img)
interp_points = DEM.img[interp_mask]
interpX = X[interp_mask]
interpY = Y[interp_mask]
filled_DEM = griddata((interpX, interpY), interp_points, (X, Y))
if 'valid_area' in kwargs:
filled_DEM[np.logical_not(kwargs['valid_area'])] = np.nan
return DEM.copy(new_raster=filled_DEM)
def elevation_function(dDEM, **kwargs):
if 'functype' not in kwargs:
raise ValueError('Have to specify functype to use elevation_function.')
return parse_elev_func_args(kwargs['functype'], dDEM, **kwargs)
def parse_elev_func_args(func_name, dDEM, **kwargs):
# first, make sure we use one of the right function names
if func_name not in ['mean', 'median', 'poly']:
raise ValueError('{} not one of mean, median, poly'.format(func_name))
if 'DEM' not in kwargs:
raise ValueError('to use {}, you must supply a base DEM'.format(func_name))
else: # if we've been supplied a DEM, we have to check that it's the right size.
DEM = kwargs['DEM']
if type(DEM) is not GeoImg:
raise TypeError('DEM must be a GeoImg')
if DEM.img.shape != dDEM.img.shape:
raise ValueError('dDEM and DEM must have the same shape')
if 'glacier_mask' in kwargs:
ddem_data = dDEM.img[np.logical_and(np.logical_and(np.isfinite(dDEM.img),
np.isfinite(DEM.img)), kwargs['glacier_mask'])]
dem_data = DEM.img[np.logical_and(np.logical_and(np.isfinite(dDEM.img),
np.isfinite(DEM.img)), kwargs['glacier_mask'])]
else:
ddem_data = dDEM.img[np.logical_and(np.isfinite(dDEM.img), np.isfinite(DEM.img))]
dem_data = DEM.img[np.logical_and(np.isfinite(dDEM.img), np.isfinite(DEM.img))]
# now, figure out which of mean, median, polyfit we're using, and do it.
if func_name == 'poly':
if 'poly_order' not in kwargs:
raise ValueError('poly_order must be defined to use polynomial fitting')
# now we fit a polynomial of order poly_order to the data:
pfit = polyfit(dem_data, ddem_data, kwargs['poly_order'])
# need a way to put hole values in the right place
if 'glacier_mask' in kwargs:
hole_inds = np.where(np.logical_and(np.logical_and(np.isnan(dDEM.img),
np.isfinite(DEM.img)), kwargs['glacier_mask']))
else:
hole_inds = np.where(np.logical_and(np.isfinite(dDEM.img), np.isfinite(DEM.img)))
hole_elevs = DEM.img[hole_inds]
interp_points = polyval(hole_elevs, pfit)
dDEM.img[hole_inds] = interp_points
if 'valid_area' in kwargs:
dDEM.img[np.logical_not(kwargs['valid_area'])] = np.nan
return dDEM
elif func_name == 'mean' or func_name == 'median':
if 'bins' not in kwargs:
# if we haven't been handed bins, we have to make our own.
if 'bin_width' not in kwargs:
bin_width = 50 # if we haven't been told what bin width to use, default to 50.
else:
bin_width = kwargs['bin_width']
min_el = np.nanmin(dem_data) - (np.nanmin(dem_data) % bin_width)
max_el = np.nanmax(dem_data) + (bin_width - (np.nanmax(dem_data) % bin_width))
bins = np.arange(min_el, max_el+1, bin_width)
else:
bins = kwargs['bins']
binned_dH = bin_data(bins, ddem_data, dem_data, mode=func_name)
# now, we interpolate the missing DH values to the binned values.
f_elev = interp1d(bins, binned_dH, fill_value="extrapolate")
# pull out missing values from dDEM
if 'glacier_mask' in kwargs:
hole_inds = np.where(np.logical_and(np.logical_and(np.isnan(dDEM.img),
np.isfinite(DEM.img)), kwargs['glacier_mask']))
else:
hole_inds = np.where(np.logical_and(np.isfinite(dDEM.img), np.isfinite(DEM.img)))
hole_elevs = DEM.img[hole_inds]
filled = f_elev(hole_elevs)
tmp_dDEM = dDEM.copy()
tmp_dDEM.img[hole_inds] = filled
if 'valid_area' in kwargs:
tmp_dDEM.img[np.logical_not(kwargs['valid_area'])] = np.nan
return tmp_dDEM
else:
raise ValueError('Somehow we made it this far without naming a correct fitting function. Oops.')
def neighborhood_average(dDEM, **kwargs):
if type(dDEM) is not GeoImg:
raise TypeError('DEM must be a GeoImg')
if 'neighborhood_size' not in kwargs:
raise ValueError('neighborhood_size must be defined to use ddem_tools.neighborhood_poly()')
nradius = int(kwargs['neighborhood_size'] / dDEM.dx)
tmp_ddem = dDEM.copy().img
out_ddem = dDEM.copy()
if 'valid_area' in kwargs:
if 'glacier_mask' in kwargs:
valid_mask = np.logical_and(kwargs['valid_area'], kwargs['glacier_mask'])
else:
valid_mask = kwargs['valid_area']
missing_mask = np.logical_and(valid_mask, np.isnan(tmp_ddem))
else:
missing_mask = np.isnan(tmp_ddem)
missing = np.where(missing_mask)
missing_inds = zip(missing[0], missing[1])
if 'glacier_mask' in kwargs:
tmp_ddem[np.logical_not(kwargs['glacier_mask'])] = np.nan
for ind in missing_inds:
nmask = neighborhood_mask(ind, nradius, tmp_ddem)
out_ddem.img[ind] = np.nanmean(tmp_ddem[nmask])
return out_ddem
# kriging is like linear, can be used with a dDEM or DEM.
def kriging(DEM, **kwargs):
pass
# RBF is simliar to linear
def radial_basis(DEM, **kwargs):
rbfunclist = ['multiquadric', 'inverse', 'gaussian', 'linear', 'cubic', 'quintic', 'thin_plate']
if type(DEM) is not GeoImg:
raise TypeError('DEM must be a GeoImg')
if 'rbfunc' not in kwargs:
# default to linear radial basis function
rbfunc = 'linear'
elif kwargs['rbfunc'] not in rbfunclist:
raise ValueError('unknown radial basis function {} requested. \
Check scipy.interpolate.Rbf docs for details'.format(kwargs['rbfunc']))
else:
rbfunc = kwargs['rbfunc']
X, Y = DEM.xy()
vals = ~np.isnan(DEM.img)
f = Rbf(X[vals], Y[vals], DEM.img[vals], function=rbfunc)
out_dem = f(X, Y)
return DEM.copy(new_raster=out_dem)
# RBS
def RectBivariateSpline(DEM, **kwargs):
if type(DEM) is not GeoImg:
raise TypeError('DEM must be a GeoImg')
X, Y = DEM.xy()
f = RBS(Y[0:, 0], X[0, 0:], DEM.img)
out_dem = f.ev(Y, X) # might have to flip upside-down?
return DEM.copy(new_raster=out_dem)
# some helper functions
def neighborhood_mask(centerind, radius, array):
a, b = centerind
nx, ny = array.shape
y, x = np.ogrid[-a:nx-a, -b:ny-b]
mask = x*x + y*y <= radius*radius
return mask
def circular_kernel(radius):
kernel = np.zeros((2*radius+1, 2*radius+1))
centerind = (radius, radius)
mask = neighborhood_mask(centerind, radius, kernel)
kernel[mask] = 1
return kernel
# interpolate on a glacier-by-glacier basis, rather than on the DEM as a whole.
def fill_holes_individually(dDEM, glacshapes, functype, burn_handle=None, **kwargs):
# first, get the individual glacier mask.
ind_glac_mask, ind_glac_vals = it.rasterize_polygons(dDEM, glacshapes, burn_handle)
filled_ddem = dDEM.copy()
# if we already have a glacier mask defined, remove it.
if 'glacier_mask' in kwargs:
del kwargs['glacier_mask']
for glac in ind_glac_vals:
tmp_mask = ind_glac_mask == glac
try:
tmp_dem = fill_holes(dDEM, elevation_function, glacier_mask=tmp_mask, functype=functype, **kwargs)
filled_ddem.img[tmp_mask] = tmp_dem.img[tmp_mask]
except:
continue
return filled_ddem
# normalize glacier elevations given a shapefile of glacier elevations
def normalize_glacier_elevations(DEM, glacshapes, burn_handle=None):
ind_glac_mask, raw_inds = it.rasterize_polygons(DEM, glacshapes, burn_handle)
normed_els = DEM.copy().img
ind_glac_vals = clean_glacier_indices(DEM, ind_glac_mask, raw_inds)
for i in ind_glac_vals:
glac_inds = np.where(ind_glac_mask == i)
glac_els = DEM.img[glac_inds]
if glac_els.size > 0:
max_el = np.nanmax(glac_els)
min_el = np.nanmin(glac_els)
normed_els[glac_inds] = (glac_els - min_el) / (max_el - min_el)
normDEM = DEM.copy(new_raster=normed_els)
return normDEM, ind_glac_mask, ind_glac_vals
def clean_glacier_indices(geoimg, glacier_mask, raw_inds):
clean_glaciers = []
for glac in raw_inds:
imask = glacier_mask == glac
is_here = len(np.where(imask)[0]) > 0
is_finite = len(np.where(np.isfinite(geoimg.img[imask]))[0]) > 0
if is_here and is_finite:
clean_glaciers.append(glac)
return clean_glaciers
# now we actually sum up the ddem values that fall within each glacier
# input from a shapefile
def calculate_volume_changes(dDEM, glacier_shapes, burn_handle=None, ind_glac_vals=None):
if type(glacier_shapes) is str:
ind_glac_mask, ind_glac_vals = it.rasterize_polygons(dDEM, glacier_shapes, burn_handle=burn_handle)
elif type(glacier_shapes) is np.array:
if ind_glac_vals is None:
ind_glac_vals = np.unique(glacier_shapes)
elif type(ind_glac_vals) is list:
ind_glac_vals = np.array(ind_glac_vals)
ind_glac_mask = glacier_shapes
ind_vol_chgs = np.nan * np.zeros(ind_glac_vals.shape)
for i, ind in enumerate(ind_glac_vals):
glac_inds = np.where(ind_glac_mask == ind)
if glac_inds[0].size == 0:
continue
glac_chgs = dDEM.img[glac_inds]
# get the volume change by summing dh/dt, multiplying by cell
ind_vol_chgs[i] = np.nansum(glac_chgs) * np.abs(dDEM.dx) * np.abs(dDEM.dy)
return ind_glac_vals, ind_vol_chgs
# create an area-altitude distribution for a DEM and a glacier shapefile
[docs]def area_alt_dist(DEM, glacier_shapes, glacier_inds=None, bin_width=None):
"""
Calculate an Area-Altitude Distribution for a glacier outline(s), given an input DEM.
:param DEM: input DEM.
:param glacier_shapes: mask representing glacier outlines. Can be boolean or integer depending on whether one
or several AADs should be calculated.
:param glacier_inds: array representing glacier indices in glacier_shapes. If unspecified, only one AAD is returned.
:param bin_width: width of elevation bands to calculate area distribution in. If unspecified, result will be
the minimum of 50m or 10% of the elevation range.
:type DEM: pybob.GeoImg
:type glacier_shapes: array-like
:type glacier_inds: array-like
:type bin_width: float
:returns bins, aads: array, or list of arrays, representing elevation bands and glacier area (in DEM horizontal
units) per elevation band.
"""
if type(DEM) is not GeoImg:
raise TypeError('DEM must be a GeoImg')
if glacier_inds is None:
dem_data = DEM.img[glacier_shapes]
bins = get_bins(dem_data, bin_width=bin_width)
min_el = bins[0]
max_el = bins[-1]
aads, _ = np.histogram(dem_data, bins=bins, range=(min_el, max_el))
aads = aads * np.abs(DEM.dx) * np.abs(DEM.dy)
bins = bins[:-1] # remove the last element, because it's actually above the glacier range.
else:
bins = []
aads = []
for i in glacier_inds:
dem_data = DEM.img[glacier_shapes == i]
thisbin = get_bins(dem_data, bin_width=bin_width)
min_el = thisbin[0]
max_el = thisbin[-1]
thisaad, _ = np.histogram(dem_data, bins=thisbin, range=(min_el, max_el))
bins.append(thisbin[:-1]) # remove last element.
aads.append(thisaad * np.abs(DEM.dx) * np.abs(DEM.dy)) # make it an area!
return bins, aads
[docs]def nmad(data, nfact=1.4826):
"""
Calculate the normalized median absolute deviation (NMAD) of an array.
:param data: input data
:param nfact: normalization factor for the data; default is 1.4826
:type data: array-like
:type nfact: float
:returns nmad: (normalized) median absolute deviation of data.
"""
m = np.nanmedian(data)
return nfact * np.nanmedian(np.abs(data - m))
[docs]def get_bins(DEM, glacier_mask=None, bin_width=None):
"""
Get elevation bins for a DEM, given an optional glacier mask and an optional width. If unspecified,
bin_width is calculated as the minimum of 50 units, or 10% of the DEM (or glacier, if mask provided) elevation
range. Bin values represent the lower bound of the elevation band, and are rounded to be a multiple of the bin
width.
:param DEM: The DEM to get elevation bins for.
:param glacier_mask: mask representing glacier outline, or region of interest.
:param bin_width: width of bins to calculate.
:type DEM: array-like
:type glacier_mask: array-like
:type bin_width: float
:returns bins: the elevation bins.
"""
if glacier_mask is not None:
dem_data = DEM[np.logical_and(np.isfinite(DEM), glacier_mask)]
else:
dem_data = DEM[np.isfinite(DEM)]
zmax = np.nanmax(dem_data)
zmin = np.nanmin(dem_data)
zrange = zmax - zmin
if bin_width is None:
bin_width = min(50, int(zrange / 10))
min_el = zmin - (zmin % bin_width)
max_el = zmax + (bin_width - (zmax % bin_width))
bins = np.arange(min_el, max_el+1, bin_width)
return bins
[docs]def get_elev_curve(DEM, dDEM, glacier_mask=None, bins=None, mode='mean', outlier=False, fill=False, poly_order=3):
"""
Get a dh(z) curve for a glacier/region of interest, given a DEM and a difference DEM (dDEM). Available modes are
'mean'/'median', calculating the mean(median) of each elevation bin, or poly, fitting a polynomial (default
third-order) to the means of each elevation bin.
:param DEM: DEM to determine z in dh(z)
:param dDEM: difference DEM to determine dh in dh(z)
:param glacier_mask: mask representing glacier outline
:param bins: values representing the lower edge of elevation bins
:param mode: how to determine the dh(z) relationship
:param outlier: filter outliers using an iterative 3-sigma filter
:param fill: fill missing bins using a polynomial fit (default third order)
:param poly_order: order for any polynomial fitting
:type DEM: array-like
:type dDEM: array-like
:type glacier_mask: array-like
:type bins: array-like
:type mode: str
:type outlier: bool
:type fill: bool
:type poly_order: int
:returns bins, curve, bin_areas: elevation bins, dh(z) curve, and number of pixels per elevation bin.
"""
assert mode in ['mean', 'median', 'poly'], "mode not recognized: {}".format(mode)
if glacier_mask is None:
valid = np.logical_and(np.isfinite(DEM), np.isfinite(dDEM))
else:
valid = np.logical_and(glacier_mask, np.logical_and(np.isfinite(DEM), np.isfinite(dDEM)))
dem_data = DEM[valid]
ddem_data = dDEM[valid]
if bins is None:
bins = get_bins(DEM, glacier_mask)
if outlier:
# ddem_data = outlier_removal(bins, dem_data, ddem_data)
ddem_data = nmad_outlier_removal(bins, dem_data, ddem_data)
if mode in ['mean', 'median']:
curve, bin_areas = bin_data(bins, ddem_data, dem_data, mode=mode, nbinned=True)
if fill:
_bins = bins[np.isfinite(curve)]
_curve = bins[np.isfinite(curve)]
p = polyfit(bins, curve, poly_order)
fill_ = polyval(bins, p)
curve[np.isnan(curve)] = fill_[np.isnan(curve)]
else:
_mean, bin_areas = bin_data(bins, ddem_data, dem_data, mode='mean', nbinned=True)
p = polyfit(bins, _mean, poly_order)
curve = polyval(bins, p)
return bins, curve, bin_areas
def nmad_outlier_removal(bins, DEM, dDEM, nfact=3):
new_ddem = np.zeros(dDEM.size)
digitized = np.digitize(DEM, bins)
for i, _ in enumerate(bins):
this_bindata = dDEM[digitized == i]
this_nmad = nmad(this_bindata)
this_bindata[np.abs(this_bindata) > nfact * this_nmad] = np.nan
new_ddem[digitized == i] = this_bindata
return new_ddem
[docs]def outlier_removal(bins, DEM, dDEM, nsig=3):
"""
Iteratively remove outliers in an elevation bin using a 3-sigma filter.
:param bins: lower bound of elevation bins to use
:param DEM: DEM to determine grouping for outlier values
:param dDEM: elevation differences to filter outliers from
:param nsig: number of standard deviations before a value is considered an outlier.
:type bins: array-like
:type DEM: array-like
:type dDEM: array-like
:type nsig: float
:returns new_ddem: ddem with outliers removed (set to NaN)
"""
new_ddem = np.zeros(dDEM.size)
digitized = np.digitize(DEM, bins)
for i, _ in enumerate(bins):
this_bindata = dDEM[digitized == i]
nout = 1
old_mean = np.nanmean(this_bindata)
old_std = np.nanstd(this_bindata)
while nout > 1:
thresh_up = old_mean + nsig * old_std
thresh_dn = old_mean - nsig * old_std
isout = np.logical_or(this_bindata > thresh_up, this_bindata < thresh_dn)
nout = np.count_nonzero(isout)
this_bindata[isout] = np.nan
old_mean = np.nanmean(this_bindata)
old_std = np.nanstd(this_bindata)
new_ddem[digitized == i] = this_bindata
return new_ddem
[docs]def calculate_dV_map(dDEM, ind_glac_mask, ind_glac_vals):
"""
Calculate glacier volume changes from a dDEM using a map of glacier outlines.
:param dDEM: difference DEM to get elevation changes from
:param ind_glac_mask: glacier mask with different values for each glacier
:param ind_glac_vals: list of unique index values in glacier mask for which to calculate volume changes.
:type dDEM: pybob.GeoImg
:type ind_glac_mask: array-like
:type ind_glac_vals: array-like
:returns ind_glac_vals, ind_vol_chgs: index and volume changes for the input indices.
"""
ind_glac_vals = np.array(ind_glac_vals)
ind_vol_chgs = np.nan * np.zeros(ind_glac_vals.shape)
for i, ind in enumerate(ind_glac_vals):
glac_inds = np.where(ind_glac_mask == ind)
if glac_inds[0].size == 0:
continue
glac_chgs = dDEM.img[glac_inds]
# get the volume change by summing dh/dt, multiplying by cell
ind_vol_chgs[i] = np.nansum(glac_chgs) * np.abs(dDEM.dx) * np.abs(dDEM.dy)
return ind_glac_vals, ind_vol_chgs
[docs]def calculate_dV_curve(aad, dh_curve):
"""
Given a dh(z) curve and AAD values, calculate volume change.
:param aad: Area-Altitude Distribution/Hypsometry with which to calculate volume change.
:param dh_curve: dh(z) curve to use.
:type aad: np.array
:type dh_curve: np.array
:returns dV: glacier volume change given input curves.
"""
assert aad.size == dh_curve.size, "AAD and dh(z) curve must have same size."
return np.nansum(aad * dh_curve)
def name_parser(fname):
splitname = fname.split('_')
if splitname[0] == 'AST':
datestr = splitname[2][3:11]
yy = int(datestr[4:])
mm = int(datestr[0:2])
dd = int(datestr[2:4])
elif splitname[0] in ['SETSM', 'SDMI', 'SPOT', 'SRTM', 'Map']:
datestr = splitname[2]
yy = int(datestr[0:4])
mm = int(datestr[4:6])
dd = int(datestr[6:])
elif splitname[0] == 'HMA':
datestr = splitname[1]
else:
raise ValueError("Could not parse {} based on sensor name. {} is not one of \
AST, SETSM, SDMI, SPOT, SRTM, Map, HMA.".format(fname, fname))
yy = int(datestr[0:4])
mm = int(datestr[4:6])
dd = int(datestr[6:])
return dt.date(yy, mm, dd)
[docs]def nice_split(fname):
"""
Given a filename of the form dH_DEM1_DEM2, return DEM1, DEM2.
:param fname: filename to split
:type fname: str
:returns name1, name2: DEM names parsed from input filename.
"""
sname = fname.rsplit('.tif', 1)[0].split('_')
sname.remove('dH')
splitloc = [i+1 for i, x in enumerate(sname[1:]) if x in sensor_names][0]
name1 = '_'.join(sname[0:splitloc])
name2 = '_'.join(sname[splitloc:])
return name1, name2