# Source code for halotools.utils.probabilistic_binning

```""" Functions used to probabilistically digitize an array
"""
import numpy as np
from astropy.utils.misc import NumpyRNGContext

__all__ = ('fuzzy_digitize', )

[docs]
def fuzzy_digitize(x, centroids, min_counts=2, seed=43):
""" Function assigns each element of the input array ``x`` to a centroid number.

Centroid-assignment is probabilistic. When a point in ``x`` is halfway between two centroids,
it is equally likely to be assigned to the centroid to its left or right;
when a point in ``x`` is coincident with a centroid,
it will be assigned to that centroid with unit probability; assignment probability
increases linearly as points approach a centroid.

The `fuzzy_digitize` function optionally enforces that elements of very sparsely
populated bins are remapped to the nearest bin with more than ``min_counts`` elements.

Parameters
----------
x : ndarray
Numpy array of shape (npts, ) storing the values to be binned

centroids : ndarray
Numpy array of shape (num_centroids, ). The values of ``centroids`` must
strictly encompass the range of values spanned by ``x`` and must also be
monotonically increasing.

min_counts : int, optional
Minimum required number of elements assigned to each centroid.
For those centroids not satisfying this requirement,
all their elements will be reassigned to the nearest sufficiently populated centroid.
Default is two.

seed : int, optional
Random number seed. Default is 43.

Returns
-------
centroid_indices : ndarray
Numpy integer array of shape (npts, ) storing the index of the centroid
to which elements of ``x`` are assigned. All integer values of ``centroid_indices``
will lie in the closed interval [0, num_centroids-1].

Examples
--------
>>> npts = int(1e5)
>>> xmin, xmax = 0, 8
>>> x = np.random.uniform(xmin, xmax, npts)
>>> epsilon, nbins = 0.001, 5
>>> xbin_edges = np.linspace(xmin-epsilon, xmax+epsilon, nbins)
>>> centroid_indices = fuzzy_digitize(x, xbin_edges)

.. image:: /_static/fuzzy_binning_example.png

"""
assert np.all(np.diff(centroids) > 0), "centroids must be monotonically increasing"
assert centroids[0] < x.min(), "smallest bin must be less than smallest element in x"
assert centroids[-1] > x.max(), "largest bin must be less than largest element in x"

npts_x = len(x)
num_centroids = len(centroids)
centroid_indices = np.zeros_like(x).astype(int)-999

with NumpyRNGContext(seed):
uran = np.random.rand(npts_x)

for i, low, high in zip(np.arange(num_centroids).astype(int), centroids[:-1], centroids[1:]):
bin_mask = (x >= low) & (x < high)

if npts_bin > 0:
dx_bin = high - low
x_in_bin_rescaled = (x_in_bin - low)/float(dx_bin)

bin_assignment = np.zeros(npts_bin).astype(int) + i
bin_assignment[high_bin_selection] = i + 1

centroid_indices[centroid_indices == -999] = 0

return enforce_bin_counts(centroid_indices, min_counts)

def enforce_bin_counts(centroid_indices, min_counts):
""" Function enforces that each entry of `centroid_indices` appears at least `min_counts` times.
For entries not satisfying this requirement, the nearest index of a sufficiently populated bin
will be used as a replacement.

Parameters
----------
centroid_indices : ndarray
Numpy integer array storing bin numbers

min_counts : int
Minimum acceptable number of elements per bin

Returns
-------
output_bin_inidices : ndarray
Numpy integer array storing bin numbers after enforcing the population requirement.

Examples
--------
>>> centroid_indices = np.random.randint(0, 1000, 1000)
>>> min_counts = 3
>>> output_centroid_indices = enforce_bin_counts(centroid_indices, min_counts)
"""
if min_counts == 0:
return centroid_indices
else:
output_centroid_indices = np.copy(centroid_indices)
unique_bin_numbers, counts = np.unique(centroid_indices, return_counts=True)
for i, bin_number, count in zip(np.arange(len(counts)), unique_bin_numbers, counts):
new_bin_number = _find_nearest_populated_bin_number(
counts, unique_bin_numbers, i, min_counts)
if new_bin_number != bin_number:
output_centroid_indices[centroid_indices==bin_number] = new_bin_number
return output_centroid_indices

def _find_nearest_populated_bin_number(counts, bin_numbers, bin_index, min_counts):
""" Helper function used by the `enforce_bin_counts` function.
"""
bin_numbers = np.atleast_1d(bin_numbers)
centroid_indices = np.arange(len(bin_numbers))
counts = np.atleast_1d(counts)
msg = "Must have at least one bin with greater than {0} elements"
assert np.any(counts >= min_counts), msg.format(min_counts)