Source code for gcm_filters.filter

"""Main Filter class."""
import enum
import warnings

from dataclasses import dataclass, field
from itertools import chain, zip_longest
from typing import Iterable, NamedTuple

import numpy as np
import xarray as xr

from scipy import interpolate

from .gpu_compat import get_array_module
from .kernels import (
    ALL_KERNELS,
    AreaWeightedMixin,
    BaseScalarLaplacian,
    BaseVectorLaplacian,
    GridType,
)


FilterShape = enum.Enum("FilterShape", ["GAUSSIAN", "TAPER"])


# These parameters are used to set the default n_steps
filter_params = {
    FilterShape.GAUSSIAN: {
        1: {"offset": 0.8, "factor": 0.0, "exponent": 1},
        2: {"offset": 1.1, "factor": 0.0, "exponent": 1},
    },
    FilterShape.TAPER: {
        1: {"offset": 2.2, "factor": 0.6, "exponent": 2.5},
        2: {"offset": 3.2, "factor": 0.7, "exponent": 2.7},
    },
}


class TargetSpec(NamedTuple):
    s_max: float
    filter_scale: float
    transition_width: float


# these functions return functions
def _gaussian_target(target_spec: TargetSpec):
    return lambda t: np.exp(
        -(target_spec.s_max * (t + 1) / 2) * (target_spec.filter_scale) ** 2 / 24
    )


def _taper_target(target_spec: TargetSpec):
    FK = interpolate.PchipInterpolator(
        np.array(
            [
                0,
                2 * np.pi / (target_spec.transition_width * target_spec.filter_scale),
                2 * np.pi / target_spec.filter_scale,
                8 * np.sqrt(target_spec.s_max),
            ]
        ),
        np.array([1, 1, 0, 0]),
    )
    return lambda t: FK(np.sqrt((t + 1) * (target_spec.s_max / 2)))


_target_function = {
    FilterShape.GAUSSIAN: _gaussian_target,
    FilterShape.TAPER: _taper_target,
}


class FilterSpec(NamedTuple):
    n_steps: int
    s_max: float
    p: Iterable[float]
    dx_min_sq: float


def _compute_filter_spec(
    filter_scale,
    dx_min,
    filter_shape,
    transition_width=np.pi,
    ndim=2,
    n_steps=0,
):

    # Set up the mass matrix for the Galerkin basis from Shen (SISC95)
    M = (np.pi / 2) * (
        2 * np.eye(n_steps - 1)
        - np.diag(np.ones(n_steps - 3), 2)
        - np.diag(np.ones(n_steps - 3), -2)
    )
    M[0, 0] = 3 * np.pi / 2

    # The range of wavenumbers is 0<=|k|<=sqrt(ndim)*pi/dxMin.
    # However, our 2nd order laplacians only get to sqrt(ndim)*2/dxMin at most.
    # Caveat: Not sure what is a good max wavenumber for the C-grid vector Laplacian
    # Per the paper, define s=k^2.
    # Need to rescale to t in [-1,1]: t = (2/sMax)*s -1; s = sMax*(t+1)/2
    s_max = ndim * (2 / dx_min) ** 2

    target_spec = TargetSpec(s_max, filter_scale, transition_width)
    F = _target_function[filter_shape](target_spec)

    # Compute inner products of Galerkin basis with target
    b = np.zeros(n_steps - 1)
    points, weights = np.polynomial.chebyshev.chebgauss(n_steps + 1)
    for i in range(n_steps - 1):
        tmp = np.zeros(n_steps + 1)
        tmp[i] = 1
        tmp[i + 2] = -1
        phi = np.polynomial.chebyshev.chebval(points, tmp)
        b[i] = np.sum(
            weights * phi * (F(points) - ((1 - points) / 2 + F(1) * (points + 1) / 2))
        )

    # Get polynomial coefficients in Galerkin basis
    c_hat = np.linalg.solve(M, b)
    # Convert back to Chebyshev basis coefficients
    p = np.zeros(n_steps + 1)
    p[0] = c_hat[0] + (1 + F(1)) / 2
    p[1] = c_hat[1] - (1 - F(1)) / 2
    for i in range(2, n_steps - 1):
        p[i] = c_hat[i] - c_hat[i - 2]
    p[n_steps - 1] = -c_hat[n_steps - 3]
    p[n_steps] = -c_hat[n_steps - 2]

    dx_min_sq = dx_min**2  # For nondimensional Laplacians

    return FilterSpec(n_steps, s_max, p, dx_min_sq)


