diff --git a/_add_atlas b/_add_atlas deleted file mode 100755 index a615761..0000000 --- a/_add_atlas +++ /dev/null @@ -1,305 +0,0 @@ -#!/usr/bin/env python3 -""" -Identify new ATLAS data and add to the SBN SIS database. - -Includes fits and diff files. - -""" - -import os -import sys -import shlex -import logging -import sqlite3 -import argparse -from glob import glob -import logging.handlers -from packaging.version import Version - -from astropy.time import Time -import astropy.units as u -import pds4_tools - -from sbn_survey_image_service.data.add import add_label -from sbn_survey_image_service.services.database_provider import data_provider_session -from sbn_survey_image_service.models import HarvestLog - - -class LabelError(Exception): - pass - - -def get_logger(): - return logging.getLogger("SBNSIS/Add ATLAS") - - -def setup_logger(args): - logger = get_logger() - - if len(logger.handlers) > 0: - # already set up - return logger - - if not os.path.exists(os.path.dirname(args.log)): - os.makedirs(os.path.dirname(args.log), exist_ok=True) - - logger.setLevel(logging.DEBUG) - - formatter = logging.Formatter("%(levelname)s:%(name)s:%(asctime)s: %(message)s") - - handler = logging.StreamHandler(sys.stderr) - handler.setLevel(logging.DEBUG if args.verbose else logging.ERROR) - handler.setFormatter(formatter) - logger.addHandler(handler) - - handler = logging.FileHandler(args.log) - handler.setLevel(logging.DEBUG if args.verbose else logging.INFO) - handler.setFormatter(formatter) - logger.addHandler(handler) - - logger.info("%s", " ".join([shlex.quote(s) for s in sys.argv])) - - return logger - - -def get_time_of_last() -> Time: - """Get the time of the last file validation.""" - - date: Time - with data_provider_session() as session: - entry: HarvestLog = ( - session.query(HarvestLog) - .filter(HarvestLog.source == "atlas") - .order_by(HarvestLog.end.desc()) - .one() - ) - date = Time(entry.time_of_last) - return date - - -def is_harvest_processing() -> bool: - """True if a harvester log entry is set to 'processing'.""" - - is_processing: bool - with data_provider_session() as session: - is_processing = ( - session.query(HarvestLog) - .filter(HarvestLog.source == "atlas") - .filter(HarvestLog.end == "processing") - .first() - ) is not None - return is_processing - - -def get_collections(db, start, stop): - """Get collections validated between start and stop. - - The rows are ordered so that if a fatal error occurs the next run might be - able to recover. - - """ - - cursor = db.execute( - """SELECT * FROM nn - WHERE current_status = 'validated' - AND recorded_at > ? AND recorded_at < ? - ORDER BY recorded_at - """, - (start.unix, stop.unix), - ) - return cursor.fetchall() - - -def collection_version(collection) -> Version: - """Get the collection version.""" - is_collection = ( - collection.label.find("Identification_Area/product_class").text - == "Product_Collection" - ) - vid = collection.label.find("Identification_Area/version_id") - if not is_collection or vid is None: - raise LabelError("This does not appear to be a valid PDS4 label.") - return Version(vid.text) - - -def get_lidvid(filename): - """Return the LIDVID and data file name.""" - product = pds4_tools.read(filename, quiet=True, lazy_load=True) - lid = product.label.find("Identification_Area/logical_identifier").text - vid = product.label.find("Identification_Area/version_id").text - return "::".join((lid, vid)) - - -def get_image_labels(collection, data_directory) -> Version: - """Get the file inventory of image files to ingest. - - The label file names for all LIDVIDs ending with ".fits" or ".diff" in the - collection inventory will be returned. - - Candidate labels are collected from xml files within `directory`. - - """ - - logger = get_logger() - files = {} - count = 0 - for fn in glob(f"{data_directory}/*xml"): - if not fn.endswith((".fits.xml", ".diff.xml")): - continue - files[get_lidvid(fn)] = fn - count += 1 - if (count % 100) == 0: - logger.debug("%d files read", count) - logger.debug("%d files read", count) - - image_files = [] - for lidvid in collection[0].data["LIDVID_LID"]: - lid = lidvid.split("::")[0] - if not lid.endswith((".fits", ".diff")): - continue - if lidvid not in files: - raise LabelError(f"{lidvid} not found in {data_directory}") - image_files.append(files[lidvid]) - return image_files - - -parser = argparse.ArgumentParser() -parser.add_argument( - "database", type=os.path.normpath, help="ATLAS-PDS processing database" -) -mutex = parser.add_mutually_exclusive_group() -mutex.add_argument( - "--since-date", type=Time, help="harvest metadata validated since this date" -) -mutex.add_argument( - "--past", - type=int, - help="harvest metadata validated in the past SINCE hours", -) -mutex.add_argument( - "--between-dates", - type=Time, - nargs=2, - help="harvest metadata validated between these dates", -) - -parser.add_argument( - "--log", default="./logging/add-atlas.log", help="log messages to this file" -) -parser.add_argument( - "--verbose", "-v", action="store_true", help="log debugging messages" -) -args = parser.parse_args() - -logger = setup_logger(args) - -# setup database -try: - db = sqlite3.connect(f"file:{args.database}?mode=ro", uri=True) - db.row_factory = sqlite3.Row -except Exception as exc: - logger.error("Could not connect to database %s", args.database) - raise exc - -logger.info("Connected to database %s", args.database) - -if args.between_dates is not None: - start = args.between_dates[0] - stop = args.between_dates[-1] - logger.info( - "Checking for collections validated between %s and %s", start.iso, stop.iso - ) -elif args.past is not None: - date = Time.now() - args.past * u.hr - logger.info( - "Checking for collections validated in the past %d hr (since %s)", - args.past, - date.iso, - ) -elif args.since_date: - date = args.since_date - logger.info("Checking for collections validated since %s", date.iso) -else: - date = get_time_of_last( - db, - ) - -results = get_collections(db, start, stop) - -if len(results) == 0: - logger.info("No new data collections found.") -else: - logger.debug("Found %d new collections.", len(results)) - for row in results: - logger.debug("%d|%d|%s", row["oc"], row["nn"], row["location"]) - - with data_provider_session() as session: - harvest_log: HarvestLog = HarvestLog( - start=Time.now().iso, - end="processing", - source="atlas", - time_of_last="", - files=0, - added=0, - errors=0, - ) - - # save to database now with "processing" status to prevent other - # instances from running - session.add(harvest_log) - session.commit() - - for i, row in enumerate(results): - logger.info("%d collections to process.", len(results) - i) - - collections = [ - pds4_tools.read(fn, quiet=True) - for fn in glob(f"/n/{row['location']}/collection_{row['nn']}*.xml") - ] - - # find the latest collection lidvid and save to the log - versions = [collection_version(label) for label in collections] - latest = collections[versions.index(max(versions))] - lid = latest.label.find("Identification_Area/logical_identifier").text - vid = latest.label.find("Identification_Area/version_id").text - logger.info("Found collection %s::%s", lid, vid) - - # Find image products in the data directory - data_directory = f"/n/{row['location']}/data" - logger.debug( - "Inspecting directory %s for image products", - data_directory, - ) - files = get_image_labels(latest, data_directory) - logger.debug("%d image products to add", len(files)) - - # harvest metadata - count = 0 - errors = 0 - for label in files: - try: - count += add_label(label, session) - except Exception as exc: - logger.error(exc) - errors += 1 - logger.info("%d files added", count) - logger.info("%d files already in the database", len(files) - count - errors) - logger.info("%d files errored", errors) - - # update harvest log - harvest_log.files += len(files) - harvest_log.added += count - harvest_log.errors += errors - harvest_log.time_of_last = max( - harvest_log.time_of_last, Time(row["recorded_at"], format="unix").iso - ) - - # save interim results - session.merge(harvest_log) - session.commit() - - harvest_log.end = Time.now().iso - session.merge(harvest_log) - session.commit() - -logger.info("Finished.") diff --git a/sbn_survey_image_service/models/__init__.py b/sbn_survey_image_service/models/__init__.py index 196ed46..05aecfd 100644 --- a/sbn_survey_image_service/models/__init__.py +++ b/sbn_survey_image_service/models/__init__.py @@ -3,4 +3,3 @@ from .base import Base from .image import Image -from .harvest_log import HarvestLog diff --git a/sbn_survey_image_service/models/harvest_log.py b/sbn_survey_image_service/models/harvest_log.py deleted file mode 100644 index be32202..0000000 --- a/sbn_survey_image_service/models/harvest_log.py +++ /dev/null @@ -1,43 +0,0 @@ -# Licensed under a 3-clause BSD style license - see LICENSE.rst -"""SBN Survey Image Service data models. - -Image: ORM model for a metadata harvesting log. - -""" - -from sqlalchemy import Column, String, Integer -from .base import Base - - -class HarvestLog(Base): - """ORM class for metadata harvesting log.""" - - __tablename__ = "harvest_log" - - id: int = Column(Integer, primary_key=True) - - start: str = Column(String, nullable=False, index=True) - """Date and time the harvesting started (UTC).""" - - end: str = Column(String, nullable=False, index=True) - """Date and time the harvesting ended (UTC).""" - - source: str = Column(String, nullable=False) - """Data source being harvested.""" - - time_of_last: str = Column(String, nullable=False) - """Time stamp of the last file added. - - This is intended to be used as the basis for discovering what new data could - be added from external archives. - - """ - - files: int = Column(Integer, nullable=False) - """Number of files harvested.""" - - added: int = Column(Integer, nullable=False) - """Number of files added to the database.""" - - errors: int = Column(Integer, nullable=False) - """Number of files that errored."""