"""
"""
from __future__ import division, print_function, absolute_import, unicode_literals
import numpy as np
from astropy.utils.misc import NumpyRNGContext
__all__ = ('calculate_first_idx_unique_array_vals',
'calculate_last_idx_unique_array_vals', 'sum_in_bins',
'random_indices_within_bin', 'calculate_entry_multiplicity')
[docs]
def calculate_first_idx_unique_array_vals(sorted_array, testing_mode=False):
""" Given an integer array with possibly repeated entries in ascending order,
return the indices of the first appearance of each unique value.
Parameters
----------
sorted_array : array
Integer array of host halo IDs, sorted in ascending order
testing_mode : bool, optional
Boolean specifying whether input arrays will be tested to see if they
satisfy the assumptions required by the algorithm.
Setting ``testing_mode`` to True is useful for unit-testing purposes,
while setting it to False improves performance.
Default is False.
If this function raises an unexpected exception, try setting ``testing_mode``
to True to identify which specific assumption about the inputs is not being met.
Returns
--------
idx_unique_array_vals : array
Integer array storing the indices of the first appearance of
each unique entry in sorted_array
Notes
------
By construction, the first element of `calculate_first_idx_unique_array_vals`
will always be zero.
Examples
--------
>>> sorted_array = np.array((0, 0, 1, 1, 4, 8, 8, 10))
>>> result = calculate_first_idx_unique_array_vals(sorted_array)
>>> assert np.all(result == (0, 2, 4, 5, 7))
"""
if testing_mode is True:
try:
assert np.all(np.diff(sorted_array) >= 0)
except AssertionError:
msg = "Input ``sorted_array`` array must be sorted in ascending order"
raise ValueError(msg)
return np.concatenate(([0], np.flatnonzero(np.diff(sorted_array)) + 1))
[docs]
def calculate_last_idx_unique_array_vals(sorted_array, testing_mode=False):
""" Given an integer array with possibly repeated entries in ascending order,
return the indices of the last appearance of each unique value.
Parameters
----------
sorted_array : array
Integer array of host halo IDs, sorted in ascending order
testing_mode : bool, optional
Boolean specifying whether input arrays will be tested to see if they
satisfy the assumptions required by the algorithm.
Setting ``testing_mode`` to True is useful for unit-testing purposes,
while setting it to False improves performance.
Default is False.
If this function raises an unexpected exception, try setting ``testing_mode``
to True to identify which specific assumption about the inputs is not being met.
Returns
--------
idx_unique_array_vals : array
Integer array storing the indices of the last appearance of
each unique entry in sorted_array
Notes
------
By construction, the first element of `calculate_first_idx_unique_array_vals`
will always be len(sorted_array)-1.
Examples
--------
>>> sorted_array = np.array((0, 0, 1, 1, 4, 8, 8, 10))
>>> result = calculate_last_idx_unique_array_vals(sorted_array)
>>> assert np.all(result == (1, 3, 4, 6, 7))
"""
if testing_mode is True:
try:
assert np.all(np.diff(sorted_array) >= 0)
except AssertionError:
msg = "Input ``sorted_array`` array must be sorted in ascending order"
raise ValueError(msg)
return np.append(np.flatnonzero(np.diff(sorted_array)), len(sorted_array)-1)
[docs]
def sum_in_bins(arr, sorted_bin_numbers, testing_mode=False):
""" Given an array of values ``arr`` and another equal-length array
``sorted_bin_numbers`` storing how these values have been binned into *Nbins*,
calculate the sum of the values in each bin.
Parameters
-----------
arr : array
Array of length *Nvals* storing the quantity to be summed
in the bins defined by ``sorted_bin_numbers``.
sorted_bin_numbers : array
Integer array of length *Nvals* storing the bin numbers
of each entry of the input ``arr``,
e.g., the result of np.digitize(arr, bins).
The ``sorted_bin_numbers`` array may have repeated entries but must
be in ascending order. That is, the subhalos whose property is
stored in array ``arr`` will be presumed to be pre-grouped according to,
for example, host halo mass, with lowest halo masses first,
and higher halo masses at higher indices, in monotonic fashion.
testing_mode : bool, optional
Boolean specifying whether input arrays will be tested to see if they
satisfy the assumptions required by the algorithm.
Setting ``testing_mode`` to True is useful for unit-testing purposes,
while setting it to False improves performance.
Default is False.
Returns
--------
binned_sum : array
Array of length-*Nbins* storing the sum of ``arr`` values
within the bins defined by ``sorted_bin_numbers``.
Examples
--------
>>> Nvals = 5
>>> arr = np.arange(Nvals)
>>> sorted_bin_numbers = np.array((1, 2, 2, 6, 7))
>>> result = sum_in_bins(arr, sorted_bin_numbers)
"""
try:
assert len(arr) == len(sorted_bin_numbers)
except AssertionError:
raise ValueError("Input ``arr`` and ``sorted_bin_numbers`` must have same length")
if testing_mode is True:
try:
assert np.all(np.diff(sorted_bin_numbers) >= 0)
except AssertionError:
msg = ("Input ``sorted_bin_numbers`` array must be sorted in ascending order")
raise ValueError(msg)
last_idx = calculate_last_idx_unique_array_vals(sorted_bin_numbers)
first_entry = arr[:last_idx[0]+1].sum()
binned_sum = np.diff(np.cumsum(arr)[last_idx])
binned_sum = np.concatenate(([first_entry], binned_sum))
return binned_sum
[docs]
def random_indices_within_bin(binned_multiplicity, desired_binned_occupations,
seed=None, min_required_entries_per_bin=None):
""" Given two equal-length arrays, with ``desired_binned_occupations``
defining the number of desired random draws per bin,
and ``binned_multiplicity`` defining the number of indices in each bin
that are available to be randomly drawn,
return a set of indices such that
only the appropriate indices will be drawn for each bin, and the total
number of such random draws is in accord with the input ``desired_binned_occupations``.
The ``random_indices_within_bin`` function is the kernel of the calculation in which
satellites are assigned to host halos that do not have enough subhalos
to serve as satellites. The algorithm implemented here enables, for example,
the random selection of a subhalo that resides in a host of a nearby mass.
Parameters
-----------
binned_multiplicity : array
Array of length-*Nbins* storing how many total items
reside in each bin.
All entries of ``binned_multiplicity`` must be at least as large
as ``min_required_entries_per_bin``, enforcing a user-specified requirement
that in each bin, you must have "enough" entries to draw from.
desired_binned_occupations : array
Array of length-*Nbins* of non-negative integers storing
the number of times to draw from each bin.
seed : integer, optional
Random number seed used when drawing random numbers with `numpy.random`.
Useful when deterministic results are desired, such as during unit-testing.
Default is None, producing stochastic results.
min_required_entries_per_bin : int, optional
Minimum requirement on the number of entries in each bin. Default is 1.
This requirement is only applied for bins with non-zero
values of ``desired_binned_occupations``.
Returns
-------
indices : array
Integer array of length equal to desired_binned_occupations.sum()
whose values can be used to index the appropriate entries of the subhalo table.
Examples
---------
>>> binned_multiplicity = np.array([1, 2, 2, 1, 3])
>>> desired_binned_occupations = np.array([2, 1, 3, 0, 2])
>>> idx = random_indices_within_bin(binned_multiplicity, desired_binned_occupations)
The ``idx`` array has *desired_binned_occupations.sum()* total entries,
with each entry storing the index of the subhalo table that will serve as a
randomly selected satellite.
"""
if min_required_entries_per_bin is None:
min_required_entries_per_bin = 1
try:
assert np.all(desired_binned_occupations >= 0)
except AssertionError:
msg = ("All entries of input ``desired_binned_occupations``\n"
"must be non-negative integers.\n")
raise ValueError(msg)
num_draws = desired_binned_occupations.sum()
if num_draws == 0:
return np.array([], dtype=int)
try:
assert np.all(binned_multiplicity[desired_binned_occupations > 0] >= min_required_entries_per_bin)
except AssertionError:
msg = ("Input ``binned_multiplicity`` array must contain at least \n"
"min_required_entries_per_bin = {0} entries. \nThis indicates that "
"the host halo mass bins should be broader.\n".format(min_required_entries_per_bin))
raise ValueError(msg)
with NumpyRNGContext(seed):
uniform_random = np.random.rand(num_draws)
num_available_subs = np.repeat(binned_multiplicity.astype(int),
desired_binned_occupations.astype(int))
intra_bin_indices = np.floor(uniform_random*num_available_subs)
first_bin_indices = np.concatenate(([0], np.cumsum(binned_multiplicity)[:-1]))
repeated_first_bin_indices = np.repeat(first_bin_indices,
desired_binned_occupations.astype(int))
absolute_indices = intra_bin_indices + repeated_first_bin_indices
return absolute_indices
[docs]
def calculate_entry_multiplicity(sorted_repeated_hostids, unique_possible_hostids,
testing_mode=False):
""" Given an array of possible hostids, and a sorted array of
(possibly repeated) hostids, return the number of appearances of each hostid.
This function can serve as the kernel, for example, for the calculation of
the number of subhalos in each host halo.
Parameters
----------
sorted_repeated_hostids : array
Length-*num_entries* integer array storing a collection of hostids.
The entries of ``sorted_repeated_hostids`` may be repeated,
but must be in ascending order.
Each entry of ``sorted_repeated_hostids`` must appear in the
``unique_possible_hostids``.
For halo analysis applications, this would be the ``halo_hostid`` column
of some set of subhalos.
unique_possible_hostids : array
Length-*num_hostids* integer array storing
the set of all available values for hostid.
All entries must be unique.
An entry of ``unique_possible_hostids`` need not necessarily appear in
``sorted_repeated_hostids``.
The ``unique_possible_hostids`` array can be sorted in any order.
For halo analysis applications, this would be the ``halo_id`` column
of the complete set of *host* halos.
testing_mode : bool, optional
Boolean specifying whether input arrays will be tested to see if they
satisfy the assumptions required by the algorithm.
Setting ``testing_mode`` to True is useful for unit-testing purposes,
while setting it to False improves performance.
Default is False.
If this function raises an unexpected exception, try setting ``testing_mode``
to True to identify which specific assumption about the inputs is not being met.
Returns
---------
entry_multiplicity : array
Length-*num_hostids* integer array storing the number of
times each entry of ``unique_possible_hostids`` appears
in ``sorted_repeated_hostids``.
Examples
--------
>>> sorted_repeated_hostids = np.array((1, 1, 2, 2, 2, 4, 5, 6, 6))
>>> unique_possible_hostids = np.arange(7)
>>> entry_multiplicity = calculate_entry_multiplicity(sorted_repeated_hostids, unique_possible_hostids)
>>> assert np.all(entry_multiplicity == (0, 2, 3, 0, 1, 1, 2))
"""
if testing_mode:
try:
assert np.all(np.diff(sorted_repeated_hostids) >= 0)
except AssertionError:
msg = "Input ``sorted_repeated_hostids`` array is not sorted in ascending order"
raise ValueError(msg)
s1 = set(unique_possible_hostids)
try:
assert len(s1) == len(unique_possible_hostids)
except AssertionError:
msg = "All entries of ``unique_possible_hostids`` must be unique"
raise ValueError(msg)
s2 = set(sorted_repeated_hostids)
try:
unmatched_entries = s2 - s1
assert len(unmatched_entries) == 0
except AssertionError:
example_unmatched_entry = list(unmatched_entries)[0]
msg = ("Each entry of sorted_repeated_hostids "
"must appear in unique_possible_hostids.\n"
"The following entry appears in ``sorted_repeated_hostids`` "
"but not in ``unique_possible_hostids``:\n\n{0}\n\n".format(example_unmatched_entry))
raise ValueError(msg)
unique_appearances_of_hostid, unique_entry_multiplicity = (
np.unique(sorted_repeated_hostids, return_counts=True))
hostid_has_match = np.in1d(unique_possible_hostids, unique_appearances_of_hostid,
assume_unique=True)
entry_multiplicity = np.zeros_like(unique_possible_hostids)
entry_multiplicity[hostid_has_match] = unique_entry_multiplicity
return entry_multiplicity