def _create_filter_func(
    filter_spec: FilterSpec,
    Laplacian: BaseScalarLaplacian,
):
    """Returns a function whose first argument is the field to be filtered
    and whose subsequent arguments are the required grid variables
    """

    def shifted_laplacian(
        field,
        s_max,
        laplacian,
        dx_min_sq,
    ):
        # This function computes -(field + (2/s_max) * laplacian(field))
        output = laplacian(field)
        if laplacian.is_dimensional:
            output = -field - (2 / s_max) * output
        else:
            output = -field - (2 / (s_max * dx_min_sq)) * output

        return output

    def filter_func(field, *args):
        # these next steps are a kind of hack we have to turn keyword arugments into regular arguments
        # the reason for doing this is that Xarray's apply_ufunc machinery works a lot better
        # with regular arguments
        assert len(args) == len(Laplacian.required_grid_args())
        grid_vars = {k: v for k, v in zip(Laplacian.required_grid_args(), args)}
        laplacian = Laplacian(**grid_vars)
        np = get_array_module(field)
        field_bar = field.copy()  # Initalize the filtering process

        # prepare field for filtering (this multiplies by area for simple fixed factor
        # filters, and does nothing for all other filters)
        field_bar = laplacian.prepare(field_bar)

        T_minus_2 = field_bar.copy()
        T_minus_1 = shifted_laplacian(
            field_bar, filter_spec.s_max, laplacian, filter_spec.dx_min_sq
        )
        field_bar = filter_spec.p[0] * T_minus_2 + filter_spec.p[1] * T_minus_1
        for i in range(2, filter_spec.n_steps + 1):
            T_minus_0 = (
                2
                * shifted_laplacian(
                    T_minus_1, filter_spec.s_max, laplacian, filter_spec.dx_min_sq
                )
                - T_minus_2
            )
            field_bar += filter_spec.p[i] * T_minus_0
            T_minus_2 = T_minus_1.copy()
            T_minus_1 = T_minus_0.copy()

        # finalize filtering (this divides by area for simple fixed factor filters,
        # and does nothing for all other filters)
        field_bar = laplacian.finalize(field_bar)

        return field_bar

    return filter_func


def _create_filter_func_vec(
    filter_spec: FilterSpec,
    Laplacian: BaseVectorLaplacian,
):
    """Returns a function whose first two arguments are the vector components of the field to be filtered
    and whose subsequent arguments are the require grid variables
    """

    def shifted_laplacian_vec(
        ufield,
        vfield,
        s_max,
        laplacian,
        dx_min_sq,
    ):
        # This function computes -(field + (2/s_max) * laplacian(field))
        (u_output, v_output) = laplacian(ufield, vfield)
        if laplacian.is_dimensional:
            u_output = -ufield - (2 / s_max) * u_output
            v_output = -vfield - (2 / s_max) * v_output
        else:
            u_output = -ufield - (2 / (s_max * dx_min_sq)) * u_output
            v_output = -vfield - (2 / (s_max * dx_min_sq)) * v_output
        return (u_output, v_output)

    def filter_func_vec(ufield, vfield, *args):
        # these next steps are a kind of hack we have to turn keyword arugments into regular arguments
        # the reason for doing this is that Xarray's apply_ufunc machinery works a lot better
        # with regular arguments
        assert len(args) == len(Laplacian.required_grid_args())
        grid_vars = {k: v for k, v in zip(Laplacian.required_grid_args(), args)}
        laplacian = Laplacian(**grid_vars)
        np = get_array_module(ufield)
        ufield_bar = ufield.copy()  # Initalize the filtering process
        vfield_bar = vfield.copy()  # Initalize the filtering process

        # prepare field for filtering (this multiplies by area for simple fixed factor
        # filters, and does nothing for all other filters)
        (ufield_bar, vfield_bar) = laplacian.prepare(ufield_bar, vfield_bar)

        uT_minus_2 = ufield_bar.copy()
        vT_minus_2 = vfield_bar.copy()
        (uT_minus_1, vT_minus_1) = shifted_laplacian_vec(
            ufield_bar,
            vfield_bar,
            filter_spec.s_max,
            laplacian,
            filter_spec.dx_min_sq,
        )
        ufield_bar = filter_spec.p[0] * uT_minus_2 + filter_spec.p[1] * uT_minus_1
        vfield_bar = filter_spec.p[0] * vT_minus_2 + filter_spec.p[1] * vT_minus_1
        for i in range(2, filter_spec.n_steps + 1):
            (uT_minus_0, vT_minus_0) = shifted_laplacian_vec(
                uT_minus_1,
                vT_minus_1,
                filter_spec.s_max,
                laplacian,
                filter_spec.dx_min_sq,
            )
            uT_minus_0 = 2 * uT_minus_0 - uT_minus_2
            vT_minus_0 = 2 * vT_minus_0 - vT_minus_2
            ufield_bar += filter_spec.p[i] * uT_minus_0
            vfield_bar += filter_spec.p[i] * vT_minus_0
            uT_minus_2 = uT_minus_1.copy()
            uT_minus_1 = uT_minus_0.copy()
            vT_minus_2 = vT_minus_1.copy()
            vT_minus_1 = vT_minus_0.copy()

        # finalize filtering (this divides by area for simple fixed factor filters,
        # and does nothing for all other filters)
        (ufield_bar, vfield_bar) = laplacian.finalize(ufield_bar, vfield_bar)

        return (ufield_bar, vfield_bar)

    return filter_func_vec


