Skip to content

Commit

Permalink
Merge pull request #1024 from simonsobs/dpkg-revamp
Browse files Browse the repository at this point in the history
Data Packaging Revamp
  • Loading branch information
kmharrington authored Dec 19, 2024
2 parents e5ecdd2 + ad293d8 commit d6e6aa2
Show file tree
Hide file tree
Showing 10 changed files with 1,651 additions and 371 deletions.
190 changes: 173 additions & 17 deletions sotodlib/io/bookbinder.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
import logging
import sys
import shutil
import yaml
import datetime as dt
from zipfile import ZipFile
import sotodlib
from sotodlib.site_pipeline.util import init_logger
from .datapkg_utils import walk_files


log = logging.getLogger('bookbinder')
Expand All @@ -28,6 +33,10 @@ class NoScanFrames(Exception):
"""Exception raised when we try and bind a book but the SMuRF file contains not Scan frames (so no detector data)"""
pass

class NoHKFiles(Exception):
"""Exception raised when we cannot find any HK data around the book time"""
pass

class NoMountData(Exception):
"""Exception raised when we cannot find mount data"""
pass
Expand Down Expand Up @@ -68,14 +77,12 @@ def setup_logger(logfile=None):

return log


def get_frame_iter(files):
"""
Returns a continuous iterator over frames for a list of files.
"""
return itertools.chain(*[core.G3File(f) for f in files])


def close_writer(writer):
"""
Closes out a G3FileWriter with an end-processing frame. If None is passed,
Expand All @@ -85,7 +92,6 @@ def close_writer(writer):
return
writer(core.G3Frame(core.G3FrameType.EndProcessing))


def next_scan(it):
"""
Returns the next Scan frame, along with any intermediate frames for an
Expand All @@ -98,7 +104,6 @@ def next_scan(it):
interm_frames.append(frame)
return None, interm_frames


