# -*- coding: utf-8 -*-
"""
Groups the different stages of the processing described in the article :
"Bernhard Jenny (2021) Terrain generalization with line integral
convolution, Cartography and Geographic Information Science, 48:1, 78-92, DOI:
10.1080/15230406.2020.1833762
https://doi.org/10.1080/15230406.2020.1833762"
Including the preliminary processing of adaptive Gaussian blur applied to the DTM.
"""
# Extern imports
import numpy as np
import rasterio
from rasterio.windows import Window
from tqdm import tqdm
from scipy.ndimage import gaussian_filter
from functools import partial
# intern imports
from dem_lic.utils.morpho_dem import (
calculate_maximal_curvature,
fast_adaptive_gaussian_blur,
initialize_flat_steep_grid,
remove_small_flat_areas,
)
[docs]
def extended_lic_weighted_altitude_lengthModulated(
grid: np.ndarray,
cellsize: float,
f: np.ndarray,
num_steps: int,
sigma_modulated: bool = True
) -> np.ndarray:
"""Applies a Line Integral Convolution (LIC) with a dynamically modulated integration
length (based on f) and a variable Gaussian kernel that reduces the influence of
higher terrain along the line of integration.
Parameters
----------
grid : np.ndarray
A 2D array representing the Digital Elevation Model (DEM).
cellsize : float
The size of each raster cell in meters.
f : np.ndarray
A 2D array (same shape as `grid`) in [0, 1], controlling the fraction of
the maximum integration steps allowed for each pixel (e.g., in flat or steep areas).
num_steps : int
The maximum number of integration steps for each pixel.
sigma_modulated : bool, optional
If True, the sigma for the weighting in the LIC algorithm is modulated by the altitude.
If False, a fixed Gaussian sigma is used.
Returns
-------
np.ndarray
The DEM after the weighted LIC processing, where ridges are accentuated
by decreasing the Gaussian kernel's sigma for portions that lie above
the starting pixel's altitude.
"""
# --- 0) Initial copies and shapes
src = grid.copy()
H, W = src.shape
# The altitude of origin for each pixel (to compare with visited altitudes).
start_alt = src.copy()
# --- 1) Compute the local number of steps for each pixel from f
f = np.clip(f, 0.0, 1.0)
local_steps = np.round(f * num_steps).astype(int)
max_steps = np.max(local_steps)
# --- 2) Compute normalized gradients (vi, vj)
DS = 0.5 # Integration speed (distance in pixel units per step)
vi, vj = np.gradient(src, cellsize) # Partial derivatives
# Magnitude (avoid division by zero)
mag = np.hypot(vi, vj)
zero_mask = (mag == 0)
mag[zero_mask] = 1.0
mag *= (1.0 / DS)
# Normalize gradients
vi /= mag
vj /= mag
# --- 3) Pass 1: Compute range along each integration line
range_altitude = np.zeros_like(src, dtype=float)
for pass_id in range(2):
if pass_id == 1:
vi = -vi
vj = -vj
current_steps = local_steps.copy()
active_cells = np.ones((H, W), dtype=bool)
fi, fj = np.mgrid[:H, :W].astype(float)
min_alt = start_alt.copy()
max_alt = start_alt.copy()
for step_idx in range(max_steps):
if not np.any(active_cells):
break
fi[active_cells] += vi[active_cells]
fj[active_cells] += vj[active_cells]
pi = np.clip(fi.astype(int), 0, H - 1)
pj = np.clip(fj.astype(int), 0, W - 1)
# Update min and max altitude along the line
min_alt = np.minimum(min_alt, src[pi, pj])
max_alt = np.maximum(max_alt, src[pi, pj])
# Update range altitude for each pixel
range_altitude = np.maximum(range_altitude, max_alt - min_alt)
current_steps[active_cells] -= 1
active_cells = (current_steps > 0)
range_altitude = np.maximum(range_altitude, 1e-6) # Avoid division by zero
# --- 4) Pass 2: Apply LIC with range-based modulation
result = np.zeros_like(src, dtype=float)
weights_sum = np.zeros_like(src, dtype=float)
sigma_base = np.sqrt((num_steps**2 - 1) / 12.0)
for pass_id in range(2):
if pass_id == 1:
vi = -vi
vj = -vj
current_steps = local_steps.copy()
active_cells = np.ones((H, W), dtype=bool)
fi, fj = np.mgrid[:H, :W].astype(float)
for step_idx in range(max_steps):
if not np.any(active_cells):
break
fi[active_cells] += vi[active_cells]
fj[active_cells] += vj[active_cells]
pi = np.clip(fi.astype(int), 0, H - 1)
pj = np.clip(fj.astype(int), 0, W - 1)
dh = src[pi, pj] - start_alt # Compute altitude difference
# Normalize dh by range along the integration line
if sigma_modulated:
f_alt = 1.0 - (dh / range_altitude)
f_alt = np.clip(f_alt, 0.0, 1.0)
else:
f_alt = np.ones_like(dh)
# Adjust sigma accordingly
sigma_adjusted = np.maximum(sigma_base * f_alt, 1e-6)
# Compute Gaussian weight
distance = step_idx * DS
weight = np.exp(-(distance**2) / (2 * sigma_adjusted**2))
# Accumulate weighted values
result[active_cells] += weight[active_cells] * src[pi[active_cells], pj[active_cells]]
weights_sum[active_cells] += weight[active_cells]
current_steps[active_cells] -= 1
active_cells = (current_steps > 0)
# --- 5) Normalize final result
np.divide(result, np.maximum(weights_sum, 1e-12), out=result)
return result
def LIC_iterations(
grid: np.ndarray,
# local_range_altitude: np.ndarray,
cellsize: float,
f: np.ndarray,
num_steps: int,
sigma_modulated: bool,
n_iterations: int,
sigma: float,
k: float,
) -> np.ndarray:
"""Perform multiple iterations of the Line Integral Convolution (LIC) method to generalize a DEM
by combining smoothing and feature enhancement.
Parameters
----------
grid : numpy.ndarray
2D array representing the input Digital Elevation Model (DEM).
# local_range_altitude : numpy ndarray
# 2D array of the range of elevation in a windows centered on each point
cellsize : float
The spatial resolution of the DEM (in meters).
f : numpy.ndarray
Continuous grid used to modulate the integration length.
num_steps : int
Maximum length of the integration line.
sigma_modulated : bool, optional
If True, the sigma for the weighting in the LIC algorithm is modulated by the altitude.
If False, a fixed Gaussian sigma is used.
n_iterations : int
Number of iterations of the LIC algorithm.
sigma : float
Standard deviation for the Gaussian blur applied to the curvature.
k : float
Weighting factor for the combination of grids.
Returns
-------
numpy.ndarray
2D array representing the DEM after LIC iterations.
Notes
-----
This function performs the following steps for each iteration:
1. Computes a modulated LIC
2. Calculates the maximal curvature of the resulting grid.
3. Applies Gaussian smoothing to the curvature.
4. Updates the DEM using a weighted combination of the original DEM and the LIC result.
"""
for n in range(n_iterations):
print(f"Iteration {n+1}/{n_iterations}")
# Step 1: Compute a modulated LIC
filtered_grid = extended_lic_weighted_altitude_lengthModulated(
grid, cellsize, f, num_steps, sigma_modulated
)
# Step 2: Calculate maximal curvature of the filtered grid
filtered_grid_max_c = calculate_maximal_curvature(
filtered_grid, cellsize
)
# Step 3: Smooth the curvature using Gaussian filtering
filtered_grid_max_c_blured = gaussian_filter(
filtered_grid_max_c, sigma=sigma, mode="nearest"
)
# Step 4: Compute weighting factor from the smoothed curvature
weight = k * np.abs(filtered_grid_max_c_blured)
weight = np.nan_to_num(weight, nan=0.0, posinf=1.0, neginf=0.0)
# Step 5: Update the DEM using the weighted combination
grid = weight * grid + (1 - weight) * filtered_grid
return grid
def correct_flat_area_values(
b: np.ndarray,
c: np.ndarray,
sigma: float,
epsilon: float = 1e-6,
) -> np.ndarray:
"""Adjusts the values in flat areas of a continuous grid after Gaussian blurring
to improve the representation of transitions.
Parameters
----------
b : numpy.ndarray
2D binary array representing flat areas (0) and steep areas (1).
c : numpy.ndarray
2D continuous array, the result of applying Gaussian blur to the binary grid.
sigma : float
Gaussian blur parameter for processing the differences.
epsilon : float, optional
A small value to avoid division by zero, by default 1e-6.
Returns
-------
numpy.ndarray
2D corrected array after adjusting values in flat areas.
Notes
-----
The function modifies the continuous grid `c` by correcting values in flat areas (indicated by 0 in `b`).
The adjustment is based on a blurred version of the difference between `b` and `c`, scaled by a
computed factor `s`. This ensures smoother transitions while preserving flat and steep area characteristics.
"""
# Step 1: Compute the difference between the binary and continuous grids
diff = b - c
# Step 2: Apply Gaussian blur to the difference
blurred_diff = gaussian_filter(diff, sigma=sigma, mode="nearest")
# Step 3: Calculate the scaling factor `s`
max_diff = np.max(diff)
max_blurred_diff = np.max(blurred_diff)
s = max_diff / (max_blurred_diff + epsilon) # Avoid division by zero
# Step 4: Compute the corrected grid
f = c + s * blurred_diff
return f
[docs]
def process_geotiff_in_block_with_overlap(
MNT_input_path: str,
output_path: str,
processing_function,
block_size: int = 2000,
overlap: int = 20,
**kwargs
) -> None:
"""Processes a GeoTIFF file in overlapping blocks and applies a user-defined processing function.
Parameters
----------
MNT_input_path : str
Path to the input GeoTIFF file containing the Digital Elevation Model (DEM).
output_path : str
Path to the output GeoTIFF file.
processing_function : function
A function that processes a single block. It should accept a block (ndarray), cell size,
and any additional parameters.
block_size : int, optional
Size of non-overlapping blocks (in pixels) to process. Default is 2000.
overlap : int, optional
Size of the overlapping region (in pixels) between adjacent blocks. Default is 20.
**kwargs
Additional arguments passed to `processing_function`.
Returns
-------
None
The processed output is written directly to the specified GeoTIFF file.
"""
# Open the input GeoTIFF and read metadata
with rasterio.open(MNT_input_path) as MNT_src:
profile = MNT_src.profile.copy()
width, height = MNT_src.width, MNT_src.height
cellsize = MNT_src.transform[0]
# Update output profile
profile.update(dtype=np.float32, compress="lzw", bigtiff="YES")
print("Starting terrain generalization...")
# Compute total number of blocks for progress tracking
n_blocks_x = (width + block_size - 1) // block_size
n_blocks_y = (height + block_size - 1) // block_size
total_blocks = n_blocks_x * n_blocks_y
# Create output GeoTIFF
with rasterio.open(output_path, "w", **profile) as dst:
with tqdm(total=total_blocks, desc="Processing blocks", unit="block") as pbar:
for y in range(0, height, block_size):
for x in range(0, width, block_size):
# Define window with overlap
x_min, x_max = max(0, x - overlap), min(width, x + block_size + overlap)
y_min, y_max = max(0, y - overlap), min(height, y + block_size + overlap)
window = Window(x_min, y_min, x_max - x_min, y_max - y_min)
# Read the MNT block
MNT_block = MNT_src.read(1, window=window)
# Apply the processing function to the block
processed_block = processing_function(MNT_block, cellsize, **kwargs)
# Extract the central block (remove overlap)
if x == 0 and y != 0:
central_block = processed_block[overlap:block_size + overlap, 0:block_size]
elif y == 0 and x != 0:
central_block = processed_block[0:block_size, overlap:block_size + overlap]
elif y == 0 and x == 0:
central_block = processed_block[0:block_size, 0:block_size]
else:
central_block = processed_block[overlap:block_size + overlap, overlap:block_size + overlap]
# Define output window
output_window = Window(x, y, central_block.shape[1], central_block.shape[0])
# Write processed block to output file
dst.write(central_block.astype(np.float32), 1, window=output_window)
# Update progress bar
pbar.update(1)
print(f"Processing completed. Output written to {output_path}")
[docs]
def process_lic_extended(
MNT_block: np.ndarray,
cellsize: float,
sigma_max: float = 5.0,
slope_threshold: float = 6,
num_bins: int = 10,
min_area: int = 100,
num_steps: int = 5,
sigma_modulated: bool = True,
n_iterations: int = 5,
sigma_blur_maxcurv: float = 3.0,
k: float = 2.5,
) -> np.ndarray:
"""Processes a single block of a DEM with LIC and generalization techniques.
Parameters
----------
MNT_block : np.ndarray
The block of the Digital Elevation Model.
cellsize : float
Size of the raster cells in meters.
sigma_max : float
Maximum kernel width for the adaptive Gaussian blur.
slope_threshold : float
Threshold for slope to distinguish flat and steep areas.
num_bins : int
Number of bins for sigma approximation in the adaptive blur.
min_area : int
Minimum size (in pixels) for flat areas to be preserved.
num_steps : int
Maximum integration length for the LIC algorithm.
sigma_modulated : bool
If True, sigma is modulated by altitude in LIC processing.
n_iterations : int
Number of iterations for the LIC algorithm.
sigma_blur_maxcurv : float
Gaussian blur parameter for maximum curvature.
k : float
Weighting factor for combining LIC grids.
Returns
-------
np.ndarray
Processed DEM block.
"""
# Compute maximum curvature
curvature_block = calculate_maximal_curvature(MNT_block, cellsize)
# Apply adaptive Gaussian blur
MNT_blurred_block = fast_adaptive_gaussian_blur(
MNT_block, curvature_block, cellsize, sigma_max, num_bins
)
# Generate binary grid for flat/steep areas
flat_steep_grid_block = initialize_flat_steep_grid(MNT_blurred_block, slope_threshold, cellsize)
# Remove small flat areas
cleaned_grid_block = remove_small_flat_areas(flat_steep_grid_block, min_area)
# Smooth the binary grid
continuous_grid_block = gaussian_filter(cleaned_grid_block.astype(float), sigma=5.0, mode="nearest")
# Correct flat area values
corrected_f = correct_flat_area_values(cleaned_grid_block, continuous_grid_block, sigma=2.0)
# Perform multi-pass LIC processing
processed_block = LIC_iterations(
MNT_blurred_block,
cellsize,
corrected_f,
num_steps,
sigma_modulated,
n_iterations,
sigma_blur_maxcurv,
k,
)
return processed_block
lic_extended_partial = partial(
process_lic_extended,
sigma_max=5.0,
slope_threshold=6,
num_bins=10,
min_area=100,
num_steps=5,
sigma_modulated=True,
n_iterations=5,
sigma_blur_maxcurv=3.0,
k=2.5
)
if __name__ == "__main__":
"""Main script for processing a GeoTIFF file with Line Integral Convolution (LIC) and
adaptive Gaussian blur techniques for terrain generalization.
"""
# File paths
MNT_input_path = ""
LIC_complete_output_path = ""
# Call the processing function
process_geotiff_in_block_with_overlap(
MNT_input_path,
LIC_complete_output_path,
processing_function=lic_extended_partial,
sigma_max = 2.0,
num_steps = 5,
sigma_modulated= True,
n_iterations = 4,
sigma_blur_maxcurv = 2.9,
)