Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 216 additions & 1 deletion python/packages/isce3/atmosphere/ionosphere_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
label,
find_objects,
gaussian_filter,
generic_filter)
generic_filter,
maximum_filter,
uniform_filter,
)

import isce3
from isce3.core.block_param_generator import block_param_generator
Expand Down Expand Up @@ -1178,3 +1181,215 @@ def remove_small_components(image, min_cluster_pixels):
cleaned_image[slice_tuple][current_object] = np.nan

return cleaned_image



def build_mask(
phase_image,
*,
coherence_image=None,
conncomp_image=None,
subswath_mask_image=None,
coh_threshold=None,
gradient_threshold=None,
gradient_window_size=25,
gradient_method='mean',
gradient_mask_percentile=None,
water_mask_image=None,
mask_type=None,
fill_value=0.0,
):
"""
Apply a mask to one native-grid phase image.
"""
if phase_image is None:
return None

if mask_type is None:
mask_type = []

mask = np.isfinite(phase_image) & (phase_image != 0)

if "coherence" in mask_type and coherence_image is not None:
mask &= np.isfinite(coherence_image)
mask &= (coherence_image >= coh_threshold)

if "connected_components" in mask_type and conncomp_image is not None:
mask &= np.isfinite(conncomp_image)
mask &= (conncomp_image > 0)

if "subswath_mask" in mask_type and subswath_mask_image is not None:
mask &= np.isfinite(subswath_mask_image)
mask &= (subswath_mask_image > 0)

if "water" in mask_type and water_mask_image is not None:
mask &= water_mask_image

if "gradient" in mask_type:
_, _, grad_mask = detect_high_phase_gradient_local(
phase_image,
gradient_threshold,
window_size=gradient_window_size,
mask=mask,
method=gradient_method,
percentile=gradient_mask_percentile)
mask &= grad_mask

out = phase_image.copy()
out[~mask] = fill_value
return out, mask


def detect_high_phase_gradient_local(
unwrapped_phase,
threshold,
window_size=5,
mask=None,
method="mean",
percentile=90,
return_gradient_components=False,
):
"""
Detect high-gradient areas from an unwrapped phase image using
a local windowed gradient score.
Parameters
----------
unwrapped_phase : np.ndarray
2D unwrapped phase array in radians.
threshold : float
Threshold applied to the local gradient score.
Unit: radians/pixel.
window_size : int, optional
Size of the moving window used to summarize gradient magnitude.
Must be >= 1. Typical values: 3, 5, 7.
mask : np.ndarray or None, optional
Boolean valid-data mask with same shape as input.
True means valid pixel. If None, finite values are treated as valid.
method : str, optional
Local summary method:
- "mean": local mean of gradient magnitude
- "max": local max of gradient magnitude
- "percentile": local percentile of gradient magnitude
percentile : float, optional
Percentile used when method="percentile".
return_gradient_components : bool, optional
If True, also return gx and gy.
Returns
-------
grad_mag : np.ndarray
Pixelwise gradient magnitude in radians/pixel.
local_grad_score : np.ndarray
Local windowed summary of grad_mag in radians/pixel.
high_gradient_mask : np.ndarray
Boolean mask where local_grad_score > threshold.
gx, gy : np.ndarray, optional
Gradient components if return_gradient_components=True.
Notes
-----
For unwrapped phase, gradients are computed using central differences
for interior pixels and first differences at the boundaries.
"""
phase = np.asarray(unwrapped_phase, dtype=np.float64)

if phase.ndim != 2:
raise ValueError("unwrapped_phase must be a 2D array")

if window_size < 1 or int(window_size) != window_size:
raise ValueError("window_size must be a positive integer")

if method not in {"mean", "max", "percentile"}:
raise ValueError("method must be 'mean', 'max', or 'percentile'")

if mask is None:
valid = np.isfinite(phase)
else:
valid = np.asarray(mask, dtype=bool) & np.isfinite(phase)

# Require valid neighbors for safe gradient estimation
# We will compute gradients first on a filled array, then invalidate
# locations influenced by invalid pixels.
phase_filled = phase.copy()
phase_filled[~valid] = 0.0

# Gradient in y (rows) and x (cols)
gy = np.full_like(phase, np.nan, dtype=np.float64)
gx = np.full_like(phase, np.nan, dtype=np.float64)

# Central differences for interior
gy[1:-1, :] = (phase_filled[2:, :] - phase_filled[:-2, :]) / 2.0
gx[:, 1:-1] = (phase_filled[:, 2:] - phase_filled[:, :-2]) / 2.0

# Forward/backward difference at borders
gy[0, :] = phase_filled[1, :] - phase_filled[0, :]
gy[-1, :] = phase_filled[-1, :] - phase_filled[-2, :]

gx[:, 0] = phase_filled[:, 1] - phase_filled[:, 0]
gx[:, -1] = phase_filled[:, -1] - phase_filled[:, -2]

# Build validity masks for gradients
valid_gx = np.zeros_like(valid, dtype=bool)
valid_gy = np.zeros_like(valid, dtype=bool)

# gx validity
valid_gx[:, 1:-1] = valid[:, 2:] & valid[:, :-2] & valid[:, 1:-1]
valid_gx[:, 0] = valid[:, 0] & valid[:, 1]
valid_gx[:, -1] = valid[:, -1] & valid[:, -2]

# gy validity
valid_gy[1:-1, :] = valid[2:, :] & valid[:-2, :] & valid[1:-1, :]
valid_gy[0, :] = valid[0, :] & valid[1, :]
valid_gy[-1, :] = valid[-1, :] & valid[-2, :]

gx[~valid_gx] = np.nan
gy[~valid_gy] = np.nan

