import sys
import rioxarray # NOQA - always import before xarray...
import xarray as xr
import dask.array as da
import datashader as ds
import geopandas as gpd
import spatialpandas
from xrspatial.utils import height_implied_by_aspect_ratio
wb_proj_str = ('+proj=merc +a=6378137 +b=6378137 +lat_ts=0.0 +lon_0=0.0 +x_0=0.0 +y_0=0 +k=1.0'
' +units=m +nadgrids=@null +wktext +no_defs')
wgs84_proj_str = '+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs'
us_national_equal_area_str = (
'+proj=laea +lat_0=45 +lon_0=-100 +x_0=0 +y_0=0 +a=6370997 '
'+b=6370997 +units=m +no_defs'
)
projections = {
3857: wb_proj_str,
4326: wgs84_proj_str,
2163: us_national_equal_area_str
}
[docs]def reproject_raster(arr: xr.DataArray, epsg=3857):
"""
Reproject raster data.
Parameters
----------
arr : xarray.DataArray
The raster data.
epsg : int
The coordinate systems code.
Returns
-------
reproj_data : xarray.DataArray
The reprojected data.
"""
global projections
try:
proj_str = projections[epsg]
return arr.rio.reproject(proj_str)
except KeyError:
raise ValueError(f'Raster Projection Error: Invalid EPSG {epsg}')
[docs]def reproject_vector(gdf: gpd.GeoDataFrame, epsg=3857):
"""
Reproject vector data.
Parameters
----------
gdf : geopandas.GeoDataFrame
The vector data.
epsg : int
The coordinate systems code.
Returns
-------
reproj_data : geopandas.GeoDataFrame
The reprojected data.
"""
return gdf.to_crs(epsg=epsg)
[docs]def flip_coords(arr, dim):
"""
Flip the geometry coordinates.
Parameters
----------
arr : xarray.DataArray
The data source.
dim : str
The coordinate field name.
Returns
-------
flipped_data : xarray.DataArray
The flipped coordinates data.
"""
args = {dim: list(reversed(arr.coords[dim]))}
arr = arr.assign_coords(**args)
return arr
[docs]def to_spatialpandas(gdf: gpd.GeoDataFrame, geometry_field='geometry'):
"""
Convert a ``geopandas.GeoDataFrame`` to ``spatialpandas.GeoDataFrame``.
Parameters
----------
gdf : geopandas.GeoDataFrame
The data source.
geometry_field : str
Geometry field on GeoDataFrame
Returns
-------
spatial_gdf : spatialpandas.GeoDataFrame
"""
return spatialpandas.GeoDataFrame(gdf, geometry=geometry_field)
[docs]def squeeze(arr, dim):
"""
Return a new ``xarray.DataArray`` with squeezed data.
Parameters
----------
arr : xarray.DataArray
The data source.
dim : str
Drop a specific field name.
Returns
-------
squeezed_data : xarray.DataArray
The squeezed data.
"""
return arr.squeeze().drop(dim)
[docs]def cast(arr, dtype):
"""
Cast the data to a specific data type.
Parameters
----------
arr : xarray.DataArray
The data source.
dtype : str
Data type.
Returns
-------
casted_data : xarray.DataArray
The casted data.
"""
arr.data = arr.data.astype(dtype)
return arr
[docs]def orient_array(arr):
"""
Reorients the array to a canonical orientation depending on
whether the x and y-resolution values are positive or negative.
Parameters
----------
arr : xarray.DataArray
Data to be reoriented
Returns
-------
reoriented_data : numpy.ndarray
Reoriented 2d NumPy ndarray
"""
arr.data = ds.utils.orient_array(arr)
return arr
[docs]def get_data_array_extent(dataarray):
"""
Get the coordinate of the lower left corner and the coordinate of
the upper right corner in map units.
Parameters
----------
dataarray : xarray.DataArray
The input datasource.
Returns
-------
extent : tuple of integers
A tuple of ``(xmin, ymin, xmax, ymax)`` values.
"""
return (dataarray.coords['x'].min().item(),
dataarray.coords['y'].min().item(),
dataarray.coords['x'].max().item(),
dataarray.coords['y'].max().item())
[docs]def canvas_like(dataarray):
"""
Get a ``datashader.Canvas`` according to a ``xarray.DataArray`` features.
Parameters
----------
dataarray : xarray.DataArray
The input datasource.
Returns
-------
canvas : datashader.Canvas
An abstract canvas representing the space in which to bin.
"""
if isinstance(dataarray, xr.DataArray):
extent = get_data_array_extent(dataarray)
else:
raise TypeError('like object must be DataArray')
x_range = (extent[0], extent[2])
y_range = (extent[1], extent[3])
H = len(dataarray.coords['y'])
W = len(dataarray.coords['x'])
return ds.Canvas(plot_width=W, plot_height=H,
x_range=x_range, y_range=y_range)
[docs]def build_vector_overviews(gdf, levels, geometry_field='geometry'):
"""
Reduce the vector data resolution.
Parameters
----------
gdf : geopandas.GeoDataFrame
The vector data.
levels : dict
The factors and values to be used when reducing the data
resolution.
geometry_field : str, default=geometry
The geometry field name.
Returns
-------
overviews : geopandas.GeoDataFrame
The reduced resolution vector data.
"""
values = {}
overviews = {}
for level, simplify_tol in levels.items():
msg = f'Generating Vector Overview level {level} at {simplify_tol} simplify tolerance'
print(msg, file=sys.stdout)
if simplify_tol in values:
overviews[int(level)] = values[simplify_tol]
continue
simplified_gdf = gdf.copy()
simplified_gdf[geometry_field] = simplified_gdf[geometry_field].simplify(simplify_tol)
overviews[int(level)] = simplified_gdf
values[simplify_tol] = simplified_gdf
return overviews
[docs]def build_raster_overviews(arr, levels, interpolate='linear'):
"""
Reduce the raster data resolution.
Parameters
----------
arr : xarray.DataArray
The raster data.
levels : dict
The factors and values to be used when reducing the data
resolution.
interpolate : str, default=linear
Resampling mode when upsampling raster.
Options include: nearest, linear.
Returns
-------
overviews : xarray.DataArray
The reduced resolution raster data.
"""
values = {}
overviews = {}
for level, resolution in levels.items():
print(f'Generating Raster Overview level {level} at {resolution} pixel width',
file=sys.stdout)
if resolution in values:
overviews[int(level)] = values[resolution]
continue
cvs = canvas_like(arr)
height = height_implied_by_aspect_ratio(resolution, cvs.x_range, cvs.y_range)
cvs.plot_height = height
cvs.plot_width = resolution
agg = (cvs.raster(arr, interpolate=interpolate)
.compute()
.chunk(512, 512)
.persist())
overviews[int(level)] = agg
values[resolution] = agg
return overviews
[docs]def add_xy_fields(gdf, geometry_field='geometry', x_field_name='X', y_field_name='Y'):
"""
Extract x and y fields from geometry and create new columns with them.
"""
gdf[x_field_name] = gdf[geometry_field].apply(lambda p: p.x)
gdf[y_field_name] = gdf[geometry_field].apply(lambda p: p.y)
return gdf
def add_projected_buffered_extent(gdf, buffer_distance, crs='4326', geometry_field='geometry'):
"""
Project geometry -> buffer -> take bounding box and add it to attributes
"""
bounds = gdf.to_crs(crs)[geometry_field].buffer(buffer_distance).bounds
gdf = gdf.join(bounds.rename(columns={"minx": f"buffer_{int(buffer_distance)}_{crs}_xmin",
"miny": f"buffer_{int(buffer_distance)}_{crs}_ymin",
"maxx": f"buffer_{int(buffer_distance)}_{crs}_xmax",
"maxy": f"buffer_{int(buffer_distance)}_{crs}_ymax"}))
return gdf
[docs]def select_by_attributes(gdf, field, value, operator='IN'):
"""
Filter a ``geopandas.GeoDataFrame`` data.
Parameters
----------
gdf : geopandas.GeoDataFrame
The vector data.
field : str
Column to be filtered.
value : int or float
Value to compare when filtering.
operator : str, default=IN
Arithmetic operator to be used when filtering.
Options include: IN, NOT IN, EQUALS, NOT EQUALS, LT, GT, LTE,
and GTE.
Returns
-------
filtered_data : geopandas.GeoDataFrame
The filtered data.
"""
if operator == 'IN':
return gdf[gdf[field].isin(value)]
elif operator == 'NOT IN':
return gdf[~gdf[field].isin(value)]
elif operator == 'EQUALS':
return gdf[gdf[field] == value]
elif operator == 'NOT EQUALS':
return gdf[gdf[field] != value]
elif operator == 'LT':
return gdf[gdf[field] < value]
elif operator == 'GT':
return gdf[gdf[field] > value]
elif operator == 'LTE':
return gdf[gdf[field] <= value]
elif operator == 'GTE':
return gdf[gdf[field] >= value]
[docs]def polygon_to_line(gdf, geometry_field='geometry'):
"""
Convert geometry type from polygon to line.
Parameters
----------
gdf : geopandas.GeoDataFrame
The vector data.
geometry_field : str, default=geometry
The geometry field name.
Returns
-------
gdf : geopandas.GeoDataFrame
The converted data.
"""
gdf[geometry_field] = gdf[geometry_field].boundary
return gdf
[docs]def raster_to_categorical_points(arr, cats: dict, dim: str = 'data'):
"""
Convert raster data to categorical points.
Parameters
----------
arr : xarray.DataArray
The raster data.
cats : dict
Categories colors.
dim : str, default=data
The categorical column name.
Returns
-------
df : pandas.DataFrame
The converted data.
"""
df = None
if isinstance(arr, da.Array):
df = arr.compute().to_dataframe()
else:
df = arr.to_dataframe()
df = df.head(1000000)
df.reset_index(inplace=True)
df[dim] = [dim].astype('int')
df[dim] = [dim].astype('category')
df[dim].cat.categories = [cats.get(s) for s in df[dim].cat.categories]
return df
def load_in_memory(df):
"""
Compute dask data frame.
Parameters
----------
df: dask.dataframe or dask_geopandas.GeoDataFrame object
Returns
-------
computed_df: pandas.DataFrame or geopandas.GeoDataFrame object
"""
df = df.compute()
return df
_transforms = {
'reproject_raster': reproject_raster,
'reproject_vector': reproject_vector,
'orient_array': orient_array,
'cast': cast,
'flip_coords': flip_coords,
'build_raster_overviews': build_raster_overviews,
'build_vector_overviews': build_vector_overviews,
'squeeze': squeeze,
'to_spatialpandas': to_spatialpandas,
'add_xy_fields': add_xy_fields,
'add_projected_buffered_extent': add_projected_buffered_extent,
'select_by_attributes': select_by_attributes,
'polygon_to_line': polygon_to_line,
'raster_to_categorical_points': raster_to_categorical_points,
'load_in_memory': load_in_memory,
}