[docs]@dataclass class Filter: """A class for applying diffusion-based smoothing filters to gridded data. Parameters ---------- filter_scale : float The filter scale, which has different meaning depending on filter shape dx_min : float The smallest grid spacing. Should have same units as ``filter_scale`` n_steps : int, optional Number of total steps in the filter ``n_steps == 0`` means the number of steps is chosen automatically filter_shape : FilterShape - ``FilterShape.GAUSSIAN``: The target filter has shape :math:`e^{-(k filter_scale)^2/24}` - ``FilterShape.TAPER``: The target filter has target grid scale Lf. Smaller scales are zeroed out. Scales larger than ``pi * filter_scale / 2`` are left as-is. In between is a smooth transition. transition_width : float, optional Width of the transition region in the "Taper" filter. This is a nondimensional parameter. Theoretical minimum is 1; not recommended. ndim : int, optional Laplacian is applied on a grid of dimension ndim grid_type : GridType what sort of grid we are dealing with grid_vars : dict dictionary of extra parameters used to initialize the grid Laplacian Attributes ---------- filter_spec: FilterSpec """ filter_scale: float dx_min: float filter_shape: FilterShape = FilterShape.GAUSSIAN transition_width: float = np.pi ndim: int = 2 n_steps: int = 0 grid_type: GridType = GridType.REGULAR grid_vars: dict = field(default_factory=dict, repr=False) def __post_init__(self): self.Laplacian = ALL_KERNELS[self.grid_type] # Determine whether this is simple fixed factor filter; in that case we need dx_min = 1 if issubclass(self.Laplacian, AreaWeightedMixin): if self.dx_min != 1: raise ValueError( f"Provided Laplacian is for simple fixed factor filtering, " "where transformed field is filtered on a regular grid with dx = dy = 1. " "dx_min must be set to 1." ) # Check if transition_width is <=1 if self.transition_width <= 1: raise ValueError(f"Transition width must be > 1.") # Get default number of steps filter_factor = self.filter_scale / self.dx_min if self.ndim > 2: if self.n_steps < 3: raise ValueError(f"When ndim > 2, you must set n_steps manually") else: n_steps_default = self.n_steps # For ndim>2 we don't have a default else: n_steps_factor = filter_params[self.filter_shape][self.ndim][ "offset" ] + filter_params[self.filter_shape][self.ndim]["factor"] * ( (np.pi / self.transition_width) ** filter_params[self.filter_shape][self.ndim]["exponent"] ) n_steps_default = np.ceil(n_steps_factor * filter_factor).astype(int) # Set n_steps if needed and issue n_step warning, if needed if self.n_steps < 3: self.n_steps = n_steps_default if self.n_steps < n_steps_default: warnings.warn( "You have set n_steps below the default. Results might not be accurate.", stacklevel=2, ) self.filter_spec = _compute_filter_spec( self.filter_scale, self.dx_min, self.filter_shape, self.transition_width, self.ndim, self.n_steps, ) # check that we have all the required grid aguments if not set(self.Laplacian.required_grid_args()) == set(self.grid_vars): raise ValueError( f"Provided `grid_vars` {list(self.grid_vars)} do not match expected " f"{list(self.Laplacian.required_grid_args())}" ) self.grid_ds = xr.Dataset({name: da for name, da in self.grid_vars.items()})
[docs] def plot_shape(self, ax=None): """Plot the shape of the target filter and approximation.""" import matplotlib.pyplot as plt # Plot the target filter and the approximate filter s_max = self.filter_spec.s_max target_spec = TargetSpec(s_max, self.filter_scale, self.transition_width) F = _target_function[self.filter_shape](target_spec) x = np.linspace(-1, 1, 10001) k = np.sqrt(s_max * (x + 1) / 2) if ax is None: fig, ax = plt.subplots() ax.plot(k, F(x), "g", label="target filter", linewidth=4) ax.plot( k, np.polynomial.chebyshev.chebval(x, self.filter_spec.p), "m", label="approximation", linewidth=4, ) ax.axvline( 2 * np.pi / self.filter_scale, color="k", label="filter cutoff wavenumber", linewidth=2, ) ax.set_xlim(left=0) if self.filter_scale / self.dx_min > 10: ax.set_xlim(right=4 * np.pi / self.filter_scale) ax.set_ylim(bottom=-0.1) ax.set_ylim(top=1.1) ax.set_xlabel("Wavenumber k", fontsize=18) ax.grid(True) ax.legend()
[docs] def apply(self, ds, dims): """Filter an `xarray.DataArray` or `xarray.Dataset` with a scalar Laplacian across the dimensions specified by `dims`. Parameters ---------- ds : xarray.DataArray or xarray.Dataset The data to be filtered. If Dataset, filter will be applied to all data variables. dims : sequence of str The names of the dimensions over which to apply the filter. Usually this is two spatial dimensions, e.g. ``('lat', 'lon')`` or ``('y', 'x')``. .. warning:: The dimension order matters! Since some filters deal with anisotropic grids, the latitude dimension must appear first in order to obtain the correct result. """ if issubclass(self.Laplacian, BaseVectorLaplacian): raise ValueError( f"Provided Laplacian {self.Laplacian} is a vector Laplacian. " f"The ``.apply`` method is only suitable for scalar Laplacians." ) if isinstance(ds, xr.Dataset): filtered = ds.copy(deep=True) any_filtered = False for key, var in filtered.variables.items(): if all(dim in var.dims for dim in dims): filtered[key] = self._apply_to_dataarray(var, dims=dims) any_filtered = True if not any_filtered: warnings.warn( f"No variables in the dataset had all of the given " f"dimensions ({dims}), so nothing was filtered.", stacklevel=2, ) return filtered else: return self._apply_to_dataarray(ds, dims=dims)
def _apply_to_dataarray(self, field, dims): """Filter an `xarray.DataArray` field with scalar Laplacian across the dimensions specified by dims.""" filter_func = _create_filter_func(self.filter_spec, self.Laplacian) grid_args = [self.grid_ds[name] for name in self.Laplacian.required_grid_args()] assert len(dims) == 2 n_args = 1 + len(grid_args) field_smooth = xr.apply_ufunc( filter_func, field, *grid_args, input_core_dims=n_args * [dims], output_core_dims=[dims], output_dtypes=[field.dtype], dask="parallelized", ) return field_smooth
[docs] def apply_to_vector(self, ufield, vfield, dims): """Filter a vector field with vector Laplacian across the dimensions specified by dims. Parameters ---------- ufield : xarray.DataArray The zonal component of the data to be filtered. vfield : xarray.DataArray The meridional component of the data to be filtered. dims : sequence of str The names of the dimensions over which to apply the filter. Usually this is two spatial dimensions, e.g. ``('lat', 'lon')`` or ``('y', 'x')``. .. warning:: The dimension order matters! Since some filters deal with anisotropic grids, the latitude dimension must appear first in order to obtain the correct result. """ if not issubclass(self.Laplacian, BaseVectorLaplacian): raise ValueError( f"Provided Laplacian {self.Laplacian} is a scalar Laplacian. " f"The ``.apply_to_vector`` method is only suitable for vector Laplacians." ) filter_func_vec = _create_filter_func_vec(self.filter_spec, self.Laplacian) grid_args = [self.grid_ds[name] for name in self.Laplacian.required_grid_args()] assert len(dims) == 2 n_args = 2 + len(grid_args) (ufield_smooth, vfield_smooth) = xr.apply_ufunc( filter_func_vec, ufield, vfield, *grid_args, input_core_dims=n_args * [dims], output_core_dims=2 * [dims], output_dtypes=[ufield.dtype, vfield.dtype], dask="parallelized", ) return (ufield_smooth, vfield_smooth)