#!/usr/bin/env python
"""
This module defines the ``SageHdf5Data`` class. This class interfaces with the
:py:class:`~sage_analysis.model.Model` class to read in binary data written by **SAGE**.
The value of :py:attr:`~sage_analysis.model.Model.sage_output_format` is generally
``sage_hdf5`` if it is to be read with this class.
We refer to :doc:`../user/custom_data_classes` for more information about adding your
own Data Class to ingest data.
Author: Jacob Seiler.
"""
import warnings
from sage_analysis.model import Model
from sage_analysis.utils import read_generic_sage_params
from typing import Dict, Any, Optional
import logging
import numpy as np
import h5py
logger = logging.getLogger(__name__)
# DO NOT TOUCH #
sage_data_version = "1.00"
# DO NOT TOUCH #
[docs]class SageHdf5Data():
"""
Class intended to inteface with the :py:class:`~sage_analysis.model.Model` class to
ingest the data written by **SAGE**. It includes methods for reading the output
galaxies, setting cosmology etc. It is specifically written for when
:py:attr:`~sage_analysis.model.Model.sage_output_format` is ``sage_hdf5``.
"""
def __init__(self, model: Model, sage_file_to_read: str) -> None:
"""
Instantiates the Data Class for reading in **SAGE** HDF5 data. In particular,
opens up the file and ensures the data version matches the expected value.
Parameters
----------
model: :py:class:`~sage_analysis.model.Model` instance
The model that this data class is associated with; this class will read the
data for this model.
"""
logger.info("Reading using SAGE HDF5 output format.")
# Use the SAGE parameter file to generate a bunch of attributes.
sage_dict = self._read_sage_params(sage_file_to_read)
self.sage_model_dict = sage_dict
logger.info(f"The read SAGE parameters are {sage_dict}")
# The output data will be named via the parameter file with the ``.hdf5`` extension.
sage_data_path = f"{sage_dict['_base_sage_output_path']}.hdf5"
model._sage_data_path = sage_data_path
model._hdf5_file = h5py.File(sage_data_path, "r")
# Due to how attributes are created in C, they will need to be decoded to get cast to a string.
model.sage_version = model._hdf5_file["Header"]["Misc"].attrs["sage_version"].decode("ascii")
model.sage_data_version = model._hdf5_file["Header"]["Misc"].attrs["sage_data_version"].decode("ascii")
# Check that this module is current for the SAGE data version.
if model.sage_data_version != sage_data_version:
raise ValueError(
f"The 'sage_hdf5.py' module was written to be compatible with sage_data_version {sage_data_version}. "
f"Your version of SAGE HDF5 has data version {model.sage_data_version}. Please update your version of "
f"SAGE or submit an issue at https://github.com/sage-home/sage-model/issues"
)
# Finally, perform some checks to ensure that the data in ``model`` is compatible with our assumptions
# regarding the HDF5 file.
self._check_model_compatibility(model, sage_dict)
model._num_output_files = model._hdf5_file["Header"]["Misc"].attrs["num_cores"]
# The cores to analyze may have a default value. To allow for this, explicitly set these as if they were read
# from the parameter file.
self.sage_model_dict["_first_file_to_analyze"] = 0
self.sage_model_dict["_last_file_to_analyze"] = model._num_output_files - 1
[docs] def _check_model_compatibility(self, model: Model, sage_dict: Optional[Dict[str, Any]]) -> None:
"""
Ensures that the attributes in the :py:class:`~sage_analysis.model.Model` instance are compatible with the
variables read from the **SAGE** parameter file (if read at all).
Parameters
----------
model : :py:class:`~sage_analysis.model.Model` instance
The model that this data class is associated with.
sage_dict : optional, dict[str, Any]
A dictionary containing all of the fields read from the **SAGE** parameter file.
Warnings
--------
UserWarning
Raised if the user initialized ``Model`` with a value of
:py:attr:`~sage_analysis.model.Model.num_sage_output_files` that is different to the value specified in the
HDF5 file.
"""
if model._num_sage_output_files is not None:
logger.debug("It is not required to specify the number of SAGE output files when analysing HDF5 output.")
# Check to ensure that there isn't a mismatch in the number specified and the number in the file.
hdf5_num_files = model._hdf5_file["Header"]["Misc"].attrs["num_cores"]
if model._num_sage_output_files != hdf5_num_files:
warnings.warn(
f"The number of SAGE output files according to the master HDF5 file is {hdf5_num_files}."
f" However, ``analyze_sage_output`` was called with {model._num_sage_output_files}. "
f"Using the number of files from the HDF5 file as the correct value."
)
[docs] def determine_volume_analyzed(self, model: Model) -> float:
"""
Determines the volume analyzed. This can be smaller than the total simulation box.
Parameters
----------
model : :py:class:`~sage_analysis.model.Model` instance
The model that this data class is associated with.
Returns
-------
volume : float
The numeric volume being processed during this run of the code in (Mpc/h)^3.
"""
# Each output file may have processed a different fraction of the total volume. Hence to determine the total
# volume processed, loop through all of the files that we're analysing and use their volume fractions.
total_volume_frac_processed = 0.0
for core_idx in range(model._first_file_to_analyze, model._last_file_to_analyze + 1):
core_key = "Core_{0}".format(core_idx)
frac_processed = model._hdf5_file[core_key]["Header"]["Runtime"].attrs["frac_volume_processed"]
total_volume_frac_processed += frac_processed
logger.info(
f"{core_key} processed {frac_processed} fraction of the volume. Total is {total_volume_frac_processed}"
)
volume = pow(model._box_size, 3) * total_volume_frac_processed
logger.info(
f"Total fraction of volume processed is {total_volume_frac_processed}. Box size is "
f"{model._box_size}. The size of the volume processed is hence {volume} (Mpc/h)^3."
)
return volume
[docs] def _read_sage_params(self, sage_file_path: str) -> Dict[str, Any]:
"""
Read the **SAGE** parameter file.
Parameters
----------
sage_file_path: string
Path to the **SAGE** parameter file.
Returns
-------
model_dict: dict [str, var]
Dictionary containing the parameter names and their values.
"""
model_dict = read_generic_sage_params(sage_file_path)
return model_dict
[docs] def determine_num_gals(self, model: Model, snapshot: int, *args):
"""
Determines the number of galaxies in all cores for this model at the specified snapshot.
Parameters
----------
model: :py:class:`~sage_analysis.model.Model` class
The :py:class:`~sage_analysis.model.Model` we're reading data for.
snapshot : int
The snapshot we're analysing.
*args : Any
Extra arguments to allow other data class to pass extra arguments to their version of
``determine_num_gals``.
"""
ngals = 0
snap_key = f"Snap_{snapshot}"
for core_idx in range(model._first_file_to_analyze, model._last_file_to_analyze + 1):
core_key = f"Core_{core_idx}"
# Maybe this Snapshot didn't have any galaxies saved.
try:
ngals += model._hdf5_file[core_key][snap_key].attrs["num_gals"]
except KeyError:
ngals = 0
continue
model._num_gals_all_files = ngals
[docs] def read_gals(self, model, core_num, pbar=None, plot_galaxies=False, debug=False):
"""
Reads the galaxies of a single core at the specified
:py:attr:`~sage_analysis.model.Model.snapshot`.
Parameters
----------
model: :py:class:`~sage_analysis.model.Model` class
The :py:class:`~sage_analysis.model.Model` we're reading data for.
core_num: Integer
The core group we're reading.
pbar: ``tqdm`` class instance, optional
Bar showing the progress of galaxy reading. If ``None``, progress bar will
not show.
plot_galaxies : Boolean, optional
If set, plots and saves the 3D distribution of galaxies for this file.
debug : Boolean, optional
If set, prints out extra useful debug information.
Returns
-------
gals : ``h5py`` group
The galaxies for this file.
Notes
-----
``tqdm`` does not play nicely with printing to stdout. Hence we disable
the ``tqdm`` progress bar if ``debug=True``.
"""
core_key = "Core_{0}".format(core_num)
snap_key = "Snap_{0}".format(model.snapshot)
num_gals_read = model._hdf5_file[core_key][snap_key].attrs["num_gals"]
# If there aren't any galaxies, exit here.
if num_gals_read == 0:
return None
gals = model._hdf5_file[core_key][snap_key]
# If we're using the `tqdm` package, update the progress bar.
if pbar is not None:
pbar.set_postfix(file=core_key, refresh=False)
pbar.update(num_gals_read)
if debug:
print("")
print("Core {0}, Snapshot {1} contained {2} galaxies".format(core_num,
model.snapshot,
num_gals_read))
w = np.where(gals["StellarMass"][:] > 1.0)[0]
print("{0} of these galaxies have mass greater than 10^10Msun/h".format(len(w)))
if plot_galaxies:
# Show the distribution of galaxies in 3D.
from sage_analysis.plots import plot_spatial_3d
pos = np.empty((len(gals["Posx"]), 3), dtype=np.float32)
dim_name = ["x", "y", "z"]
for (dim_num, dim_name) in enumerate(dim_name):
key = "Pos{0}".format(dim_num)
pos[:, dim_num] = gals[key][:]
output_file = "./galaxies_{0}.{1}".format(core_num, model.plot_output_format)
plot_spatial_3d(pos, output_file, model.box_size)
return gals
[docs] def update_snapshot_and_data_path(self, model: Model, snapshot: int):
"""
Updates the :py:attr:`~sage_analysis.Model.snapshot` attribute to ``snapshot``. As the HDF5 file contains all
snapshot information, we do **not** need to update the path to the output data. However, ensure that the file
itself is still open.
"""
model._snapshot = snapshot
# If the file was closed, then ``__bool__()`` will return False.
if not model._hdf5_file.__bool__():
model._hdf5_file = h5py.File(model.sage_data_path, "r")
[docs] def close_file(self, model):
"""
Closes the open HDF5 file.
"""
model._hdf5_file.close()