Skip to content

Mix

src.geostat.kernel.Mix

Bases: Kernel

Mix kernel class for combining multiple Gaussian Process (GP) kernels.

The Mix class defines a kernel that allows combining multiple input kernels, either using specified weights or by directly mixing the component kernels. This provides a flexible way to create complex covariance structures by blending the properties of different kernels.

Parameters:

  • inputs (list of Kernel objects) –

    A list of kernel objects to be combined.

  • weights (matrix, default: None ) –

    A matrix specifying how the input kernels should be combined. If not provided, the kernels are combined without weighting.

Examples:

Combining multiple kernels with specified weights:

from geostat.kernel import Mix, SquaredExponential, Noise

# Create individual kernels
kernel1 = SquaredExponential(sill=1.0, range=2.0)
kernel2 = Noise(nugget=0.1)

# Combine kernels using the Mix class
mixed_kernel = Mix(inputs=[kernel1, kernel2], weights=[[0.6, 0.4], [0.4, 0.6]])

locs1 = np.array([[0.0], [1.0], [2.0]])
locs2 = np.array([[0.0], [1.0], [2.0]])
covariance_matrix = mixed_kernel({'locs1': locs1, 'locs2': locs2, 'weights': [[0.6, 0.4], [0.4, 0.6]]})

Using the Mix kernel without weights:

mixed_kernel_no_weights = Mix(inputs=[kernel1, kernel2])

Notes:

  • The call method computes the covariance matrix by either using the specified weights to combine the input kernels or directly combining them when weights are not provided.
  • The vars method gathers the parameters from all input kernels, allowing for easy access and manipulation of their coefficients.
  • The Mix kernel is useful for creating complex, multi-faceted covariance structures by blending different types of kernels, providing enhanced modeling flexibility.
Source code in src/geostat/kernel.py
class Mix(Kernel):
    """
    Mix kernel class for combining multiple Gaussian Process (GP) kernels.

    The `Mix` class defines a kernel that allows combining multiple input kernels, either using 
    specified weights or by directly mixing the component kernels. This provides a flexible way to 
    create complex covariance structures by blending the properties of different kernels.

    Parameters:
        inputs (list of Kernel objects):
            A list of kernel objects to be combined.
        weights (matrix, optional):
            A matrix specifying how the input kernels should be combined. If not provided, 
            the kernels are combined without weighting.

    Examples:
        Combining multiple kernels with specified weights:

        ```python
        from geostat.kernel import Mix, SquaredExponential, Noise

        # Create individual kernels
        kernel1 = SquaredExponential(sill=1.0, range=2.0)
        kernel2 = Noise(nugget=0.1)

        # Combine kernels using the Mix class
        mixed_kernel = Mix(inputs=[kernel1, kernel2], weights=[[0.6, 0.4], [0.4, 0.6]])

        locs1 = np.array([[0.0], [1.0], [2.0]])
        locs2 = np.array([[0.0], [1.0], [2.0]])
        covariance_matrix = mixed_kernel({'locs1': locs1, 'locs2': locs2, 'weights': [[0.6, 0.4], [0.4, 0.6]]})
        ```

        Using the `Mix` kernel without weights:

        ```python
        mixed_kernel_no_weights = Mix(inputs=[kernel1, kernel2])
        ```

    Examples: Notes:
        - The `call` method computes the covariance matrix by either using the specified weights 
            to combine the input kernels or directly combining them when weights are not provided.
        - The `vars` method gathers the parameters from all input kernels, allowing for easy 
            access and manipulation of their coefficients.
        - The `Mix` kernel is useful for creating complex, multi-faceted covariance structures 
            by blending different types of kernels, providing enhanced modeling flexibility.
    """

    def __init__(self, inputs, weights=None):
        self.inputs = inputs
        fa = {}
        ai = dict(cats1='cats1', cats2='cats2')

        # Special case if weights is not given.
        if weights is not None:
            fa['weights'] = weights
            ai['inputs'] = inputs

        super().__init__(fa, ai)

    def gather_vars(self, cache=None):
        """Make a special version of gather_vars because
           we want to gather variables from `inputs`
           even when it's not in autoinputs"""
        vv = super().gather_vars(cache)
        for iput in self.inputs:
            cache[id(self)] |= iput.gather_vars(cache)
        return cache[id(self)]

    def vars(self):
        if 'weights' in self.fa:
            return {k: p for row in self.fa['weights']
                      for k, p in get_trend_coefs(row).items()}
        else:
            return {}

    def call(self, e):
        if 'weights' in e:
            weights = []
            for row in e['weights']:
                if isinstance(row, (tuple, list)):
                    row = tf.stack(row)
                    weights.append(row)
            weights = tf.stack(weights)
            C = tf.stack(e['inputs'], axis=-1) # [locs, locs, numinputs].
            Aaug1 = tf.gather(weights, e['cats1']) # [locs, numinputs].
            Aaug2 = tf.gather(weights, e['cats2']) # [locs, numinputs].
            outer = tf.einsum('ac,bc->abc', Aaug1, Aaug2) # [locs, locs, numinputs].
            C = tf.einsum('abc,abc->ab', C, outer) # [locs, locs].
            return C
        else:
            # When weights is not given, exploit the fact that we don't have
            # to compute every element in component covariance matrices.
            N = len(self.inputs)
            catcounts1 = tf.math.bincount(e['cats1'], minlength=N, maxlength=N)
            catcounts2 = tf.math.bincount(e['cats2'], minlength=N, maxlength=N)
            catindices1 = tf.math.cumsum(catcounts1, exclusive=True)
            catindices2 = tf.math.cumsum(catcounts2, exclusive=True)
            catdiffs = tf.unstack(catindices2 - catindices1, num=N)
            locsegs1 = tf.split(e['locs1'], catcounts1, num=N)
            locsegs2 = tf.split(e['locs2'], catcounts2, num=N)

            # TODO: Check that the below is still correct.
            CC = [] # Observation noise submatrices.
            for sublocs1, sublocs2, catdiff, iput in zip(locsegs1, locsegs2, catdiffs, self.inputs):
                cache = dict(
                    offset = e['offset'] + catdiff,
                    locs1 = sublocs1,
                    locs2 = sublocs2)
                cache['per_axis_dist2'] = PerAxisDist2().run(cache)
                cache['euclidean'] = Euclidean().run(cache)
                Csub = iput.run(cache)
                CC.append(Csub)

            return block_diag(CC)