"""
Main objects for reading SWIFT datasets.
These include:
+ SWIFTUnits, which is a unit system that can be queried for units (and converts arrays
to relevant unyt arrays when read from the HDF5 file)
+ SWIFTMetadata, which contains all of the metadata from the file
+ __SWIFTGroupDataset, which contains particle information but should never be
directly accessed. Use ``_generate_datasets`` to create one of these. The reasoning
here is that properties can only be added to the class afterwards, and not
directly in an _instance_ of the class.
+ SWIFTDataset, a container class for all of the above.
"""
from swiftsimio.accelerated import read_ranges_from_file
from swiftsimio.objects import cosmo_array, cosmo_quantity
from swiftsimio.masks import SWIFTMask
from swiftsimio.metadata.objects import (
_metadata_discriminator,
SWIFTUnits,
SWIFTGroupMetadata,
)
from swiftsimio.metadata.field.attr_reader import (
load_field_units as _load_field_units,
load_field_description as _load_field_description,
load_field_compression as _load_field_compression,
load_field_cosmo_factor as _load_field_cosmo_factor,
load_field_physical as _load_field_physical,
load_field_valid_transform as _load_field_valid_transform,
)
from swiftsimio._handle_provider import HandleProvider
import h5py
import numpy as np
from typing import Callable
from pathlib import Path
def _generate_getter(
name: str,
*,
filename: str,
field: str,
mask: np.ndarray | None,
mask_size: int,
column_index: int | None = None,
) -> Callable[["__SWIFTGroupDataset"], cosmo_array]:
"""
Generate a function that retrieves data from file if not already in memory.
The process is:
a) If self._`name` exists, return it
b) If not, open `filename`
c) Reads filename[`field`]
d) Set self._`name`
e) Return self._`name`.
Parameters
----------
name : str
Output name (snake_case) of the field.
filename : str
Filename of the HDF5 file that everything will be read from. Used to generate
the HDF5 dataset.
field : str
Full path of field, including e.g. particle type. Examples include
``/PartType0/Velocities``.
mask : np.ndarray, optional
Mask to be used with ``accelerated.read_ranges_from_file``, i.e. an array of
integers that describe ranges to be read from the file.
mask_size : int
Size of the mask if present.
column_index : int, optional
Index specifying which columns to read from the numpy array.
If not provided, we read all columns and return an n-dimensional array.
Returns
-------
Callable
A getter: callable object that gets the value of the array that has been saved to
``_name``. This function takes only ``self`` from the ``__SWIFTGroupDataset``
class.
Notes
-----
The major use of this function is for its side effect of setting ``_name`` as
a member of the class on first read. When the attribute is accessed, it will
be dynamically read from the file, to keep initial memory usage as minimal
as possible.
If the resultant array is modified, it will not be re-read from the file.
"""
# Must do this _outside_ getter because of weird locality issues with the
# use of None as the default.
# Here, we need to ensure that in the cases where we're using columns,
# during a partial read, that we respect the single-column dataset nature.
use_columns = column_index is not None
columns = np.s_[:] if not use_columns else np.s_[column_index]
def getter(self: __SWIFTGroupDataset) -> cosmo_array:
"""
Get the data for this dataset, reading from from disk if it's not in memory.
Parameters
----------
self : __SWIFTGroupDataset
The containing dataset class that this getter is assigned to.
Returns
-------
cosmo_array
The dataset.
"""
current_value = getattr(self, f"_{name}")
if current_value is not None:
return current_value
else:
with self.open_file() as handle:
try:
attributes = handle[field].attrs
unit = _load_field_units(attributes, self.metadata.units)
cf = _load_field_cosmo_factor(attributes, self.metadata)
description = _load_field_description(attributes)
compression = _load_field_compression(attributes)
physical = _load_field_physical(attributes)
valid_transform = _load_field_valid_transform(attributes)
if mask is not None:
output_type = handle[field].dtype
output_shape = (
(mask_size, handle[field].shape[1])
if handle[field].ndim > 1 and not use_columns
else mask_size
)
setattr(
self,
f"_{name}",
cosmo_array(
read_ranges_from_file(
handle[field],
mask,
output_shape=output_shape,
output_type=output_type,
columns=columns,
),
unit,
cosmo_factor=cf,
name=description,
compression=compression,
comoving=not physical,
valid_transform=valid_transform,
),
)
else:
setattr(
self,
f"_{name}",
cosmo_array(
# Only use column data if array is multidimensional,
# otherwise we will crash here
(
handle[field][:, columns]
if handle[field].ndim > 1
else handle[field][:]
),
unit,
cosmo_factor=cf,
name=f"{description} [Column {column_index}, {name}]",
compression=compression,
comoving=not physical,
valid_transform=valid_transform,
),
)
except KeyError:
print(f"Could not read {field}")
return None
return getattr(self, f"_{name}")
return getter
def _generate_setter(name: str) -> Callable[["__SWIFTGroupDataset", cosmo_array], None]:
"""
Generate a function that sets self._name to the value that is passed to it.
Parameters
----------
name : str
The name of the attribute to set.
Returns
-------
Callable
A callable object that sets the attribute specified by ``name`` to the value
passed to it.
"""
def setter(self: __SWIFTGroupDataset, value: cosmo_array) -> None:
"""
Set the (private attribute) data for this dataset.
Parameters
----------
self : __SWIFTGroupDataset
The containing dataset class that this getter is assigned to.
value : cosmo_array
The data values.
"""
setattr(self, f"_{name}", value)
return
return setter
def _generate_group_attr_getter(
name: str,
*,
group: str,
attr_name: str,
) -> Callable[["__SWIFTGroupDataset"], cosmo_array | cosmo_quantity]:
"""
Generate a getter for lazy-loading group-level HDF5 attributes.
Parameters
----------
name : str
Public output name (snake_case) of the attribute.
group : str
Group path in the HDF5 file (e.g. ``LOS_0000``).
attr_name : str
HDF5 attribute name in the source file.
Returns
-------
Callable
A getter callable that lazy-loads ``group.attrs[attr_name]``.
"""
def getter(
self: __SWIFTGroupDataset,
) -> cosmo_array | cosmo_quantity:
"""
Get the group-level attribute, reading from disk if it's not in memory.
Parameters
----------
self : __SWIFTGroupDataset
The containing dataset class that this getter is assigned to.
Returns
-------
cosmo_array or cosmo_quantity
Group attribute with cosmology metadata attached.
"""
current_value = getattr(self, f"_{name}")
if current_value is not None:
return current_value
with self.open_file() as handle:
value = handle[group].attrs[attr_name]
unit_loader = self.metadata.get_group_attribute_units(name)
unit = unit_loader(self.metadata.units)
comoving = self.metadata.get_group_attribute_comoving(name)
cf = self.metadata.get_group_attribute_cosmo_factor(name)
parsed = cosmo_array(
np.atleast_1d(value), unit, comoving=comoving, cosmo_factor=cf
).squeeze()
setattr(self, f"_{name}", parsed)
return parsed
return getter
def _generate_deleter(name: str) -> Callable[["__SWIFTGroupDataset"], None]:
"""
Generate a function that destroys self._name (sets it back to None).
Parameters
----------
name : str
The name of the field to be deleted.
Returns
-------
Callable
Callable that deletes ``name`` field.
"""
def deleter(self: __SWIFTGroupDataset) -> None:
"""
Delete the data for this dataset.
Parameters
----------
self : __SWIFTGroupDataset
The containing dataset class that this getter is assigned to.
"""
current_value = getattr(self, f"_{name}")
del current_value
setattr(self, f"_{name}", None)
return
return deleter
class __SWIFTGroupDataset(HandleProvider):
"""
Create empty property fields.
Do not use this class alone; it is essentially completely empty. It is filled
with properties by `_generate_datasets`.
On initialization we calls the generate_empty_properties function to ensure that
defaults are set correctly.
Parameters
----------
filename : Path
The filename to read metadata.
group_metadata : SWIFTGroupMetadata
The metadata used to generate empty properties.
handle : h5py.File
File handle to read from.
"""
filename: Path
def __init__(
self,
filename: Path,
group_metadata: SWIFTGroupMetadata,
handle: h5py.File,
) -> None:
super().__init__(handle.filename, handle=handle)
self.units = group_metadata.units
self.group = group_metadata.group
self.group_name = group_metadata.group_name
self.group_metadata = group_metadata
self.metadata = group_metadata.metadata
self.generate_empty_properties()
self._close_handle_if_manager()
return
def generate_empty_properties(self) -> None:
"""
Generate empty properties that will be accessed through the setters and getters.
Initially set all of the _{name} values to None. If it doesn't _exist_ in the
file, the variable is not created.
"""
for field_name, field_path in zip(
self.group_metadata.field_names, self.group_metadata.field_paths
):
if field_path in self.handle:
setattr(self, f"_{field_name}", None)
else:
raise AttributeError(
(
f"Cannot find attribute {field_path} in file although"
" it was present when searching initially."
)
)
for group_attribute_name in self.group_metadata.group_attributes.keys():
setattr(self, f"_{group_attribute_name}", None)
return
def __str__(self) -> str:
"""
Print out available fields, not just the memory location.
Returns
-------
str
The file location and available fields.
"""
field_names = ", ".join(self.group_metadata.field_names)
return f"SWIFT dataset at {self.filename}. \nAvailable fields: {field_names}"
def __repr__(self) -> str:
"""
Print out available fields, not just the memory location.
Returns
-------
str
The file location and available fields.
"""
return self.__str__()
class __SWIFTNamedColumnDataset(HandleProvider):
r"""
Holder class for individual named datasets.
Very similar to :class:`~swiftsimio.reader.__SWIFTGroupDataset` but much simpler.
Parameters
----------
field_path : str
Path to field within hdf5 snapshot.
group_metadata : SWIFTGroupMetadata
The metadata for the group that this named column dataset belongs to.
named_columns : list of str
List of categories for the variable ``name``.
name : str
The variable of interest.
handle : h5py.File
File handle to read from.
Examples
--------
For a gas particle we might be interested in the mass fractions for a number
of elements (e.g. hydrogen, helium, carbon, etc). In a SWIFT snapshot these
would be found in ``field_path`` = /PartType0/ElementMassFractions. The
``named_columns`` would be the list of elements (["hydrogen", ...]) and the
variable we're interested in is the mass fraction ``name`` = element_mass_fraction.
Thus,
.. code-block:: python
data.gas = __SWIFTNamedColumnDataset(
"/PartType0/ElementMassFractions",
["hydrogen", "helium"],
"element_mass_fraction",
handle
)
would make visible:
.. code-block:: python
data.gas.element_mass_fraction.hydrogen
data.gas.element_mass_fraction.helium
"""
def __init__(
self,
field_path: str,
group_metadata: SWIFTGroupMetadata,
named_columns: list[str],
name: str,
handle: h5py.File,
) -> None:
super().__init__(handle.filename, handle=handle)
self.field_path = field_path
self.group_metadata = group_metadata
self.metadata = group_metadata.metadata
self.named_columns = named_columns
self.name = name
# Need to initialise for the getter() call.
for column in named_columns:
setattr(self, f"_{column}", None)
# Call to self._close_handle_if_manager() is not needed here:
# either handle is None and we never opened anything, or it's a file
# which we will not close because it's managed by a parent object.
return
def __str__(self) -> str:
"""
Print the available column names for this dataset.
Returns
-------
str
Formatted list of column names.
"""
return (
f"Named columns instance with {self.named_columns} available "
f'for "{self.name}"'
)
def __repr__(self) -> str:
"""
Print the available column names for this dataset.
Returns
-------
str
Formatted list of column names.
"""
return self.__str__()
def __len__(self) -> int:
"""
Get the column count.
Returns
-------
int
The number of columns.
"""
return len(self.named_columns)
def __eq__(self, other: "__SWIFTNamedColumnDataset") -> bool:
"""
Check if the dataset name and available column match another's.
Parameters
----------
other : __SWIFTNamedColumnDataset
The other dataset to compare to.
Returns
-------
bool
``True`` if the datasets match, ``False`` otherwise.
"""
return self.named_columns == other.named_columns and self.name == other.name
def _generate_datasets(
filename: Path,
group_metadata: SWIFTGroupMetadata,
mask: SWIFTMask,
handle: h5py.File | None = None,
) -> __SWIFTGroupDataset | __SWIFTNamedColumnDataset:
"""
Generate a SWIFTGroupDatasets _class_ for the given particle type.
We _must_ do the following _outside_ of the class itself, as one
can assign properties to a _class_ but not _within_ a class
dynamically.
Here we loop through all of the possible properties in the metadata file.
We then use the builtin ``property()`` function and some generators to
create setters and getters for those properties. This will allow them
to be accessed from outside by using SWIFTGroupDataset.name, where
the name is, for example, coordinates.
Parameters
----------
filename : Path
File name to read metadata.
group_metadata : SWIFTGroupMetadata
The metadata for the group.
mask : SWIFTMask
The mask object for the datasets.
handle : h5py.File, optional
File handle to read metadata.
mask : SWIFTMask
The mask object for the datasets.
Returns
-------
__SWIFTGroupDataset or __SWIFTNamedColumnDataset
The customized dataset object.
"""
filename = group_metadata.filename
group = group_metadata.group
group_name = group_metadata.group_name
group_nice_name = group_metadata.metadata.get_nice_name(group)
# Mask is an object that contains all masks for all possible datasets.
mask_array = getattr(mask, group_name, None)
mask_size = getattr(mask, f"{group_name}_size", -1)
# This 'nice' piece of code ensures that our datasets have different _types_
# for different particle types. We initially fill a dict with the properties that
# we want, and then create a single instance of our class.
this_dataset_bases = (__SWIFTGroupDataset, object)
this_dataset_dict = {}
for field_path, field_name in zip(
group_metadata.field_paths, group_metadata.field_names
):
named_columns = group_metadata.named_columns[field_path]
if named_columns is None:
field_property = property(
_generate_getter(
field_name,
filename=filename,
field=field_path,
mask=mask_array,
mask_size=mask_size,
),
_generate_setter(field_name),
_generate_deleter(field_name),
)
else:
# TODO: Handle this case with recursion.
# Here we want to create an extra middleman object. So we can do something
# like {ptype}.{ThisNamedColumnDataset}.column_name. This follows from the
# above templating.
this_named_column_dataset_bases = (
__SWIFTNamedColumnDataset,
HandleProvider,
)
this_named_column_dataset_dict = {}
for index, column in enumerate(named_columns):
this_named_column_dataset_dict[column] = property(
_generate_getter(
column,
filename=filename,
field=field_path,
mask=mask_array,
mask_size=mask_size,
column_index=index,
),
_generate_setter(column),
_generate_deleter(column),
)
ThisNamedColumnDataset = type(
f"{group_nice_name}{field_path.split('/')[-1]}Columns",
this_named_column_dataset_bases,
this_named_column_dataset_dict,
)
field_property = ThisNamedColumnDataset(
handle=handle,
group_metadata=group_metadata,
field_path=field_path,
named_columns=named_columns,
name=field_name,
)
this_dataset_dict[field_name] = field_property
for (
group_attribute_name,
hdf5_attribute_name,
) in group_metadata.group_attributes.items():
this_dataset_dict[group_attribute_name] = property(
_generate_group_attr_getter(
group_attribute_name,
group=group,
attr_name=hdf5_attribute_name,
),
_generate_setter(group_attribute_name),
_generate_deleter(group_attribute_name),
)
ThisDataset = type(
f"{group_nice_name}Dataset", this_dataset_bases, this_dataset_dict
)
empty_dataset = ThisDataset(filename, group_metadata, handle=handle)
return empty_dataset
[docs]
class SWIFTDataset(HandleProvider):
"""
A collection object for units, metadata and data objects.
It contains:
+ a ``SWIFTUnits``,
+ a ``SWIFTMetadata``,
+ several ``SWIFTGroupDataset``
This object, in essence, completely represents a SWIFT snapshot. You can access
the different particles as follows:
+ SWIFTDataset.gas.particle_ids
+ SWIFTDataset.dark_matter.masses
+ SWIFTDataset.gas.smoothing_lengths
These arrays all have units that are determined by the unit system in the file.
The unit system is available as SWIFTDataset.units and the metadata as
SWIFTDataset.metadata.
Parameters
----------
filename : str
Name of file containing snapshot.
mask : ~swiftsimio.masks.SWIFTMask, optional
Mask object containing dataset to selected particles.
handle : h5py.File, optional
File handle to read metadata.
"""
filename: Path
def __init__(
self,
filename: Path,
mask: SWIFTMask | None = None,
handle: h5py.File | None = None,
) -> None:
super().__init__(filename, handle=handle)
self.mask = mask
if mask is not None:
self.mask.convert_masks_to_ranges()
self.get_units()
self.get_metadata()
self.create_datasets()
self._close_handle_if_manager()
return
def __str__(self) -> str:
"""Print out some useful information, not just the memory location."""
group_names = ", ".join(self.metadata.present_group_names)
return f"SWIFT dataset at {self.filename}. \nAvailable groups: {group_names}"
def __repr__(self) -> str:
"""Print out some useful information, not just the memory location."""
return self.__str__()
[docs]
def get_units(self) -> None:
"""
Load the units from the SWIFT snapshot.
Ordinarily this happens automatically, but you can call
this function again if you mess things up.
"""
if self.mask is not None:
# we can save ourselves the trouble of reading it again
assert (self._handle is self.mask._handle) or self.filename.samefile(
self.mask.filename
), f"Mask is for {self.mask.filename} but dataset is for {self.filename}."
self.units = self.mask.units
else:
self.units = SWIFTUnits(self.filename, handle=self.handle)
return
[docs]
def create_datasets(self) -> None:
"""
Create datasets for present groups.
Present groups are specified in metadata.present_group_names.
These can then be accessed using their underscore names, e.g. gas.
"""
if not hasattr(self, "metadata"):
self.get_metadata()
for group_name in self.metadata.present_group_names:
setattr(
self,
group_name,
_generate_datasets(
self.filename,
getattr(self.metadata, f"{group_name}_properties"),
self.mask,
handle=self.handle,
),
)
return