diff --git a/dc1/noise_sim/compress_hdf5.py b/dc1/noise_sim/compress_hdf5.py new file mode 100755 index 00000000..24761424 --- /dev/null +++ b/dc1/noise_sim/compress_hdf5.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python3 + +""" +This script loads v0 uncompressed TOAST observations and writes v1 compressed files. +""" + +import os +import sys +import shutil +import re +import glob + +import datetime + +import argparse + +import numpy as np + +from astropy import units as u + +import toast +import toast.ops + +from toast.timing import gather_timers, dump, Timer + +from toast.observation import default_values as defaults + + +def parse_arguments(): + """ + Defines and parses the arguments for the script. + """ + parser = argparse.ArgumentParser( + description="Compress CMB-S4 simulation data" + ) + + parser.add_argument( + "--verify", + required=False, + action="store_true", + default=False, + help="Re-load the converted data and verify consistency", + ) + + parser.add_argument( + "--obs", + type=str, + required=False, + nargs="+", + help="One or more observation files", + ) + + # The operators we want to configure from the command line or a parameter file. + operators = list() + + # Parse all of the operator configuration + config, args, jobargs = toast.parse_config(parser, operators=operators) + + return config, args, jobargs + + +def main(): + env = toast.utils.Environment.get() + log = toast.utils.Logger.get() + env.enable_function_timers() + global_timer = toast.timing.GlobalTimers.get() + global_timer.start("compress HDF5 (total)") + + config, args, jobargs = parse_arguments() + + # Default group size + comm = toast.Comm() + + # Process each observation + for obs_path in args.obs: + obs_dir = os.path.dirname(obs_path) + file_root = os.path.splitext(obs_path)[0] + if comm.world_rank == 0: + print(f"Working on {obs_path}:") + backup = f"{file_root}_uncompressed.h5" + timer = Timer() + timer.start() + obs = toast.io.load_hdf5( + obs_path, + comm, + process_rows=comm.group_size, + meta=None, + detdata=None, + shared=None, + intervals=None, + detectors=None, + force_serial=False, + ) + + if comm.comm_world is not None: + comm.comm_world.barrier() + timer.stop() + if comm.world_rank == 0: + print(f" Load {obs_path} in {timer.seconds()} s", flush=True) + + if comm.world_rank == 0: + os.rename(obs_path, backup) + + if comm.comm_world is not None: + comm.comm_world.barrier() + + timer.start() + obf = toast.io.save_hdf5( + obs, + obs_dir, + meta=None, + detdata=[ + (defaults.det_data, {"type": "flac"}), + (defaults.det_flags, {"type": "gzip"}), + ], + shared=None, + intervals=None, + config=None, + times=defaults.times, + force_serial=False, + ) + if comm.comm_world is not None: + comm.comm_world.barrier() + timer.stop() + if comm.world_rank == 0: + print(f" Save {obs_path} in {timer.seconds()} s", flush=True) + + if obf != obs_path: + msg = f"Generated HDF5 ({obf}) does not match original " + msg += f"file name ({obs_path})" + raise RuntimeError(msg) + + if args.verify: + timer.start() + compare = toast.io.load_hdf5( + obs_path, + comm, + process_rows=comm.group_size, + ) + if comm.comm_world is not None: + comm.comm_world.barrier() + timer.stop() + if comm.world_rank == 0: + print( + f" Re-load {obs_path} for verification in {timer.seconds()} s", + flush=True + ) + + if compare != obs: + msg = f"Observation HDF5 verify failed:\n" + msg += f"Input = {obs}\n" + msg += f"Loaded = {compare}" + log.error(msg) + raise RuntimeError(msg) + elif comm.world_rank == 0: + print(f" Verification PASS", flush=True) + else: + if comm.world_rank == 0: + print(f" Skipping verification", flush=True) + + # Dump all the timing information to the output dir + + global_timer.stop("compress HDF5 (total)") + alltimers = gather_timers(comm=comm.comm_world) + if comm.world_rank == 0: + out = os.path.join(".", "timing") + dump(alltimers, out) + + +if __name__ == "__main__": + world, procs, rank = toast.mpi.get_world() + with toast.mpi.exception_guard(comm=world): + main() diff --git a/dc1/noise_sim/compress_hdf5.slurm b/dc1/noise_sim/compress_hdf5.slurm new file mode 100644 index 00000000..58c2ff25 --- /dev/null +++ b/dc1/noise_sim/compress_hdf5.slurm @@ -0,0 +1,68 @@ +#!/bin/bash +#SBATCH --qos=regular +#SBATCH --time=05:00:00 +#SBATCH --nodes=1 +#SBATCH --job-name=CMBS4_todcompress +#SBATCH --licenses=SCRATCH +#SBATCH --constraint=cpu +#SBATCH --account=mp107 + +# set parent directory corresponding to one frequency band +PARENT_DIRECTORY="/global/cfs/cdirs/cmbs4/dc/dc0/staging/noise_sim/outputs_rk/LAT0_CHLAT/f090" + +# set range of observations to compress by this script (inclusive) +START_INDEX=1 +END_INDEX=30 + +echo "Listing all observations in $PARENT_DIRECTORY" + +# list all observations in parent directory and save to a variable +SUBDIR_LIST=$(find "$PARENT_DIRECTORY" -mindepth 1 -maxdepth 1 -type d | sort) +# extract observations names for printing to console +SUBDIR_NAMES=$(echo "$SUBDIR_LIST" | xargs -I{} basename {}) +echo "Observations found: " +echo "$SUBDIR_NAMES" + +echo "Proceeding to compress observations indexed in range: $START_INDEX-$END_INDEX" +# select subset of observations (subdirectories) based on range and save to a variable +SELECTED_SUBDIRS=$(echo "$SUBDIR_LIST" | sed -n "${START_INDEX},${END_INDEX}p") +# extract selected observations names for printing to console +SELECTED_SUBDIR_NAMES=$(echo "$SELECTED_SUBDIRS" | xargs -I{} basename {}) +echo "Selected observations: " +echo "$SELECTED_SUBDIR_NAMES" + +# loop through selected subdirectories and process each one +for subdir in $SELECTED_SUBDIRS; do + echo "Processing observation: $(basename $subdir)" + # search for files with the expected starting keywords : 'RISING' or 'SETTING' + FILE_LIST=$(find "$subdir" -type f \( -name "RISING*" -o -name "SETTING*" \) -printf "%p ") + # extract file names for printing to console + echo "Files to compress: " + for filename in $FILE_LIST; do + echo $(basename $filename) + done + + # call compression script on the list of files + date + echo "Calling flac compression script ..." + srun -n 128 python compress_hdf5.py --verify --obs $FILE_LIST > "log_$(basename $subdir).txt" 2>&1 + + # if python script runs without error, delete backup files + if [ $? -eq 0 ]; then + echo "FLAC compression script ran successfully. Deleting backup files..." + if find "$subdir" -type f -name "*uncompressed.h5" -delete; then + echo "Backup files deleted successfully." + date + else + # If backup files deletion fails for some reason we stop the everything to avoid any risk of running out of disk memory. + echo "Error deleting backup files. Exiting loop over observations." + date + break + fi + else + echo "FLAC compression script encountered an error. Not deleting any files." + date + fi +done + +echo "Observation batch $START_INDEX-$END_INDEX processing in $(basename $PARENT_DIRECTORY) band done. Please verify log files." \ No newline at end of file diff --git a/dc1/noise_sim/convert_hdf5_to_spt3g.py b/dc1/noise_sim/convert_hdf5_to_spt3g.py new file mode 100755 index 00000000..388d6527 --- /dev/null +++ b/dc1/noise_sim/convert_hdf5_to_spt3g.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python3 + +""" +This script loads HDF5 format data and exports to SPT3G format data. +""" + +import os +import sys +import shutil +import re +import glob + +import datetime + +import argparse + +import numpy as np + +from astropy import units as u + +import toast +import toast.ops + +from toast.timing import gather_timers, dump + +from toast import spt3g as t3g +from spt3g import core as c3g + +from toast.observation import default_values as defaults + + +def parse_arguments(): + """ + Defines and parses the arguments for the script. + """ + parser = argparse.ArgumentParser( + description="Convert CMB-S4 simulated HDF5 data to SPT3G format" + ) + + parser.add_argument( + "--obs", + type=str, + required=False, + nargs="+", + help="One or more observation directories.", + ) + + # The operators we want to configure from the command line or a parameter file. + operators = list() + + # Parse all of the operator configuration + config, args, jobargs = toast.parse_config(parser, operators=operators) + + return config, args, jobargs + + +def main(): + env = toast.utils.Environment.get() + log = toast.utils.Logger.get() + env.enable_function_timers() + global_timer = toast.timing.GlobalTimers.get() + global_timer.start("convert_hdf5_to_spt3g (total)") + + config, args, jobargs = parse_arguments() + + # Use a group size of one + comm = toast.Comm(groupsize=jobargs.group_size) + + # Create the (initially empty) data + data = toast.Data(comm=comm) + + # Set up temporary symlinks and output + nowstr = datetime.datetime.now(datetime.timezone.utc).isoformat() + input = f"hdf5_{nowstr}" + output = f"spt3g_{nowstr}" + + if comm.world_rank == 0: + os.makedirs(input) + os.makedirs(output) + + obsloc = dict() + + for odir in args.obs: + obsdir = os.path.basename(os.path.normpath(odir)) + # Look for the data file + obsdata = glob.glob(os.path.join(odir, "RISING*.h5")) + if len(obsdata) == 0: + obsdata = glob.glob(os.path.join(odir, "SETTING*.h5")) + if len(obsdata) == 0: + obsdata = glob.glob(os.path.join(odir, "POLE*.h5")) + if len(obsdata) != 1: + raise RuntimeError(f"Did not find exactly one data file for {odir}") + obsdata = os.path.abspath(obsdata[0]) + obsdatafile = os.path.basename(obsdata) + obsname = re.sub(r"_\d+\.h5", "", obsdatafile) + obsloc[obsname] = obsdir + log.info_rank( + f"Found observation data {obsdatafile}", + comm=comm.comm_world + ) + if comm.world_rank == 0: + os.symlink(obsdata, os.path.join(input, obsdatafile)) + + # Load the data. + loader = toast.ops.LoadHDF5( + volume=input, + process_rows=1, + ) + log.info_rank( + f"Loading HDF5 data from {loader.volume}", comm=comm.comm_world + ) + loader.apply(data) + + # Build up the lists of objects to export from the first observation + + noise_models = list() + meta_arrays = list() + shared = list() + detdata = list() + intervals = list() + + msg = "Exporting observation fields:" + + ob = data.obs[0] + for k, v in ob.shared.items(): + g3name = f"shared_{k}" + if re.match(r".*boresight.*", k) is not None: + # These are quaternions + msg += f"\n (shared): {k} (quaternions)" + shared.append((k, g3name, c3g.G3VectorQuat)) + elif k == defaults.times: + # Timestamps are handled separately + continue + else: + msg += f"\n (shared): {k}" + shared.append((k, g3name, None)) + for k, v in ob.detdata.items(): + msg += f"\n (detdata): {k}" + if k == "signal": + # We are going to convert this below. Append + # the new name to the list. + detdata.append(("signal_f32", k, None)) + else: + detdata.append((k, k, None)) + for k, v in ob.intervals.items(): + msg += f"\n (intervals): {k}" + intervals.append((k, k)) + for k, v in ob.items(): + if isinstance(v, toast.noise.Noise): + msg += f"\n (noise): {k}" + noise_models.append((k, k)) + elif isinstance(v, np.ndarray) and len(v.shape) > 0: + if isinstance(v, u.Quantity): + raise NotImplementedError("Writing array quantities not yet supported") + msg += f"\n (meta arr): {k}" + meta_arrays.append((k, k)) + else: + msg += f"\n (meta): {k}" + + log.info_rank(msg, comm=comm.comm_world) + + # Convert the 8-byte detector data to 4-byte. + + log.info_rank("Converting signal to 4-byte floats", comm=comm.comm_world) + for ob in data.obs: + ob.detdata.create("signal_f32", dtype=np.float32) + for d in ob.local_detectors: + ob.detdata["signal_f32"][d, :] = ob.detdata["signal"][d, :] + del ob.detdata["signal"] + + # Export the data + + meta_exporter = t3g.export_obs_meta( + noise_models=noise_models, + meta_arrays=meta_arrays, + ) + data_exporter = t3g.export_obs_data( + shared_names=shared, + det_names=detdata, + interval_names=intervals, + compress=True, + ) + exporter = t3g.export_obs( + meta_export=meta_exporter, + data_export=data_exporter, + export_rank=0, + ) + + save3g = toast.ops.SaveSpt3g( + directory=output, + framefile_mb=1000, + obs_export=exporter, + purge=True, + ) + + log.info_rank( + f"Exporting SPT3G data to {save3g.directory}", comm=comm.comm_world + ) + save3g.apply(data) + + if comm.world_rank == 0: + # Move spt3g outputs to the observation directories + for out3g in obsloc.keys(): + os.rename(os.path.join(output, out3g), os.path.join(obsloc[out3g], "spt3g")) + # Remove the input symlinks + shutil.rmtree(input) + + # Dump all the timing information to the output dir + + global_timer.stop("convert_hdf5_to_spt3g (total)") + alltimers = gather_timers(comm=comm.comm_world) + if comm.world_rank == 0: + out = os.path.join(output, "timing") + dump(alltimers, out) + + +if __name__ == "__main__": + world, procs, rank = toast.mpi.get_world() + with toast.mpi.exception_guard(comm=world): + main()