"""
Define functions that can be accelerated by numba.
Numba does not use classes, unfortunately.
"""
import numpy as np
from h5py._hl.dataset import Dataset
from .optional_packages import jit, prange, NUM_THREADS
from ._ordered_slices import OrderedSlices
__all__ = [
"jit",
"prange",
"NUM_THREADS",
"ranges_from_array",
"read_ranges_from_file_unchunked",
"index_dataset",
"get_chunk_ranges",
"expand_ranges",
"extract_ranges_from_chunks",
"read_ranges_from_file_chunked",
"read_ranges_from_file",
"read_ranges_from_hdfstream",
"list_of_strings_to_arrays",
"to_native_byteorder_inplace",
]
[docs]
def to_native_byteorder_inplace(arr: np.ndarray) -> None:
"""
Ensure that arr is native endian without making a copy.
Parameters
----------
arr : np.ndarray
Array to convert to native endian.
"""
if arr.dtype.isnative:
return
if arr.dtype.names is None: # standard check for not a structured or recarray
arr.byteswap(inplace=True)
else:
for name in arr.dtype.names:
if not arr[name].dtype.isnative:
arr[name].byteswap(inplace=True)
arr.dtype = arr.dtype.newbyteorder("native")
[docs]
@jit(nopython=True)
def ranges_from_array(array: np.array) -> np.ndarray:
"""
Find contiguous ranges of IDs in sorted list of IDs.
Parameters
----------
array : np.array of int
Sorted list of IDs.
Returns
-------
np.ndarray
List of length two arrays corresponding to contiguous
ranges of IDs (inclusive) in the input array.
Examples
--------
The array:
.. code-block::
[0, 1, 2, 3, 5, 6, 7, 9, 11, 12, 13]
would return:
.. code-block::
[[0, 4], [5, 8], [9, 10], [11, 14]]
"""
output = []
if len(array) == 0:
# the "empty" mask, from 0 to 0 gets 0 elements
return np.array([[0, 0]])
start = array[0]
stop = array[0]
for value in array[1:]:
if value != stop + 1:
output.append([start, stop + 1])
start = value
stop = value
else:
stop = value
output.append([start, stop + 1])
return np.array(output)
[docs]
def read_ranges_from_file_unchunked(
handle: Dataset,
ranges: np.ndarray,
output_shape: tuple,
output_type: type = np.float64,
columns: slice = np.s_[:],
) -> np.array:
"""
Read only a selection of index ranges from a dataset that is not chunked.
Takes a hdf5 dataset, and the set of ranges from
ranges_from_array, and reads only those ranges from the file.
Unfortunately this functionality is not built into HDF5.
Parameters
----------
handle : Dataset
HDF5 dataset to slice data from.
ranges : np.ndarray
Array of ranges (see :func:`ranges_from_array`).
output_shape : tuple
Resultant shape of output.
output_type : type, optional
:class:`numpy.dtype` of output elements. If not supplied, we assume :py:attr:`numpy.float64`.
columns : slice, optional
Selector for columns if using a multi-dimensional array. If the array is only
a single dimension this is not used.
Returns
-------
np.ndarray
Result from reading only the relevant values from ``handle``.
"""
output = np.empty(output_shape, dtype=output_type)
already_read = 0
handle_multidim = handle.ndim > 1
for read_start, read_end in ranges:
if read_end == read_start:
continue
# Because we read inclusively
size_of_range = read_end - read_start
# Construct selectors so we can use read_direct to prevent creating
# copies of data from the hdf5 file.
hdf5_read_sel = (
np.s_[read_start:read_end, columns]
if handle_multidim
else np.s_[read_start:read_end]
)
output_dest_sel = np.s_[already_read : size_of_range + already_read]
handle.read_direct(output, source_sel=hdf5_read_sel, dest_sel=output_dest_sel)
already_read += size_of_range
# Ensure the result is a native endian type
to_native_byteorder_inplace(output)
return output
[docs]
def index_dataset(handle: Dataset, mask_array: np.array) -> np.array:
"""
Index the dataset using the mask array.
This is not currently a feature of h5py. (March 2019)
Parameters
----------
handle : Dataset
Data to be indexed.
mask_array : np.array
Mask used to index data.
Returns
-------
np.ndarray
Subset of the data specified by the mask.
"""
output_type = handle[0].dtype
output_size = mask_array.size
ranges = ranges_from_array(mask_array)
return read_ranges_from_file(handle, ranges, output_size, output_type)
[docs]
@jit(nopython=True, fastmath=True)
def get_chunk_ranges(
ranges: np.ndarray, chunk_size: np.ndarray, array_length: int
) -> np.ndarray:
"""
Return indices indicating which hdf5 chunk each range from ``ranges`` belongs to.
Parameters
----------
ranges : np.ndarray
Array of ranges (see :func:`ranges_from_array`).
chunk_size : int
Size of the hdf5 dataset chunks.
array_length : int
Size of the dataset.
Returns
-------
np.ndarray
Two dimensional array of bounds for the chunks that contain each range from
``ranges``.
"""
chunk_ranges = []
for bounds in ranges:
lower = (bounds[0] // chunk_size) * chunk_size
upper = min(-((-bounds[1]) // chunk_size) * chunk_size, array_length)
# Before appending new chunk range we need to check
# that it doesn't already exist or overlap with an
# existing one. The only way overlap can happen is
# if the current lower index is less than or equal
# to the previous upper one. In that case simply
# update the previous upper to cover current chunk
if len(chunk_ranges) > 0:
if lower > chunk_ranges[-1][1]:
chunk_ranges.append([lower, upper])
elif lower <= chunk_ranges[-1][1]:
chunk_ranges[-1][1] = upper
# If chunk_ranges is empty, don't do any checks
else:
chunk_ranges.append([lower, upper])
return np.array(chunk_ranges)
[docs]
@jit(nopython=True, fastmath=True)
def expand_ranges(ranges: np.ndarray) -> np.array:
"""
Return an array of indices that are within the specified ranges.
Parameters
----------
ranges : np.ndarray
Array of ranges (see :func:`ranges_from_array`).
Returns
-------
np.ndarray
1D array of indices that fall within each range specified in `ranges`.
"""
length = np.asarray([bounds[1] - bounds[0] for bounds in ranges]).sum()
output = np.zeros(length, dtype=np.int64)
i = 0
for bounds in ranges:
lower = bounds[0]
upper = bounds[1]
bound_length = upper - lower
for j in range(bound_length):
output[i + j] = lower + j
i += bound_length
return output
[docs]
def read_ranges_from_file_chunked(
handle: Dataset,
ranges: np.ndarray,
output_shape: tuple,
output_type: type = np.float64,
columns: slice = np.s_[:],
) -> np.array:
"""
Read only a selection of index ranges from a dataset that is chunked.
Takes a hdf5 dataset, and the set of ranges from
ranges_from_array, and reads only those ranges from the file.
Unfortunately this functionality is not built into HDF5.
Parameters
----------
handle : Dataset
HDF5 dataset to slice data from.
ranges : np.ndarray
Array of ranges (see :func:`ranges_from_array`).
output_shape : tuple
Resultant shape of output.
output_type : type, optional
:class:`numpy.dtype` of output elements. If not supplied, we assume :py:attr:`numpy.float64`.
columns : slice, optional
Selector for columns if using a multi-dimensional array. If the array is only
a single dimension this is not used.
Returns
-------
np.ndarray
Result from reading only the relevant values from ``handle``.
"""
chunk_size = handle.chunks[0]
# Make array of chunk ranges
chunk_ranges = get_chunk_ranges(ranges, chunk_size, handle.shape[0])
chunk_range_size = np.diff(chunk_ranges).sum()
try:
output_shape = (chunk_range_size, output_shape[1])
except (TypeError, IndexError):
# Output shape is just a number, we have a 1D array.
output_shape = chunk_range_size
output = np.empty(output_shape, dtype=output_type)
already_read = 0
handle_multidim = handle.ndim > 1
for bounds in chunk_ranges:
read_start, read_end = bounds
if read_end == read_start:
continue
# Because we read inclusively
size_of_range = read_end - read_start
# Construct selectors so we can use read_direct to prevent creating
# copies of data from the hdf5 file.
hdf5_read_sel = (
np.s_[read_start:read_end, columns]
if handle_multidim
else np.s_[read_start:read_end]
)
output_dest_sel = np.s_[already_read : size_of_range + already_read]
handle.read_direct(output, source_sel=hdf5_read_sel, dest_sel=output_dest_sel)
already_read += size_of_range
if handle.chunks is not None:
output = extract_ranges_from_chunks(output, chunk_ranges, ranges)
# Ensure the result is a native endian type
to_native_byteorder_inplace(output)
return output
[docs]
def read_ranges_from_hdfstream(
handle: Dataset,
ranges: np.ndarray,
output_shape: tuple,
output_type: type = np.float64,
columns: slice = np.s_[:],
) -> np.array:
"""
Request the specified ranges from the hdfstream server.
Takes a hdfstream remote dataset, and the set of ranges from
ranges_from_array, and sends a http request for those ranges.
Parameters
----------
handle : Dataset
HDF5 dataset to slice data from.
ranges : np.ndarray
Array of ranges (see :func:`ranges_from_array`).
output_shape : Tuple
Resultant shape of output.
output_type : type, optional
:class:`numpy.dtype` of output elements. If not supplied, we assume :py:attr:`numpy.float64`.
columns : slice, optional
Selector for columns if using a multi-dimensional array. If the array is only
a single dimension this is not used.
Returns
-------
np.ndarray
Result from reading only the relevant values from ``handle``.
"""
# Get a sorted list of slices to request from the server
ordered_slices = OrderedSlices(ranges, handle.ndim, columns)
# Request the slice data as a single ndarray. Here we read into an existing
# buffer so that we should get an exception if the data type or shape is
# not what we expected.
output = np.empty(output_shape, dtype=output_type)
if len(ordered_slices.slices) > 0:
handle.request_slices(ordered_slices.slices, dest=output)
# Put the result into the same order as the input ranges array, if it isn't already
output = ordered_slices.reorder_result(output)
# Ensure the result is a native endian type
to_native_byteorder_inplace(output)
return output
[docs]
def read_ranges_from_file(
handle: Dataset,
ranges: np.ndarray,
output_shape: tuple,
output_type: type = np.float64,
columns: slice = np.s_[:],
) -> np.array:
"""
Correctly select which version of ``read_ranges_from_file`` should be used.
Parameters
----------
handle : Dataset
HDF5 dataset to slice data from.
ranges : np.ndarray
Array of ranges (see :func:`ranges_from_array`).
output_shape : tuple
Resultant shape of output.
output_type : type, optional
:class:`numpy.dtype` of output elements. If not supplied, we assume :py:attr:`numpy.float64`.
columns : slice, optional
Selector for columns if using a multi-dimensional array. If the array is only
a single dimension this is not used.
Returns
-------
np.ndarray
Result from reading only the relevant values from ``handle``.
See Also
--------
read_ranges_from_file_chunked
Reads data ranges for chunked hdf5 file.
read_ranges_from_file_unchunked
Reads data ranges for unchunked hdf5 file.
"""
# It was found that the range size for which read_ranges_from_file_chunked was
# faster than unchunked was approximately 5e5. For ranges larger than this the
# overheads associated with read_ranges_from_file_chunked caused slightly worse
# performance than read_ranges_from_file_unchunked
cross_over_range_size = 5e5
average_range_size = np.diff(ranges).mean()
read_ranges = (
read_ranges_from_file_chunked
if handle.chunks is not None and average_range_size < cross_over_range_size
else read_ranges_from_file_unchunked
)
return (
read_ranges_from_hdfstream if hasattr(handle, "request_slices") else read_ranges
)(handle, ranges, output_shape, output_type, columns)
[docs]
def list_of_strings_to_arrays(lines: list[str]) -> np.array:
"""
Convert a list of space-delimited values to arrays.
Parameters
----------
lines : list[str]
List of strings containing numbers separated by a set of spaces.
Returns
-------
list[np.array]
List of numpy arrays, one per column.
Notes
-----
Currently not suitable for :mod:`numba` acceleration due to mixed datatype usage.
"""
# Calculate types and set up arrays.
arrays = []
dtypes = []
number_of_lines = len(lines)
for item in lines[0].split():
if "." in item or "e" in item:
dtype = np.float64
else:
dtype = np.int64
dtypes.append(dtype)
arrays.append(np.zeros(number_of_lines, dtype=dtype))
for index, line in enumerate(lines):
for dtype, (array, value) in zip(dtypes, enumerate(line.split())):
arrays[array][index] = dtype(value)
return arrays