class HkDataField:
"""
Class containing HK Data for a single field.
Expand Down Expand Up @@ -271,6 +276,13 @@ def __init__(self, files, book_id, hk_fields: Dict,
else:
self.log = log

if len(self.files) == 0:
if self.require_acu or self.require_hwp:
raise NoHKFiles("No HK files specified for book")
self.log.warning("No HK files found for book")
for fld in ['az', 'el', 'boresight', 'corotator_enc','az_mode', 'hwp_freq']:
setattr(self.hkdata, fld, None)

if self.require_acu and self.hkdata.az is None:
self.log.warning("No ACU data specified in hk_fields!")

Expand Down Expand Up @@ -480,7 +492,6 @@ def add_acu_summary_info(self, frame, t0, t1):
if k not in frame:
frame[k] = np.nan


class SmurfStreamProcessor:
def __init__(self, obs_id, files, book_id, readout_ids,
log=None, allow_bad_timing=False):
Expand Down Expand Up @@ -757,10 +768,10 @@ def bind(self, outdir, times, frame_idxs, file_idxs, pbar=False, ancil=None,
if pbar.n >= pbar.total:
pbar.close()


class BookBinder:
"""
Class for combining smurf and hk L2 data to create books.
Class for combining smurf and hk L2 data to create books containing detector
timestreams.
Parameters
----------
Expand Down Expand Up @@ -820,12 +831,12 @@ def __init__(self, book, obsdb, filedb, data_root, readout_ids, outdir, hk_field
self.data_root = data_root
self.hk_root = os.path.join(data_root, 'hk')
self.meta_root = os.path.join(data_root, 'smurf')
self.hkfiles = get_hk_files(self.hk_root,
book.start.timestamp(),
book.stop.timestamp())

self.obsdb = obsdb
self.outdir = outdir

assert book.schema==0, "obs/oper books only have schema=0"

self.max_samps_per_frame = max_samps_per_frame
self.max_file_size = max_file_size
self.ignore_tags = ignore_tags
Expand All @@ -851,6 +862,24 @@ def __init__(self, book, obsdb, filedb, data_root, readout_ids, outdir, hk_field
logfile = os.path.join(outdir, 'Z_bookbinder_log.txt')
self.log = setup_logger(logfile)

try:
self.hkfiles = get_hk_files(
self.hk_root,
book.start.timestamp(),
book.stop.timestamp()
)
except NoHKFiles as e:
if require_hwp or require_acu:
self.log.error(
"HK files are required if we require ACU or HWP data"
)
raise e
self.log.warning(
"Found no HK files during book time, binding anyway because "
"require_acu and require_hwp are False"
)
self.hkfiles = []

self.ancil = AncilProcessor(
self.hkfiles,
book.bid,
Expand Down Expand Up @@ -967,6 +996,34 @@ def copy_smurf_files_to_book(self):

self.meta_files = meta_files

def write_M_files(self, telescope, tube_config):
# write M_book file
m_book_file = os.path.join(self.outdir, "M_book.yaml")
book_meta = {}
book_meta["book"] = {
"type": self.book.type,
"schema_version": self.book.schema,
"book_id": self.book.bid,
"finalized_at": dt.datetime.utcnow().isoformat(),
}
book_meta["bookbinder"] = {
"codebase": sotodlib.__file__,
"version": sotodlib.__version__,
# leaving this in but KH doesn't know what it's supposed to be for
"context": "unknown",
}
with open(m_book_file, "w") as f:
yaml.dump(book_meta, f)

mfile = os.path.join(self.outdir, "M_index.yaml")
with open(mfile, "w") as f:
yaml.dump(
self.get_metadata(
telescope=telescope,
tube_config=tube_config,
), f
)

def get_metadata(self, telescope=None, tube_config={}):
"""
Returns metadata dict for the book
Expand Down Expand Up @@ -1091,6 +1148,108 @@ def bind(self, pbar=False):
self.log.info("Finished binding data. Exiting.")
return True

class TimeCodeBinder:
"""Class for building the timecode based books, smurf, stray, and hk books.
These books are built primarily just by copying specified files from level
2 locations to new locations at level 2.
"""

def __init__(
self, book, timecode, indir, outdir, file_list=None,
ignore_pattern=None,
):
self.book = book
self.timecode = timecode
self.indir = indir
self.outdir = outdir
self.file_list = file_list
if ignore_pattern is not None:
self.ignore_pattern = ignore_pattern
else:
self.ignore_pattern = []

if book.type == 'smurf' and book.schema > 0:
self.compress_output = True
else:
self.compress_output = False

def get_metadata(self, telescope=None, tube_config={}):
return {
"book_id": self.book.bid,
# dummy start and stop times
"start_time": float(self.timecode) * 1e5,
"stop_time": (float(self.timecode) + 1) * 1e5,
"telescope": telescope,
"type": self.book.type,
}

def write_M_files(self, telescope, tube_config):
# write M_book file

book_meta = {}
book_meta["book"] = {
"type": self.book.type,
"schema_version": self.book.schema,
"book_id": self.book.bid,
"finalized_at": dt.datetime.utcnow().isoformat(),
}
book_meta["bookbinder"] = {
"codebase": sotodlib.__file__,
"version": sotodlib.__version__,
# leaving this in but KH doesn't know what it's supposed to be for
"context": "unknown",
}
if self.compress_output:
with ZipFile(self.outdir, mode='a') as zf:
zf.writestr("M_book.yaml", yaml.dump(book_meta))
else:
m_book_file = os.path.join(self.outdir, "M_book.yaml")
with open(m_book_file, "w") as f:
yaml.dump(book_meta, f)

index = self.get_metadata(
telescope=telescope,
tube_config=tube_config,
)
if self.compress_output:
with ZipFile(self.outdir, mode='a') as zf:
zf.writestr("M_index.yaml", yaml.dump(index))
else:
mfile = os.path.join(self.outdir, "M_index.yaml")
with open(mfile, "w") as f:
yaml.dump(index, f)

def bind(self, pbar=False):
if self.compress_output:
if self.file_list is None:
self.file_list = walk_files(self.indir, include_suprsync=True)
ignore = shutil.ignore_patterns(*self.ignore_pattern)
to_ignore = ignore("", self.file_list)
self.file_list = sorted(
[f for f in self.file_list if f not in to_ignore]
)
with ZipFile(self.outdir, mode='x') as zf:
for f in self.file_list:
relpath = os.path.relpath(f, self.indir)
zf.write(f, arcname=relpath)
elif self.file_list is None:
shutil.copytree(
self.indir,
self.outdir,
ignore=shutil.ignore_patterns(
*self.ignore_pattern,
),
)
else:
if not os.path.exists(self.outdir):
os.makedirs(self.outdir)
for f in self.file_list:
relpath = os.path.relpath(f, self.indir)
path = os.path.join(self.outdir, relpath)
base, _ = os.path.split(path)
if not os.path.exists(base):
os.makedirs(base)
shutil.copy(f, os.path.join(self.outdir, relpath))

def fill_time_gaps(ts):
"""
Expand Down Expand Up @@ -1132,7 +1291,6 @@ def fill_time_gaps(ts):

return new_ts, ~m


_primary_idx_map = {}
def get_frame_times(frame, allow_bad_timing=False):
"""
Expand Down Expand Up @@ -1172,7 +1330,6 @@ def get_frame_times(frame, allow_bad_timing=False):
## don't change this error message. used in Imprinter CLI
raise TimingSystemOff("Timing counters not incrementing")


def split_ts_bits(c):
"""
Split up 64 bit to 2x32 bit
Expand All @@ -1183,7 +1340,6 @@ def split_ts_bits(c):
b = c & MAXINT
return a, b


def counters_to_timestamps(c0, c2):
s, ns = split_ts_bits(c2)

Expand All @@ -1193,7 +1349,6 @@ def counters_to_timestamps(c0, c2):
ts = np.round(c2 - (c0 / 480000) ) + c0 / 480000
return ts


def find_ref_idxs(refs, vs):
"""
Creates a mapping from a list of timestamps (vs) to a list of reference
Expand Down Expand Up @@ -1251,8 +1406,10 @@ def get_hk_files(hkdir, start, stop, tbuff=10*60):
m = (start-tbuff <= file_times) & (file_times < stop+tbuff)
if not np.any(m):
check = np.where( file_times <= start )
if len(check) < 1:
raise ValueError("Cannot find HK files we need")
if len(check) < 1 or len(check[0]) < 1:
raise NoHKFiles(
f"Cannot find HK files between {start} and {stop}"
)
fidxs = [check[0][-1]]
m[fidxs] = 1
else:
Expand Down Expand Up @@ -1371,7 +1528,6 @@ def find_frame_splits(ancil, t0=None, t1=None):
idxs = locate_scan_events(az.times[msk], az.data[msk], filter_window=100)
return az.times[msk][idxs]


def get_smurf_files(obs, meta_path, all_files=False):
"""
Returns a list of smurf files that should be copied into a book.
Expand Down
Loading

0 comments on commit d6e6aa2

Please sign in to comment.