Source code for pybob.bob_tools

"""
pybob.bob_tools is a collection of the tools that didn't really fit other places.
"""
from __future__ import print_function
import os
import random
import errno
import datetime as dt
from osgeo import ogr, osr
import numpy as np
from shapely.geometry import Point


[docs]def mkdir_p(out_dir): """ Add bash mkdir -p functionality to os.makedirs. :param out_dir: directory to create. """ try: os.makedirs(out_dir) except OSError as exc: # Python >2.5 if exc.errno == errno.EEXIST and os.path.isdir(out_dir): pass else: raise
[docs]def standard_landsat(instring): """ Given a string of a landsat scenename, make a standard (pre-Collection) filename, of the form LSSPPPRRRYYYYDDDXXX01. """ strsplit = instring.split('_') if len(strsplit) < 3: # if we only have 1 (or fewer) underscores, it's already fine return strsplit[0] else: outstring = 'L' sensor = strsplit[0][1] # figure out what sensor we're using, and generate the appropriate string if sensor == '5': outstring += 'T5' elif sensor == '4': outstring += 'T4' elif sensor == '7': outstring += 'E7' elif sensor == '8': # chances are, we won't ever see this one. outstring += 'C8' outstring += strsplit[0][2:] # this is the path/row information # now it gets fun: getting the date right. year = strsplit[1][3:7] month = strsplit[1][7:9] day = strsplit[1][9:] # make sure we have 3 digits here doy = str(dt.datetime(int(year), int(month), int(day)).timetuple().tm_yday).zfill(3) outstring += year + doy # now, spoof the last bits so it's got the right size: outstring += 'XXX01' return outstring
[docs]def doy2mmdd(year, doy, string_out=True, outform='%Y/%m/%d'): """ Return a string or a datetime object given a year and a day of year. :param year: Year of the input date, e.g., ``2018``. :param doy: Day of the year of the input date, e.g., ``1`` for 1 Jan. :param string_out: Return a string (True) or a datetime object (False). Default is True. :param outform: Format for string to return. Default is `%Y/%m/%d`, or 2018/01/01 for 1 January 2018. :type year: int :type doy: int :type string_out: bool :type outform: str :returns date: datetime.datetime or string representation of the input date. >>> bt.doy2mmdd(2018, 1, string_out=True) `2018/01/01` >>> bt.doy2mmdd(2018, 1, string_out=False) datetime.datetime(2018, 1, 1, 0, 0) """ datestruct = dt.datetime(year, 1, 1) + dt.timedelta(doy-1) if string_out: return datestruct.strftime(outform) else: return datestruct
[docs]def mmdd2doy(year, month, day, string_out=True): """ Return a string or an int representing a day of year, given a date. :param year: Year of the input date, e.g., ``2018``. :param mm: Month of the year of the input date, e.g., ``1`` for Jan. :param dd: Day of the month of the input date. :param string_out: Return a string (True) or an int (False). Default is True. :type year: int :type mm: int :type dd: int :type string_out: bool :returns doy: day of year representation of year, month, day >>> bt.mmdd2doy(2018, 1, 1, string_out=True) `1` """ doy = dt.datetime(year, month, day).timetuple().tm_yday if string_out: return str(doy) else: return doy
def mmdd2dec(year, month, day): yearstart = dt.datetime(year, 1, 1).toordinal() nextyearstart = dt.datetime(year+1, 1, 1).toordinal() datestruct = dt.datetime(year, month, day) datefrac = float(datestruct.toordinal() + 0.5 - yearstart) / (nextyearstart - yearstart) return year + datefrac def doy2dec(year, doy): pass def dec2mmdd(decdate): year = int(np.floor(decdate)) datefrac = decdate - year yearstart = dt.datetime(year, 1, 1).toordinal() nextyearstart = dt.datetime(year+1, 1, 1).toordinal() days = np.round((nextyearstart - yearstart) * datefrac - 0.5) return dt.datetime(year, 1, 1) + dt.timedelta(days) def dec2doy(decdate): year = int(np.floor(decdate)) datefrac = decdate - year yearstart = dt.datetime(year, 1, 1).toordinal() nextyearstart = dt.datetime(year+1, 1, 1).toordinal() days = np.round((nextyearstart - yearstart) * datefrac - 0.5) return year, days+1 pass def parse_lsat_scene(scenename, string_out=True): sensor = scenename[0:3] path = int(scenename[3:6]) row = int(scenename[6:9]) year = int(scenename[9:13]) doy = int(scenename[13:16]) if string_out: return sensor, str(path), str(row), str(year), str(doy) else: return sensor, path, row, year, doy
[docs]def bin_data(bins, data2bin, bindata, mode='mean', nbinned=False): """ Place data into bins based on a secondary dataset, and calculate statistics on them. :param bins: array-like structure indicating the bins into which data should be placed. :param data2bin: data that should be binned. :param bindata: secondary dataset that decides how data2bin should be binned. Should have same size/shape as data2bin. :param mode: How to calculate statistics of binned data. One of 'mean', 'median', 'std', 'max', or 'min'. :param nbinned: Return a second array, nbinned, with number of data points that fit into each bin. Default is False. :type bins: array-like :type data2bin: array-like :type bindata: array-like :type mode: str :type nbinned: bool :returns binned, nbinned: calculated, binned data with same size as bins input. If nbinned is True, returns a second array with the number of inputs for each bin. """ assert mode in ['mean', 'median', 'std', 'max', 'min'], "mode not recognized: {}".format(mode) digitized = np.digitize(bindata, bins) binned = np.zeros(len(bins)) * np.nan if nbinned: numbinned = np.zeros(len(bins)) if mode == 'mean': for i, _ in enumerate(bins): binned[i] = np.nanmean(data2bin[np.logical_and(np.isfinite(bindata), digitized == i+1)]) if nbinned: numbinned[i] = np.count_nonzero(np.logical_and(np.isfinite(data2bin), digitized == i+1)) elif mode == 'median': for i, _ in enumerate(bins): binned[i] = np.nanmedian(data2bin[np.logical_and(np.isfinite(bindata), digitized == i+1)]) if nbinned: numbinned[i] = np.count_nonzero(np.logical_and(np.isfinite(data2bin), digitized == i+1)) elif mode == 'std': for i, _ in enumerate(bins): binned[i] = np.nanstd(data2bin[np.logical_and(np.isfinite(bindata), digitized == i+1)]) if nbinned: numbinned[i] = np.count_nonzero(np.logical_and(np.isfinite(data2bin), digitized == i+1)) elif mode == 'max': for i, _ in enumerate(bins): binned[i] = np.nanmax(data2bin[np.logical_and(np.isfinite(bindata), digitized == i+1)]) if nbinned: numbinned[i] = np.count_nonzero(np.logical_and(np.isfinite(data2bin), digitized == i+1)) elif mode == 'min': for i, _ in enumerate(bins): binned[i] = np.nanmin(data2bin[np.logical_and(np.isfinite(bindata), digitized == i+1)]) if nbinned: numbinned[i] = np.count_nonzero(np.logical_and(np.isfinite(data2bin), digitized == i+1)) else: raise ValueError('mode must be mean, median, std, max, or min') if nbinned: return np.array(binned), np.array(numbinned) else: return np.array(binned)
def random_points_in_polygon(poly, npts=1): xmin, ymin, xmax, ymax = poly.bounds rand_points = [] for pt in range(npts): thisptin = False while not thisptin: rand_point = Point(xmin + (random.random() * (xmax-xmin)), ymin + (random.random() * (ymax-ymin))) thisptin = poly.contains(rand_point) rand_points.append(rand_point) return rand_points def reproject_layer(inLayer, targetSRS): srcSRS = inLayer.GetSpatialRef() coordTrans = osr.CoordinateTransformation(srcSRS, targetSRS) outLayer = ogr.GetDriverByName('Memory').CreateDataSource('').CreateLayer(inLayer.GetName(), geom_type=inLayer.GetGeomType()) inLayerDefn = inLayer.GetLayerDefn() for i in range(0, inLayerDefn.GetFieldCount()): fieldDefn = inLayerDefn.GetFieldDefn(i) outLayer.CreateField(fieldDefn) outLayerDefn = outLayer.GetLayerDefn() inFeature = inLayer.GetNextFeature() while inFeature: geom = inFeature.GetGeometryRef() geom.Transform(coordTrans) outFeature = ogr.Feature(outLayerDefn) outFeature.SetGeometry(geom) for i in range(0, outLayerDefn.GetFieldCount()): outFeature.SetField(outLayerDefn.GetFieldDefn(i).GetNameRef(), inFeature.GetField(i)) outLayer.CreateFeature(outFeature) outFeature.Destroy() inFeature.Destroy() inFeature = inLayer.GetNextFeature() return outLayer def round_down(num, divisor): return num - (num % divisor)