"""Provide utilities to create SWIFT files usable as initial conditions."""
import unyt
import h5py
import numpy as np
from warnings import warn
from typing import Callable
from sympy import Symbol
from functools import reduce
from swiftsimio import metadata
from swiftsimio.metadata.writer.unit_systems import cosmo_units
from swiftsimio.objects import cosmo_array
def _ptype_str_to_int(ptype_str: str) -> int:
"""
Convert a string like ``"PartType0"`` to an integer (in this example, ``0``).
Parameters
----------
ptype_str : str
The particle type string.
Returns
-------
int
The corresponding integer.
"""
return int(ptype_str.strip("PartType")) if "PartType" in ptype_str else ptype_str
class __SWIFTWriterParticleDataset(object):
"""
A particle dataset for _writing_ with.
This is explicitly different to the one used for reading
(:class:`~swiftsimio.reader.__SWIFTGroupDataset`), as it requires a very different
feature set.
Parameters
----------
writer : ~swiftsimio.snapshot_writer.SWIFTSnapshotWriter
A reference to the writer containing this particle dataset writer.
unit_system : unyt.UnitSystem or str
Either be a string (e.g. "cgs"), or a UnitSystem as defined by unyt
specifying the units to be used. Users may wish to use the cosmological unit
system provided as
``from swiftsimio.metadata.writer.unit_systems import cosmo_units``.
particle_type : int
The particle type of the dataset. Numbering convention is the same as SWIFT, with
``0`` corresponding to gas, etc., as usual.
"""
def __init__(
self,
writer: "SWIFTSnapshotWriter",
unit_system: unyt.UnitSystem | str,
particle_type: int,
) -> None:
self.writer = writer
self.unit_system = unit_system
self.particle_type = particle_type
self.particle_name = metadata.particle_types.particle_name_underscores[
self.particle_type
]
self.generate_empty_properties()
return
def generate_empty_properties(self) -> None:
"""
Generate the empty properties accessed by setters and getters.
Initially all of the ``_{name}`` values are set to ``None``. Note that we only
generate required properties.
"""
for name in getattr(metadata.required_fields, self.particle_name).keys():
setattr(self, f"_{name}", None)
return
def check_empty(self) -> bool:
"""
Check if all required datasets are empty.
Returns
-------
bool
``True`` if all datasets are empty.
"""
for name in getattr(metadata.required_fields, self.particle_name).keys():
if getattr(self, f"_{name}") is not None:
return False
return True
def check_consistent(self) -> bool:
"""
Perform consistency checks on dataset.
Checks the following:
+ that all required fields (apart from ``particle_ids``) are not ``None``,
+ that all required fields have the same length.
If those are true,
+ ``self.n_part`` is set with the total number of particles of this type
+ ``self.requires_particle_ids_before_write`` is set to a boolean.
Returns
-------
bool
``True`` if above listed criteria are satisfied.
"""
self.requires_particle_ids_before_write = False
sizes = []
for name in getattr(metadata.required_fields, self.particle_name).keys():
if getattr(self, f"_{name}") is None:
if name == "particle_ids":
self.requires_particle_ids_before_write = True
else:
raise AttributeError(f"Required dataset {name} is None.")
else:
sizes.append(getattr(self, f"_{name}").shape[0])
# Now we figure out if everyone's the same (without numpy...)
assert reduce(lambda x, y: x and y, [sizes[0] == x for x in sizes]), (
f"Arrays passed to {self.particle_name} dataset are not of the same size."
)
# Make sure positions and velocities have the same shapes
if getattr(self, "coordinates").shape != getattr(self, "velocities").shape:
raise AttributeError(
f"{self.particle_name} coordinates and velocities have unequal shapes"
)
self.n_part = sizes[0]
return True
def generate_smoothing_lengths(self) -> None:
"""
Generate smoothing lengths as 2 times the mean interparticle spacing.
This only works for a boxsize that has the same length in all dimensions.
"""
if "smoothing_lengths" not in getattr(
metadata.required_fields, self.particle_name
):
raise RuntimeError(
"Cannot generate smoothing lengths for particle types that don't require "
"them."
)
if not (np.diff(self.writer.boxsize) == 0).all():
raise ValueError(
"To generate smoothing lengths box side lengths must all be equal."
)
dimension = self.writer.dimension
boxlength = self.writer.boxsize[0]
n_part = self.coordinates.shape[0]
mips = boxlength / (n_part ** (1.0 / dimension))
smoothing_lengths = 2 * mips * np.ones(n_part, dtype=float)
self.smoothing_lengths = smoothing_lengths
return
def write_particle_group(self, file_handle: h5py.File, compress: bool) -> None:
"""
Write the particle group's required properties to file.
Parameters
----------
file_handle : h5py.File
File handle to write to.
compress : bool
Flag to indicate whether to turn on gzip compression.
"""
particle_group = file_handle.create_group(self.particle_type)
if compress:
compression = "gzip"
else:
compression = None
for name, required_field_info in getattr(
metadata.required_fields, self.particle_name
).items():
output_handle = required_field_info["handle"]
particle_group.create_dataset(
output_handle, data=getattr(self, name), compression=compression
)
return
def write_particle_group_metadata(
self, file_handle: h5py.File, dset_attributes: dict
) -> None:
"""
Write the relevant metadata for a particle group.
Parameters
----------
file_handle : h5py.File
HDF5 output file to write to.
dset_attributes : dict
Dictionary containg metadata to attach to group.
"""
for name, required_field_info in getattr(
metadata.required_fields, self.particle_name
).items():
output_handle = required_field_info["handle"]
obj = file_handle[f"/{self.particle_type}/{output_handle}"]
for attr_name, attr_value in dset_attributes[output_handle].items():
obj.attrs.create(attr_name, attr_value)
return
def get_attributes(self) -> dict:
"""
Return a dictionary containing the attributes to attach to the dataset.
Returns
-------
dict
Dictionary containg the attributes applying to the dataset.
"""
# annoyingly unyt calls "current" "current_mks", but not a big deal
from unyt.dimensions import mass, length, temperature, time, current_mks
attributes_dict = {}
for name, required_field_info in getattr(
metadata.required_fields, self.particle_name
).items():
output_handle = required_field_info["handle"]
field = getattr(self, name)
if not isinstance(field, cosmo_array):
raise ValueError(
f"Provide {name} data to swiftsimio.Writer as"
" swiftsimio.cosmo_array's (i.e. including both unit and cosmology"
" information)."
)
# Find the exponents for each of the dimensions
dim_exponents = get_dimensions(field.units.dimensions)
attributes_dict[output_handle] = {
"Conversion factor to CGS (not including cosmological corrections)": [
field.unit_quantity.in_cgs()
],
"Conversion factor to physical CGS "
"(including cosmological corrections)": [
field.unit_quantity.in_cgs() * field.cosmo_factor.a_factor
],
"U_I exponent": [dim_exponents[current_mks]],
"U_L exponent": [dim_exponents[length]],
"U_M exponent": [dim_exponents[mass]],
"U_T exponent": [dim_exponents[temperature]],
"U_t exponent": [dim_exponents[time]],
"a-scale exponent": [
float(getattr(field.cosmo_factor.expr, "exp", 1.0))
],
"h-scale exponent": [0.0],
}
return attributes_dict
[docs]
def get_dimensions(dimension: unyt.dimensions) -> dict:
"""
Return exponents corresponding to base dimensions for given :mod:`unyt.dimensions`.
Parameters
----------
dimension : unyt.dimensions
Dimension for which we're identifying the exponents.
Returns
-------
dict
Dictionary of exponents corresponding to each base dimension.
Examples
--------
.. code-block:: python
>>> get_dimensions(unyt.dimensions.velocity)
{(mass): 0.0, (length): 1.0, (time): -1.0, (temperature): 0.0, (angle): 0.0,
(current_mks): 0.0, 1: 0.0, (luminous_intensity): 0.0, (logarithmic): 0.0}
"""
# create the return dict defaulting to exponent of 0
exp_dict = {dimension: 0.0 for dimension in unyt.dimensions.base_dimensions}
# extract the exponent for each of the dimensions
exp_dict.update(
(k, float(v))
for k, v in (d.as_base_exp() for d in dimension.as_ordered_factors())
)
return exp_dict
[docs]
def generate_getter(
name: str,
) -> Callable[[__SWIFTWriterParticleDataset], cosmo_array]:
"""
Generate a getter for the :class:`~swiftsimio.objects.cosmo_array` for ``name``.
Parameters
----------
name : str
Name of data field.
Returns
-------
Callable
Getter function that returns :class:`~swiftsimio.objects.cosmo_array` for
``name``.
"""
def getter(self: __SWIFTWriterParticleDataset) -> cosmo_array:
"""
Get the value of the dataset from the private name attribute.
Parameters
----------
self : __SWIFTWriterParticleDataset
The dataset the attribute is attached to.
Returns
-------
cosmo_array
The value of the named data field.
"""
return getattr(self, f"_{name}")
return getter
[docs]
def generate_setter(
name: str, dimensions: unyt.dimensions, unit_system: unyt.UnitSystem | str
) -> Callable[[__SWIFTWriterParticleDataset, cosmo_array], None]:
"""
Generate a function that sets ``self._<name>`` to the value that is passed to it.
Parameters
----------
name : str
String to set ``self._name`` to.
dimensions : unyt.dimensions
Physical dimension of ``self._name`` (for consistency check).
unit_system : unyt.UnitSystem or str
Unit system of ``self._name``.
Returns
-------
Callable
Function to set ``self._name``.
"""
def setter(self: __SWIFTWriterParticleDataset, value: cosmo_array) -> None:
"""
Set the named dataset (private attribute) to a value.
Makes consistency checks on the input:
+ the given units have the expected dimensions for this field
+ that values are unique and >=1 for ``particle_ids``
+ that the scale factor attached to :class:`~swiftsimio.objects.cosmo_array`
inputs matches the one for the top-level output metadata.
Parameters
----------
self : __SWIFTWriterParticleDataset
The dataset that the attribute is attached to.
value : cosmo_array
The value to set the attribute to.
"""
if hasattr(value, "cosmo_factor"):
assert value.cosmo_factor.scale_factor == self.writer.scale_factor, (
f"The scale factor of {name} ({value.cosmo_factor.scale_factor}) does not"
f" match the scale factor of the Writer ({self.writer.scale_factor})."
)
if dimensions != 1:
if isinstance(value, cosmo_array):
if value.units.dimensions == dimensions:
value.convert_to_base(unit_system)
value.convert_to_comoving()
setattr(self, f"_{name}", value)
else:
raise unyt.exceptions.InvalidUnitEquivalence(
f"Convert to {name}", value.units.dimensions, dimensions
)
else:
raise TypeError(f"Provide {name} as swiftsimio.cosmo_array.")
else:
if name == "particle_ids":
if any(value <= 0):
raise ValueError(f"{self.particle_name}.particle_ids must be >= 1.")
if np.unique(value).size != value.size:
raise ValueError(
f"{self.particle_name}.particle_ids must not have repeated IDs."
)
setattr(
self,
f"_{name}",
cosmo_array(
value,
unyt.dimensionless,
comoving=True,
scale_factor=self.writer.scale_factor,
scale_exponent=0.0,
),
)
return
return setter
[docs]
def generate_deleter(name: str) -> Callable[[__SWIFTWriterParticleDataset], None]:
"""
Generate a function that destroys ``self._<name>`` (sets it back to ``None``).
Parameters
----------
name : str
Name of object to be destroyed.
Returns
-------
Callable
Function to delete ``self._<name>``.
"""
def deleter(self: __SWIFTWriterParticleDataset) -> None:
"""
Delete the named (private attribute) dataset.
Parameters
----------
self : __SWIFTWriterParticleDataset
The dataset the attribute is attached to.
"""
current_value = getattr(self, f"_{name}")
del current_value
setattr(self, f"_{name}", None)
return
return deleter
[docs]
def generate_dataset(
writer: "SWIFTSnapshotWriter",
unit_system: unyt.UnitSystem | str,
particle_type: int,
) -> __SWIFTWriterParticleDataset:
"""
Generate a :class:`~swiftsimio.snapshot_writer.SWIFTWriterParticleDataset`.
We _must_ do the following _outside_ of the class itself, as one can assign properties
to a _class_ but not _within_ a class dynamically.
Here we loop through all of the possible properties in the metadata file. We then use
the builtin property() function and some generators to create setters and getters for
those properties. This will allow them to be accessed from outside by using
``SWIFTWriterParticleDataset(...).<name>``, where the name is, for example,
``coordinates``.
Parameters
----------
writer : ~swiftsimio.snapshot_writer.SWIFTSnapshotWriter
A reference to the writer that will contain the generated particle dataset writer.
unit_system : unyt.UnitSystem or str
Unit system of the dataset.
particle_type : int
The particle type of the dataset. Numbering convention is the same as
SWIFT, with ``0`` corresponding to gas, etc. as usual.
Returns
-------
SWIFTWriterParticleDataset
An empty dataset for the given particle type.
"""
particle_name = metadata.particle_types.particle_name_underscores[particle_type]
particle_nice_name = metadata.particle_types.particle_name_class[particle_type]
this_dataset_bases = (__SWIFTWriterParticleDataset, object)
this_dataset_dict = {}
for name, required_field_info in getattr(
metadata.required_fields, particle_name
).items():
dimensions = required_field_info["dimensions"]
this_dataset_dict[name] = property(
generate_getter(name),
generate_setter(name, dimensions, unit_system),
generate_deleter(name),
)
ThisDataset = type(
f"{particle_nice_name}WriterDataset", this_dataset_bases, this_dataset_dict
)
empty_dataset = ThisDataset(writer, unit_system, particle_type)
return empty_dataset
[docs]
class SWIFTSnapshotWriter(object):
"""
The SWIFT dataset writer.
This is used to store all particle arrays and do some extra consistency checks and
processing before writing a HDF5 file containing:
+ Fully consistent unit system
+ All required arrays for SWIFT to start
+ Required metadata (all automatic, apart from those required by ``__init__``)
The written output can be read back in using :mod:`swiftsimio`, or used as initial
conditions for a SWIFT simulation.
Any of the usual datasets (``gas``, ``dark_matter``, ``stars``, ``black_holes``,
``sinks``, ``neutrinos``, ``boundary``) can be populated with data. If a dataset is
populated, it must have all ``required_fields`` filled in. The required fields can be
viewed, for example, with ``swiftsimio.metadata.writer.required_fields.gas.keys()``.
The only exception is ``particle_ids``, that will be generated automatically if
missing. Smoothing length estimates for uniform-density gas in a periodic box can
also be auto-generated, but this has to be done explicitly (see example below).
To fill in a required dataset, provide the data with a
:class:`~swiftsimio.objects.cosmo_array`. For example:
.. code-block:: python
w.gas.masses = cosmo_array(
np.ones(n_p, dtype=float) * 1e6,
u.solMass,
comoving=True,
scale_factor=w.scale_factor,
scale_exponent=0,
)
Parameters
----------
unit_system : unyt.UnitSystem or str
Unit system for dataset.
boxsize : cosmo_array
Size of simulation box and associated units.
dimension : int, optional
Dimensions (length of coordinate vector) of simulation.
compress : bool, optional
Flag to turn on gzip compression.
extra_header : dict, optional
Dictionary containing extra fields to write to the HDF5 ``Header`` group.
scale_factor : np.float32
Scale factor associated with dataset.
Examples
--------
An example showing populating all required fields for the ``gas`` dataset:
.. code-block:: python
import numpy as np
import unyt as u
from swiftsimio import Writer, cosmo_array
from swiftsimio.metadata.writer.unit_systems import cosmo_unit_system
# number of gas particles
n_p = 1000
# scale factor of 1.0
a = 1.0
# Box is 100 Mpc
lbox = 100
boxsize = cosmo_array(
[lbox, lbox, lbox],
u.Mpc,
comoving=True,
scale_factor=a,
scale_exponent=1,
)
# Create the Writer object. cosmo_unit_system corresponds to default Gadget-like
# units of 10^10 Msun, Mpc, and km/s
w = Writer(unit_system=cosmo_unit_system, boxsize=boxsize, scale_factor=a)
# Randomly spaced coordinates from 0 to lbox Mpc in each direction
w.gas.coordinates = cosmo_array(
np.random.rand(n_p, 3) * lbox,
u.Mpc,
comoving=True,
scale_factor=w.scale_factor,
scale_exponent=1,
)
# Random velocities from 0 to 1 km/s
w.gas.velocities = cosmo_array(
np.random.rand(n_p, 3),
u.km / u.s,
comoving=True,
scale_factor=w.scale_factor,
scale_exponent=1,
)
# Generate uniform masses as 10^6 solar masses for each particle
w.gas.masses = cosmo_array(
np.ones(n_p, dtype=float) * 1e6,
u.solMass,
comoving=True,
scale_factor=w.scale_factor,
scale_exponent=0,
)
# Generate internal energy corresponding to 10^4 K
w.gas.internal_energy = cosmo_array(
np.ones(n_p, dtype=float) * 1e4 / 1e6,
u.kb * u.K / u.solMass,
comoving=True,
scale_factor=w.scale_factor,
scale_exponent=-2,
)
# Generate initial guess for smoothing lengths based on mean inter-particle spacing
w.gas.generate_smoothing_lengths()
# w.gas.particle_ids can optionally be set, otherwise they are auto-generated
# write the initial conditions out to a file
w.write("ics.hdf5")
"""
def __init__(
self,
*,
unit_system: unyt.UnitSystem | str = cosmo_units,
boxsize: cosmo_array,
dimension: int = 3,
compress: bool = True,
extra_header: dict | None = None,
scale_factor: np.float32 = 1.0,
) -> None:
if isinstance(unit_system, str):
self.unit_system = unyt.unit_systems.unit_system_registry[unit_system]
else:
self.unit_system = unit_system
try:
if len(boxsize) != dimension:
raise ValueError(
f"boxsize must have length equal to number of dimensions, {boxsize=},"
f" {dimension=}."
)
except TypeError: # len() of unsized object
raise ValueError(
"boxsize must be a cosmo_array of length equal to spatial dimensions."
)
self.boxsize = np.copy(boxsize, subok=True)
self.boxsize.convert_to_base(self.unit_system)
self.dimension = dimension
self.compress = compress
self.extra_header = extra_header
self.create_particle_datasets()
self.scale_factor = scale_factor
return
[docs]
def create_particle_datasets(self) -> None:
"""Create particle dataset for each particle type in the metadata."""
for number, name in metadata.particle_types.particle_name_underscores.items():
setattr(
self,
name,
generate_dataset(self, self.unit_system, number),
)
return
def _generate_ids(self, names_to_write: list) -> None:
"""
(Re-)generate all particle IDs for groups with names in ``names_to_write``.
Parameters
----------
names_to_write : list
List of groups to (re-)generate ids for.
"""
# Start particle ID's at 1. When running with hydro + DM, partID = 0
# is a no-no because the ID's are used as offsets in arrays. The code
# will most likely crash at some point, and "part_verify_links()" will
# complain about it if SWIFT is run with debugging checks on.
already_used = 1
for name in names_to_write:
if getattr(self, name)._particle_ids is not None:
warn(
f"Overwriting {name}.particle_ids, to prevent this provide particle "
"IDs for all particle types, or none.",
RuntimeWarning,
)
getattr(self, name).particle_ids = cosmo_array(
np.arange(already_used, getattr(self, name).n_part + already_used),
unyt.dimensionless,
comoving=True,
scale_factor=self.scale_factor,
scale_exponent=0.0,
)
already_used += getattr(self, name).n_part
return
def _write_metadata(self, handle: h5py.File, names_to_write: list) -> None:
"""
Write metadata to file.
Metadata written is based on the information passed to the object and the
information in the particle groups.
Parameters
----------
handle : h5py.File
HDF5 file handle to write to.
names_to_write : list
List of metadata fields to write.
"""
part_types = (
max(
[
_ptype_str_to_int(k)
for k in metadata.particle.particle_name_underscores.keys()
]
)
+ 1
)
number_of_particles = [0] * part_types
mass_table = [0.0] * part_types
for number, name in metadata.particle_types.particle_name_underscores.items():
if name in names_to_write:
number_of_particles[_ptype_str_to_int(number)] = getattr(
self, name
).n_part
mass_table[_ptype_str_to_int(number)] = getattr(self, name).masses[0]
attrs = {
"BoxSize": self.boxsize,
"NumPart_Total": number_of_particles,
"NumPart_Total_HighWord": [0] * 6,
"Flag_Entropy_ICs": 0,
"Dimension": np.array([self.dimension]),
"Scale-factor": [self.scale_factor],
# LEGACY but required for Gadget readers
"NumFilesPerSnapshot": 1,
"NumPart_ThisFile": number_of_particles,
"MassTable": mass_table,
}
if self.extra_header is not None:
attrs = {**attrs, **self.extra_header}
header = handle.create_group("Header")
for name, value in attrs.items():
header.attrs.create(name, value)
return
def _write_units(self, handle: h5py.File) -> None:
"""
Write the unit information to file.
.. note::
We do not have support for units of current yet.
Parameters
----------
handle : h5py.File
HDF5 file to write units to.
"""
dim = unyt.dimensions
cgs_base = unyt.unit_systems.cgs_unit_system.base_units
base = self.unit_system.base_units
def get_conversion(unit_type: Symbol) -> np.ndarray[float]:
"""
Find the conversion factor from base to cgs units.
Parameters
----------
unit_type : sympy.Symbol
The type of unit to convert (mass, length, etc.).
Returns
-------
np.ndarray[float]
The conversion factor.
"""
# We need to find the correct unit (which is now stored as a sympy value,
# why?!) and convert it to an unyt unit.
our_unit = unyt.unit_object.Unit(base[unit_type])
cgs_unit = unyt.unit_object.Unit(cgs_base[unit_type])
conversion_factor = our_unit.get_conversion_factor(cgs_unit)[0]
# We use the array because this is how swift outputs it, as a length
# 1 array (rather than as a single float).
return np.array([conversion_factor])
attrs = {
"Unit mass in cgs (U_M)": get_conversion(dim.mass),
"Unit length in cgs (U_L)": get_conversion(dim.length),
"Unit time in cgs (U_t)": get_conversion(dim.time),
"Unit current in cgs (U_I)": np.array([1.0]),
"Unit temperature in cgs (U_T)": get_conversion(dim.temperature),
}
units = handle.create_group("Units")
for name, value in attrs.items():
units.attrs.create(name, value)
return
[docs]
def write(self, filename: str) -> None:
"""
Write the information in the dataset to file.
Parameters
----------
filename : str
File to write to.
"""
names_to_write = []
generate_ids = False
for name in metadata.particle_types.particle_name_underscores.values():
this_dataset = getattr(self, name)
if not this_dataset.check_empty():
if this_dataset.check_consistent():
names_to_write.append(name)
generate_ids = (
generate_ids or this_dataset.requires_particle_ids_before_write
)
if generate_ids:
self._generate_ids(names_to_write)
else:
all_ids = np.concatenate(
[getattr(self, name).particle_ids for name in names_to_write]
)
if np.unique(all_ids).size != all_ids.size:
raise ValueError(
"Particle IDs must be unique across all particle types."
)
with h5py.File(filename, "w") as handle:
self._write_metadata(handle, names_to_write)
self._write_units(handle)
for name in names_to_write:
getattr(self, name).write_particle_group(handle, compress=self.compress)
attrs = getattr(self, name).get_attributes()
getattr(self, name).write_particle_group_metadata(handle, attrs)
return