Source code for swiftsimio.snapshot_writer

"""Provide utilities to create SWIFT files usable as initial conditions."""

import unyt
import h5py
import numpy as np
from warnings import warn

from typing import Callable
from sympy import Symbol
from functools import reduce

from swiftsimio import metadata
from swiftsimio.metadata.writer.unit_systems import cosmo_units
from swiftsimio.objects import cosmo_array


def _ptype_str_to_int(ptype_str: str) -> int:
    """
    Convert a string like ``"PartType0"`` to an integer (in this example, ``0``).

    Parameters
    ----------
    ptype_str : str
        The particle type string.

    Returns
    -------
    int
        The corresponding integer.
    """
    return int(ptype_str.strip("PartType")) if "PartType" in ptype_str else ptype_str


class __SWIFTWriterParticleDataset(object):
    """
    A particle dataset for _writing_ with.

    This is explicitly different to the one used for reading
    (:class:`~swiftsimio.reader.__SWIFTGroupDataset`), as it requires a very different
    feature set.

    Parameters
    ----------
    writer : ~swiftsimio.snapshot_writer.SWIFTSnapshotWriter
        A reference to the writer containing this particle dataset writer.

    unit_system : unyt.UnitSystem or str
        Either be a string (e.g. "cgs"), or a UnitSystem as defined by unyt
        specifying the units to be used. Users may wish to use the cosmological unit
        system provided as
        ``from swiftsimio.metadata.writer.unit_systems import cosmo_units``.

    particle_type : int
        The particle type of the dataset. Numbering convention is the same as SWIFT, with
        ``0`` corresponding to gas, etc., as usual.
    """

    def __init__(
        self,
        writer: "SWIFTSnapshotWriter",
        unit_system: unyt.UnitSystem | str,
        particle_type: int,
    ) -> None:
        self.writer = writer
        self.unit_system = unit_system
        self.particle_type = particle_type

        self.particle_name = metadata.particle_types.particle_name_underscores[
            self.particle_type
        ]

        self.generate_empty_properties()

        return

    def generate_empty_properties(self) -> None:
        """
        Generate the empty properties accessed by setters and getters.

        Initially all of the ``_{name}`` values are set to ``None``. Note that we only
        generate required properties.
        """
        for name in getattr(metadata.required_fields, self.particle_name).keys():
            setattr(self, f"_{name}", None)

        return

    def check_empty(self) -> bool:
        """
        Check if all required datasets are empty.

        Returns
        -------
        bool
            ``True`` if all datasets are empty.
        """
        for name in getattr(metadata.required_fields, self.particle_name).keys():
            if getattr(self, f"_{name}") is not None:
                return False

        return True

    def check_consistent(self) -> bool:
        """
        Perform consistency checks on dataset.

        Checks the following:

        + that all required fields (apart from ``particle_ids``) are not ``None``,
        + that all required fields have the same length.

        If those are true,

        + ``self.n_part`` is set with the total number of particles of this type
        + ``self.requires_particle_ids_before_write`` is set to a boolean.

        Returns
        -------
        bool
            ``True`` if above listed criteria are satisfied.
        """
        self.requires_particle_ids_before_write = False

        sizes = []

        for name in getattr(metadata.required_fields, self.particle_name).keys():
            if getattr(self, f"_{name}") is None:
                if name == "particle_ids":
                    self.requires_particle_ids_before_write = True
                else:
                    raise AttributeError(f"Required dataset {name} is None.")
            else:
                sizes.append(getattr(self, f"_{name}").shape[0])

        # Now we figure out if everyone's the same (without numpy...)
        assert reduce(lambda x, y: x and y, [sizes[0] == x for x in sizes]), (
            f"Arrays passed to {self.particle_name} dataset are not of the same size."
        )

        # Make sure positions and velocities have the same shapes
        if getattr(self, "coordinates").shape != getattr(self, "velocities").shape:
            raise AttributeError(
                f"{self.particle_name} coordinates and velocities have unequal shapes"
            )

        self.n_part = sizes[0]

        return True

    def generate_smoothing_lengths(self) -> None:
        """
        Generate smoothing lengths as 2 times the mean interparticle spacing.

        This only works for a boxsize that has the same length in all dimensions.
        """
        if "smoothing_lengths" not in getattr(
            metadata.required_fields, self.particle_name
        ):
            raise RuntimeError(
                "Cannot generate smoothing lengths for particle types that don't require "
                "them."
            )

        if not (np.diff(self.writer.boxsize) == 0).all():
            raise ValueError(
                "To generate smoothing lengths box side lengths must all be equal."
            )
        dimension = self.writer.dimension
        boxlength = self.writer.boxsize[0]

        n_part = self.coordinates.shape[0]
        mips = boxlength / (n_part ** (1.0 / dimension))

        smoothing_lengths = 2 * mips * np.ones(n_part, dtype=float)

        self.smoothing_lengths = smoothing_lengths

        return

    def write_particle_group(self, file_handle: h5py.File, compress: bool) -> None:
        """
        Write the particle group's required properties to file.

        Parameters
        ----------
        file_handle : h5py.File
            File handle to write to.

        compress : bool
            Flag to indicate whether to turn on gzip compression.
        """
        particle_group = file_handle.create_group(self.particle_type)

        if compress:
            compression = "gzip"
        else:
            compression = None

        for name, required_field_info in getattr(
            metadata.required_fields, self.particle_name
        ).items():
            output_handle = required_field_info["handle"]
            particle_group.create_dataset(
                output_handle, data=getattr(self, name), compression=compression
            )

        return

    def write_particle_group_metadata(
        self, file_handle: h5py.File, dset_attributes: dict
    ) -> None:
        """
        Write the relevant metadata for a particle group.

        Parameters
        ----------
        file_handle : h5py.File
            HDF5 output file to write to.

        dset_attributes : dict
            Dictionary containg metadata to attach to group.
        """
        for name, required_field_info in getattr(
            metadata.required_fields, self.particle_name
        ).items():
            output_handle = required_field_info["handle"]
            obj = file_handle[f"/{self.particle_type}/{output_handle}"]
            for attr_name, attr_value in dset_attributes[output_handle].items():
                obj.attrs.create(attr_name, attr_value)

        return

    def get_attributes(self) -> dict:
        """
        Return a dictionary containing the attributes to attach to the dataset.

        Returns
        -------
        dict
            Dictionary containg the attributes applying to the dataset.
        """
        # annoyingly unyt calls "current" "current_mks", but not a big deal
        from unyt.dimensions import mass, length, temperature, time, current_mks

        attributes_dict = {}

        for name, required_field_info in getattr(
            metadata.required_fields, self.particle_name
        ).items():
            output_handle = required_field_info["handle"]
            field = getattr(self, name)
            if not isinstance(field, cosmo_array):
                raise ValueError(
                    f"Provide {name} data to swiftsimio.Writer as"
                    " swiftsimio.cosmo_array's (i.e. including both unit and cosmology"
                    " information)."
                )

            # Find the exponents for each of the dimensions
            dim_exponents = get_dimensions(field.units.dimensions)
            attributes_dict[output_handle] = {
                "Conversion factor to CGS (not including cosmological corrections)": [
                    field.unit_quantity.in_cgs()
                ],
                "Conversion factor to physical CGS "
                "(including cosmological corrections)": [
                    field.unit_quantity.in_cgs() * field.cosmo_factor.a_factor
                ],
                "U_I exponent": [dim_exponents[current_mks]],
                "U_L exponent": [dim_exponents[length]],
                "U_M exponent": [dim_exponents[mass]],
                "U_T exponent": [dim_exponents[temperature]],
                "U_t exponent": [dim_exponents[time]],
                "a-scale exponent": [
                    float(getattr(field.cosmo_factor.expr, "exp", 1.0))
                ],
                "h-scale exponent": [0.0],
            }

        return attributes_dict


