# code: utf-8
# author: Xudong Zheng
# email: z786909151@163.com
"""Build and modify VIC domain files.
Public functions
----------------
``buildDomain``
Create ``domain.nc`` with mask, area, fraction, and grid-length variables.
``cal_mask_frac_area_length``
Compute domain mask/fraction/area/length arrays from grid and basin geometry.
``modifyDomain_for_pourpoint``
Force the domain mask at the nearest grid cell to a pour-point location.
``addElevIntoDomain``
Append elevation from level-1 parameters into the domain file.
"""
from copy import deepcopy
import matplotlib.pyplot as plt
import numpy as np
from netCDF4 import Dataset
from tqdm import *
import geopandas as gpd
from . import logger
from .tools.decoractors import clock_decorator
from .tools.dpc_func.basin_grid_func import grids_array_coord_map, createStand_grids_lat_lon_from_gridshp, gridshp_index_to_grid_array_index
from .tools.geo_func.search_grids import *
# UTM_proj_map = {
# "UTM Zone 10N": {"lon_min": -126, "lon_max": -120, "crs_code": "EPSG:32610"},
# "UTM Zone 11N": {"lon_min": -120, "lon_max": -114, "crs_code": "EPSG:32611"},
# "UTM Zone 12N": {"lon_min": -114, "lon_max": -108, "crs_code": "EPSG:32612"},
# "UTM Zone 13N": {"lon_min": -108, "lon_max": -102, "crs_code": "EPSG:32613"},
# "UTM Zone 14N": {"lon_min": -102, "lon_max": -96, "crs_code": "EPSG:32614"},
# "UTM Zone 15N": {"lon_min": -96, "lon_max": -90, "crs_code": "EPSG:32615"},
# "UTM Zone 16N": {"lon_min": -90, "lon_max": -84, "crs_code": "EPSG:32616"},
# "UTM Zone 17N": {"lon_min": -84, "lon_max": -78, "crs_code": "EPSG:32617"},
# "UTM Zone 18N": {"lon_min": -78, "lon_max": -72, "crs_code": "EPSG:32618"},
# "UTM Zone 19N": {"lon_min": -72, "lon_max": -66, "crs_code": "EPSG:32619"},
# }
[docs]
def generate_utm_proj_map() -> dict:
"""Generate a global UTM zone dictionary (Zone 1-60, N/S)"""
utm_proj_map = {}
for zone in range(1, 61):
# Calculate the longitude range of each zone (each zone is 6 degrees wide)
lon_min = -180 + (zone - 1) * 6
lon_max = lon_min + 6
# Northern Hemisphere (N) - EPSG:326XX
utm_proj_map[f"UTM Zone {zone}N"] = {
"lon_min": lon_min,
"lon_max": lon_max,
"crs_code": f"EPSG:326{zone:02d}" # Zero-padded, e.g., 1 -> 01
}
# Southern Hemisphere (S) - EPSG:327XX
utm_proj_map[f"UTM Zone {zone}S"] = {
"lon_min": lon_min,
"lon_max": lon_max,
"crs_code": f"EPSG:327{zone:02d}"
}
return utm_proj_map
UTM_proj_map = generate_utm_proj_map()
[docs]
@clock_decorator(print_arg_ret=False)
def buildDomain(
evb_dir, dpc_VIC,
reverse_lat=True, basin_shp=None,
prefine_mask=None
):
"""
Build ``domain.nc`` for the current case.
Parameters
----------
evb_dir : Evb_dir
Case directory manager. Output is written to ``evb_dir.domainFile_path``.
dpc_VIC : object
DPC object that provides at least ``grid_shp`` (and optionally ``basin_shp``
when ``basin_shp`` is not passed explicitly).
reverse_lat : bool, optional
Whether latitude axis is arranged north-to-south.
basin_shp : geopandas.GeoDataFrame, optional
Basin polygon used to compute active-grid fractions. If ``None``, it is
taken from ``dpc_VIC`` cache.
prefine_mask : numpy.ndarray, optional
If provided, this array is written to ``mask`` instead of computed mask.
Returns
-------
None
"""
# ====================== build Domain ======================
logger.info("Starting to build domain file... ...")
# create domain file
logger.debug(f"open {evb_dir.domainFile_path} file for saving as domain file")
with Dataset(evb_dir.domainFile_path, "w", format="NETCDF4") as dst_dataset:
# get lon/lat
logger.debug(f"get lon_list and lat_list from the dpc")
grid_shp = dpc_VIC.get_data_from_cache("grid_shp")[0]
lon_list, lat_list, lon_map_index_level0, lat_map_index_level0 = (
grids_array_coord_map(grid_shp, reverse_lat=reverse_lat)
)
logger.debug(f"define dimension and variables in the domain file")
# Define dimensions
lat = dst_dataset.createDimension("lat", len(lat_list))
lon = dst_dataset.createDimension("lon", len(lon_list))
# Create variables for latitudes, longitudes, and other grid data
lat_v = dst_dataset.createVariable("lat", "f8", ("lat",))
lon_v = dst_dataset.createVariable("lon", "f8", ("lon",))
lats = dst_dataset.createVariable(
"lats",
"f8",
(
"lat",
"lon",
),
) # 2D array
lons = dst_dataset.createVariable(
"lons",
"f8",
(
"lat",
"lon",
),
) # 2D array
mask = dst_dataset.createVariable(
"mask",
"i4",
(
"lat",
"lon",
),
)
area = dst_dataset.createVariable(
"area",
"f8",
(
"lat",
"lon",
),
)
frac = dst_dataset.createVariable(
"frac",
"f8",
(
"lat",
"lon",
),
)
frac_full_one = dst_dataset.createVariable(
"frac_full_one",
"f8",
(
"lat",
"lon",
),
)
x_length = dst_dataset.createVariable(
"x_length",
"f8",
(
"lat",
"lon",
),
)
y_length = dst_dataset.createVariable(
"y_length",
"f8",
(
"lat",
"lon",
),
)
# Assign values to variables
lat_v[:] = np.array(lat_list)
lon_v[:] = np.array(lon_list)
grid_array_lons, grid_array_lats = np.meshgrid(lon_v[:], lat_v[:]) # 2D array
lons[:, :] = grid_array_lons
lats[:, :] = grid_array_lats
(
mask_array,
frac_grid_in_basin_array,
frac_full_one_array,
area_array,
x_length_array,
y_length_array,
) = cal_mask_frac_area_length(
dpc_VIC,
reverse_lat=reverse_lat,
plot=False,
basin_shp=basin_shp,
)
mask[:, :] = mask_array if prefine_mask is None else prefine_mask
area[:, :] = area_array
frac[:, :] = frac_grid_in_basin_array
frac_full_one[:, :] = frac_full_one_array
x_length[:, :] = x_length_array
y_length[:, :] = y_length_array
# Add attributes to variables
lat_v.standard_name = "latitude"
lat_v.long_name = "latitude of grid cell center"
lat_v.units = "degrees_north"
lat_v.axis = "Y"
lon_v.standard_name = "longitude"
lon_v.long_name = "longitude of grid cell center"
lon_v.units = "degrees_east"
lon_v.axis = "X"
lats.long_name = "lats 2D"
lats.description = "Latitude of grid cell 2D"
lats.units = "degrees"
lons.long_name = "lons 2D"
lons.description = "longitude of grid cell 2D"
lons.units = "degrees"
mask.long_name = "domain mask"
mask.comment = "1=inside domain, 0=outside"
mask.unit = "binary"
area.standard_name = "area"
area.long_name = "area"
area.description = "area of grid cell"
area.units = "m2"
frac.long_name = "frac"
frac.description = "fraction of grid cell that is active"
frac.units = "fraction"
frac_full_one.long_name = "frac_full_one"
frac_full_one.description = "all value set to 1"
frac_full_one.units = "fraction"
# Global attributes
dst_dataset.title = "VIC5 image domain dataset"
dst_dataset.note = (
"domain dataset generated by XudongZheng, zhengxd@sehemodel.club"
)
dst_dataset.Conventions = "CF-1.6"
logger.info(
f"Building domain sucessfully, domain file has been saved to {evb_dir.domainFile_path}"
)
[docs]
def cal_mask_frac_area_length(
dpc_VIC,
reverse_lat=True,
plot=False,
basin_shp=None,
):
"""
Compute mask/fraction/area/length arrays for domain generation.
Parameters
----------
dpc_VIC : object
DPC object that provides cached ``grid_shp`` and optionally ``basin_shp``.
reverse_lat : bool, optional
Whether latitude axis is arranged north-to-south.
plot : bool, optional
If ``True``, create a quick diagnostic plot.
basin_shp : geopandas.GeoDataFrame, optional
Basin polygon for intersection area calculation.
Returns
-------
tuple
``(mask, frac_grid_in_basin, frac_full_one, area, x_length, y_length)``.
"""
logger.info("Starting to cal_mask_frac_area_length... ...")
# get grid_shp and basin_shp from the dpc_VIC
grid_shp = dpc_VIC.get_data_from_cache("grid_shp")[0]
basin_shp = dpc_VIC.get_data_from_cache("basin_shp")[0] if basin_shp is None else basin_shp
# Determine the UTM CRS based on the longitude of the basin center
try:
lon_cen = basin_shp["lon_cen"].values[0]
except:
lon_cen = basin_shp.centroid.x[0]
for k in UTM_proj_map.keys():
if (
lon_cen >= UTM_proj_map[k]["lon_min"]
and lon_cen <= UTM_proj_map[k]["lon_max"]
):
proj_crs = UTM_proj_map[k]["crs_code"]
# Convert grid shapefile to the chosen projection
grid_shp_projection = deepcopy(grid_shp)
grid_shp_projection = grid_shp_projection.to_crs(proj_crs)
basin_shp_projection = deepcopy(basin_shp)
basin_shp_projection = basin_shp_projection.to_crs(proj_crs)
# lon/lat grid map into index to construct array
# lon_list, lat_list, lon_map_index, lat_map_index = grids_array_coord_map(
# grid_shp, reverse_lat=reverse_lat
# )
stand_grids_lat, stand_grids_lon = createStand_grids_lat_lon_from_gridshp(grid_shp, reverse_lat=reverse_lat)
rows_index, cols_index = gridshp_index_to_grid_array_index(
grid_shp, stand_grids_lat, stand_grids_lon
)
# Initialize arrays for mask, frac, and frac_grid_in_basin
# mask = np.empty((len(lat_list), len(lon_list)), dtype=int)
# frac_full_one = np.full((len(lat_list), len(lon_list)), fill_value=1.0, dtype=float)
# frac_grid_in_basin = np.empty((len(lat_list), len(lon_list)), dtype=float)
mask = np.empty((len(stand_grids_lat), len(stand_grids_lon)), dtype=int)
frac_full_one = np.full((len(stand_grids_lat), len(stand_grids_lon)), fill_value=1.0, dtype=float)
frac_grid_in_basin = np.empty((len(stand_grids_lat), len(stand_grids_lon)), dtype=float)
area = np.empty((len(stand_grids_lat), len(stand_grids_lon)), dtype=float)
x_length = np.empty((len(stand_grids_lat), len(stand_grids_lon)), dtype=float)
y_length = np.empty((len(stand_grids_lat), len(stand_grids_lon)), dtype=float)
logger.debug("Calculating variables for grid cells...")
# overlay
inter = gpd.overlay(
grid_shp_projection.reset_index().rename(columns={'index': 'grid_idx'}),
basin_shp_projection,
how='intersection'
)
# cal variables
grid_area = grid_shp_projection.geometry.area.values
bounds = grid_shp_projection.geometry.bounds
area_overlay = np.zeros((len(grid_shp_projection),), dtype=float)
for grid_id, group in inter.groupby('grid_idx'):
area_overlay[grid_id] = group.geometry.area.sum()
frac_matrix = area_overlay / grid_area
frac_matrix[frac_matrix > 1.0] = 1.0
frac_matrix[frac_matrix <= 0.0] = np.nan
mask[rows_index, cols_index] = frac_matrix > 0
frac_grid_in_basin[rows_index, cols_index] = frac_matrix
area[rows_index, cols_index] = grid_area
x_length[rows_index, cols_index] = (bounds['maxx'] - bounds['minx']).to_numpy()
y_length[rows_index, cols_index] = (bounds['maxy'] - bounds['miny']).to_numpy()
# for i in tqdm(
# grid_shp.index, colour="green", desc="loop for grids to cal mask, frac"
# ):
# center = grid_shp.loc[i, "point_geometry"]
# cen_lon = center.x
# cen_lat = center.y
# # Get the grid at the current index
# grid_i = grid_shp.loc[[i], :]
# # fig, ax = plt.subplots() # plot for testing
# # grid_i.plot(ax=ax)
# # basin_shp.plot(ax=ax, alpha=0.5)
# # intersection
# overlay_gdf = grid_i.overlay(basin_shp, how="intersection")
# # Update mask and fraction based on intersection
# if len(overlay_gdf) == 0:
# mask[lat_map_index[cen_lat], lon_map_index[cen_lon]] = 0
# frac_grid_in_basin[lat_map_index[cen_lat], lon_map_index[cen_lon]] = np.nan # 0
# frac_full_one[lat_map_index[cen_lat], lon_map_index[cen_lon]] = np.nan # 0
# else:
# mask[lat_map_index[cen_lat], lon_map_index[cen_lon]] = 1
# frac_grid_in_basin[lat_map_index[cen_lat], lon_map_index[cen_lon]] = (
# overlay_gdf.area.values[0] / grid_i.area.values[0]
# )
logger.debug("Calculating mask and fraction successfully")
# Initialize arrays for area and grid cell dimensions
# area = np.empty((len(lat_list), len(lon_list)), dtype=float)
# x_length = np.empty((len(lat_list), len(lon_list)), dtype=float)
# y_length = np.empty((len(lat_list), len(lon_list)), dtype=float)
# logger.debug("Calculating area and grid dimensions...")
# for i in tqdm(
# grid_shp_projection.index,
# colour="green",
# desc="loop for grids to cal area, x(y)_length",
# ):
# center = grid_shp_projection.loc[i, "point_geometry"]
# cen_lon = center.x
# cen_lat = center.y
# area[lat_map_index[cen_lat], lon_map_index[cen_lon]] = grid_shp_projection.loc[
# i, "geometry"
# ].area
# x_length[lat_map_index[cen_lat], lon_map_index[cen_lon]] = (
# grid_shp_projection.loc[i, "geometry"].bounds[2]
# - grid_shp_projection.loc[i, "geometry"].bounds[0]
# )
# y_length[lat_map_index[cen_lat], lon_map_index[cen_lon]] = (
# grid_shp_projection.loc[i, "geometry"].bounds[3]
# - grid_shp_projection.loc[i, "geometry"].bounds[1]
# )
# Optionally flip arrays based on latitude orientation
if not reverse_lat:
mask_flip = np.flip(mask, axis=0)
frac_grid_in_basin_flip = np.flip(frac_grid_in_basin, axis=0)
area_flip = np.flip(area, axis=0)
# extent = [lon_list[0], lon_list[-1], lat_list[0], lat_list[-1]]
extent = [stand_grids_lon[0], stand_grids_lon[-1], stand_grids_lat[0], stand_grids_lat[-1]]
else:
mask_flip = mask
frac_grid_in_basin_flip = frac_grid_in_basin
area_flip = area
# extent = [lon_list[0], lon_list[-1], lat_list[-1], lat_list[0]]
extent = [stand_grids_lon[0], stand_grids_lon[-1], stand_grids_lat[0], stand_grids_lat[-1]]
# Plot the results if requested
if plot:
fig, axes = plt.subplots(2, 2)
dpc_VIC.plot(
fig=fig,
ax=axes[0, 0],
)
axes[0, 0].set_xlim([extent[0], extent[1]])
axes[0, 0].set_ylim([extent[2], extent[3]])
axes[0, 1].imshow(mask_flip, extent=extent)
axes[1, 0].imshow(frac_grid_in_basin_flip, extent=extent)
axes[1, 1].imshow(area_flip, extent=extent)
axes[0, 0].set_title("dpc_VIC")
axes[0, 1].set_title("mask")
axes[1, 0].set_title("frac_grid_in_basin")
axes[1, 1].set_title("area")
logger.info("cal_mask_frac_area_length successfully")
return mask, frac_grid_in_basin, frac_full_one, area, x_length, y_length
[docs]
def modifyDomain_for_pourpoint(evb_dir, pourpoint_lon, pourpoint_lat):
"""
Set domain mask to ``1`` at the grid nearest to the pour point.
Parameters
----------
evb_dir : Evb_dir
Case directory manager.
pourpoint_lon : float
Pour-point longitude.
pourpoint_lat : float
Pour-point latitude.
Returns
-------
None
"""
logger.info(
f"Starting to modify domain for pour point at ({pourpoint_lat}, {pourpoint_lon})... ..."
)
# Open the existing domain file in append mode
with Dataset(evb_dir.domainFile_path, "a", format="NETCDF4") as src_dataset:
# get lat, lon
lat = src_dataset.variables["lat"][:]
lon = src_dataset.variables["lon"][:]
# Search for the grid cell closest to the provided pour point coordinates
searched_grid_index = search_grids_nearest(
[pourpoint_lat], [pourpoint_lon], lat, lon, search_num=1
)[0]
# Log the found grid index for the pour point
logger.debug(f"Found nearest grid index for pour point: {searched_grid_index}")
# Update the mask at the nearest grid cell to 1, indicating the pour point
src_dataset.variables["mask"][
searched_grid_index[0][0], searched_grid_index[1][0]
] = 1
# Log the successful update of the mask
logger.debug(
f"Mask updated to 1 at grid ({searched_grid_index[0][0]}, {searched_grid_index[1][0]}) for the pour point."
)
logger.info(f"Modifying domain sucessfully")
[docs]
def addElevIntoDomain(evb_dir, params_dataset_level1):
"""Append ``elev`` variable into an existing domain file.
Parameters
----------
evb_dir : Evb_dir
Case directory manager.
params_dataset_level1 : netCDF4.Dataset
Parameter dataset that contains ``elev`` with shape ``(lat, lon)``.
Returns
-------
None
"""
logger.info(
f"Starting to add elev (from param_level1) into domain... ..."
)
# Open the existing domain file in append mode
with Dataset(evb_dir.domainFile_path, "a", format="NETCDF4") as src_dataset:
# Create variables for elev
elev = src_dataset.createVariable("elev", "f8", ("lat", "lon"))
# Add attributes to variables
elev.standard_name = "elevation"
elev.long_name = "elevation of grid cell"
elev.units = "m"
# Assign values to elev variable
assert params_dataset_level1["elev"].shape == elev.shape, "Shape of elev from params_dataset_level1 does not match domain shape"
elev[:, :] = params_dataset_level1.variables["elev"][:, :]
logger.info(f"Adding elev into domain sucessfully")
[docs]
def centers_to_edges(centers):
"""Compute grid edges from center coordinates."""
centers = np.array(centers)
diffs = np.diff(centers)
left = centers[0] - diffs[0] / 2
right = centers[-1] + diffs[-1] / 2
mid = centers[:-1] + diffs / 2
edges = np.concatenate(([left], mid, [right]))
return edges
[docs]
def remap_level1_to_level0_mask(lon_level1, lat_level1, mask_level1,
lon_level0, lat_level0):
"""
Map level-1 mask to level-0 grid by cell-center lookup.
Each level-0 grid cell inherits the mask value of the level-1 cell
in which its center falls.
Returns
-------
numpy.ndarray
Remapped mask with ``(len(lat_level0), len(lon_level0))`` shape.
"""
lon_level1 = np.array(lon_level1)
lat_level1 = np.array(lat_level1)
lon_level0 = np.array(lon_level0)
lat_level0 = np.array(lat_level0)
lon_edges = centers_to_edges(lon_level1)
lat_edges = centers_to_edges(lat_level1)
# flip if descending
if lon_edges[1] < lon_edges[0]:
lon_edges = lon_edges[::-1]
lon_level1 = lon_level1[::-1]
mask_level1 = mask_level1[:, ::-1]
if lat_edges[1] < lat_edges[0]:
lat_edges = lat_edges[::-1]
lat_level1 = lat_level1[::-1]
mask_level1 = mask_level1[::-1, :]
# For each level0 cell, locate its level1 parent cell
xi = np.searchsorted(lon_edges, lon_level0) - 1
yi = np.searchsorted(lat_edges, lat_level0) - 1
xi = np.clip(xi, 0, mask_level1.shape[1] - 1)
yi = np.clip(yi, 0, mask_level1.shape[0] - 1)
XI, YI = np.meshgrid(xi, yi)
mask_level0 = mask_level1[YI, XI]
return mask_level0