# Gradient magnitude
grad_mag = np.sqrt(gx**2 + gy**2)

# Valid where both phase and grad_mag are valid
valid_grad = np.isfinite(grad_mag) & valid

# Local summary of gradient magnitude
if method == "mean":
data = np.where(valid_grad, grad_mag, 0.0)
weights = valid_grad.astype(np.float64)

local_sum = uniform_filter(data, size=window_size,
mode="constant", cval=0.0) * (window_size ** 2)
local_count = uniform_filter(weights, size=window_size, mode="constant", cval=0.0) * (window_size ** 2)

with np.errstate(invalid="ignore", divide="ignore"):
local_grad_score = local_sum / local_count

local_grad_score[local_count == 0] = np.nan

elif method == "max":
data = np.where(valid_grad, grad_mag, -np.inf)
local_grad_score = maximum_filter(data, size=window_size, mode="constant", cval=-np.inf)
local_grad_score[~valid] = np.nan
local_grad_score[~np.isfinite(local_grad_score)] = np.nan

elif method == "percentile":
def nanpercentile_func(values):
vals = values[np.isfinite(values)]
if vals.size == 0:
return np.nan
return np.percentile(vals, percentile)

data = np.where(valid_grad, grad_mag, np.nan)
local_grad_score = generic_filter(
data,
function=nanpercentile_func,
size=window_size,
mode="constant",
cval=np.nan,
)

# Final mask
high_gradient_mask = np.isfinite(local_grad_score) & (local_grad_score < threshold)
high_gradient_mask[~valid] = False

if return_gradient_components:
return grad_mag, local_grad_score, high_gradient_mask, gx, gy

return grad_mag, local_grad_score, high_gradient_mask
112 changes: 112 additions & 0 deletions python/packages/isce3/atmosphere/main_band_estimation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import numpy as np

from .ionosphere_estimation import IonosphereEstimation
from isce3.atmosphere.ionosphere_filter import (
detect_high_phase_gradient_local)
from isce3.signal.interpolate_by_range import decimate_freq_a_array
from isce3.unwrap.preprocess import interpret_subswath_mask

Expand Down Expand Up @@ -367,6 +369,116 @@ def get_valid_area(

return mask_array

def get_gradient_mask(
self,
main_array=None,
side_array=None,
diff_ms_array=None,
low_band_array=None,
high_band_array=None,
diff_low_high_band_array=None,
slant_main=None,
slant_side=None,
mask=None,
window=None,
threshold_first=None,
threshold_second=None,
method='mean',
percentile=None):
"""
Generate a boolean mask identifying pixels with consistently high phase
gradient based on one or two input interferometric arrays.
This function detects high phase gradient regions using
`detect_high_phase_gradient_local` and combines two gradient masks
(first and second) using a logical AND operation. The resulting mask
highlights pixels that simultaneously exceed gradient thresholds in
both inputs. Optionally, frequency A/B alignment (decimation) is applied
when side-band data is involved.
The function supports two modes:
1. Using `main_array` and `diff_low_high_band_array`
2. Using `low_band_array` and `high_band_array`
Small isolated pixels are removed from the final mask.
Parameters
----------
main_array : numpy.ndarray, optional
Interferometric phase (or coherence) array for the main band.
Used when `diff_low_high_band_array` is provided.
side_array : numpy.ndarray, optional
Interferometric array for the side band. If provided, low/high band
arrays are decimated to match the geometry of the main band.
diff_ms_array : numpy.ndarray, optional
Difference array between main and side bands (currently unused).
low_band_array : numpy.ndarray, optional
Interferometric array for the low-frequency sub-band.
high_band_array : numpy.ndarray, optional
Interferometric array for the high-frequency sub-band.
diff_low_high_band_array : numpy.ndarray, optional
Difference array between high and low sub-band interferograms.
If provided, this is used instead of separate low/high arrays.
slant_main : numpy.ndarray, optional
Slant range coordinates for the main (frequency A) band.
slant_side : numpy.ndarray, optional
Slant range coordinates for the side (frequency B) band.
window : int, optional
Window size used for local gradient estimation.
threshold_first : float
Threshold for the first gradient mask (e.g., main or low band).
threshold_second : float
Threshold for the second gradient mask (e.g., diff or high band).
method : str, optional
Method used for gradient aggregation within the local window.
Options depend on `detect_high_phase_gradient_local`
(e.g., 'mean', 'max', 'percentile).
percentile : float, optional
Percentile value used when method requires percentile-based
thresholding.
Returns
-------
mask_array : numpy.ndarray (bool)
2D boolean mask where:
- True (1): pixels with high phase gradient in both inputs
- False (0): pixels not meeting the criteria
The mask is post-processed to remove isolated single pixels.
"""
# decimate coherence or connected components
# when side array is also used.
if side_array is not None or diff_ms_array is not None:
main_array = decimate_freq_a_array(
slant_main,
slant_side,
main_array)

_, _, grad_mask_first = detect_high_phase_gradient_local(
main_array,
threshold_first,
window_size=window,
mask=mask,
method=method,
percentile=percentile)

if diff_ms_array is not None:
_, _, grad_mask_second = detect_high_phase_gradient_local(
diff_ms_array,
threshold_second,
window_size=window,
mask=mask,
method=method,
percentile=percentile)
elif side_array is not None:
_, _, grad_mask_second = detect_high_phase_gradient_local(
side_array,
threshold_second,
window_size=window,
mask=mask,
method=method,
percentile=percentile)

mask_array = grad_mask_first & grad_mask_second
mask_array = self.remove_single_pixels(mask_array)

mask_array = np.array(mask_array, dtype=bool)
return mask_array

def get_subswath_mask_array(
self,
main_array=None,
Expand Down
Loading
Loading