[docs] def get_dimensions(dimension: unyt.dimensions) -> dict: """ Return exponents corresponding to base dimensions for given :mod:`unyt.dimensions`. Parameters ---------- dimension : unyt.dimensions Dimension for which we're identifying the exponents. Returns ------- dict Dictionary of exponents corresponding to each base dimension. Examples -------- .. code-block:: python >>> get_dimensions(unyt.dimensions.velocity) {(mass): 0.0, (length): 1.0, (time): -1.0, (temperature): 0.0, (angle): 0.0, (current_mks): 0.0, 1: 0.0, (luminous_intensity): 0.0, (logarithmic): 0.0} """ # create the return dict defaulting to exponent of 0 exp_dict = {dimension: 0.0 for dimension in unyt.dimensions.base_dimensions} # extract the exponent for each of the dimensions exp_dict.update( (k, float(v)) for k, v in (d.as_base_exp() for d in dimension.as_ordered_factors()) ) return exp_dict
[docs] def generate_getter( name: str, ) -> Callable[[__SWIFTWriterParticleDataset], cosmo_array]: """ Generate a getter for the :class:`~swiftsimio.objects.cosmo_array` for ``name``. Parameters ---------- name : str Name of data field. Returns ------- Callable Getter function that returns :class:`~swiftsimio.objects.cosmo_array` for ``name``. """ def getter(self: __SWIFTWriterParticleDataset) -> cosmo_array: """ Get the value of the dataset from the private name attribute. Parameters ---------- self : __SWIFTWriterParticleDataset The dataset the attribute is attached to. Returns ------- cosmo_array The value of the named data field. """ return getattr(self, f"_{name}") return getter
[docs] def generate_setter( name: str, dimensions: unyt.dimensions, unit_system: unyt.UnitSystem | str ) -> Callable[[__SWIFTWriterParticleDataset, cosmo_array], None]: """ Generate a function that sets ``self._<name>`` to the value that is passed to it. Parameters ---------- name : str String to set ``self._name`` to. dimensions : unyt.dimensions Physical dimension of ``self._name`` (for consistency check). unit_system : unyt.UnitSystem or str Unit system of ``self._name``. Returns ------- Callable Function to set ``self._name``. """ def setter(self: __SWIFTWriterParticleDataset, value: cosmo_array) -> None: """ Set the named dataset (private attribute) to a value. Makes consistency checks on the input: + the given units have the expected dimensions for this field + that values are unique and >=1 for ``particle_ids`` + that the scale factor attached to :class:`~swiftsimio.objects.cosmo_array` inputs matches the one for the top-level output metadata. Parameters ---------- self : __SWIFTWriterParticleDataset The dataset that the attribute is attached to. value : cosmo_array The value to set the attribute to. """ if hasattr(value, "cosmo_factor"): assert value.cosmo_factor.scale_factor == self.writer.scale_factor, ( f"The scale factor of {name} ({value.cosmo_factor.scale_factor}) does not" f" match the scale factor of the Writer ({self.writer.scale_factor})." ) if dimensions != 1: if isinstance(value, cosmo_array): if value.units.dimensions == dimensions: value.convert_to_base(unit_system) value.convert_to_comoving() setattr(self, f"_{name}", value) else: raise unyt.exceptions.InvalidUnitEquivalence( f"Convert to {name}", value.units.dimensions, dimensions ) else: raise TypeError(f"Provide {name} as swiftsimio.cosmo_array.") else: if name == "particle_ids": if any(value <= 0): raise ValueError(f"{self.particle_name}.particle_ids must be >= 1.") if np.unique(value).size != value.size: raise ValueError( f"{self.particle_name}.particle_ids must not have repeated IDs." ) setattr( self, f"_{name}", cosmo_array( value, unyt.dimensionless, comoving=True, scale_factor=self.writer.scale_factor, scale_exponent=0.0, ), ) return return setter
[docs] def generate_deleter(name: str) -> Callable[[__SWIFTWriterParticleDataset], None]: """ Generate a function that destroys ``self._<name>`` (sets it back to ``None``). Parameters ---------- name : str Name of object to be destroyed. Returns ------- Callable Function to delete ``self._<name>``. """ def deleter(self: __SWIFTWriterParticleDataset) -> None: """ Delete the named (private attribute) dataset. Parameters ---------- self : __SWIFTWriterParticleDataset The dataset the attribute is attached to. """ current_value = getattr(self, f"_{name}") del current_value setattr(self, f"_{name}", None) return return deleter
[docs] def generate_dataset( writer: "SWIFTSnapshotWriter", unit_system: unyt.UnitSystem | str, particle_type: int, ) -> __SWIFTWriterParticleDataset: """ Generate a :class:`~swiftsimio.snapshot_writer.SWIFTWriterParticleDataset`. We _must_ do the following _outside_ of the class itself, as one can assign properties to a _class_ but not _within_ a class dynamically. Here we loop through all of the possible properties in the metadata file. We then use the builtin property() function and some generators to create setters and getters for those properties. This will allow them to be accessed from outside by using ``SWIFTWriterParticleDataset(...).<name>``, where the name is, for example, ``coordinates``. Parameters ---------- writer : ~swiftsimio.snapshot_writer.SWIFTSnapshotWriter A reference to the writer that will contain the generated particle dataset writer. unit_system : unyt.UnitSystem or str Unit system of the dataset. particle_type : int The particle type of the dataset. Numbering convention is the same as SWIFT, with ``0`` corresponding to gas, etc. as usual. Returns ------- SWIFTWriterParticleDataset An empty dataset for the given particle type. """ particle_name = metadata.particle_types.particle_name_underscores[particle_type] particle_nice_name = metadata.particle_types.particle_name_class[particle_type] this_dataset_bases = (__SWIFTWriterParticleDataset, object) this_dataset_dict = {} for name, required_field_info in getattr( metadata.required_fields, particle_name ).items(): dimensions = required_field_info["dimensions"] this_dataset_dict[name] = property( generate_getter(name), generate_setter(name, dimensions, unit_system), generate_deleter(name), ) ThisDataset = type( f"{particle_nice_name}WriterDataset", this_dataset_bases, this_dataset_dict ) empty_dataset = ThisDataset(writer, unit_system, particle_type) return empty_dataset
[docs] class SWIFTSnapshotWriter(object): """ The SWIFT dataset writer. This is used to store all particle arrays and do some extra consistency checks and processing before writing a HDF5 file containing: + Fully consistent unit system + All required arrays for SWIFT to start + Required metadata (all automatic, apart from those required by ``__init__``) The written output can be read back in using :mod:`swiftsimio`, or used as initial conditions for a SWIFT simulation. Any of the usual datasets (``gas``, ``dark_matter``, ``stars``, ``black_holes``, ``sinks``, ``neutrinos``, ``boundary``) can be populated with data. If a dataset is populated, it must have all ``required_fields`` filled in. The required fields can be viewed, for example, with ``swiftsimio.metadata.writer.required_fields.gas.keys()``. The only exception is ``particle_ids``, that will be generated automatically if missing. Smoothing length estimates for uniform-density gas in a periodic box can also be auto-generated, but this has to be done explicitly (see example below). To fill in a required dataset, provide the data with a :class:`~swiftsimio.objects.cosmo_array`. For example: .. code-block:: python w.gas.masses = cosmo_array( np.ones(n_p, dtype=float) * 1e6, u.solMass, comoving=True, scale_factor=w.scale_factor, scale_exponent=0, ) Parameters ---------- unit_system : unyt.UnitSystem or str Unit system for dataset. boxsize : cosmo_array Size of simulation box and associated units. dimension : int, optional Dimensions (length of coordinate vector) of simulation. compress : bool, optional Flag to turn on gzip compression. extra_header : dict, optional Dictionary containing extra fields to write to the HDF5 ``Header`` group. scale_factor : np.float32 Scale factor associated with dataset. Examples -------- An example showing populating all required fields for the ``gas`` dataset: .. code-block:: python import numpy as np import unyt as u from swiftsimio import Writer, cosmo_array from swiftsimio.metadata.writer.unit_systems import cosmo_unit_system # number of gas particles n_p = 1000 # scale factor of 1.0 a = 1.0 # Box is 100 Mpc lbox = 100 boxsize = cosmo_array( [lbox, lbox, lbox], u.Mpc, comoving=True, scale_factor=a, scale_exponent=1, ) # Create the Writer object. cosmo_unit_system corresponds to default Gadget-like # units of 10^10 Msun, Mpc, and km/s w = Writer(unit_system=cosmo_unit_system, boxsize=boxsize, scale_factor=a) # Randomly spaced coordinates from 0 to lbox Mpc in each direction w.gas.coordinates = cosmo_array( np.random.rand(n_p, 3) * lbox, u.Mpc, comoving=True, scale_factor=w.scale_factor, scale_exponent=1, ) # Random velocities from 0 to 1 km/s w.gas.velocities = cosmo_array( np.random.rand(n_p, 3), u.km / u.s, comoving=True, scale_factor=w.scale_factor, scale_exponent=1, ) # Generate uniform masses as 10^6 solar masses for each particle w.gas.masses = cosmo_array( np.ones(n_p, dtype=float) * 1e6, u.solMass, comoving=True, scale_factor=w.scale_factor, scale_exponent=0, ) # Generate internal energy corresponding to 10^4 K w.gas.internal_energy = cosmo_array( np.ones(n_p, dtype=float) * 1e4 / 1e6, u.kb * u.K / u.solMass, comoving=True, scale_factor=w.scale_factor, scale_exponent=-2, ) # Generate initial guess for smoothing lengths based on mean inter-particle spacing w.gas.generate_smoothing_lengths() # w.gas.particle_ids can optionally be set, otherwise they are auto-generated # write the initial conditions out to a file w.write("ics.hdf5") """ def __init__( self, *, unit_system: unyt.UnitSystem | str = cosmo_units, boxsize: cosmo_array, dimension: int = 3, compress: bool = True, extra_header: dict | None = None, scale_factor: np.float32 = 1.0, ) -> None: if isinstance(unit_system, str): self.unit_system = unyt.unit_systems.unit_system_registry[unit_system] else: self.unit_system = unit_system try: if len(boxsize) != dimension: raise ValueError( f"boxsize must have length equal to number of dimensions, {boxsize=}," f" {dimension=}." ) except TypeError: # len() of unsized object raise ValueError( "boxsize must be a cosmo_array of length equal to spatial dimensions." ) self.boxsize = np.copy(boxsize, subok=True) self.boxsize.convert_to_base(self.unit_system) self.dimension = dimension self.compress = compress self.extra_header = extra_header self.create_particle_datasets() self.scale_factor = scale_factor return
[docs] def create_particle_datasets(self) -> None: """Create particle dataset for each particle type in the metadata.""" for number, name in metadata.particle_types.particle_name_underscores.items(): setattr( self, name, generate_dataset(self, self.unit_system, number), ) return
def _generate_ids(self, names_to_write: list) -> None: """ (Re-)generate all particle IDs for groups with names in ``names_to_write``. Parameters ---------- names_to_write : list List of groups to (re-)generate ids for. """ # Start particle ID's at 1. When running with hydro + DM, partID = 0 # is a no-no because the ID's are used as offsets in arrays. The code # will most likely crash at some point, and "part_verify_links()" will # complain about it if SWIFT is run with debugging checks on. already_used = 1 for name in names_to_write: if getattr(self, name)._particle_ids is not None: warn( f"Overwriting {name}.particle_ids, to prevent this provide particle " "IDs for all particle types, or none.", RuntimeWarning, ) getattr(self, name).particle_ids = cosmo_array( np.arange(already_used, getattr(self, name).n_part + already_used), unyt.dimensionless, comoving=True, scale_factor=self.scale_factor, scale_exponent=0.0, ) already_used += getattr(self, name).n_part return def _write_metadata(self, handle: h5py.File, names_to_write: list) -> None: """ Write metadata to file. Metadata written is based on the information passed to the object and the information in the particle groups. Parameters ---------- handle : h5py.File HDF5 file handle to write to. names_to_write : list List of metadata fields to write. """ part_types = ( max( [ _ptype_str_to_int(k) for k in metadata.particle.particle_name_underscores.keys() ] ) + 1 ) number_of_particles = [0] * part_types mass_table = [0.0] * part_types for number, name in metadata.particle_types.particle_name_underscores.items(): if name in names_to_write: number_of_particles[_ptype_str_to_int(number)] = getattr( self, name ).n_part mass_table[_ptype_str_to_int(number)] = getattr(self, name).masses[0] attrs = { "BoxSize": self.boxsize, "NumPart_Total": number_of_particles, "NumPart_Total_HighWord": [0] * 6, "Flag_Entropy_ICs": 0, "Dimension": np.array([self.dimension]), "Scale-factor": [self.scale_factor], # LEGACY but required for Gadget readers "NumFilesPerSnapshot": 1, "NumPart_ThisFile": number_of_particles, "MassTable": mass_table, } if self.extra_header is not None: attrs = {**attrs, **self.extra_header} header = handle.create_group("Header") for name, value in attrs.items(): header.attrs.create(name, value) return def _write_units(self, handle: h5py.File) -> None: """ Write the unit information to file. .. note:: We do not have support for units of current yet. Parameters ---------- handle : h5py.File HDF5 file to write units to. """ dim = unyt.dimensions cgs_base = unyt.unit_systems.cgs_unit_system.base_units base = self.unit_system.base_units def get_conversion(unit_type: Symbol) -> np.ndarray[float]: """ Find the conversion factor from base to cgs units. Parameters ---------- unit_type : sympy.Symbol The type of unit to convert (mass, length, etc.). Returns ------- np.ndarray[float] The conversion factor. """ # We need to find the correct unit (which is now stored as a sympy value, # why?!) and convert it to an unyt unit. our_unit = unyt.unit_object.Unit(base[unit_type]) cgs_unit = unyt.unit_object.Unit(cgs_base[unit_type]) conversion_factor = our_unit.get_conversion_factor(cgs_unit)[0] # We use the array because this is how swift outputs it, as a length # 1 array (rather than as a single float). return np.array([conversion_factor]) attrs = { "Unit mass in cgs (U_M)": get_conversion(dim.mass), "Unit length in cgs (U_L)": get_conversion(dim.length), "Unit time in cgs (U_t)": get_conversion(dim.time), "Unit current in cgs (U_I)": np.array([1.0]), "Unit temperature in cgs (U_T)": get_conversion(dim.temperature), } units = handle.create_group("Units") for name, value in attrs.items(): units.attrs.create(name, value) return
[docs] def write(self, filename: str) -> None: """ Write the information in the dataset to file. Parameters ---------- filename : str File to write to. """ names_to_write = [] generate_ids = False for name in metadata.particle_types.particle_name_underscores.values(): this_dataset = getattr(self, name) if not this_dataset.check_empty(): if this_dataset.check_consistent(): names_to_write.append(name) generate_ids = ( generate_ids or this_dataset.requires_particle_ids_before_write ) if generate_ids: self._generate_ids(names_to_write) else: all_ids = np.concatenate( [getattr(self, name).particle_ids for name in names_to_write] ) if np.unique(all_ids).size != all_ids.size: raise ValueError( "Particle IDs must be unique across all particle types." ) with h5py.File(filename, "w") as handle: self._write_metadata(handle, names_to_write) self._write_units(handle) for name in names_to_write: getattr(self, name).write_particle_group(handle, compress=self.compress) attrs = getattr(self, name).get_attributes() getattr(self, name).write_particle_group_metadata(handle, attrs) return