Source code for halotools.empirical_models.assembias_models.heaviside_assembias

"""
This module contains the `~halotools.empirical_models.HeavisideAssembias` class.
The purpose of this class is to introduce step function-type assembly bias into
any method of any component model, as in
`Hearin et al 2015 decorated HODs <http://arxiv.org/abs/1512.03050>`_.
"""

import numpy as np
from warnings import warn
from astropy.utils.misc import NumpyRNGContext

from .. import model_defaults, model_helpers

from ...utils.array_utils import custom_len
from ...custom_exceptions import HalotoolsError
from ...utils.table_utils import compute_conditional_percentiles
import collections

__all__ = ('HeavisideAssembias', 'PreservingNgalHeavisideAssembias')
__author__ = ('Andrew Hearin', )


[docs]class HeavisideAssembias(object): """ Class used as an orthogonal mix-in to introduce step function-style assembly-biased behavior into any component model. """ def __init__(self, **kwargs): """ No positional arguments accepted; all argument are strictly keyword arguments. Parameters ---------- method_name_to_decorate : string Name of the method in the primary class whose behavior is being decorated lower_assembias_bound : float lower bound on the method being decorated with assembly bias upper_assembias_bound : float upper bound on the method being decorated with assembly bias sec_haloprop_key : string, optional String giving the column name of the secondary halo property governing the assembly bias. Must be a key in the table passed to the methods of `HeavisideAssembiasComponent`. Default value is specified in the `~halotools.empirical_models.model_defaults` module. split : float or list, optional Fraction or list of fractions between 0 and 1 defining how we split halos into two groupings based on their conditional secondary percentiles. Default is 0.5 for a constant 50/50 split. split_abscissa : list, optional Values of the primary halo property at which the halos are split as described above in the ``split`` argument. If ``loginterp`` is set to True (the default behavior), the interpolation will be done in the logarithm of the primary halo property. Default is to assume a constant 50/50 split. splitting_model : object, optional Model instance with a method called ``splitting_method_name`` used to split the input halos into two types. splitting_method_name : string, optional Name of method bound to ``splitting_model`` used to split the input halos into two types. halo_type_tuple : tuple, optional Tuple providing the information about how elements of the input ``table`` have been pre-divided into types. The first tuple entry must be a string giving the column name of the input ``table`` that provides the halo-typing. The second entry gives the value that will be stored in this column for halos that are above the percentile split, the third entry gives the value for halos below the split. If provided, you must ensure that the splitting of the ``table`` was self-consistently performed with the input ``split``, or ``split_abscissa`` and ``split_ordinates``, or ``split_func`` keyword arguments. assembias_strength : float or list, optional Fraction or sequence of fractions between -1 and 1 defining the assembly bias correlation strength. Default is 0.5. assembias_strength_abscissa : list, optional Values of the primary halo property at which the assembly bias strength is specified. Default is to assume a constant strength of 0.5. If passing a list, the strength will interpreted at the input ``assembias_strength_abscissa``. Default is to assume a constant strength of 0.5. loginterp : bool, optional If set to True, the interpolation will be done in the logarithm of the primary halo property, rather than linearly. Default is True. """ self._interpret_constructor_inputs(**kwargs) self._decorate_baseline_method() self._methods_to_inherit.extend(['assembias_strength']) try: self.publications.append('arXiv:1512.03050') except: self.publications = ['arXiv:1512.03050'] def _interpret_constructor_inputs(self, loginterp=True, sec_haloprop_key=model_defaults.sec_haloprop_key, **kwargs): """ """ self._loginterp = loginterp self.sec_haloprop_key = sec_haloprop_key required_attr_list = ['prim_haloprop_key', 'gal_type'] for attr in required_attr_list: if not hasattr(self, attr): msg = ("In order to use the HeavisideAssembias class " "to decorate your model component with assembly bias, \n" "the component instance must have a %s attribute") raise HalotoolsError(msg % attr) try: self._method_name_to_decorate = kwargs['method_name_to_decorate'] except KeyError: msg = ("The constructor to the HeavisideAssembiasComponent class " "must be called with the following keyword arguments:\n" "``%s``") raise HalotoolsError(msg % ('_method_name_to_decorate')) try: lower_bound = float(kwargs['lower_assembias_bound']) lower_bound_key = 'lower_bound_' + self._method_name_to_decorate + '_' + self.gal_type setattr(self, lower_bound_key, lower_bound) upper_bound = float(kwargs['upper_assembias_bound']) upper_bound_key = 'upper_bound_' + self._method_name_to_decorate + '_' + self.gal_type setattr(self, upper_bound_key, upper_bound) except KeyError: msg = ("The constructor to the HeavisideAssembiasComponent class " "must be called with the following keyword arguments:\n" "``%s``, ``%s``") raise HalotoolsError(msg % ('lower_assembias_bound', 'upper_assembias_bound')) self._set_percentile_splitting(**kwargs) self._initialize_assembias_param_dict(**kwargs) if 'halo_type_tuple' in kwargs: self.halo_type_tuple = kwargs['halo_type_tuple'] def _set_percentile_splitting(self, split=0.5, **kwargs): """ Method interprets the arguments passed to the constructor and sets up the interpolation scheme for how halos will be divided into two types as a function of the primary halo property. """ if 'splitting_model' in kwargs: self.splitting_model = kwargs['splitting_model'] func = getattr(self.splitting_model, kwargs['splitting_method_name']) if isinstance(func, collections.abc.Callable): self._input_split_func = func else: raise HalotoolsError("Input ``splitting_model`` must have a callable function " "named ``%s``" % kwargs['splitting_method_name']) elif 'split_abscissa' in list(kwargs.keys()): if custom_len(kwargs['split_abscissa']) != custom_len(split): raise HalotoolsError("``split`` and ``split_abscissa`` must have the same length") self._split_abscissa = kwargs['split_abscissa'] self._split_ordinates = split else: try: self._split_abscissa = [2] self._split_ordinates = [split] except KeyError: msg = ("The _set_percentile_splitting method must at least be called with a ``split``" "keyword argument, or alternatively ``split`` and ``split_abscissa`` arguments.") raise HalotoolsError(msg) def _initialize_assembias_param_dict(self, assembias_strength=0.5, **kwargs): """ """ if not hasattr(self, 'param_dict'): self.param_dict = {} # Make sure the code behaves properly whether or not we were passed an iterable strength = assembias_strength try: iterator = iter(strength) strength = list(strength) except TypeError: strength = [strength] if 'assembias_strength_abscissa' in kwargs: abscissa = kwargs['assembias_strength_abscissa'] try: iterator = iter(abscissa) abscissa = list(abscissa) except TypeError: abscissa = [abscissa] else: abscissa = [2] if custom_len(abscissa) != custom_len(strength): raise HalotoolsError("``assembias_strength`` and ``assembias_strength_abscissa`` " "must have the same length") self._assembias_strength_abscissa = abscissa for ipar, val in enumerate(strength): self.param_dict[self._get_assembias_param_dict_key(ipar)] = val def _decorate_baseline_method(self): """ """ try: baseline_method = getattr(self, self._method_name_to_decorate) setattr(self, 'baseline_'+self._method_name_to_decorate, baseline_method) decorated_method = self.assembias_decorator(baseline_method) setattr(self, self._method_name_to_decorate, decorated_method) except AttributeError: msg = ("The baseline model constructor must be called before " "calling the HeavisideAssembias constructor, \n" "and the baseline model must have a method named ``%s``") raise HalotoolsError(msg % self._method_name_to_decorate)
[docs] def percentile_splitting_function(self, prim_haloprop): """ Method returns the fraction of halos that are ``type-2`` as a function of the input primary halo property. Parameters ----------- prim_haloprop : array_like Array storing the primary halo property. Returns ------- split : float Fraction of ``type2`` halos at the input primary halo property. """ if hasattr(self, '_input_split_func'): result = self._input_split_func(prim_haloprop=prim_haloprop) if np.any(result < 0): msg = ("The input split_func passed to the HeavisideAssembias class" "must not return negative values") raise HalotoolsError(msg) if np.any(result > 1): msg = ("The input split_func passed to the HeavisideAssembias class" "must not return values exceeding unity") raise HalotoolsError(msg) return result elif self._loginterp is True: spline_function = model_helpers.custom_spline( np.log10(self._split_abscissa), self._split_ordinates, k=3) result = spline_function(np.log10(prim_haloprop)) else: spline_function = model_helpers.custom_spline( self._split_abscissa, self._split_ordinates, k=3) result = spline_function(prim_haloprop) result = np.where(result < 0, 0., result) result = np.where(result > 1, 1., result) return result
[docs] def assembias_strength(self, prim_haloprop): """ Method returns the strength of assembly bias as a function of the primary halo property. Parameters ---------- prim_haloprop : array_like Array storing the primary halo property. Returns ------- strength : array_like Strength of assembly bias as a function of the input halo property. """ model_ordinates = (self.param_dict[self._get_assembias_param_dict_key(ipar)] for ipar in range(len(self._assembias_strength_abscissa))) spline_function = model_helpers.custom_spline( self._assembias_strength_abscissa, list(model_ordinates), k=3) if self._loginterp is True: result = spline_function(np.log10(prim_haloprop)) else: result = spline_function(prim_haloprop) result = np.where(result < -1, -1., result) result = np.where(result > 1, 1., result) return result
def _get_assembias_param_dict_key(self, ipar): """ """ return self._method_name_to_decorate + '_' + self.gal_type + '_assembias_param' + str(ipar+1) def _galprop_perturbation(self, **kwargs): """ Method determines how much to boost the baseline function according to the strength of assembly bias and the min/max boost allowable by the requirement that the all-halo baseline function be preserved. The returned perturbation applies to type-1 halos. """ lower_bound_key = 'lower_bound_' + self._method_name_to_decorate + '_' + self.gal_type baseline_lower_bound = getattr(self, lower_bound_key) upper_bound_key = 'upper_bound_' + self._method_name_to_decorate + '_' + self.gal_type baseline_upper_bound = getattr(self, upper_bound_key) try: baseline_result = kwargs['baseline_result'] prim_haloprop = kwargs['prim_haloprop'] splitting_result = kwargs['splitting_result'] except KeyError: msg = ("Must call _galprop_perturbation method of the" "HeavisideAssembias class with the following keyword arguments:\n" "``baseline_result``, ``splitting_result`` and ``prim_haloprop``") raise HalotoolsError(msg) result = np.zeros(len(prim_haloprop)) strength = self.assembias_strength(prim_haloprop) positive_strength_idx = strength > 0 negative_strength_idx = strength < 0 if len(baseline_result[positive_strength_idx]) > 0: base_pos = baseline_result[positive_strength_idx] split_pos = splitting_result[positive_strength_idx] type1_frac_pos = 1 - split_pos strength_pos = strength[positive_strength_idx] upper_bound1 = baseline_upper_bound - base_pos upper_bound2 = ((1 - type1_frac_pos)/type1_frac_pos)*(base_pos - baseline_lower_bound) upper_bound = np.minimum(upper_bound1, upper_bound2) result[positive_strength_idx] = strength_pos*upper_bound if len(baseline_result[negative_strength_idx]) > 0: base_neg = baseline_result[negative_strength_idx] split_neg = splitting_result[negative_strength_idx] type1_frac_neg = 1 - split_neg strength_neg = strength[negative_strength_idx] lower_bound1 = baseline_lower_bound - base_neg lower_bound2 = (1 - type1_frac_neg)/type1_frac_neg*(base_neg - baseline_upper_bound) lower_bound = np.maximum(lower_bound1, lower_bound2) result[negative_strength_idx] = np.abs(strength_neg)*lower_bound return result
[docs] def assembias_decorator(self, func): """ Primary behavior of the `HeavisideAssembias` class. This method is used to introduce a boost/decrement of the baseline function in a manner that preserves the all-halo result. Any function with a semi-bounded range can be decorated with `assembias_decorator`. The baseline behavior can be anything whatsoever, such as mean star formation rate or mean halo occupation, provided it has a semi-bounded range. Parameters ----------- func : function object Baseline function whose behavior is being decorated with assembly bias. Returns ------- wrapper : function object Decorated function that includes assembly bias effects. """ lower_bound_key = 'lower_bound_' + self._method_name_to_decorate + '_' + self.gal_type baseline_lower_bound = getattr(self, lower_bound_key) upper_bound_key = 'upper_bound_' + self._method_name_to_decorate + '_' + self.gal_type baseline_upper_bound = getattr(self, upper_bound_key) def wrapper(*args, **kwargs): ################################################################################# # Retrieve the arrays storing prim_haloprop and sec_haloprop # The control flow below is what permits accepting an input # table or a directly inputting prim_haloprop and sec_haloprop arrays _HAS_table = False if 'table' in kwargs: try: table = kwargs['table'] prim_haloprop = table[self.prim_haloprop_key] sec_haloprop = table[self.sec_haloprop_key] _HAS_table = True except KeyError: msg = ("When passing an input ``table`` to the " " ``assembias_decorator`` method,\n" "the input table must have a column with name ``%s``" "and a column with name ``%s``.\n") raise HalotoolsError(msg % (self.prim_haloprop_key), self.sec_haloprop_key) else: try: prim_haloprop = np.atleast_1d(kwargs['prim_haloprop']) except KeyError: msg = ("\nIf not passing an input ``table`` to the " "``assembias_decorator`` method,\n" "you must pass ``prim_haloprop`` argument.\n") raise HalotoolsError(msg) try: sec_haloprop = np.atleast_1d(kwargs['sec_haloprop']) except KeyError: if 'sec_haloprop_percentile' not in kwargs: msg = ("\nIf not passing an input ``table`` to the " "``assembias_decorator`` method,\n" "you must pass either a ``sec_haloprop`` or " "``sec_haloprop_percentile`` argument.\n") raise HalotoolsError(msg) ################################################################################# # Compute the fraction of type-2 halos as a function of the input prim_haloprop split = self.percentile_splitting_function(prim_haloprop) # Compute the baseline, undecorated result result = func(*args, **kwargs) # We will only decorate values that are not edge cases, # so first compute the mask for non-edge cases no_edge_mask = ( (split > 0) & (split < 1) & (result > baseline_lower_bound) & (result < baseline_upper_bound) ) # Now create convenient references to the non-edge-case sub-arrays no_edge_result = result[no_edge_mask] no_edge_split = split[no_edge_mask] ################################################################################# # Compute the array type1_mask # This array will serve as a boolean mask that divides the halo sample into two subsamples # There are several possible ways that the type1_mask can be computed, depending on # what the decorator was passed as input if _HAS_table is True: # we were passed halo_type_tuple: if hasattr(self, 'halo_type_tuple'): halo_type_key = self.halo_type_tuple[0] halo_type1_val = self.halo_type_tuple[1] type1_mask = table[halo_type_key][no_edge_mask] == halo_type1_val # the value of sec_haloprop_percentile is already stored as a column of the table elif self.sec_haloprop_key + '_percentile' in list(table.keys()): no_edge_percentiles = table[self.sec_haloprop_key + '_percentile'][no_edge_mask] type1_mask = no_edge_percentiles > no_edge_split else: # the value of sec_haloprop_percentile will be computed from scratch percentiles = compute_conditional_percentiles( prim_haloprop=prim_haloprop, sec_haloprop=sec_haloprop ) no_edge_percentiles = percentiles[no_edge_mask] type1_mask = no_edge_percentiles > no_edge_split else: try: percentiles = kwargs['sec_haloprop_percentile'] if custom_len(percentiles) == 1: percentiles = np.zeros(custom_len(prim_haloprop)) + percentiles except KeyError: percentiles = compute_conditional_percentiles( prim_haloprop=prim_haloprop, sec_haloprop=sec_haloprop ) no_edge_percentiles = percentiles[no_edge_mask] type1_mask = no_edge_percentiles > no_edge_split # type1_mask has now been computed for all possible branchings ################################################################################# perturbation = self._galprop_perturbation( prim_haloprop=prim_haloprop[no_edge_mask], baseline_result=no_edge_result, splitting_result=no_edge_split) frac_type1 = 1 - no_edge_split frac_type2 = 1 - frac_type1 perturbation[~type1_mask] *= (-frac_type1[~type1_mask] / (frac_type2[~type1_mask])) no_edge_result += perturbation result[no_edge_mask] = no_edge_result return result return wrapper
[docs]class PreservingNgalHeavisideAssembias(HeavisideAssembias):
[docs] def assembias_mc_occupation(self, seed=None, **kwargs): first_occupation_moment_orig = self.mean_occupation_orig(**kwargs) first_occupation_moment = self.mean_occupation(**kwargs) if self._upper_occupation_bound == 1: with NumpyRNGContext(seed): score = np.random.rand(custom_len(first_occupation_moment_orig)) total = np.count_nonzero(first_occupation_moment_orig > score) result = np.where(first_occupation_moment > score, 1, 0) diff = result.sum() - total if diff < 0: x = (first_occupation_moment / score) result.fill(0) result[x.argsort()[-total:]] = 1 elif diff > 0: x = (1.0-first_occupation_moment) / (1.0-score) result.fill(0) result[x.argsort()[:total]] = 1 elif self._upper_occupation_bound == float("inf"): total = self._poisson_distribution(first_occupation_moment_orig.sum(), seed=seed) if seed is not None: seed += 1 with NumpyRNGContext(seed): score = np.random.rand(total) score.sort() x = first_occupation_moment.cumsum(dtype=np.float64) x /= x[-1] result = np.ediff1d(np.insert(np.searchsorted(score, x), 0, 0)) else: msg = ("\nYou have chosen to set ``_upper_occupation_bound`` to some value \n" "besides 1 or infinity. In such cases, you must also \n" "write your own ``mc_occupation`` method that overrides the method in the \n" "OccupationComponent super-class\n") raise HalotoolsError(msg) if 'table' in kwargs: kwargs['table']['halo_num_'+self.gal_type] = result return result
def _decorate_baseline_method(self): self.mean_occupation_orig = self.mean_occupation self.mc_occupation = self.assembias_mc_occupation super(PreservingNgalHeavisideAssembias, self)._decorate_baseline_method()