Source code for mesmer.likelihood

""" This module contains the main likelihood object of the package.
:class:`mesmer.likelihood.LogProb`. This object is initialized by
providing a set of observed data and its covariance, and specifying
a model.
"""
import yaml
import h5py
import numpy as np
from .seds import FMatrix

__all__ = ["LogProb"]


[docs]class LogProb(object): """ Class calculating the marginalize posterior of spectral parameters given some observed data, its covariance, and a model for the emission of different components. This class links together the functions required to calculate the terms in Equation (A7) of 1608.00551. """ def __init__(self, data=None, covariance=None, frequencies=None, model=None): """ The setup of this class requires the observed multifrequency sky maps, their pixel covariance (assumed diagonal in pixel space), the SED matrix (instance of `mesmer.FMatrix`), a dictionary containing the fixed parameter and their values, and a set of priors, which tell the likelihood which parameters are to be let vary. Parameters ---------- data: ndarray Array of shape (Nfreq, Npol, Npix) containing the observed multi-frequency data. covariance: ndarray Array of shape (Nfreq, Npol, Npix) containing the pixel covariance of the array `data`. fmatrix: object, :class:`mesmer.FMatrix` Instance of :class:`mesmer.FMatrix` that defines the component scaling in the fitted sky model. fixed_parameters: dict Dictionary, where key, value pairs correspond to parameter names and values to be fixed. priors: dict Dictionary, where key, value pairs correpsond to which parameters are allowed to vary, and the corresponding mean and standard deviation of the Gaussian prior. """ if ( (data is not None) and (covariance is not None) and (frequencies is not None) ): self.data_setup(data, covariance, frequencies) if model is not None: self.model_setup(model) def __str__(self): """ Magic method for conveniently printing object summary. """ msg = r""" Data ---- Number of frequencies: {:d} Number of polarization channels: {:d} Number of pixels: {:d} Model ----- Components: {:s} Free parameters: {:s} Fixed parameters: {:s} """.format( self.nfreq, self.npol, self.npix, " ".join(self._components), " ".join(self.free_parameters), " ".join(self._fixed_parameters), ) return msg
[docs] def __call__(self, theta, ret_neg=False): """ This method computes the log probability at a point in parameter space specified by `pars` and any additional `kwargs` that may overwrite members of the `pars` dictionary. Parameters ---------- theta: ndarray Array containing the parameters that are being varied. These should match the parameters given as priors when instantiating this object. ret_neg: bool (optional, default=False) If True, return the negative log likelihood. Else return the positive. Returns ------- float The log probability at this point in parameter space. """ try: assert len(theta) == len(self.free_parameters) except AssertionError: raise AssertionError("Pass argument for each varied parameter.") lnprior = self._lnprior(theta) F = self._F(theta) N_T_inv = self._N_T_inv(theta, F=F) T_bar = self._T_bar(theta, F=F, N_T_inv=N_T_inv) lnP = _lnP(lnprior, T_bar, N_T_inv) # if ret_neg, return negative loglik1elihood. Convenient for use # with minimization functions to find ML. if ret_neg: return -lnP return lnP
[docs] def data_setup(self, data, covariance, frequencies): self.nfreq, self.npol, self.npix = data.shape self.frequencies = frequencies self.N_inv_d = (data, covariance)
[docs] def model_setup(self, model): """ Function to parse the model definition dictionary. The first level of the dictionary has keys corresponding to the different components, and must match one of the functions in `mesmer.seds`. The values of the keys are dictionaries with `fixed` and `varied` keywords. These contain further dictionaries with fixed parameter name / value pairs, and free parameter prior name / pairs. Parameters ---------- model: dict Dictionary defining model. """ self._components = list(model.keys()) self._fmatrix = FMatrix(self._components) self._priors = {} self._fixed_parameters = {} for component in model.values(): # get list of free parameters for this component prior = component.get("varied", None) # if there are not free parameters prior is None if prior is not None: self._priors.update(prior) # do the same for the fixed parameters fix_par = component.get("fixed", None) if fix_par is not None: self._fixed_parameters.update(fix_par) self.free_parameters = sorted(self._priors.keys())
[docs] def load_data_from_hdf5(self, fpath, mc=0): """ Method to instantiate data attributes using data saved in an HDF5 archive. Parameters ---------- fpath: str, Path Path to the hdf5 archive. record: str Record in which the data is stored. This could be the name of the group in which the datasets are saved. """ with h5py.File(fpath, "r") as f: maps = f["maps"] data = maps["data_mc{:04d}".format(mc)][...] cov = maps["cov"][...] frequencies = maps.attrs["frequencies"] self.data_setup(data, cov, frequencies)
[docs] @classmethod def load_data_from_hdf5_batch(cls, fpath, model=None): """ Method to instantiate data attrib utes using data saved in an HDF5 archive. Note that a simpler structure for this method would place the for loop within a single opening of the hdf5 file. However, this hogs the resource, and we want to be able to write to this file from other processes. Parameters ---------- fpath: str, Path Path to the hdf5 archive. model: Path (optional, default=None) If specified, this is a path to a yaml file containing an SED model specification. Returns ------- :class:`mesmer.likelihood.LogProb` Instantiated :class:`mesmer.likelihood.LogProb` object. """ # get metadata from the simulation with h5py.File(fpath, "r") as f: maps = f["maps"] nmc = maps.attrs["monte_carlo"] frequencies = maps.attrs1["frequencies"] # loop over monte carlo realizations and return an initialized # `LogProb` object. Each of these objects will still need to # have a model loaded, unless the model keyword is defined. for i in range(nmc): with h5py.File(fpath, "r") as f: maps = f["maps"] cov = maps["cov"][...] data = maps["data_mc{:04d}".format(i)][...] if model is None: yield cls(data=data, covariance=cov, frequencies=frequencies) else: with open(model) as f: cfg = yaml.load(f, Loader=yaml.FullLoader) yield cls(data=data, covariance=cov, frequencies=frequencies, model=cfg)
[docs] @classmethod def load_model_from_yaml(cls, fpath): """ Method to load a model configuration from a ``yaml`` file. """ with open(fpath) as f: model = yaml.load(f, Loader=yaml.FullLoader) return model["identifier"], cls(model=model["model"])
[docs] def get_amplitude_expectation(self, theta, component=None): """ Convenience function to return the component-separated expected amplitudes, `T_bar`, taking care of the relevant reshaping. """ T_bar = self._T_bar(theta) T_bar = np.moveaxis(T_bar.reshape(self.npol, self.npix, -1), 2, 0) if component is None: return T_bar assert component in self._components idx = self._components.index(component) return T_bar[idx]
[docs] def get_amplitdue_covariance(self, theta, component=None): """ Convenience function to return the component covariances, `N_T_inv`, for a given set of spectral parameters. """ N_T_inv = self._N_T_inv(theta) ncomp = len(self._components) N_T = np.linalg.inv(N_T_inv.reshape(self.npol, self.npix, ncomp, ncomp)) if component is None: return N_T assert component in self._components idx = self._components.index(component) return N_T[:, :, idx, idx]
@property def free_parameters(self): return self.__free_parameters @free_parameters.setter def free_parameters(self, val): self.__free_parameters = val @property def N_inv(self): return self.__N_inv @N_inv.setter def N_inv(self, val): self.__N_inv = val @property def N_inv_d(self): return self.__N_inv_d @N_inv_d.setter def N_inv_d(self, val): """ Setter method for the inverse-variance weighted data. Parameters ---------- val: tuple(ndarray) Tuple containing the data and covariance arrays. Arrays must have the same shape, and are expected in shape (Nfreq, Npol, Npix). """ (data, cov) = val try: assert data.ndim == 3 assert cov.ndim == 3 except AssertionError: raise AssertionError("Data must have three dimensions, Nfreq, Npol, Npix") shape = [self.npix * self.npol, self.nfreq] data = _reorder_reshape_inputs(data, shape) self.N_inv = 1.0 / _reorder_reshape_inputs(cov, shape) self.__N_inv_d = data * self.N_inv
[docs] def theta_0(self, npoints=None, seed=7837): """ Function to generate a starting guess for optimization or sampling by drawing a random point from the prior. Parameters ---------- npoints: int (optional, default=None) If npoints is not None, function returns an array of draws from the prior of dimension (npoints, ndim). This can be useful for initializing a set of optimization runs, or samplers. seed: int (optional, default=7837) Seed for the PRNG key used by `jax`. `jax` has a subtly different approach to random number generation to numpy. Worth reading about this before setting this number. Returns ------- ndarray Array of length the number of free parameters. """ if npoints is not None: shape = (npoints, len(self.free_parameters)) else: shape = (len(self.free_parameters),) out = np.random.normal(*shape, dtype=np.float32) means = [] stds = [] for par in self.free_parameters: mean, std = self._priors[par] means.append(mean) stds.append(std) means = np.array(means) stds = np.array(stds) return means + out * stds
def _F(self, theta): free_parameters = dict(zip(self.free_parameters, theta)) params = {**free_parameters, **self._fixed_parameters, "nu": self.frequencies} return self._fmatrix(**params) def _N_T_inv(self, theta, F=None): if F is None: F = self._F(theta) return _N_T_inv(F, self.N_inv) def _T_bar(self, theta, F=None, N_T_inv=None): if F is None: F = self._F(theta) if N_T_inv is None: N_T_inv = self._N_T_inv(theta, F=F) return _T_bar(F, N_T_inv, self.N_inv_d) def _lnprior(self, theta): logprior = 0 for arg, par in zip(theta, self.free_parameters): mean, std = self._priors[par] logprior += _log_gaussian(arg, mean, std) return logprior
def _N_T_inv(F: np.ndarray, N_inv: np.ndarray) -> np.ndarray: """Function to calculate the inverse covariance of component amplitudes, `N_T_inv. This is an implementation of Equation (A4) in 1608.00551. See also Equation (A10) for interpretation. Parameters ---------- F: ndarray SED matrix N_inv: ndarray Inverse noise covariance. Returns ------- ndarray N_T_inv, the inverse covariance of component amplitude. """ Fprod = F[:, None, :] * F[None, :, :] return np.sum(Fprod[None, :, :, :] * N_inv[:, None, None, :], axis=3) def _T_bar(F: np.ndarray, N_T_inv: np.ndarray, N_inv_d: np.ndarray) -> np.ndarray: """Function to calculate the expected component amplitudes, `T_bar`. This is an implementation of Equation (A4) in 1608.00551. See also Equation (A10) for interpretation. Parameters ---------- F: ndarray SED matrix N_T_inv: ndarray Inverse component covariance. N_inv_d: ndarray Inverse covariance-weighted data. Returns ------- ndarray T_bar, the expected component amplitude. """ y = np.sum(F[None, :, :] * N_inv_d[:, None, :], axis=2) return np.linalg.solve(N_T_inv, y) def _lnP(lnprior, T_bar, N_T_inv): """ Function to calculate the posterior marginalized over amplitude parameters. This function calcualtes Equation (A7), with the inclusion of a Jeffrey's prior of 1608.00551: .. math:: p(b|d) \\propto \\exp\\left[\\frac{1}{2}\\bar{T}^T N_T^{-1} \\bar{T} \\right]p_p(b) Parameters ---------- lnprior: float Prior evaluated at the given point in parameter space T_bar: ndarray Array containing the component amplitude means calculated at this point in parameter space. N_T_inv: ndarray Component amplitude covariance calculated at this point in parameter space. Returns ------- float Log likelihood of this set of spectral parameters. """ return lnprior + 0.5 * np.einsum("ij,ijk,ik->", T_bar, N_T_inv, T_bar) def _log_gaussian(par, mean, std): """ Function used to calculate a Gaussian prior. """ return -0.5 * ((par - mean) / std) ** 2 def _reorder_reshape_inputs(arr, shape): """ Function to reorder axes and reshape dimensions of input data. This takes input data, assumed to be of shape: (Nfreq, Npol, Npix) and converts to shape (Npix * Npol, Nfreq), which is easier to work with in the likelihood. Parameters ---------- arr: ndarray Numpy array with three dimensions. Returns ------- ndarray Numpy array with two dimensions. """ return np.moveaxis(arr, (0, 1, 2), (2, 0, 1)).reshape(shape).astype(np.float32)