"""
Module storing the TabularAsciiReader, a class providing a memory-efficient
algorithm for reading a very large ascii file that stores tabular data
of a data type that is known in advance.
"""
import os
import gzip
import collections
from time import time
import numpy as np
from ..utils.python_string_comparisons import _passively_decode_string
__all__ = ("TabularAsciiReader",)
[docs]
class TabularAsciiReader(object):
"""
Class providing a memory-efficient algorithm for
reading a very large ascii file that stores tabular data
of a data type that is known in advance.
When reading ASCII data with
`~halotools.sim_manager.TabularAsciiReader.read_ascii`, user-defined
cuts on columns are applied on-the-fly using a python generator
to yield only those columns whose indices appear in the
input ``columns_to_keep_dict``.
As the file is read, the data is generated in chunks,
and a customizable mask is applied to each newly generated chunk.
The only aggregated data from each chunk are those rows
passing all requested cuts, so that the
`~halotools.sim_manager.TabularAsciiReader`
only requires you to have enough RAM to store the *cut* catalog,
not the entire ASCII file.
The primary method of the class is
`~halotools.sim_manager.TabularAsciiReader.read_ascii`.
The output of this method is a structured Numpy array,
which can then be stored in your preferred binary format
using the built-in Numpy methods, h5py, etc. If you wish to
store the catalog in the Halotools cache, you should instead
use the `~halotools.sim_manager.RockstarHlistReader` class.
The algorithm assumes that data of known, unchanging type is
arranged in a consecutive sequence of lines within the ascii file,
that the data stream begins with the first line that is not the ``header_char``,
and that the first subsequent appearance of an empty line demarcates the end
of the data stream.
"""
def __init__(
self,
input_fname,
columns_to_keep_dict,
header_char="#",
row_cut_min_dict={},
row_cut_max_dict={},
row_cut_eq_dict={},
row_cut_neq_dict={},
num_lines_header=None,
):
"""
Parameters
-----------
input_fname : string
Absolute path to the file storing the ASCII data.
columns_to_keep_dict : dict
Dictionary used to define which columns
of the tabular ASCII data will be kept.
Each key of the dictionary will be the name of the
column in the returned data table. The value bound to
each key is a two-element tuple.
The first tuple entry is an integer providing
the *index* of the column to be kept, starting from 0.
The second tuple entry is a string defining the Numpy dtype
of the data in that column,
e.g., 'f4' for a float, 'f8' for a double,
or 'i8' for a long.
Thus an example ``columns_to_keep_dict`` could be
{'x': (1, 'f4'), 'y': (0, 'i8'), 'z': (9, 'f4')}.
In this case, the structured array returned by the `read_ascii` method
would have three keys:
``x`` storing a float for the data in
the second column of the ASCII file,
``y`` storing a long integer for the data in
the first column of the ASCII file, and
``z`` storing a float for the data in
the tenth column of the ASCII file.
header_char : str, optional
String to be interpreted as a header line
at the beginning of the ascii file.
Default is '#'.
Can alternatively use ``num_lines_header`` optional argument.
num_lines_header : int, optional
Number of lines in the header. Default is None, in which case
header length will be determined by ``header_char`` argument.
row_cut_min_dict : dict, optional
Dictionary used to place a lower-bound cut on the rows
of the tabular ASCII data.
Each key of the dictionary must also
be a key of the input ``columns_to_keep_dict``;
for purposes of good bookeeping, you are not permitted
to place a cut on a column that you do not keep. The value
bound to each key serves as the lower bound on the data stored
in that row. A row with a smaller value than this lower bound for the
corresponding column will not appear in the returned data table.
For example, if row_cut_min_dict = {'mass': 1e10}, then all rows of the
returned data table will have a mass greater than 1e10.
row_cut_max_dict : dict, optional
Dictionary used to place an upper-bound cut on the rows
of the tabular ASCII data.
Each key of the dictionary must also
be a key of the input ``columns_to_keep_dict``;
for purposes of good bookeeping, you are not permitted
to place a cut on a column that you do not keep. The value
bound to each key serves as the upper bound on the data stored
in that row. A row with a larger value than this upper bound for the
corresponding column will not appear in the returned data table.
For example, if row_cut_min_dict = {'mass': 1e15}, then all rows of the
returned data table will have a mass less than 1e15.
row_cut_eq_dict : dict, optional
Dictionary used to place an equality cut on the rows
of the tabular ASCII data.
Each key of the dictionary must also
be a key of the input ``columns_to_keep_dict``;
for purposes of good bookeeping, you are not permitted
to place a cut on a column that you do not keep. The value
bound to each key serves as the required value for the data stored
in that row. Only rows with a value equal to this required value for the
corresponding column will appear in the returned data table.
For example, if row_cut_eq_dict = {'upid': -1}, then *all* rows of the
returned data table will have a upid of -1.
row_cut_neq_dict : dict, optional
Dictionary used to place an inequality cut on the rows
of the tabular ASCII data.
Each key of the dictionary must also
be a key of the input ``columns_to_keep_dict``;
for purposes of good bookeeping, you are not permitted
to place a cut on a column that you do not keep. The value
bound to each key serves as a forbidden value for the data stored
in that row. Rows with a value equal to this forbidden value for the
corresponding column will not appear in the returned data table.
For example, if row_cut_neq_dict = {'upid': -1}, then *no* rows of the
returned data table will have a upid of -1.
Examples
---------
Suppose you are only interested in reading the tenth and fifth
columns of data of your ascii file, and that these columns store
a float variable you want to call *mass*, and a long integer variable
you want to call *id*, respectively. If you want a Numpy structured array
storing *all* rows of these two columns:
>>> cols = {'mass': (9, 'f4'), 'id': (4, 'i8')}
>>> reader = TabularAsciiReader(fname, cols) # doctest: +SKIP
>>> arr = reader.read_ascii() # doctest: +SKIP
If you are only interested in rows where *mass* exceeds 1e10:
>>> row_cut_min_dict = {'mass': 1e10}
>>> reader = TabularAsciiReader(fname, cols, row_cut_min_dict = row_cut_min_dict) # doctest: +SKIP
>>> arr = reader.read_ascii() # doctest: +SKIP
Finally, suppose the fortieth column stores an integer called *resolved*,
and in addition to the above mass cut, you do not wish to store
any rows for which the *resolved* column value equals zero.
As described above, you are not permitted to make a row-cut on a column
that you do not keep, so in addition to defining the new row cut,
you must also include the *resolved* column in your *columns_to_keep_dict*:
>>> cols = {'mass': (9, 'f4'), 'id': (4, 'i8'), 'resolved': (39, 'i4')}
>>> row_cut_neq_dict = {'resolved': 0}
>>> reader = TabularAsciiReader(fname, cols, row_cut_neq_dict = row_cut_neq_dict, row_cut_min_dict = row_cut_min_dict) # doctest: +SKIP
>>> arr = reader.read_ascii() # doctest: +SKIP
"""
self.input_fname = _passively_decode_string(self._get_fname(input_fname))
self.header_char = self._get_header_char(header_char)
self.num_lines_header = num_lines_header
self._determine_compression_safe_file_opener()
self._process_columns_to_keep(columns_to_keep_dict)
self.row_cut_min_dict = row_cut_min_dict
self.row_cut_max_dict = row_cut_max_dict
self.row_cut_eq_dict = row_cut_eq_dict
self.row_cut_neq_dict = row_cut_neq_dict
self._verify_input_row_cuts_keys()
self._verify_min_max_consistency()
self._verify_eq_neq_consistency()
self._enforce_no_repeated_columns()
def _verify_input_row_cuts_keys(self, **kwargs):
"""Require all columns upon which a row-cut is placed to also appear in
the input ``columns_to_keep_dict``. For purposes of good bookeeping,
you are not permitted to place a cut on a column that you do not keep.
"""
potential_row_cuts = (
"row_cut_min_dict",
"row_cut_max_dict",
"row_cut_eq_dict",
"row_cut_neq_dict",
)
for row_cut_key in potential_row_cuts:
row_cut_dict = getattr(self, row_cut_key)
for key in row_cut_dict:
try:
assert key in list(self.columns_to_keep_dict.keys())
except AssertionError:
msg = (
"\nThe ``" + key + "`` key does not appear in the input \n"
"``columns_to_keep_dict``, but it does appear in the "
"input ``" + row_cut_key + "``. \n"
"It is not permissible to place a cut "
"on a column that you do not keep.\n"
)
raise KeyError(msg)
def _verify_min_max_consistency(self, **kwargs):
"""Verify that no min_cut column has a value greater to the corresponding max_cut.
Such a choice would laboriously result in a final catalog with zero entries.
"""
for row_cut_min_key, row_cut_min in self.row_cut_min_dict.items():
try:
row_cut_max = self.row_cut_max_dict[row_cut_min_key]
if row_cut_max <= row_cut_min:
msg = (
"\nFor the ``" + row_cut_min_key + "`` column, \n"
"you set the value of the input ``row_cut_min_dict`` to "
+ str(row_cut_min)
+ "\nand the value of the input "
"``row_cut_max_dict`` to " + str(row_cut_max) + "\n"
"This will result in zero selected rows and is not permissible.\n"
)
raise ValueError(msg)
except KeyError:
pass
for row_cut_max_key, row_cut_max in self.row_cut_max_dict.items():
try:
row_cut_min = self.row_cut_min_dict[row_cut_max_key]
if row_cut_min >= row_cut_max:
msg = (
"\nFor the ``" + row_cut_max_key + "`` column, \n"
"you set the value of the input ``row_cut_max_dict`` to "
+ str(row_cut_max)
+ "\nand the value of the input "
"``row_cut_min_dict`` to " + str(row_cut_min) + "\n"
"This will result in zero selected rows and is not permissible.\n"
)
raise ValueError(msg)
except KeyError:
pass
def _verify_eq_neq_consistency(self, **kwargs):
"""Verify that no neq_cut column has a value equal to the corresponding eq_cut.
Such a choice would laboriously result in a final catalog with zero entries.
"""
for row_cut_eq_key, row_cut_eq in self.row_cut_eq_dict.items():
try:
row_cut_neq = self.row_cut_neq_dict[row_cut_eq_key]
if row_cut_neq == row_cut_eq:
msg = (
"\nFor the ``" + row_cut_eq_key + "`` column, \n"
"you set the value of the input ``row_cut_eq_dict`` to "
+ str(row_cut_eq)
+ "\nand the value of the input "
"``row_cut_neq_dict`` to " + str(row_cut_neq) + "\n"
"This will result in zero selected rows and is not permissible.\n"
)
raise ValueError(msg)
except KeyError:
pass
for row_cut_neq_key, row_cut_neq in self.row_cut_neq_dict.items():
try:
row_cut_eq = self.row_cut_eq_dict[row_cut_neq_key]
if row_cut_eq == row_cut_neq:
msg = (
"\nFor the ``" + row_cut_neq_key + "`` column, \n"
"you set the value of the input ``row_cut_neq_dict`` to "
+ str(row_cut_neq)
+ "\nand the value of the input "
"``row_cut_eq_dict`` to " + str(row_cut_eq) + "\n"
"This will result in zero selected rows and is not permissible.\n"
)
raise ValueError(msg)
except KeyError:
pass
def _process_columns_to_keep(self, columns_to_keep_dict):
"""Private method performs sanity checks in the input ``columns_to_keep_dict``
and uses this input to define two attributes used for future bookkeeping,
``self.column_indices_to_keep`` and ``self.dt``.
"""
for key, value in columns_to_keep_dict.items():
try:
assert type(value) == tuple
assert len(value) == 2
except AssertionError:
msg = (
"\nThe value bound to every key of the input ``columns_to_keep_dict``\n"
"must be a two-element tuple.\n"
"The ``" + key + "`` is not the required type.\n"
)
raise TypeError(msg)
column_index, dtype = value
try:
assert type(column_index) == int
except AssertionError:
msg = (
"\nThe first element of the two-element tuple bound to every key of \n"
"the input ``columns_to_keep_dict`` must an integer.\n"
"The first element of the ``"
+ key
+ "`` is not the required type.\n"
)
raise TypeError(msg)
try:
dt = np.dtype(dtype)
except:
msg = (
"\nThe second element of the two-element tuple bound to every key of \n"
"the input ``columns_to_keep_dict`` must be a string recognized by Numpy\n"
"as a data type, e.g., 'f4' or 'i8'.\n"
"The second element of the ``"
+ key
+ "`` is not the required type.\n"
)
raise TypeError(msg)
self.columns_to_keep_dict = columns_to_keep_dict
# Create a hard copy of the dict keys to ensure that
# self.column_indices_to_keep and self.dt are defined
# according to the same sequence
column_key_list = list(columns_to_keep_dict.keys())
# Only data columns with indices in self.column_indices_to_keep
# will be yielded by the data_chunk_generator
self.column_indices_to_keep = list(
[columns_to_keep_dict[key][0] for key in column_key_list]
)
# The rows of data yielded by the data_chunk_generator
# will be assumed to be the following Numpy dtype
self.dt = np.dtype(
[(key, columns_to_keep_dict[key][1]) for key in column_key_list]
)
def _get_fname(self, input_fname):
"""Verify that the input fname exists on disk."""
# Check whether input_fname exists.
if not os.path.isfile(input_fname):
# Check to see whether the uncompressed version is available instead
if not os.path.isfile(input_fname[:-3]):
msg = "Input filename %s is not a file"
raise IOError(msg % input_fname)
else:
msg = (
"Input filename ``%s`` is not a file. \n"
"However, ``%s`` exists, so change your input_fname accordingly."
)
raise IOError(msg % (input_fname, input_fname[:-3]))
return _passively_decode_string(os.path.abspath(input_fname))
def _enforce_no_repeated_columns(self):
duplicates = list(
k
for k, v in list(collections.Counter(self.column_indices_to_keep).items())
if v > 1
)
if len(duplicates) > 0:
example_repeated_column_index = str(duplicates[0])
msg = (
"\nColumn number "
+ example_repeated_column_index
+ " appears more than once in your ``columns_to_keep_dict``."
)
raise ValueError(msg)
def _get_header_char(self, header_char):
"""Verify that the input header_char is
a one-character string or unicode variable.
"""
try:
assert (type(header_char) == str) or (type(header_char) == bytes)
assert len(header_char) == 1
except AssertionError:
msg = (
"\nThe input ``header_char`` must be a single string/bytes character.\n"
)
raise TypeError(msg)
return header_char
def _determine_compression_safe_file_opener(self):
"""Determine whether to use *open* or *gzip.open* to read
the input file, depending on whether or not the file is compressed.
"""
f = gzip.open(self.input_fname, "r")
try:
f.read(1)
self._compression_safe_file_opener = gzip.open
except IOError:
self._compression_safe_file_opener = open
finally:
f.close()
[docs]
def data_len(self):
"""
Number of rows of data in the input ASCII file.
Returns
--------
Nrows_data : int
Total number of rows of data.
Notes
-------
The returned value is computed as the number of lines
between the returned value of `header_len` and
the next appearance of an empty line.
The `data_len` method is the particular section of code
where where the following assumptions are made:
1. The data begins with the first appearance of a non-empty line that does not begin with the character defined by ``self.header_char``.
2. The data ends with the next appearance of an empty line.
"""
Nrows_data = 0
with self._compression_safe_file_opener(self.input_fname, "r") as f:
for i, l in enumerate(f):
if (l[0 : len(self.header_char)] != self.header_char) and (l != "\n"):
Nrows_data += 1
return Nrows_data
[docs]
def data_chunk_generator(self, chunk_size, f):
"""
Python generator uses f.readline() to march
through an input open file object to yield
a chunk of data with length equal to the input ``chunk_size``.
The generator only yields columns that were included
in the ``columns_to_keep_dict`` passed to the constructor.
Parameters
-----------
chunk_size : int
Number of rows of data in the chunk being generated
f : File
Open file object being read
Returns
--------
chunk : tuple
Tuple of data from the ascii.
Only data from ``column_indices_to_keep`` are yielded.
"""
cur = 0
while cur < chunk_size:
line = f.readline()
parsed_line = line.strip().split()
yield tuple(parsed_line[i] for i in self.column_indices_to_keep)
cur += 1
[docs]
def apply_row_cut(self, array_chunk):
"""Method applies a boolean mask to the input array
based on the row-cuts determined by the
dictionaries passed to the constructor.
Parameters
-----------
array_chunk : Numpy array
Returns
--------
cut_array : Numpy array
"""
mask = np.ones(len(array_chunk), dtype=bool)
for colname, lower_bound in self.row_cut_min_dict.items():
mask *= array_chunk[colname] > lower_bound
for colname, upper_bound in self.row_cut_max_dict.items():
mask *= array_chunk[colname] < upper_bound
for colname, equality_condition in self.row_cut_eq_dict.items():
mask *= array_chunk[colname] == equality_condition
for colname, inequality_condition in self.row_cut_neq_dict.items():
mask *= array_chunk[colname] != inequality_condition
return array_chunk[mask]
[docs]
def read_ascii(self, chunk_memory_size=500):
"""Method reads the input ascii and returns
a structured Numpy array of the data
that passes the row- and column-cuts.
Parameters
----------
chunk_memory_size : int, optional
Determine the approximate amount of Megabytes of memory
that will be processed in chunks. This variable
must be smaller than the amount of RAM on your machine;
choosing larger values typically improves performance.
Default is 500 Mb.
Returns
--------
full_array : array_like
Structured Numpy array storing the rows and columns
that pass the input cuts. The columns of this array
are those selected by the ``column_indices_to_keep``
argument passed to the constructor.
See also
----------
data_chunk_generator
"""
print(("\n...Processing ASCII data of file: \n%s\n " % self.input_fname))
start = time()
file_size = os.path.getsize(self.input_fname)
# convert to bytes to match units of file_size
chunk_memory_size *= 1e6
num_data_rows = int(self.data_len())
print(("Total number of rows in detected data = %i" % num_data_rows))
# Set the number of chunks to be filesize/chunk_memory,
# but enforcing that 0 < Nchunks <= num_data_rows
try:
Nchunks = int(max(1, min(file_size / chunk_memory_size, num_data_rows)))
except ZeroDivisionError:
msg = "\nMust choose non-zero size for input " "``chunk_memory_size``"
raise ValueError(msg)
num_rows_in_chunk = int(num_data_rows // Nchunks)
num_full_chunks = int(num_data_rows // num_rows_in_chunk)
num_rows_in_chunk_remainder = num_data_rows - num_rows_in_chunk * Nchunks
header_length = int(self.header_len())
print(("Number of rows in detected header = %i \n" % header_length))
chunklist = []
with self._compression_safe_file_opener(self.input_fname, "r") as f:
for skip_header_row in range(header_length):
_s = f.readline()
for _i in range(num_full_chunks):
print(
("... working on chunk " + str(_i) + " of " + str(num_full_chunks))
)
chunk_array = np.array(
list(self.data_chunk_generator(num_rows_in_chunk, f)), dtype=self.dt
)
cut_chunk = self.apply_row_cut(chunk_array)
chunklist.append(cut_chunk)
# Now for the remainder chunk
chunk_array = np.array(
list(self.data_chunk_generator(num_rows_in_chunk_remainder, f)),
dtype=self.dt,
)
cut_chunk = self.apply_row_cut(chunk_array)
chunklist.append(cut_chunk)
full_array = np.concatenate(chunklist)
end = time()
runtime = end - start
if runtime > 60:
runtime = runtime / 60.0
msg = "Total runtime to read in ASCII = %.1f minutes\n"
else:
msg = "Total runtime to read in ASCII = %.2f seconds\n"
print((msg % runtime))
print("\a")
return full_array