From ad293d8c380c7c33a8ad4d359bfab668745be739 Mon Sep 17 00:00:00 2001 From: Katie Harrington Date: Sun, 6 Oct 2024 15:54:48 +0000 Subject: [PATCH] Large revamp of the data packaging system to setup for automated deletion of level 2 and staged data A whole bunch of updates that are complete enough and should probably be merged into main since we're now at a point of running this on-site and deployments for deletion are mostly ready to be automated. Summary of all the changes: DataPackage and cleanup_level2 datapkg_completion module has a DataPackage class I built to work with both G3tSmurf, G3tHK, and Imprinter at the same time to go through and do the checks necessary to make sure we're ready to delete data from each timecode. re-write cleanup level 2 to use the DataPackage functions and to run deletion at three different phases Phase 1 - Completion: go through a timecode folder and make sure every single book that should exist does (planned for a 14 day lag) Phase 2 - delete staged files as long as completion is True (planned for a 14 day lag) Phase 3 - level 2 files. When one month has passed and there are at least two book copies (one on-site or not) that have the same checksum. (planned for a 28 day lag) Bookbinder Add ability to bind books without HK files if flags are set correctly Create a TimeCodeBinder in bookbinder.py to bind the timecode books. This replaces a smaller fake binder that was being used in imprinter. This binder has some compression capability that is used for smurf books (but not enough to go through and change operations books). This also had to include moving where some functions are defined / called for the book metadata. No changes were made to the output obs/oper book metadata Imprinter updates to Imprinter to check with librarian about the status of off-site books Fix LAT incompletion querying. closes Incomplete observation finding doesn't work for LAT #938 new functionality for finding which books belong in stray books, but resulting books should still be the same Add a new schema column to the book table to track schema of the different books. This is now used for smurf books since we have a new compressed version that is schema=1 get_files_for_book now works for all book types function to find if there are any level 2 observations that are not registered into books load_smurf tweaks to how the file and observation deletion work G3tSmurf now tracks it's instance of G3tHK functions to find files on disk that are not in the database and that are in the database but are not linked to level 2 observations. G3tHK Significantly reduces the size of the g3thk databases, now we have file entries but only the necessary smurf-related agents get added to hkagents and hkfields Databases on-site were purged of all the non-UFM related fields. closes G3HKDb error handling #698 Imprinter CLI update autofix to include a time range sqlite int fix and use in_ for queries add all_slots as a property. closes #938 add method to check if librarian has the book offsite add lvl2 data folder to imprinter and generate smurf book file lists add many functions for checking completion of a time period Squashed commit of the following: Add DataPackaging class for going throught, checking completion and running time-code based deletion. HK and verification through the librarian are optional. G3tHK Update: Trim Updates g3thk to only query fields for agents from the site-pipeline configs; i.e., pysmurf monitor and supersync to aid the data packaging tools from the level2 side, and there fore reduce g3thk db size actually push non hard-coded version g3thk trim add docstring and remove prints and commented lines push non hardcoded version for getting all iids and not the first one in the config file updates to allow for book binding without HK data update g3thk connection set new hk connection to be general account for hk databases without detector data add debug message fix debug message fix DataPackaging module to deal with imprinters with no detector data updated to start trying to scriptify the cleanup change logger name finish variable name changes fix typos and update logging levels more log updates move per-book checks into imprinter create compressed smurf books actually ignore the ignored files setup autofix to accept a time range and fix level2 delete dry run behavior include HK database checking completion fix logic to not accidentally skip a setup update deletes based on new planning replace self move level 2 checks to imprinter actually check if timecode is deletable use the correct list name fix error message smurf books with new schema need new delete method remove un-needed assertion remove old code, add more book counts to monitor without agents just look at file time check finalization time before registering timecode books raise logging value and update defaults change staged vs level 2 order --- sotodlib/io/bookbinder.py | 190 +++++- sotodlib/io/datapkg_completion.py | 715 +++++++++++++++++++++ sotodlib/io/datapkg_utils.py | 32 + sotodlib/io/g3thk_db.py | 61 +- sotodlib/io/imprinter.py | 579 +++++++++-------- sotodlib/io/imprinter_cli.py | 21 +- sotodlib/io/imprinter_utils.py | 44 +- sotodlib/io/load_smurf.py | 119 +++- sotodlib/site_pipeline/cleanup_level2.py | 219 ++++++- sotodlib/site_pipeline/update_book_plan.py | 42 +- 10 files changed, 1651 insertions(+), 371 deletions(-) create mode 100644 sotodlib/io/datapkg_completion.py diff --git a/sotodlib/io/bookbinder.py b/sotodlib/io/bookbinder.py index 41a2704d2..a1bdd1e63 100644 --- a/sotodlib/io/bookbinder.py +++ b/sotodlib/io/bookbinder.py @@ -13,7 +13,12 @@ import logging import sys import shutil +import yaml +import datetime as dt +from zipfile import ZipFile +import sotodlib from sotodlib.site_pipeline.util import init_logger +from .datapkg_utils import walk_files log = logging.getLogger('bookbinder') @@ -28,6 +33,10 @@ class NoScanFrames(Exception): """Exception raised when we try and bind a book but the SMuRF file contains not Scan frames (so no detector data)""" pass +class NoHKFiles(Exception): + """Exception raised when we cannot find any HK data around the book time""" + pass + class NoMountData(Exception): """Exception raised when we cannot find mount data""" pass @@ -68,14 +77,12 @@ def setup_logger(logfile=None): return log - def get_frame_iter(files): """ Returns a continuous iterator over frames for a list of files. """ return itertools.chain(*[core.G3File(f) for f in files]) - def close_writer(writer): """ Closes out a G3FileWriter with an end-processing frame. If None is passed, @@ -85,7 +92,6 @@ def close_writer(writer): return writer(core.G3Frame(core.G3FrameType.EndProcessing)) - def next_scan(it): """ Returns the next Scan frame, along with any intermediate frames for an @@ -98,7 +104,6 @@ def next_scan(it): interm_frames.append(frame) return None, interm_frames - class HkDataField: """ Class containing HK Data for a single field. @@ -271,6 +276,13 @@ def __init__(self, files, book_id, hk_fields: Dict, else: self.log = log + if len(self.files) == 0: + if self.require_acu or self.require_hwp: + raise NoHKFiles("No HK files specified for book") + self.log.warning("No HK files found for book") + for fld in ['az', 'el', 'boresight', 'corotator_enc','az_mode', 'hwp_freq']: + setattr(self.hkdata, fld, None) + if self.require_acu and self.hkdata.az is None: self.log.warning("No ACU data specified in hk_fields!") @@ -480,7 +492,6 @@ def add_acu_summary_info(self, frame, t0, t1): if k not in frame: frame[k] = np.nan - class SmurfStreamProcessor: def __init__(self, obs_id, files, book_id, readout_ids, log=None, allow_bad_timing=False): @@ -757,10 +768,10 @@ def bind(self, outdir, times, frame_idxs, file_idxs, pbar=False, ancil=None, if pbar.n >= pbar.total: pbar.close() - class BookBinder: """ - Class for combining smurf and hk L2 data to create books. + Class for combining smurf and hk L2 data to create books containing detector + timestreams. Parameters ---------- @@ -820,12 +831,12 @@ def __init__(self, book, obsdb, filedb, data_root, readout_ids, outdir, hk_field self.data_root = data_root self.hk_root = os.path.join(data_root, 'hk') self.meta_root = os.path.join(data_root, 'smurf') - self.hkfiles = get_hk_files(self.hk_root, - book.start.timestamp(), - book.stop.timestamp()) + self.obsdb = obsdb self.outdir = outdir + assert book.schema==0, "obs/oper books only have schema=0" + self.max_samps_per_frame = max_samps_per_frame self.max_file_size = max_file_size self.ignore_tags = ignore_tags @@ -851,6 +862,24 @@ def __init__(self, book, obsdb, filedb, data_root, readout_ids, outdir, hk_field logfile = os.path.join(outdir, 'Z_bookbinder_log.txt') self.log = setup_logger(logfile) + try: + self.hkfiles = get_hk_files( + self.hk_root, + book.start.timestamp(), + book.stop.timestamp() + ) + except NoHKFiles as e: + if require_hwp or require_acu: + self.log.error( + "HK files are required if we require ACU or HWP data" + ) + raise e + self.log.warning( + "Found no HK files during book time, binding anyway because " + "require_acu and require_hwp are False" + ) + self.hkfiles = [] + self.ancil = AncilProcessor( self.hkfiles, book.bid, @@ -967,6 +996,34 @@ def copy_smurf_files_to_book(self): self.meta_files = meta_files + def write_M_files(self, telescope, tube_config): + # write M_book file + m_book_file = os.path.join(self.outdir, "M_book.yaml") + book_meta = {} + book_meta["book"] = { + "type": self.book.type, + "schema_version": self.book.schema, + "book_id": self.book.bid, + "finalized_at": dt.datetime.utcnow().isoformat(), + } + book_meta["bookbinder"] = { + "codebase": sotodlib.__file__, + "version": sotodlib.__version__, + # leaving this in but KH doesn't know what it's supposed to be for + "context": "unknown", + } + with open(m_book_file, "w") as f: + yaml.dump(book_meta, f) + + mfile = os.path.join(self.outdir, "M_index.yaml") + with open(mfile, "w") as f: + yaml.dump( + self.get_metadata( + telescope=telescope, + tube_config=tube_config, + ), f + ) + def get_metadata(self, telescope=None, tube_config={}): """ Returns metadata dict for the book @@ -1091,6 +1148,108 @@ def bind(self, pbar=False): self.log.info("Finished binding data. Exiting.") return True +class TimeCodeBinder: + """Class for building the timecode based books, smurf, stray, and hk books. + These books are built primarily just by copying specified files from level + 2 locations to new locations at level 2. + """ + + def __init__( + self, book, timecode, indir, outdir, file_list=None, + ignore_pattern=None, + ): + self.book = book + self.timecode = timecode + self.indir = indir + self.outdir = outdir + self.file_list = file_list + if ignore_pattern is not None: + self.ignore_pattern = ignore_pattern + else: + self.ignore_pattern = [] + + if book.type == 'smurf' and book.schema > 0: + self.compress_output = True + else: + self.compress_output = False + + def get_metadata(self, telescope=None, tube_config={}): + return { + "book_id": self.book.bid, + # dummy start and stop times + "start_time": float(self.timecode) * 1e5, + "stop_time": (float(self.timecode) + 1) * 1e5, + "telescope": telescope, + "type": self.book.type, + } + + def write_M_files(self, telescope, tube_config): + # write M_book file + + book_meta = {} + book_meta["book"] = { + "type": self.book.type, + "schema_version": self.book.schema, + "book_id": self.book.bid, + "finalized_at": dt.datetime.utcnow().isoformat(), + } + book_meta["bookbinder"] = { + "codebase": sotodlib.__file__, + "version": sotodlib.__version__, + # leaving this in but KH doesn't know what it's supposed to be for + "context": "unknown", + } + if self.compress_output: + with ZipFile(self.outdir, mode='a') as zf: + zf.writestr("M_book.yaml", yaml.dump(book_meta)) + else: + m_book_file = os.path.join(self.outdir, "M_book.yaml") + with open(m_book_file, "w") as f: + yaml.dump(book_meta, f) + + index = self.get_metadata( + telescope=telescope, + tube_config=tube_config, + ) + if self.compress_output: + with ZipFile(self.outdir, mode='a') as zf: + zf.writestr("M_index.yaml", yaml.dump(index)) + else: + mfile = os.path.join(self.outdir, "M_index.yaml") + with open(mfile, "w") as f: + yaml.dump(index, f) + + def bind(self, pbar=False): + if self.compress_output: + if self.file_list is None: + self.file_list = walk_files(self.indir, include_suprsync=True) + ignore = shutil.ignore_patterns(*self.ignore_pattern) + to_ignore = ignore("", self.file_list) + self.file_list = sorted( + [f for f in self.file_list if f not in to_ignore] + ) + with ZipFile(self.outdir, mode='x') as zf: + for f in self.file_list: + relpath = os.path.relpath(f, self.indir) + zf.write(f, arcname=relpath) + elif self.file_list is None: + shutil.copytree( + self.indir, + self.outdir, + ignore=shutil.ignore_patterns( + *self.ignore_pattern, + ), + ) + else: + if not os.path.exists(self.outdir): + os.makedirs(self.outdir) + for f in self.file_list: + relpath = os.path.relpath(f, self.indir) + path = os.path.join(self.outdir, relpath) + base, _ = os.path.split(path) + if not os.path.exists(base): + os.makedirs(base) + shutil.copy(f, os.path.join(self.outdir, relpath)) def fill_time_gaps(ts): """ @@ -1132,7 +1291,6 @@ def fill_time_gaps(ts): return new_ts, ~m - _primary_idx_map = {} def get_frame_times(frame, allow_bad_timing=False): """ @@ -1172,7 +1330,6 @@ def get_frame_times(frame, allow_bad_timing=False): ## don't change this error message. used in Imprinter CLI raise TimingSystemOff("Timing counters not incrementing") - def split_ts_bits(c): """ Split up 64 bit to 2x32 bit @@ -1183,7 +1340,6 @@ def split_ts_bits(c): b = c & MAXINT return a, b - def counters_to_timestamps(c0, c2): s, ns = split_ts_bits(c2) @@ -1193,7 +1349,6 @@ def counters_to_timestamps(c0, c2): ts = np.round(c2 - (c0 / 480000) ) + c0 / 480000 return ts - def find_ref_idxs(refs, vs): """ Creates a mapping from a list of timestamps (vs) to a list of reference @@ -1251,8 +1406,10 @@ def get_hk_files(hkdir, start, stop, tbuff=10*60): m = (start-tbuff <= file_times) & (file_times < stop+tbuff) if not np.any(m): check = np.where( file_times <= start ) - if len(check) < 1: - raise ValueError("Cannot find HK files we need") + if len(check) < 1 or len(check[0]) < 1: + raise NoHKFiles( + f"Cannot find HK files between {start} and {stop}" + ) fidxs = [check[0][-1]] m[fidxs] = 1 else: @@ -1371,7 +1528,6 @@ def find_frame_splits(ancil, t0=None, t1=None): idxs = locate_scan_events(az.times[msk], az.data[msk], filter_window=100) return az.times[msk][idxs] - def get_smurf_files(obs, meta_path, all_files=False): """ Returns a list of smurf files that should be copied into a book. diff --git a/sotodlib/io/datapkg_completion.py b/sotodlib/io/datapkg_completion.py new file mode 100644 index 000000000..60292bbc7 --- /dev/null +++ b/sotodlib/io/datapkg_completion.py @@ -0,0 +1,715 @@ +import os +import yaml +import logging +import shutil +import numpy as np +import datetime as dt +from sqlalchemy import or_, and_, not_ +from collections import OrderedDict + +from .load_smurf import ( + TimeCodes, + SupRsyncType, + Finalize, + SmurfStatus, + logger as smurf_log +) +from .imprinter import ( + Books, + Imprinter, + BOUND, + UNBOUND, + UPLOADED, + FAILED, + WONT_BIND, + DONE, + SMURF_EXCLUDE_PATTERNS, +) +import sotodlib.io.imprinter_utils as utils +from .imprinter_cli import autofix_failed_books +from .datapkg_utils import walk_files, just_suprsync + +from .bookbinder import log as book_logger + +def combine_loggers(imprint, fname=None): + log_list = [imprint.logger, smurf_log, book_logger] + logger = logging.getLogger("DataPackaging") + logger.setLevel(logging.DEBUG) + + formatter = logging.Formatter( + '%(levelname)s - %(name)s - %(message)s' + ) + # Create a file handler + if fname is not None: + handler = logging.FileHandler(fname) + handler.setLevel(logging.DEBUG) + handler.setFormatter(formatter) + logger.addHandler(handler) + [l.addHandler(handler) for l in log_list] + + # Create a stream handler to print logs to the console + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) # You can set the desired log level for console output + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + return logger + +class DataPackaging: + def __init__(self, platform, log_filename=None): + self.platform = platform + self.imprint = Imprinter.for_platform(platform) + self.logger = combine_loggers(self.imprint, fname=log_filename) + self.session = self.imprint.get_session() + if self.imprint.build_det: + self.g3session, self.SMURF = self.imprint.get_g3tsmurf_session(return_archive=True) + else: + self.g3session = None + self.SMURF = None + self.HK = self.imprint.get_g3thk() + + def get_first_timecode_on_disk(self, include_hk=True): + tc = 50000 + if self.imprint.build_det: + tc = min([ + tc, + int(sorted(os.listdir(self.SMURF.meta_path))[0]), + int(sorted(os.listdir(self.SMURF.archive_path))[0]), + ]) + if include_hk: + tc = min([ + tc, + int(sorted(os.listdir(self.HK.hkarchive_path))[0]) + ]) + if tc == 50000: + raise ValueError(f"Found no timecode folders for {self.platform}") + return tc + + def get_first_timecode_in_staged(self, include_hk=True): + q = self.session.query(Books).filter( + Books.status == UPLOADED, + ) + if not include_hk: + q = q.filter(Books.type != 'hk') + first = q.order_by(Books.start).first() + tc = int( first.start.timestamp() // 1e5) + return tc + + def all_files_in_timecode(self, timecode, include_hk=True): + flist = [] + if self.imprint.build_det: + stc = os.path.join(self.SMURF.meta_path, str(timecode)) + flist.extend(walk_files(stc, include_suprsync=True)) + ttc = os.path.join(self.SMURF.archive_path, str(timecode)) + flist.extend(walk_files(ttc, include_suprsync=True)) + if include_hk: + htc = os.path.join(self.HK.hkarchive_path, str(timecode)) + flist.extend(walk_files(htc, include_suprsync=True)) + return flist + + def get_suprsync_files(self, timecode): + if not self.imprint.build_det: + return [] + stc = os.path.join(self.SMURF.meta_path, str(timecode)) + ttc = os.path.join(self.SMURF.archive_path, str(timecode)) + flist = [] + + if not os.path.exists(stc) and not os.path.exists(ttc): + return flist + if os.path.exists(ttc) and 'suprsync' in os.listdir(ttc): + for root, _, files in os.walk(os.path.join(ttc, 'suprsync')): + for name in files: + flist.append(os.path.join(ttc, root, name)) + if os.path.exists(stc) and 'suprsync' in os.listdir(stc): + for root, _, files in os.walk(os.path.join(stc, 'suprsync')): + for name in files: + flist.append(os.path.join(stc, root, name)) + return flist + + def check_hk_registered(self, timecode, complete): + min_ctime = timecode*1e5 + max_ctime = (timecode+1)*1e5 + + self.HK.add_hkfiles( + min_ctime=min_ctime, max_ctime=max_ctime, + show_pb=False, update_last_file=False, + ) + self.imprint.register_hk_books( + min_ctime=min_ctime, + max_ctime=max_ctime, + ) + # check the hk book is registered + book = self.session.query(Books).filter( + Books.bid == f"hk_{timecode}_{self.platform}" + ).one_or_none() + if book is None: + complete[0] = False + complete[1] += f"HK book hk_{timecode}_{self.platform} missing\n" + elif book.status == UNBOUND: + try: + self.imprint.bind_book(book) + except: + self.logger.warning(f"Failed to bind {book.bid}") + if book.status < BOUND: + complete[0] = False + complete[1] += f"Book hk_{timecode}_{self.platform} not bound" + return complete + + def make_timecode_complete( + self, timecode, try_binding_books=True, try_single_obs=True, + include_hk=True, + ): + """ + Carefully go through an entire timecode and check that the data packaging as + complete as it can be. The verification will also try and fix any errors + found in the system. Updating databases, registering books, and binding + books if try_binding_books is True + + Arguments + ---------- + timecode: int + 5-digit ctime to check for completion + try_binding_books: bool + if true, go through and try to bind any newly registered books + try_single_obs: bool + if true, tries to register any missing observations as single wafer + observations if registering as multi-wafer observations fails. This + happens sometimes if the stream lengths are very close to the + minimum overlap time + include_hk: bool + if true, also checkes everything related to hk + """ + + complete = [True, ""] + min_ctime = timecode*1e5 + max_ctime = (timecode+1)*1e5 + + if not self.imprint.build_det: + ## no detector data tracked by imprinter + if include_hk: + return self.check_hk_registered(timecode, complete) + else: + self.logger.warning( + f"No detector data built for platform " + f"{self.imprint.daq_node} and not checking HK. Nothing to " + "check for completion" + ) + return complete + + has_smurf, has_timestreams = True, True + stc = os.path.join(self.SMURF.meta_path, str(timecode)) + ttc = os.path.join(self.SMURF.archive_path, str(timecode)) + + if not os.path.exists(stc): + self.logger.debug(f"TC {timecode}: No level 2 smurf folder") + has_smurf = False + if not os.path.exists(ttc): + self.logger.debug(f"TC {timecode}: No level 2 timestream folder") + has_timestreams = False + + if os.path.exists(ttc) and just_suprsync(ttc): + self.logger.info( + f"TC {timecode}: Level 2 timestreams is only suprsync" + ) + has_timestreams = False + if os.path.exists(stc) and just_suprsync(stc): + self.logger.info(f"TC {timecode}: Level 2 smurf is only suprsync") + has_smurf = False + + if not has_smurf and not has_timestreams: + return complete + if not has_smurf and has_timestreams: + self.logger.error(f"TC {timecode}: Has timestreams folder without smurf!") + + overall_final_ctime = self.SMURF.get_final_time( + self.imprint.all_slots, check_control=False + ) + tcode_limit = int(overall_final_ctime//1e5) + if timecode+1 > tcode_limit: + raise ValueError( + f"We cannot check files from {timecode} because finalization time " + f"is {overall_final_ctime}" + ) + + self.logger.info(f"Checking Timecode {timecode} for completion") + ## check for files on disk to be in database + missing_files = self.SMURF.find_missing_files( + timecode, session=self.g3session + ) + if len(missing_files) > 0: + self.logger.warning( + f"{len(missing_files)} files not in G3tSmurf" + ) + self.SMURF.index_metadata( + min_ctime=min_ctime, + max_ctime=max_ctime, + session=self.g3session + ) + self.SMURF.index_archive( + min_ctime=min_ctime, + max_ctime=max_ctime, + show_pb=False, + session=self.g3session + ) + self.SMURF.index_timecodes( + min_ctime=min_ctime, + max_ctime=max_ctime, + session=self.g3session + ) + still_missing = len( + self.SMURF.find_missing_files(timecode, session=self.g3session) + ) + if still_missing>0: + msg = f"{still_missing} file(s) were not able to be added to the " \ + "G3tSmurf database." + self.logger.error(msg) + complete[0] = False + complete[1] += msg+"\n" + else: + self.logger.debug("All files on disk are in G3tSmurf database") + + ## check for level 2 files to be assigned to level 2 observations + missing_obs = self.SMURF.find_missing_files_from_obs( + timecode, session=self.g3session + ) + if len(missing_obs) > 0: + msg = f"{len(missing_obs)} files not assigned lvl2 obs" + no_tags = 0 + for fpath in missing_obs: + if fpath[-6:] != "000.g3": + msg += f"\n{fpath} was not added to a larger observation." \ + " Will be fixed later if possible." + else: + status = SmurfStatus.from_file(fpath) + if len(status.tags)==0: + no_tags += 1 + else: + msg += f"\Trying to add {fpath} to database" + self.SMURF.add_file( + fpath, self.g3session, overwrite=True + ) + if no_tags > 0: + msg += f"\n{no_tags} of the files have no tags, so these should "\ + "not be observations." + self.logger.warning(msg) + + ## if the stray book has already been bound then we cannot add + ## more detector books without causing problems + add_new_detector_books = True + stray_book = self.session.query(Books).filter( + Books.bid == f"stray_{timecode}_{self.platform}", + Books.status >= BOUND, + ).one_or_none() + if stray_book is not None: + add_new_detector_books = False + + ## check for incomplete observations + ## add time to max_ctime to account for observations on the edge + incomplete = self.imprint._find_incomplete( + min_ctime, max_ctime+24*2*3600 + ) + if incomplete.count() > 0: + ic_list = incomplete.all() + """Check if these are actually incomplete, imprinter incomplete checker + includes making sure the stop isn't beyond max ctime. + """ + obs_list = [] + for obs in ic_list: + if obs.stop is None or obs.timestamp <= max_ctime: + obs_list.append(obs) + + ## complete these no matter what for file tracking / deletion + self.logger.warning( + f"Found {len(obs_list)} incomplete observations. Fixing" + ) + for obs in obs_list: + self.logger.debug(f"Updating {obs}") + self.SMURF.update_observation_files( + obs, + self.g3session, + force=True, + ) + + ## make sure all obs / operation books from this period are registered + ## looks like short but overlapping observations are sometimes missed, + ## use `try_single_obs` flag to say if we want to try and clean those up + missing = self.imprint.find_missing_lvl2_obs_from_books( + min_ctime,max_ctime + ) + if add_new_detector_books and len(missing) > 0: + self.logger.info( + f"{len(missing)} lvl2 observations are not registered in books." + " Trying to register them" + ) + ## add time to max_ctime to account for observations on the edge + self.imprint.update_bookdb_from_g3tsmurf( + min_ctime=min_ctime, max_ctime=max_ctime+24*2*3600, + ) + still_missing = self.imprint.find_missing_lvl2_obs_from_books( + min_ctime,max_ctime + ) + if len(still_missing) > 0 and try_single_obs: + self.logger.warning("Trying single stream registration") + self.imprint.update_bookdb_from_g3tsmurf( + min_ctime=min_ctime, max_ctime=max_ctime+24*2*3600, + force_single_stream=True, + ) + still_missing = self.imprint.find_missing_lvl2_obs_from_books( + min_ctime,max_ctime + ) + if len(still_missing) > 0: + msg = f"Level 2 observations {still_missing} could not be " \ + "registered in books" + self.logger.error(msg) + complete[0] = False + complete[1] += msg+"\n" + elif not add_new_detector_books and len(missing)>0: + msg = f"Have level 2 observations missing but cannot add new " \ + f"detector books because {timecode} was already finalized " \ + " and stray exists. These files should be in stray" + self.logger.warning(msg) + + ## at this point, if an obs or oper book is going to be registered it is + if try_binding_books: + books = self.session.query(Books).filter( + Books.status == UNBOUND, + Books.start >= dt.datetime.utcfromtimestamp(min_ctime), + Books.start <= dt.datetime.utcfromtimestamp(max_ctime), + ).all() + self.logger.info(f"{len(books)} new books to bind") + for book in books: + try: + self.imprint.bind_book(book) + except: + self.logger.warning(f"Failed to bind {book.bid}") + + failed = self.session.query(Books).filter( + Books.status == FAILED, + Books.start >= dt.datetime.utcfromtimestamp(min_ctime), + Books.start <= dt.datetime.utcfromtimestamp(max_ctime), + ).all() + if len(failed) > 0: + self.logger.info( + f"{len(failed)} books failed to bind. trying to autofix" + ) + autofix_failed_books( + self.imprint, + min_ctime=min_ctime, + max_ctime=max_ctime, + ) + + is_final, reason = utils.get_timecode_final(self.imprint, timecode) + if not is_final: + self.logger.info( + f"Timecode {timecode} not counted as final: reason {reason}" + ) + meta_entries = self.g3session.query(TimeCodes).filter( + TimeCodes.timecode == timecode, + TimeCodes.suprsync_type == SupRsyncType.META.value, + ).count() + file_entries = self.g3session.query(TimeCodes).filter( + TimeCodes.timecode == timecode, + TimeCodes.suprsync_type == SupRsyncType.FILES.value, + ).count() + if ( + meta_entries == len(self.imprint.all_slots) and + file_entries == len(self.imprint.all_slots) + ): + self.logger.info( + f"{timecode} was part of the mixed up timecode agent entries" + ) + elif timecode < tcode_limit: + self.logger.info( + f"At least one server was likely off during timecode {timecode}" + ) + self.logger.info( + f"Setting timecode {timecode} to final in SMuRF database" + ) + utils.set_timecode_final(self.imprint, timecode) + + self.imprint.register_timecode_books( + min_ctime=min_ctime, + max_ctime=max_ctime, + ) + + if try_binding_books: + books = self.session.query(Books).filter( + Books.status == UNBOUND, + Books.start >= dt.datetime.utcfromtimestamp(min_ctime), + Books.start <= dt.datetime.utcfromtimestamp(max_ctime), + ).all() + self.logger.info(f"{len(books)} new to bind") + for book in books: + try: + self.imprint.bind_book(book) + except: + self.logger.warning(f"Failed to bind {book.bid}") + + # check the smurf book is registered + book = self.session.query(Books).filter( + Books.bid == f"smurf_{timecode}_{self.platform}" + ).one_or_none() + if book is None: + complete[0] = False + complete[1] += f"SMuRF book smurf_{timecode}_{self.platform} missing\n" + if include_hk: + complete = self.check_hk_registered(timecode, complete) + + # check if there's a stray book + stray = self.session.query(Books).filter( + Books.bid == f"stray_{timecode}_{self.platform}" + ).one_or_none() + if stray is None and try_binding_books: + # all files should be in obs/oper books + flist = self.imprint.get_files_for_stray_book( + min_ctime=min_ctime, + max_ctime=max_ctime, + ) + if len(flist) > 0: + complete[0] = False + complete[1] += f"Stray book stray_{timecode}_{self.platform} missing\n" + elif stray is None and not try_binding_books: + my_list = self.imprint.get_files_for_stray_book( + min_ctime=min_ctime, + max_ctime=max_ctime, + ) + if len(my_list) > 0: + self.logger.warning( + f"We expect {len(my_list)} books in a stray book but need " + "to bind books to verify" + ) + complete[0] = False + complete[1] += f"Stray book stray_{timecode}_{self.platform} missing\n" + else: + flist = self.imprint.get_files_for_book(stray) + my_list = self.imprint.get_files_for_stray_book( + min_ctime=min_ctime, + max_ctime=max_ctime, + ) + assert np.all( + sorted(flist) == sorted(my_list) + ), "logic error somewhere" + ## check that all books are bound + books = self.session.query(Books).filter( + or_(Books.status == UNBOUND, Books.status == FAILED), + Books.start >= dt.datetime.utcfromtimestamp(min_ctime), + Books.start <= dt.datetime.utcfromtimestamp(max_ctime), + ).count() + if books != 0: + complete[0] = False + complete[1] += f"Have {books} unbound or failed books in timecode \n" + return complete + + def books_in_timecode( + self, timecode, include_wont_fix=False, include_hk=True + ): + min_ctime = timecode*1e5 + max_ctime = (timecode+1)*1e5 + + q = self.session.query(Books).filter( + Books.start >= dt.datetime.utcfromtimestamp(min_ctime), + Books.start < dt.datetime.utcfromtimestamp(max_ctime), + ) + if not include_wont_fix: + q = q.filter(Books.status != WONT_BIND) + if not include_hk: + q = q.filter(Books.type != 'hk') + return q.all() + + def file_list_from_database( + self, timecode, deletable, verify_with_librarian, include_hk=True, + ): + file_list = [] + min_ctime = timecode*1e5 + max_ctime = (timecode+1)*1e5 + + q = self.session.query(Books).filter( + Books.start >= dt.datetime.utcfromtimestamp(min_ctime), + Books.start < dt.datetime.utcfromtimestamp(max_ctime), + ) + not_ready = q.filter( not_(or_( + Books.status == WONT_BIND, Books.status >= UPLOADED) + )).count() + if not_ready > 0: + self.logger.error( + f"There are {not_ready} non-uploaded books in this timecode" + ) + deletable[0] = False + deletable[1] += f"There are {not_ready} non-uploaded books in " \ + "this timecode\n" + if not include_hk: + q = q.filter(Books.type != 'hk') + book_list = q.filter(Books.status >= UPLOADED).all() + self.logger.debug( + f"Found {len(book_list)} books in time code {timecode}" + ) + + for book in book_list: + if book.lvl2_deleted: + continue + if verify_with_librarian: + in_lib = self.imprint.check_book_in_librarian( + book, n_copies=1, raise_on_error=False + ) + if not in_lib: + deletable[0] = False + deletable[1] += f"{book.bid} has not been uploaded to librarain\n" + + flist = self.imprint.get_files_for_book(book) + if isinstance(flist, OrderedDict): + x = [] + for k in flist: + x.extend(flist[k]) + flist=x + file_list.extend(flist) + # add suprsync files + file_list.extend( self.get_suprsync_files(timecode) ) + return file_list, deletable + + def verify_timecode_deletable( + self, timecode, verify_with_librarian=True, include_hk=True, + ): + """ + Checkes that all books in that timecode are uploaded to the librarian + and that there is a copy offsite (if verify_with_librarian=True) + + Steps for checking: + + 1. Walk the file system and build up a list of all files there + 2. Go book by book within timecode and build up the list of level 2 + files that went into it using the databases. Add any files in suprsync + folders into this list since they aren't book bound but we'd like them + to be deleted + 3. Compare the two lists and make sure they're the same. + """ + deletable = [True, ""] + + files_on_disk = self.all_files_in_timecode( + timecode, include_hk=include_hk + ) + if len(files_on_disk) == 0: + return deletable + # these are files that are in the smurf directory but we don't save in the + # smurf books. mostly watching out for .dat files + ignore = shutil.ignore_patterns(*SMURF_EXCLUDE_PATTERNS) + ignored_files = ignore("", files_on_disk) + self.logger.debug( + f"Timecode {timecode} has {len(ignored_files)} ignored files" + ) + files_in_database, deletable = self.file_list_from_database( + timecode, deletable, verify_with_librarian, include_hk=include_hk + ) + + missed_files = [] + extra_files = [] + for f in files_on_disk: + if f not in files_in_database and f not in ignored_files: + missed_files.append(f) + for f in files_in_database: + if f not in files_on_disk: + extra_files.append(f) + if len(missed_files) == 0 and len(extra_files) == 0: + self.logger.info(f"Timecode {timecode} has complete coverage") + if len(missed_files)>0: + msg = f"Files on disk but not in database {len(missed_files)}:\n" + for f in missed_files: + msg += f"\t{f}\n" + self.logger.warning(msg) + deletable[0] = False + deletable[1] += msg + if len(extra_files)>0: + msg = f"Files in database but not on disk: {extra_files}" + for f in missed_files: + msg += f"\t{f}\n" + self.logger.error(msg) + deletable[0] = False + deletable[1] += msg + return deletable + + def delete_timecode_level2( + self, timecode, dry_run=True, include_hk=True, + verify_with_librarian=True, + ): + book_list = self.books_in_timecode(timecode, include_hk=include_hk) + books_not_deleted = [] + + for book in book_list: + stat = self.imprint.delete_level2_files( + book, verify_with_librarian=verify_with_librarian, + n_copies_in_lib=2, dry_run=dry_run + ) + if stat > 0: + books_not_deleted.append(book) + + if len(books_not_deleted) > 0: + msg = "Could not delete level 2 for books:\n" + for book in books_not_deleted: + msg += f'\t{book.bid}\n' + self.logger.error(msg) + return False, "" + return True, "" + + + def delete_timecode_staged( + self, timecode, include_hk=True, verify_with_librarian=False, + check_level2=False, + ): + book_list = self.books_in_timecode(timecode, include_hk=include_hk) + books_not_deleted = [] + for book in book_list: + stat = self.imprint.delete_book_staged( + book, check_level2=check_level2, + verify_with_librarian=verify_with_librarian + ) + if stat > 0: + books_not_deleted.append(book) + # cleanup + for tube in self.imprint.tubes: + for btype in ['obs', 'oper']: + path = os.path.join( + self.imprint.output_root, tube, btype, str(timecode) + ) + if os.path.exists(path) and len(os.listdir(path))==0: + os.rmdir(path) + + if len(books_not_deleted) > 0: + msg = "Could not delete stages for books:\n" + for book in books_not_deleted: + msg += f'\t{book.bid}\n' + self.logger.error(msg) + return False, "msg" + return True, "" + + def check_and_delete_timecode( + self, timecode, include_hk=True, verify_with_librarian=True + ): + check = self.make_timecode_complete(timecode, include_hk=include_hk) + if not check[0]: + self.logger.error(f"Timecode {timecode} not complete") + self.logger.error(check[1]) + return check + check = self.verify_timecode_deletable( + timecode, include_hk=include_hk, + verify_with_librarian=False, + ) + if not check[0]: + self.logger.error(f"Timecode {timecode} not ready to delete") + self.logger.error(check[1]) + return check + + check = self.delete_timecode_level2( + timecode, dry_run=False, include_hk=include_hk, + verify_with_librarian=verify_with_librarian, + ) + + if not self.imprint.build_det: + return check + stc = os.path.join(self.SMURF.meta_path, str(timecode)) + ttc = os.path.join(self.SMURF.archive_path, str(timecode)) + + if os.path.exists(stc): + if len(os.listdir(stc)) == 0 or just_suprsync(stc): + shutil.rmtree(stc) + if os.path.exists(ttc): + if len(os.listdir(ttc)) == 0 or just_suprsync(ttc): + shutil.rmtree(ttc) + return check diff --git a/sotodlib/io/datapkg_utils.py b/sotodlib/io/datapkg_utils.py index ef26cc264..c28533bc8 100644 --- a/sotodlib/io/datapkg_utils.py +++ b/sotodlib/io/datapkg_utils.py @@ -63,3 +63,35 @@ def get_imprinter_config( platform, env_file=None, env_var="DATAPKG_ENV"): raise ValueError(f"configs not found in tags {tags}") return os.path.join( tags['configs'], platform, 'imprinter.yaml') + +def walk_files(path, include_suprsync=False): + """get a list of the files in a timecode folder, optional flag to ignore + suprsync files + + Arguments + ---------- + path: path to a level 2 timecode folder, either smurf or timestreams + include_suprsync: optional, bool + if true, includes the suprsync files in the returned list + + Returns + -------- + files (list): list of the absolute paths to all files in a timecode folder + """ + if not os.path.exists(path): + return [] + flist = [] + for root, _, files in os.walk(path): + if not include_suprsync and 'suprsync' in root: + continue + for f in files: + flist.append( os.path.join(path, root, f)) + return flist + +def just_suprsync(path): + """check if timecode folder only has suprsync folder in it + """ + flist = os.listdir( path ) + if len(flist) == 1 and flist[0] == "suprsync": + return True + return False diff --git a/sotodlib/io/g3thk_db.py b/sotodlib/io/g3thk_db.py index 743f28a69..08829da0d 100644 --- a/sotodlib/io/g3thk_db.py +++ b/sotodlib/io/g3thk_db.py @@ -13,6 +13,7 @@ import logging from .datapkg_utils import load_configs + logger = logging.getLogger(__name__) Base = declarative_base() @@ -126,7 +127,7 @@ class HKFields(Base): class G3tHk: - def __init__(self, hkarchive_path, db_path=None, echo=False): + def __init__(self, hkarchive_path, iids, db_path=None, echo=False): """ Class to manage a housekeeping data archive @@ -134,6 +135,8 @@ def __init__(self, hkarchive_path, db_path=None, echo=False): ____ hkarchive_path : path Path to the data directory + iids : list + List of agent instance ids db_path : path, optional Path to the sqlite file echo : bool, optional @@ -144,13 +147,15 @@ def __init__(self, hkarchive_path, db_path=None, echo=False): self.hkarchive_path = hkarchive_path self.db_path = db_path + self.iids = iids self.engine = db.create_engine(f"sqlite:///{db_path}", echo=echo) Session.configure(bind=self.engine) self.Session = sessionmaker(bind=self.engine) self.session = Session() Base.metadata.create_all(self.engine) - def load_fields(self, hk_path): + + def load_fields(self, hk_path, iids): """ Load fields from .g3 file and start and end time for each field. @@ -167,14 +172,15 @@ def load_fields(self, hk_path): # enact HKArchiveScanner hkas = hk.HKArchiveScanner() hkas.process_file(hk_path) - + arc = hkas.finalize() # get fields from .g3 file fields, timelines = arc.get_fields() hkfs = [] for key in fields.keys(): - hkfs.append(key) + if any(iid in key for iid in iids): + hkfs.append(key) starts = [] stops = [] @@ -346,14 +352,14 @@ def add_agents_and_fields(self, path, overwrite=False): .one() ) - db_agents = db_file.agents + # line below may not be needed; is redundant + db_agents = [a for a in db_file.agents if a.instance_id in self.iids] db_fields = db_file.fields - agents = [] - - out = self.load_fields(db_file.path) + out = self.load_fields(db_file.path, self.iids) fields, starts, stops, medians, means, min_vals, max_vals, stds = out + agents = [] for field in fields: agent = field.split(".")[1] agents.append(agent) @@ -531,10 +537,12 @@ def get_last_update(self): .order_by(db.desc(HKFiles.global_start_time)) .first() ) + if len(last_file.agents) == 0: + return last_file.global_start_time return max([a.stop for a in last_file.agents]) @classmethod - def from_configs(cls, configs): + def from_configs(cls, configs, iids=None): """ Create a G3tHK instance from a configs dictionary @@ -545,12 +553,34 @@ def from_configs(cls, configs): if type(configs) == str: configs = load_configs(configs) + if iids is None: + iids = [] + if "finalization" in configs: + servers = configs["finalization"].get("servers", {}) + for server in servers: + for key in server.keys(): + # Append the value (iid) to the iids list + iids.append(server[key]) + else: + logger.debug( + "No finalization information in configuration, agents and " + "fields will not be added." + ) + return cls( - os.path.join(configs["data_prefix"], "hk"), - configs["g3thk_db"] + hkarchive_path = os.path.join(configs["data_prefix"], "hk"), + db_path = configs["g3thk_db"], + iids = iids ) - def delete_file(self, hkfile, dry_run=False, my_logger=None): + def batch_delete_files(self, file_list, dry_run=False, my_logger=None): + for f in file_list: + self.delete_file( + f, dry_run=dry_run, my_logger=my_logger, commit=False + ) + self.session.commit() + + def delete_file(self, hkfile, dry_run=False, my_logger=None, commit=True): """WARNING: Removes actual files from file system. Delete an hkfile instance, its on-disk file, and all associated agents @@ -565,13 +595,13 @@ def delete_file(self, hkfile, dry_run=False, my_logger=None): my_logger = logger # remove field info - my_logger.info(f"removing field entries for {hkfile.path} from database") + my_logger.debug(f"removing field entries for {hkfile.path} from database") if not dry_run: for f in hkfile.fields: self.session.delete(f) # remove agent info - my_logger.info(f"removing agent entries for {hkfile.path} from database") + my_logger.debug(f"removing agent entries for {hkfile.path} from database") if not dry_run: for a in hkfile.agents: self.session.delete(a) @@ -590,4 +620,5 @@ def delete_file(self, hkfile, dry_run=False, my_logger=None): my_logger.info(f"remove {hkfile.path} from database") if not dry_run: self.session.delete(hkfile) - self.session.commit() + if commit: + self.session.commit() diff --git a/sotodlib/io/imprinter.py b/sotodlib/io/imprinter.py index 22fb9f84c..0a5a29ecb 100644 --- a/sotodlib/io/imprinter.py +++ b/sotodlib/io/imprinter.py @@ -17,7 +17,7 @@ from spt3g import core import sotodlib -from .bookbinder import BookBinder +from .bookbinder import BookBinder, TimeCodeBinder from .load_smurf import ( G3tSmurf, Observations as G3tObservations, @@ -49,6 +49,8 @@ # tel tube, stream_id, slot mapping VALID_OBSTYPES = ["obs", "oper", "smurf", "hk", "stray", "misc"] +# file patterns excluded from smurf books +SMURF_EXCLUDE_PATTERNS = ["*.dat", "*_mask.txt", "*_freq.txt"] class BookExistsError(Exception): """Exception raised when a book already exists in the database""" @@ -132,6 +134,7 @@ class Books(Base): timing = db.Column(db.Boolean) path = db.Column(db.String) lvl2_deleted = db.Column(db.Boolean, default=False) + schema = db.Column(db.Integer, default=0) def __repr__(self): return f"" @@ -237,10 +240,12 @@ def __init__(self, im_config=None, db_args={}, logger=None, make_db=False): self.config = load_configs(im_config) self.db_path = self.config.get("db_path") - self.daq_node = self.config.get("daq_node") + self.daq_node = self.config.get("daq_node") self.output_root = self.config.get("output_root") self.g3tsmurf_config = self.config.get("g3tsmurf") - + g3tsmurf_cfg = load_configs(self.g3tsmurf_config) + self.lvl2_data_root = g3tsmurf_cfg["data_prefix"] + self.build_hk = self.config.get("build_hk") self.build_det = self.config.get("build_det") @@ -371,6 +376,7 @@ def register_book(self, obsset, bid=None, commit=True, session=None): if bid is None: bid = obsset.get_id() assert obsset.mode is not None + assert obsset.mode in ['obs','oper'] # check whether book exists in the database if self.book_exists(bid, session=session): raise BookExistsError(f"Book {bid} already exists in the database") @@ -406,6 +412,7 @@ def register_book(self, obsset, bid=None, commit=True, session=None): [s for s in obsset.slots if obsset.contains_stream(s)] ), # not worth having a extra table timing=timing_on, + schema=0, ) book.path = self.get_book_path(book) @@ -444,9 +451,6 @@ def register_hk_books(self, min_ctime=None, max_ctime=None, session=None): session = session or self.get_session() if not self.build_hk: return - - g3tsmurf_cfg = load_configs(self.g3tsmurf_config) - lvl2_data_root = g3tsmurf_cfg["data_prefix"] if min_ctime is None: min_ctime = 16000e5 @@ -454,7 +458,7 @@ def register_hk_books(self, min_ctime=None, max_ctime=None, session=None): max_ctime = 5e10 # all ctime dir except the last ctime dir will be considered complete - ctime_dirs = sorted(glob(op.join(lvl2_data_root, "hk", "*"))) + ctime_dirs = sorted(glob(op.join(self.lvl2_data_root, "hk", "*"))) for ctime_dir in ctime_dirs[:-1]: ctime = op.basename(ctime_dir) if int(ctime) < int(min_ctime//1e5): @@ -473,6 +477,7 @@ def register_hk_books(self, min_ctime=None, max_ctime=None, session=None): start=dt.datetime.utcfromtimestamp(int(ctime) * 1e5), stop=dt.datetime.utcfromtimestamp((int(ctime) + 1) * 1e5), tel_tube=self.daq_node, + schema=0, ) book.path = self.get_book_path(book) session.add(book) @@ -498,9 +503,8 @@ def register_timecode_books( smurf books are registered whenever all the relevant metadata timecode entries have been found. stray books are registered when metadata and - file timecode entries exist ASSUMING all obs/oper books in that time + file timecode entries exist AND all obs/oper books in that time range have been bound successfully. - """ if not self.build_det: @@ -508,6 +512,10 @@ def register_timecode_books( session = session or self.get_session() g3session, SMURF = self.get_g3tsmurf_session(return_archive=True) + final_time = SMURF.get_final_time( + self.all_slots, min_ctime, max_ctime, check_control=False + ) + final_tc = int(final_time//1e5) servers = SMURF.finalize["servers"] meta_agents = [s["smurf-suprsync"] for s in servers] files_agents = [s["timestream-suprsync"] for s in servers] @@ -523,6 +531,12 @@ def register_timecode_books( tcs = tcs.distinct().all() for (tc,) in tcs: + if tc >= final_tc: + self.logger.info( + f"Not ready to make timecode books for {tc} because final" + f" timecode is {final_tc}" + ) + continue q = g3session.query(TimeCodes).filter( TimeCodes.timecode == tc, ) @@ -556,6 +570,7 @@ def register_timecode_books( tel_tube=self.daq_node, start=book_start, stop=book_stop, + schema=1, ) smurf_book.path = self.get_book_path(smurf_book) session.add(smurf_book) @@ -580,21 +595,26 @@ def register_timecode_books( ) if q.count() > 0: self.logger.info( - f"Not ready to bind {book_id} due to unbound or " + f"Not ready to register {book_id} due to unbound or " "failed obs/oper books." ) continue - stray_book = Books( - bid=book_id, - type="stray", - status=UNBOUND, - tel_tube=self.daq_node, - start=book_start, - stop=book_stop, + + flist = self.get_files_for_stray_book( + min_ctime= tc * 1e5, + max_ctime= (tc + 1) * 1e5 ) - stray_book.path = self.get_book_path(stray_book) - flist = self.get_files_for_book(stray_book) if len(flist) > 0: + stray_book = Books( + bid=book_id, + type="stray", + status=UNBOUND, + tel_tube=self.daq_node, + start=book_start, + stop=book_stop, + schema=0, + ) + stray_book.path = self.get_book_path(stray_book) self.logger.info(f"registering {book_id}") session.add(stray_book) session.commit() @@ -607,8 +627,6 @@ def get_book_abs_path(self, book): return os.path.join(self.output_root, book_path) def get_book_path(self, book): - g3tsmurf_cfg = load_configs(self.g3tsmurf_config) - lvl2_data_root = g3tsmurf_cfg["data_prefix"] if book.type in ["obs", "oper"]: session_id = book.bid.split("_")[1] @@ -617,11 +635,12 @@ def get_book_path(self, book): return os.path.join(odir, book.bid) elif book.type in ["hk", "smurf"]: # get source directory for hk book - root = op.join(lvl2_data_root, book.type) first5 = book.bid.split("_")[1] assert first5.isdigit(), f"first5 of {book.bid} is not a digit" - odir = op.join(book.tel_tube, book.type) - return os.path.join(odir, book.bid) + odir = op.join(book.tel_tube, book.type, book.bid) + if book.type == 'smurf' and book.schema > 0: + return odir + '.zip' + return odir elif book.type in ["stray"]: first5 = book.bid.split("_")[1] assert first5.isdigit(), f"first5 of {book.bid} is not a digit" @@ -641,8 +660,6 @@ def _get_binder_for_book(self, require_acu=True, ): """get the appropriate bookbinder for the book based on its type""" - g3tsmurf_cfg = load_configs(self.g3tsmurf_config) - lvl2_data_root = g3tsmurf_cfg["data_prefix"] if book.type in ["obs", "oper"]: book_path = self.get_book_abs_path(book) @@ -660,7 +677,7 @@ def _get_binder_for_book(self, # bind book using bookbinder library bookbinder = BookBinder( - book, obsdb, filedb, lvl2_data_root, readout_ids, book_path, hk_fields, + book, obsdb, filedb, self.lvl2_data_root, readout_ids, book_path, hk_fields, ignore_tags=ignore_tags, ancil_drop_duplicates=ancil_drop_duplicates, allow_bad_timing=allow_bad_timing, @@ -671,86 +688,43 @@ def _get_binder_for_book(self, elif book.type in ["hk", "smurf"]: # get source directory for hk book - root = op.join(lvl2_data_root, book.type) - first5 = book.bid.split("_")[1] - assert first5.isdigit(), f"first5 of {book.bid} is not a digit" - book_path_src = op.join(root, first5) + root = op.join(self.lvl2_data_root, book.type) + timecode = book.bid.split("_")[1] + assert timecode.isdigit(), f"timecode of {book.bid} is not a digit" + book_path_src = op.join(root, timecode) # get target directory for hk book - odir = op.join(self.output_root, book.tel_tube, book.type) + book_path_tgt = self.get_book_abs_path(book) + odir, _ = op.split(book_path_tgt) if not op.exists(odir): os.makedirs(odir) - book_path_tgt = os.path.join(odir, book.bid) - - class _FakeBinder: # dummy class to mimic baseline bookbinder - def __init__(self, indir, outdir): - self.indir = indir - self.outdir = outdir - - def get_metadata(self, telescope=None, tube_config={}): - return { - "book_id": book.bid, - # dummy start and stop times - "start_time": float(first5) * 1e5, - "stop_time": (float(first5) + 1) * 1e5, - "telescope": telescope, - "type": book.type, - } - - def bind(self, pbar=False): - shutil.copytree( - self.indir, - self.outdir, - ignore=shutil.ignore_patterns( - "*.dat", "*_mask.txt", "*_freq.txt" - ), - ) - - return _FakeBinder(book_path_src, book_path_tgt) + + bookbinder = TimeCodeBinder( + book, timecode, book_path_src, book_path_tgt, + ignore_pattern=SMURF_EXCLUDE_PATTERNS, + ) + return bookbinder elif book.type in ["stray"]: flist = self.get_files_for_book(book) # get source directory for stray book - root = op.join(lvl2_data_root, "timestreams") - first5 = book.bid.split("_")[1] - assert first5.isdigit(), f"first5 of {book.bid} is not a digit" - book_path_src = op.join(root, first5) - - # get target directory for hk book - odir = op.join(self.output_root, book.tel_tube, book.type) + root = op.join(self.lvl2_data_root, "timestreams") + timecode = book.bid.split("_")[1] + assert timecode.isdigit(), f"timecode of {book.bid} is not a digit" + book_path_src = op.join(root, timecode) + + # get target directory for book + book_path_tgt = self.get_book_abs_path(book) + odir, _ = op.split(book_path_tgt) if not op.exists(odir): os.makedirs(odir) - book_path_tgt = os.path.join(odir, book.bid) - - class _FakeBinder: # dummy class to mimic baseline bookbinder - def __init__(self, indir, outdir, file_list): - self.indir = indir - self.outdir = outdir - self.file_list = file_list - - def get_metadata(self, telescope=None, tube_config={}): - return { - "book_id": book.bid, - # dummy start and stop times - "start_time": float(first5) * 1e5, - "stop_time": (float(first5) + 1) * 1e5, - "telescope": telescope, - "type": book.type, - } - - def bind(self, pbar=False): - if not os.path.exists(self.outdir): - os.makedirs(self.outdir) - for f in self.file_list: - relpath = os.path.relpath(f, self.indir) - path = os.path.join(self.outdir, relpath) - base, _ = os.path.split(path) - if not os.path.exists(base): - os.makedirs(base) - shutil.copy(f, os.path.join(self.outdir, relpath)) - - return _FakeBinder(book_path_src, book_path_tgt, flist) + + bookbinder = TimeCodeBinder( + book, timecode, book_path_src, book_path_tgt, + file_list=flist, + ) + return bookbinder else: raise NotImplementedError( f"binder for book type {book.type} not implemented" @@ -828,38 +802,13 @@ def bind_book( require_hwp=require_hwp, ) binder.bind(pbar=pbar) - - # write M_book file - m_book_file = os.path.join(binder.outdir, "M_book.yaml") - book_meta = {} - book_meta["book"] = { - "type": book.type, - "schema_version": 0, - "book_id": book.bid, - "finalized_at": dt.datetime.utcnow().isoformat(), - } - book_meta["bookbinder"] = { - "codebase": sotodlib.__file__, - "version": sotodlib.__version__, - "context": self.config.get("context", "unknown"), - } - with open(m_book_file, "w") as f: - yaml.dump(book_meta, f) - + # write M_index file if book.type in ['obs', 'oper']: tc = self.tube_configs[book.tel_tube] else: tc = {} - - mfile = os.path.join(binder.outdir, "M_index.yaml") - with open(mfile, "w") as f: - yaml.dump( - binder.get_metadata( - telescope=self.daq_node, - tube_config = tc, - ), f - ) + binder.write_M_files(self.daq_node, tc) if book.type in ['obs', 'oper']: # check that detectors books were written out correctly @@ -944,65 +893,6 @@ def get_books_by_status(self, status, session=None): return session.query(Books).filter( Books.status == status ).order_by(Books.start).all() - - def get_level2_deleteable_books( - self, session=None, cleanup_delay=None, max_time=None - ): - """Get all bound books from database where we need to delete the level2 - data - - Parameters - ---------- - session: BookDB session - cleanup_delay: float - amount of time to delay book deletation relative to g3tsmurf finalization - time in units of days. - max_time: datetime - maxmimum time of book start to search. Overrides cleanup_delay if - earlier - - Returns - ------- - books: list of book objects - """ - raise NotImplementedError("This function hasn't been fixed yet") - if session is None: - session = self.get_session() - if cleanup_delay is None: - cleanup_delay = 0 - - base_filt = and_( - Books.status == BOUND, - Books.lvl2_deleted == False, - or_( ## not implementing smurf deletion just yet - Books.type == "obs", - Books.type == "oper", - Books.type == "stray", - Books.type == "hk", - ), - ) - sources = session.query( - Books.tel_tube - ).filter(base_filt).distinct().all() - - source_filt = [] - for source, in sources: - streams = self.tubes[source].get("slots") - _, SMURF = self.get_g3tsmurf_session(source, return_archive=True) - limit = SMURF.get_final_time(streams, check_control=False) - max_stop = dt.datetime.utcfromtimestamp(limit) - dt.timedelta(days=cleanup_delay) - - source_filt.append( and_(Books.tel_tube == source, Books.stop <= max_stop) ) - - q = session.query(Books).filter( - base_filt, - or_(*source_filt), - ) - - if max_time is not None: - q = q.filter(Books.stop <= max_time) - - return q.all() # some aliases for readability def get_unbound_books(self, session=None): @@ -1033,6 +923,20 @@ def get_bound_books(self, session=None): """ return self.get_books_by_status(BOUND, session) + def get_done_books(self, session=None): + """Get all "done" books from database. Done means staged files are deleted. + + Parameters + ---------- + session: BookDB session + + Returns + ------- + books: list of book objects + + """ + return self.get_books_by_status(DONE, session) + def get_failed_books(self, session=None): """Get all failed books from database @@ -1138,23 +1042,23 @@ def rollback(self, session=None): session = self.get_session() session.rollback() - def _find_incomplete(self, min_ctime, max_ctime, stream_filt=None): + @property + def all_slots(self): + return [x for xs in [ + t.get('slots') for (_,t) in self.tubes.items() + ] for x in xs] + + def _find_incomplete(self, min_ctime, max_ctime, streams=None): """return G3tSmurf session query for incomplete observations """ - if stream_filt is None: - streams = [] - streams.extend( - *[t.get("slots") for (_,t) in self.tubes.items()] - ) - stream_filt = or_( - *[G3tObservations.stream_id == s for s in streams] - ) + if streams is None: + streams = self.all_slots session = self.get_g3tsmurf_session() q = session.query(G3tObservations).filter( G3tObservations.timestamp >= min_ctime, G3tObservations.timestamp <= max_ctime, - stream_filt, + G3tObservations.stream_id.in_(streams), or_( G3tObservations.stop == None, G3tObservations.stop >= dt.datetime.utcfromtimestamp(max_ctime), @@ -1226,9 +1130,6 @@ def update_bookdb_from_g3tsmurf( streams = stream_ids self.logger.debug(f"Looking for observations from stream_ids {streams}") - # restrict to given stream ids (wafers) - stream_filt = or_(*[G3tObservations.stream_id == s for s in streams]) - # check data transfer finalization final_time = SMURF.get_final_time( streams, min_ctime, max_ctime, check_control=True @@ -1238,7 +1139,7 @@ def update_bookdb_from_g3tsmurf( self.logger.debug(f"Searching between {min_ctime} and {max_ctime}") # check for incomplete observations in time range - q_incomplete = self._find_incomplete(min_ctime, max_ctime, stream_filt) + q_incomplete = self._find_incomplete(min_ctime, max_ctime, streams) # if we have incomplete observations in our stream_id list we cannot # bookbind any observations overlapping the incomplete ones. @@ -1275,7 +1176,7 @@ def update_bookdb_from_g3tsmurf( obs_q = session.query(G3tObservations).filter( G3tObservations.timestamp >= min_ctime, G3tObservations.timestamp < max_ctime, - stream_filt, + G3tObservations.stream_id.in_(streams), G3tObservations.stop < max_stop, not_(G3tObservations.stop == None), G3tObservations.obs_id.not_in(already_registered), @@ -1311,7 +1212,7 @@ def add_to_output(obs_list, mode): # observations from other streams q = obs_q.filter( G3tObservations.stream_id != str_obs.stream_id, - stream_filt, + G3tObservations.stream_id.in_(streams), or_( and_( G3tObservations.start <= str_obs.start, @@ -1437,38 +1338,10 @@ def get_files_for_book(self, book): res[o.obs_id] = sorted([f.name for f in o.files]) return res elif book.type in ["stray"]: - session = self.get_session() - - ## build list of files already in bound books - book_list = session.query(Books).filter( - Books.start >= book.start, - Books.start < book.stop, - or_(Books.type == 'obs', Books.type == 'oper'), - Books.status != WONT_BIND, - ).all() - files_in_books = [] - for b in book_list: - flist = self.get_files_for_book(b) - for k in flist: - files_in_books.extend(flist[k]) - - g3session = self.get_g3tsmurf_session() - tcode = int(book.bid.split("_")[1]) - - files_in_tc = g3session.query(Files).filter( - Files.name.like(f"%/{tcode}/%"), - ).all() - files_in_tc = [f.name for f in files_in_tc] - - files_into_stray = [] - for f in files_in_tc: - if f in files_in_books: - continue - files_into_stray.append(f) - return files_into_stray + return self.get_files_for_stray_book(book) elif book.type == "hk": - HK = self.get_g3thk(book.tel_tube) + HK = self.get_g3thk() flist = ( HK.session.query(HKFiles) .filter( @@ -1478,12 +1351,82 @@ def get_files_for_book(self, book): .all() ) return [f.path for f in flist] + elif book.type == "smurf": + tcode = int(book.bid.split("_")[1]) + basepath = os.path.join( + self.lvl2_data_root, 'smurf', str(tcode) + ) + ignore = shutil.ignore_patterns(*SMURF_EXCLUDE_PATTERNS) + flist = [] + for root, _, files in os.walk(basepath): + to_ignore = ignore('', files) + flist.extend([ + os.path.join(basepath, root, f) + for f in files if f not in to_ignore + ]) + return flist else: raise NotImplementedError( - f"book type {book.type} not understood for" " file search" + f"book type {book.type} not understood for file search" ) + def get_files_for_stray_book( + self, book=None, min_ctime=None, max_ctime=None + ): + """generate list of files that are not in detector books and should + going into stray books. if book is None then we expect both min and max + ctime to be provided + + Arguments + ---------- + book: optional, book instance + min_ctime: optional, minimum ctime value to search + max_ctime: optional, maximum ctime value to search + + Returns + -------- + list of files that should go into a stray book + """ + if book is None: + assert min_ctime is not None and max_ctime is not None + start = dt.datetime.utcfromtimestamp(min_ctime) + stop = dt.datetime.utcfromtimestamp(max_ctime) + + tcode = int(min_ctime//1e5) + if max_ctime > (tcode+1)*1e5: + self.logger.error( + f"Max ctime {max_ctime} is higher than would be expected " + f"for a single stray book with min ctime {min_ctime}. only" + " checking the first timecode directory" + ) + else: + assert book.type == 'stray' + start = book.start + stop = book.stop + tcode = int(book.bid.split("_")[1]) + + session = self.get_session() + g3session, SMURF = self.get_g3tsmurf_session(return_archive=True) + path = os.path.join(SMURF.archive_path, str(tcode)) + registered_obs = [ + x[0] for x in session.query(Observations.obs_id).join(Books).filter( + Books.start >= start, + Books.start < stop, + Books.status != WONT_BIND, + ).all()] + db_files = g3session.query(Files).filter( + Files.name.like(f"{path}%") + ).all() + + stray_files = [] + for f in db_files: + if f.obs_id is None or f.obs_id not in registered_obs: + stray_files.append(f.name) + + return stray_files + + def get_readout_ids_for_book(self, book): """ Get all readout IDs for a book @@ -1611,6 +1554,19 @@ def get_g3tsmurf_obs_for_book(self, book): ) return {o.obs_id: o for o in obs} + def _librarian_connect(self): + """ + start connection to librarian + """ + from hera_librarian import LibrarianClient + from hera_librarian.settings import client_settings + conn = client_settings.connections.get( + self.config.get("librarian_conn") + ) + if conn is None: + raise ValueError(f"'librarian_conn' not in imprinter config") + self.librarian = LibrarianClient.from_info(conn) + def upload_book_to_librarian(self, book, session=None, raise_on_error=True): """Upload bound book to the librarian @@ -1626,14 +1582,7 @@ def upload_book_to_librarian(self, book, session=None, raise_on_error=True): if session is None: session = self.get_session() if self.librarian is None: - from hera_librarian import LibrarianClient - from hera_librarian.settings import client_settings - conn = client_settings.connections.get( - self.config.get("librarian_conn") - ) - if conn is None: - raise ValueError(f"'librarian_conn' not in imprinter config") - self.librarian = LibrarianClient.from_info(conn) + self._librarian_connect() assert book.status == BOUND, "cannot upload unbound books" @@ -1655,8 +1604,32 @@ def upload_book_to_librarian(self, book, session=None, raise_on_error=True): return False, e return True, None - - def delete_level2_files(self, book, dry_run=True): + def check_book_in_librarian(self, book, n_copies=1, raise_on_error=True): + """have the librarian validate the books is stored offsite. returns true + if at least n_copies are storied offsite. + """ + if self.librarian is None: + self._librarian_connect() + try: + resp = self.librarian.validate_file(book.path) + in_lib = sum( + [(x.computed_same_checksum) for x in resp] + ) >= n_copies + if not in_lib: + self.logger.info(f"received response from librarian {resp}") + except Exception as e: + if raise_on_error: + raise e + else: + self.logger.warning( + f"Failed to check libraian status for {book.bid}: {e}" + ) + self.logger.warning(traceback.format_exc()) + in_lib = False + return in_lib + + def delete_level2_files(self, book, verify_with_librarian=True, + n_copies_in_lib=2, dry_run=True): """Delete level 2 data from already bound books Parameters @@ -1665,13 +1638,31 @@ def delete_level2_files(self, book, dry_run=True): dry_run: bool if true, just prints plans to self.logger.info """ - if book.status != BOUND: - raise ValueError(f"Book must be bound to delete level 2 files") - + if book.lvl2_deleted: + self.logger.debug( + f"Level 2 for {book.bid} has already been deleted" + ) + return 0 + if book.status < UPLOADED: + self.logger.warning( + f"Book {book.bid} is not uploaded, not deleting level 2" + ) + return 1 + if verify_with_librarian: + in_lib = self.check_book_in_librarian( + book, n_copies=n_copies_in_lib, raise_on_error=False + ) + if not in_lib: + self.logger.warning( + f"Book {book.bid} does not have {n_copies_in_lib} copies" + " will not delete level 2" + ) + return 2 + self.logger.info(f"Removing level 2 files for {book.bid}") if book.type == "obs" or book.type == "oper": session, SMURF = self.get_g3tsmurf_session( - book.tel_tube, return_archive=True + return_archive=True ) odic = self.get_g3tsmurf_obs_for_book(book) @@ -1681,7 +1672,7 @@ def delete_level2_files(self, book, dry_run=True): ) elif book.type == "stray": session, SMURF = self.get_g3tsmurf_session( - book.tel_tube, return_archive=True + return_archive=True ) flist = self.get_files_for_book(book) for f in flist: @@ -1689,12 +1680,25 @@ def delete_level2_files(self, book, dry_run=True): SMURF.delete_file( db_file, session, dry_run=dry_run, my_logger=self.logger ) + elif book.type == "smurf": + tcode = int(book.bid.split("_")[1]) + basepath = os.path.join( + self.lvl2_data_root, 'smurf', str(tcode) + ) + if not dry_run: + shutil.rmtree(basepath) + elif book.type == "hk": - HK = self.get_g3thk(book.tel_tube) + HK = self.get_g3thk() flist = self.get_files_for_book(book) - for f in flist: - hkfile = HK.session.query(HKFiles).filter(HKFiles.path == f).one() - HK.delete_file(hkfile, dry_run=dry_run, my_logger=self.logger) + hkf_list = [ + HK.session.query(HKFiles).filter( + HKFiles.path == f + ).one() for f in flist + ] + HK.batch_delete_files( + hkf_list, dry_run=dry_run, my_logger=self.logger + ) else: raise NotImplementedError( f"Do not know how to delete level 2 files" @@ -1703,8 +1707,10 @@ def delete_level2_files(self, book, dry_run=True): if not dry_run: book.lvl2_deleted = True self.session.commit() + return 0 - def delete_book_files(self, book): + def delete_book_staged(self, book, check_level2=False, + verify_with_librarian=False, n_copies_in_lib=1, override=False): """Delete all files associated with a book Parameters @@ -1712,27 +1718,80 @@ def delete_book_files(self, book): book: Book object """ + if book.status == DONE: + self.logger.debug( + f"Book {book.bid} has already had staged files deleted" + ) + return 0 + if not override: + if book.status < UPLOADED: + self.logger.warning( + "Cannot delete non-uploaded books without override" + ) + return 1 + if check_level2 and not book.lvl2_deleted: + self.logger.warning( + f"Level 2 data not deleted for {book.bid}, not deleting " + "staged" + ) + return 2 + if verify_with_librarian: + in_lib = self.check_book_in_librarian( + book, n_copies=n_copies_in_lib, raise_on_error=False + ) + if not in_lib: + self.logger.warning( + f"Book {book.bid} does not have {n_copies_in_lib} copies" + " will not delete staged" + ) + return 3 + # remove all files within the book book_path = self.get_book_abs_path(book) try: - shutil.rmtree( book_path ) + self.logger.info( + f"Removing {book.bid} from staged" + ) + if book.type == 'smurf' and book.schema == 1: + os.remove(book_path) + else: + shutil.rmtree( book_path ) except Exception as e: self.logger.warning(f"Failed to remove {book_path}: {e}") self.logger.error(traceback.format_exc()) + book.status = DONE + self.session.commit() + return 0 + def find_missing_lvl2_obs_from_books( + self, min_ctime, max_ctime + ): + """create a list of level 2 observation IDs that are not registered in + the imprinter database + + Arguments + ---------- + min_ctime: minimum ctime value to search + max_ctime: maximum ctime value to search - def all_bound_until(self): - """report a datetime object to indicate that all books are bound - by this datetime. + Returns + -------- + list of level 2 observation ids not in books """ session = self.get_session() - # sort by start time and find the start time by which - # all books are bound - books = session.query(Books).order_by(Books.start).all() - for book in books: - if book.status < BOUND: - return book.start - return book.start # last book + g3session, SMURF = self.get_g3tsmurf_session(return_archive=True) + registered_obs = [ + x[0] for x in session.query(Observations.obs_id).join(Books).filter( + Books.start >= dt.datetime.utcfromtimestamp(min_ctime), + Books.start < dt.datetime.utcfromtimestamp(max_ctime), + ).all()] + missing_obs = g3session.query(G3tObservations).filter( + G3tObservations.timestamp >= min_ctime, + G3tObservations.timestamp < max_ctime, + G3tObservations.stream_id.in_(self.all_slots), + G3tObservations.obs_id.not_in(registered_obs) + ).all() + return missing_obs ##################### # Utility functions # diff --git a/sotodlib/io/imprinter_cli.py b/sotodlib/io/imprinter_cli.py index 50209f1f1..3c8cf5f54 100644 --- a/sotodlib/io/imprinter_cli.py +++ b/sotodlib/io/imprinter_cli.py @@ -13,9 +13,10 @@ import os import argparse +import datetime as dt from typing import Optional -from sotodlib.io.imprinter import Imprinter, Books +from sotodlib.io.imprinter import Imprinter, Books, FAILED import sotodlib.io.imprinter_utils as utils def main(): @@ -110,9 +111,21 @@ def _last_line(book): if len(s) > 0: return s -def autofix_failed_books(imprint:Imprinter, test_mode=False): - fail_list = imprint.get_failed_books() - for book in fail_list: +def autofix_failed_books( + imprint:Imprinter, test_mode=False, min_ctime=None, max_ctime=None, +): + session = imprint.get_session() + failed = session.query(Books).filter(Books.status == FAILED) + if min_ctime is not None: + failed = failed.filter( + Books.start >= dt.datetime.utcfromtimestamp(min_ctime), + ) + if max_ctime is not None: + failed = failed.filter( + Books.start <= dt.datetime.utcfromtimestamp(max_ctime), + ) + failed = failed.all() + for book in failed: print("-----------------------------------------------------") print(f"On book {book.bid}. Has error:\n{_last_line(book)}") if 'SECOND-FAIL' in book.message: diff --git a/sotodlib/io/imprinter_utils.py b/sotodlib/io/imprinter_utils.py index 4ec18f80f..1ed0be33a 100644 --- a/sotodlib/io/imprinter_utils.py +++ b/sotodlib/io/imprinter_utils.py @@ -80,7 +80,12 @@ def set_book_rebind(imprint, book, update_level2=False): if op.exists(book_dir): print(f"Removing all files from {book_dir}") - shutil.rmtree(book_dir) + if os.path.isfile(book_dir): + os.remove(book_dir) + elif os.path.isdir(book_dir): + shutil.rmtree(book_dir) + else: + print("How is this not a file or directory") else: print(f"Found no files in {book_dir} to remove") @@ -160,6 +165,21 @@ def block_set_rebind(imprint, update_level2=False): imprint.logger.info(f"Setting book {book.bid} for rebinding") set_book_rebind(imprint, book, update_level2=update_level2) +def block_fix_bad_timing(imprint): + """Run through and try rebinding all books with bad timing""" + failed_books = imprint.get_failed_books() + fix_list = [] + for book in failed_book: + if "TimingSystemOff" in book.message: + fix_list.append(book) + for book in fix_list: + imprint.logger.info(f"Setting book {book.bid} for rebinding") + set_book_rebind(imprint, book) + imprint.logger.info( + f"Binding book {book.bid} while accepting bad timing" + ) + imprint.bind_book(book, allow_bad_timing=True) + def get_timecode_final(imprint, time_code, type='all'): """Check if all required entries in the g3tsmurf database are present for smurf or stray book regisitration. @@ -186,16 +206,17 @@ def get_timecode_final(imprint, time_code, type='all'): g3session, SMURF = imprint.get_g3tsmurf_session(return_archive=True) session = imprint.get_session() + # this is another place I was reminded sqlite does not accept + # numpy int32s or numpy int64s + time_code = int(time_code) + servers = SMURF.finalize["servers"] meta_agents = [s["smurf-suprsync"] for s in servers] files_agents = [s["timestream-suprsync"] for s in servers] - meta_query = or_(*[TimeCodes.agent == a for a in meta_agents]) - files_query = or_(*[TimeCodes.agent == a for a in files_agents]) - tcm = g3session.query(TimeCodes.agent).filter( TimeCodes.timecode==time_code, - meta_query, + TimeCodes.agent.in_(meta_agents), TimeCodes.suprsync_type == SupRsyncType.META.value, ).distinct().all() @@ -209,8 +230,8 @@ def get_timecode_final(imprint, time_code, type='all'): return False, 1 tcf = g3session.query(TimeCodes.agent).filter( - TimeCodes.timecode==time_code, - files_query, + TimeCodes.timecode == time_code, + TimeCodes.agent.in_(files_agents), TimeCodes.suprsync_type == SupRsyncType.FILES.value, ).distinct().all() @@ -244,8 +265,10 @@ def set_timecode_final(imprint, time_code): """ g3session, SMURF = imprint.get_g3tsmurf_session(return_archive=True) - servers = SMURF.finalize["servers"] + # this is another place I was reminded sqlite does not accept + # numpy int32s or numpy int64s + time_code = int(time_code) for server in servers: tcf = g3session.query(TimeCodes).filter( @@ -261,6 +284,7 @@ def set_timecode_final(imprint, time_code): agent=server["timestream-suprsync"], ) g3session.add(tcf) + g3session.commit() tcm = g3session.query(TimeCodes).filter( TimeCodes.timecode==time_code, @@ -274,5 +298,5 @@ def set_timecode_final(imprint, time_code): timecode=time_code, agent=server["smurf-suprsync"], ) - g3session.add(tcm) - g3session.commit() \ No newline at end of file + g3session.add(tcm) + g3session.commit() \ No newline at end of file diff --git a/sotodlib/io/load_smurf.py b/sotodlib/io/load_smurf.py index 0bafb1cf6..b0a8cb415 100644 --- a/sotodlib/io/load_smurf.py +++ b/sotodlib/io/load_smurf.py @@ -20,7 +20,7 @@ from .. import core from . import load as io_load -from .datapkg_utils import load_configs +from .datapkg_utils import load_configs, walk_files, just_suprsync from .g3thk_db import G3tHk, HKFiles, HKAgents, HKFields from .g3thk_utils import pysmurf_monitor_control_list @@ -194,6 +194,7 @@ def __init__( self.meta_path = meta_path self.db_path = db_path self.hk_db_path = hk_db_path + self.HK = None self.finalize = finalize if os.path.exists(self.db_path): @@ -606,13 +607,13 @@ def delete_file(self, db_file, session=None, dry_run=False, my_logger=None): my_logger = logger db_frames = db_file.frames - my_logger.info(f"Deleting frame entries for {db_file.name}") + my_logger.debug(f"Deleting frame entries for {db_file.name}") if not dry_run: [session.delete(frame) for frame in db_frames] if not os.path.exists(db_file.name): my_logger.warning( - f"Database file {db_file.name} appears already" " deleted on disk" + f"Database file {db_file.name} appears already deleted on disk" ) else: my_logger.info(f"Deleting file {db_file.name}") @@ -1155,7 +1156,9 @@ def update_observation_files( logger.debug(f"Setting {obs.obs_id} stop time to {obs.stop}") session.commit() - def delete_observation_files(self, obs, session, dry_run=False, my_logger=None): + def delete_observation_files( + self, obs, session, dry_run=False, my_logger=None + ): """WARNING: Deletes files from the file system Args @@ -1394,20 +1397,33 @@ def index_timecodes(self, session=None, min_ctime=16000e5, max_ctime=None): session.add(tcf) session.commit() - def update_finalization(self, update_time, session=None): - """Update the finalization time rows in the database""" + def get_HK(self): if self.hk_db_path is None: - raise ValueError("HK database path required to update finalization" " time") + raise ValueError("HK database path required") + + if self.HK is None: + iids = [] + for server in self.finalize.get("servers", []): + for key in server.keys(): + # Append the value (iid) to the iids list + iids.append(server[key]) + + self.HK = G3tHk( + os.path.join(os.path.split(self.archive_path)[0], "hk"), + iids = iids, + db_path = self.hk_db_path, + ) + return self.HK + def update_finalization(self, update_time, session=None): + """Update the finalization time rows in the database""" + if session is None: session = self.Session() # look for new rows to add to table self._start_finalization(session) - HK = G3tHk( - os.path.join(os.path.split(self.archive_path)[0], "hk"), - self.hk_db_path, - ) + HK = self.get_HK() agent_list = session.query(Finalize).all() for agent in agent_list: @@ -1449,15 +1465,19 @@ def update_finalization(self, update_time, session=None): session.commit() def get_final_time( - self, stream_ids, start=None, stop=None, check_control=True, session=None + self, stream_ids, start=None, stop=None, check_control=True, + session=None ): - """Return the ctime to which database is finalized for a set of stream_ids - between ctimes start and stop. If check_control is True it will use the - pysmurf-monitor entries in the HK database to determine which - pysmurf-monitors were in control of which stream_ids between start and stop. + """Return the ctime to which database is finalized for a set of + stream_ids between ctimes start and stop. If check_control is True it + will use the pysmurf-monitor entries in the HK database to determine + which pysmurf-monitors were in control of which stream_ids between + start and stop. """ if check_control and self.hk_db_path is None: - raise ValueError("HK database path required to update finalization" " time") + raise ValueError( + "HK database path required to update finalization time" + ) if check_control and ((start is None) or (stop is None)): raise ValueError( "start and stop ctimes are required to check which" @@ -1465,10 +1485,8 @@ def get_final_time( ) if session is None: session = self.Session() - HK = G3tHk( - os.path.join(os.path.split(self.archive_path)[0], "hk"), - self.hk_db_path, - ) + + HK = self.get_HK() agent_list = [] if "servers" not in self.finalize: @@ -1730,6 +1748,51 @@ def index_action_observations( if new_session: session.close() + def find_missing_files(self, timecode, session=None): + """create a list of files in the timecode folder that are not in the + g3tsmurf database + + Arguments + ---------- + timecode (int): a level 2 timestreams timecode + + Returns + -------- + missing (list): list of file paths that are not in the g3tsmurf database + """ + if session is None: + session = self.Session() + path = os.path.join(self.archive_path, str(timecode)) + + q = session.query(Files).filter(Files.name.like(f"{path}%")) + db_list = [f.name for f in q.all()] + sys_list = walk_files(path) + missing = [] + for f in sys_list: + if f not in db_list: + missing.append(f) + + return missing + + def find_missing_files_from_obs(self, timecode, session=None): + """create a list of files in the g3tsmurf database that do not have an + assigned level 2 observation ID + + Arguments + ---------- + timecode (int): a level 2 timestreams timecode + + Returns + -------- + missing (list): list of file paths that do not have level 2 observation IDs + """ + if session is None: + session = self.Session() + path = os.path.join(self.archive_path, str(timecode)) + q = session.query(Files).filter(Files.name.like(f"{path}%")) + db_list = q.all() + return [f.name for f in db_list if f.obs_id is None] + def lookup_file(self, filename, fail_ok=False): """Lookup a file's observations details in database. Meant to look and act like core.metadata.obsfiledb.lookup_file. @@ -1914,7 +1977,6 @@ def load_status(self, time, stream_id=None, show_pb=False): """ return SmurfStatus.from_time(time, self, stream_id=stream_id, show_pb=show_pb) - def dump_DetDb(archive, detdb_file): """ Take a G3tSmurf archive and create a a DetDb of the type used with Context @@ -1951,7 +2013,6 @@ def dump_DetDb(archive, detdb_file): session.close() return my_db - def make_DetDb_single_obs(obsfiledb, obs_id): # find relevant files to get status c = obsfiledb.conn.execute( @@ -2016,18 +2077,15 @@ def make_DetDb_single_obs(obsfiledb, obs_id): detdb.conn.commit() return detdb - def obs_detdb_context_hook(ctx, obs_id, *args, **kwargs): ddb = make_DetDb_single_obs(ctx.obsfiledb, obs_id) ctx.obs_detdb = ddb return ddb - core.Context.hook_sets["obs_detdb_load"] = { "before-use-detdb": obs_detdb_context_hook, } - class SmurfStatus: """ This is a class that attempts to extract essential information from the @@ -2385,7 +2443,6 @@ def smurf_to_readout(self, band, chan): """ return self.mask_inv[band, chan] - def get_channel_mask( ch_list, status, archive=None, obsfiledb=None, ignore_missing=True ): @@ -2568,7 +2625,6 @@ def _get_tuneset_channel_names(status, ch_map, archive): session.close() return ruids - def _get_detset_channel_names(status, ch_map, obsfiledb): """Update channel maps with name from obsfiledb""" # tune file in status @@ -2639,7 +2695,6 @@ def _get_detset_channel_names(status, ch_map, obsfiledb): return ruids - def _get_channel_mapping(status, ch_map): """Generate baseline channel map from status object""" for i, ch in enumerate(ch_map["idx"]): @@ -2656,7 +2711,6 @@ def _get_channel_mapping(status, ch_map): ch_map[i]["channel"] = -1 return ch_map - def get_channel_info( status, mask=None, @@ -2740,7 +2794,6 @@ def get_channel_info( return ch_info - def _get_sample_info(filenames): """Scan through a list of files and count samples. Starts counting from the first file in the list. Used in load_file for sample restiction @@ -2780,7 +2833,6 @@ def _get_sample_info(filenames): start += samps return out - def split_ts_bits(c): """Split up 64 bit to 2x32 bit""" NUM_BITS_PER_INT = 32 @@ -2789,7 +2841,6 @@ def split_ts_bits(c): b = c & MAXINT return a, b - def _get_timestamps(streams, load_type=None, linearize_timestamps=True): """Calculate the timestamp field for loaded data @@ -2841,7 +2892,6 @@ def _get_timestamps(streams, load_type=None, linearize_timestamps=True): return io_load.hstack_into(None, streams["time"]) logger.error("Timing System could not be determined") - def load_file( filename, channels=None, @@ -3099,7 +3149,6 @@ def load_file( return aman - def load_g3tsmurf_obs(db, obs_id, dets=None, samples=None, no_signal=None, **kwargs): """Obsloader function for g3tsmurf data archives. diff --git a/sotodlib/site_pipeline/cleanup_level2.py b/sotodlib/site_pipeline/cleanup_level2.py index e71f916e7..16c3947e9 100644 --- a/sotodlib/site_pipeline/cleanup_level2.py +++ b/sotodlib/site_pipeline/cleanup_level2.py @@ -1,52 +1,221 @@ +import numpy as np import datetime as dt from typing import Optional import argparse from sotodlib.io.imprinter import Imprinter +from sotodlib.io.datapkg_completion import DataPackaging +from sotodlib.site_pipeline.util import init_logger + +logger = init_logger(__name__, "cleanup_level2: ") + +def level2_completion( + dpk: DataPackaging, + lag: Optional[float] = 14, + min_timecode: Optional[int] = None, + max_timecode: Optional[int] = None, + raise_incomplete: Optional[bool] = True, +): + + ## build time range where we require timecodes to be complete + if min_timecode is None: + min_timecode = dpk.get_first_timecode_on_disk() + if max_timecode is None: + x = dt.datetime.now() - dt.timedelta(days=lag) + max_timecode = int( x.timestamp() // 1e5) + + logger.info( + f"Checking Timecode completion from {min_timecode} to " + f"{max_timecode}." + ) + + check_list = [] + for timecode in range(min_timecode, max_timecode): + check = dpk.make_timecode_complete(timecode) + if not check[0]: + check_list.append( (timecode, check[1]) ) + continue + check = dpk.verify_timecode_deletable( + timecode, include_hk=True, + verify_with_librarian=False, + ) + if not check[0]: + check_list.append( (timecode, check[1]) ) + + if len( check_list ) > 0 and raise_incomplete: + raise ValueError( + f"Data Packaging cannot be completed for {check_list}" + ) + +def do_delete_level2( + dpk: DataPackaging, + lag: Optional[float] = 28, + min_timecode: Optional[int] = None, + max_timecode: Optional[int] = None, + raise_incomplete: Optional[bool] =True, +): + ## build time range where we should be deleting + if min_timecode is None: + min_timecode = dpk.get_first_timecode_on_disk() + + if max_timecode is None: + x = dt.datetime.now() - dt.timedelta(days=lag) + max_timecode = int( x.timestamp() // 1e5) + + logger.info( + f"Removing Level 2 data from {min_timecode} to " + f"{max_timecode}." + ) + delete_list = [] + for timecode in range(min_timecode, max_timecode): + check = dpk.check_and_delete_timecode(timecode) + if not check[0]: + logger.error(f"Failed to remove level 2 for {timecode}") + delete_list.append( (timecode, check[1])) + continue + if len( delete_list ) > 0 and raise_incomplete: + raise ValueError( + f"Level 2 Deletion not finished for {delete_list}" + ) + +def do_delete_staged( + dpk: DataPackaging, + lag: Optional[float] = 14, + min_timecode: Optional[int] = None, + max_timecode: Optional[int] = None, + raise_incomplete: Optional[bool] =True, +): + ## build time range where we should be deleting + if min_timecode is None: + min_timecode = dpk.get_first_timecode_in_staged() + + if max_timecode is None: + x = dt.datetime.now() - dt.timedelta(days=lag) + max_timecode = int( x.timestamp() // 1e5) + + logger.info( + f"Removing staged from {min_timecode} to " + f"{max_timecode}." + ) + delete_list = [] + for timecode in range(min_timecode, max_timecode): + check = dpk.make_timecode_complete(timecode) + if not check[0]: + delete_list.append( (timecode, check[1]) ) + continue + check = dpk.verify_timecode_deletable( + timecode, include_hk=True, + verify_with_librarian=False, + ) + if not check[0]: + delete_list.append( (timecode, check[1]) ) + continue + check = dpk.delete_timecode_staged(timecode) + if not check[0]: + logger.error(f"Failed to remove staged for {timecode}") + delete_list.append( (timecode, check[1])) + continue + if len( delete_list ) > 0 and raise_incomplete: + raise ValueError( + f"Staged Deletion not finished for {delete_list}" + ) def main( - config: str, - cleanup_delay: float = 7, - max_ctime: Optional[float] = None, - dry_run: Optional[bool] = False, + platform: str, + check_complete: Optional[bool]= False, + delete_staged: Optional[bool] = False, + delete_lvl2: Optional[bool]= False, + completion_lag: Optional[float] = 14, + min_complete_timecode: Optional[int] = None, + max_complete_timecode: Optional[int] = None, + staged_deletion_lag: Optional[float] = 14, + min_staged_delete_timecode: Optional[int] = None, + max_staged_delete_timecode: Optional[int] = None, + lvl2_deletion_lag: Optional[float] = 28, + min_lvl2_delete_timecode: Optional[int] = None, + max_lvl2_delete_timecode: Optional[int] = None, ): """ Use the imprinter database to clean up already bound level 2 files. Parameters ---------- - config : str - Path to config file for imprinter - cleanup_delay : float, optional - The amount of time to delay book deletion in units of days, by default 1 - max_ctime : Optional[datetime], optional - The maximum datetime to delete level 2 data. Overrides cleanup_delay. + platform : str + platform we're running for + completion_lag : float, optional + The number of days in the past where we expect data packaging to be + fully complete. + min_complete_timecode : Optional[datetime], optional + The lowest timecode to run completion checking. over-rides the "start + from beginning" behavior. + max_complete_timecode : Optional[datetime], optional + The highest timecode to run completion checking. over-rides the + completion_lag calculated value. dry_run : Optional[bool], If true, only prints deletion to logger """ + dpk = DataPackaging(platform) - if max_ctime is not None: - max_time = dt.datetime.utcfromtimestamp(max_ctime) - else: - max_time = None + if check_complete: + level2_completion( + dpk, completion_lag, + min_complete_timecode, max_complete_timecode, + ) + + if delete_staged: + do_delete_staged( + dpk, staged_deletion_lag, + min_staged_delete_timecode, max_staged_delete_timecode + ) + + if delete_lvl2: + do_delete_level2( + dpk, lvl2_deletion_lag, + min_lvl2_delete_timecode, max_lvl2_delete_timecode, + ) - imprinter = Imprinter(config, db_args={'connect_args': {'check_same_thread': False}}) - book_list = imprinter.get_level2_deleteable_books(max_time=max_time, cleanup_delay=cleanup_delay) - for book in book_list: - imprinter.delete_level2_files(book, dry_run=dry_run) def get_parser(parser=None): if parser is None: parser = argparse.ArgumentParser() - parser.add_argument('config', type=str, help="Config file for Imprinter") - parser.add_argument('--cleanup-delay', type=float, default=7, - help="Days to keep level 2 data before cleaning") - parser.add_argument('--max-ctime', type=float, - help="Maximum ctime to delete to, overrides cleanup_delay ONLY if its an earlier time") - parser.add_argument('--dry-run', action="store_true", - help="if passed, only prints delete behavior") + + parser.add_argument('platform', type=str, help="Platform for Imprinter") + parser.add_argument('--check-complete', action="store_true", + help="If passed, run completion check") + parser.add_argument('--delete-lvl2', action="store_true", + help="If passed, delete lvl2 raw data") + parser.add_argument('--delete-staged', action="store_true", + help="If passed, delete lvl2 staged data") + + parser.add_argument('--completion-lag', type=float, default=14, + help="Buffer days before we start failing completion") + parser.add_argument('--min-complete-timecode', type=int, + help="Minimum timecode to start completion check. Overrides starting " + "from the beginning") + parser.add_argument('--max-complete-timecode', type=int, + help="Maximum timecode to stop completion check. Overrides the " + "completion-lag setting") + + parser.add_argument('--lvl2-deletion-lag', type=float, default=28, + help="Buffer days before we start deleting level 2 raw data") + parser.add_argument('--min-lvl2-delete-timecode', type=int, + help="Minimum timecode to start level 2 raw data deletion. Overrides " + "starting from the beginning") + parser.add_argument('--max-lvl2-delete-timecode', type=int, + help="Maximum timecode to stop level 2 raw data deletion. Overrides the" + " lvl2-deletion-lag setting") + + parser.add_argument('--staged-deletion-lag', type=float, default=28, + help="Buffer days before we start deleting level 2 staged data") + parser.add_argument('--min-staged-delete-timecode', type=int, + help="Minimum timecode to start level 2 staged data deletion. Overrides" + " starting from the beginning") + parser.add_argument('--max-staged-delete-timecode', type=int, + help="Maximum timecode to stop level 2 staged data deletion. Overrides" + " the lvl2-deletion-lag setting") + return parser if __name__ == "__main__": diff --git a/sotodlib/site_pipeline/update_book_plan.py b/sotodlib/site_pipeline/update_book_plan.py index d76ac04cd..6a80e603d 100644 --- a/sotodlib/site_pipeline/update_book_plan.py +++ b/sotodlib/site_pipeline/update_book_plan.py @@ -2,11 +2,21 @@ import datetime as dt import time from typing import Optional +from sqlalchemy import not_ -from sotodlib.io.imprinter import Imprinter from sotodlib.site_pipeline.monitor import Monitor from sotodlib.site_pipeline.util import init_logger +from sotodlib.io.imprinter import ( + Imprinter, + Books, + BOUND, + UNBOUND, + UPLOADED, + FAILED, + DONE, +) + logger = init_logger(__name__, "update_book_plan: ") def main( @@ -134,9 +144,13 @@ def record_book_counts(monitor, imprinter): log_tags = {} script_run = time.time() + session = imprinter.get_session() + def get_count( q ): + return session.query(Books).filter(q).count() + monitor.record( "unbound", - [ len(imprinter.get_unbound_books()) ], + [ get_count(Books.status == UNBOUND) ], [script_run], tags, imprinter.config["monitor"]["measurement"], @@ -145,7 +159,7 @@ def record_book_counts(monitor, imprinter): monitor.record( "bound", - [ len(imprinter.get_bound_books()) ], + [ get_count(Books.status == BOUND) ], [script_run], tags, imprinter.config["monitor"]["measurement"], @@ -154,7 +168,7 @@ def record_book_counts(monitor, imprinter): monitor.record( "uploaded", - [ len(imprinter.get_uploaded_books()) ], + [ get_count(Books.status == UPLOADED) ], [script_run], tags, imprinter.config["monitor"]["measurement"], @@ -163,7 +177,25 @@ def record_book_counts(monitor, imprinter): monitor.record( "failed", - [ len(imprinter.get_failed_books()) ], + [ get_count(Books.status == FAILED) ], + [script_run], + tags, + imprinter.config["monitor"]["measurement"], + log_tags=log_tags + ) + + monitor.record( + "done", + [ get_count(Books.status == DONE) ], + [script_run], + tags, + imprinter.config["monitor"]["measurement"], + log_tags=log_tags + ) + + monitor.record( + "has_level2", + [ get_count(not_(Books.lvl2_deleted)) ], [script_run], tags, imprinter.config["monitor"]["measurement"],