diff --git a/bin/dm_xnat_extract.py b/bin/dm_xnat_extract.py index b20f07c9..2268c67b 100755 --- a/bin/dm_xnat_extract.py +++ b/bin/dm_xnat_extract.py @@ -113,7 +113,8 @@ def get_bids_config(self, config, bids_conf=None): def main(): args = read_args() - configure_logging(args.study, args.quiet, args.verbose, args.debug) + log_level = get_log_level(args) + configure_logging(args.study, log_level) if args.use_dcm2bids and not datman.exporters.DCM2BIDS_FOUND: logger.error("Failed to import Dcm2Bids. Ensure that " @@ -130,6 +131,7 @@ def main(): clobber=args.clobber, dcm2bids_config=args.dcm_config, bids_out=args.bids_out, + log_level=log_level, refresh=args.refresh ) else: @@ -297,17 +299,27 @@ def _is_file(path, parser): return args -def configure_logging(study, quiet=None, verbose=None, debug=None): - ch = logging.StreamHandler(sys.stdout) +def get_log_level(args): + """Return a string representing the log level, based on user input. + + A string representation of the log level is needed to please dcm2bids :) + """ + if args.quiet: + return "ERROR" + + if args.verbose: + return "INFO" + + if args.debug: + return "DEBUG" + + return "WARNING" - log_level = logging.WARNING - if quiet: - log_level = logging.ERROR - if verbose: - log_level = logging.INFO - if debug: - log_level = logging.DEBUG +def configure_logging(study, log_level): + ch = logging.StreamHandler(sys.stdout) + + log_level = getattr(logging, log_level) logger.setLevel(log_level) ch.setLevel(log_level) @@ -586,7 +598,10 @@ def export_scans(config, xnat, xnat_experiment, session, bids_opts=None, exporter.export(scan.download_dir) for exporter in session_exporters: - exporter.export(temp_dir) + try: + exporter.export(temp_dir) + except Exception as e: + logger.error(f"Exporter {exporter} failed - {e}") def make_session_exporters(config, session, experiment, bids_opts=None, @@ -779,7 +794,12 @@ def needs_raw(session_exporters): def needs_export(session_exporters): """Returns True if any session exporters need to be run. """ - return any([not exp.outputs_exist() for exp in session_exporters]) + try: + return any([not exp.outputs_exist() for exp in session_exporters]) + except ValueError: + # ValueError is raised when an invalid series number exists on XNAT. + # Skip these sessions + return False def needs_download(scan, session_exporters, series_exporters): diff --git a/datman/exporters.py b/datman/exporters.py index db326d42..d8ddb834 100644 --- a/datman/exporters.py +++ b/datman/exporters.py @@ -10,6 +10,7 @@ unique key that can be referenced in config files (e.g. 'nii'). """ from abc import ABC, abstractmethod +from collections import OrderedDict from datetime import datetime from glob import glob from json import JSONDecodeError @@ -31,7 +32,8 @@ get_relative_source, read_json, write_json) try: - from dcm2bids import Dcm2bids + from dcm2bids import dcm2bids, Dcm2bids + from dcm2bids.sidecar import Acquisition except ImportError: DCM2BIDS_FOUND = False else: @@ -193,8 +195,181 @@ def __init__(self, config, session, experiment, bids_opts=None, **kwargs): self.log_level = bids_opts.log_level if bids_opts else "INFO" self.dcm2bids_config = bids_opts.dcm2bids_config if bids_opts else None self.refresh = bids_opts.refresh if bids_opts else False + + # Can be removed if dcm2bids patches the log issue + self.set_log_level() + super().__init__(config, session, experiment, **kwargs) + def set_log_level(self): + """Set the dcm2bids log level based on user input. + + dcm2bids doesnt properly adjust the log level based on user input, + so adjust it here to make it less spammy. + """ + if isinstance(self.log_level, str): + try: + level = getattr(logging, self.log_level) + except AttributeError: + logger.info( + f"Unrecognized log level {self.log_level}. " + "Log level set to 'warn'" + ) + level = logging.WARNING + else: + level = self.log_level + + for logger_name in logging.root.manager.loggerDict: + if not logger_name.startswith('dcm2bids'): + continue + # Get it this way instead of accessing the dict directly in + # case the dict still contains a placeholder + logging.getLogger(logger_name).setLevel(level) + + def get_expected_scans(self): + return self.get_xnat_map() + + def get_actual_scans(self): + return self.get_local_map() + + def check_contents(self, expected, actual): + misnamed = {} + missing = {} + for scan in expected: + if scan not in actual: + # Ignore scans with error files from prev dcm2niix fails + for out_name in expected[scan]: + err_file = os.path.join( + self.bids_folder, out_name + "_niix.err" + ) + if os.path.exists(err_file): + continue + else: + missing.setdefault(scan, []).append(out_name) + continue + + # Ignore split series, we can't handle these right now. + if len(expected[scan]) != 1: + continue + if len(actual[scan]) != 1: + continue + + expected_name = expected[scan][0] + actual_name = actual[scan][0] + if expected_name == actual_name: + continue + misnamed[actual_name] = expected_name + + return misnamed, missing + + def handle_missing_scans(self, missing_scans, niix_log): + # This should be refactored + series_log = parse_niix_log(niix_log, self.experiment.scans) + for scan in missing_scans: + if scan.series not in series_log: + error_msg = ( + f"dcm2niix failed to create nifti for {scan}. " + "Data may require manual intervention or blacklisting.\n" + ) + else: + error_msg = "\n".join(series_log[scan.series]) + + for fname in missing_scans[scan]: + logger.error(error_msg) + self.write_error_file(fname, error_msg) + + def write_error_file(self, fname, error_msg): + out_name = os.path.join(self.bids_folder, fname + "_niix.err") + + root_dir, _ = os.path.split(out_name) + try: + os.makedirs(root_dir) + except FileExistsError: + pass + + try: + with open(out_name, "w") as fh: + fh.writelines(error_msg) + except Exception as e: + logger.error(f"Failed to write error log. {e}") + logger.error( + "Session may continuously redownload if log is not created." + ) + + def fix_run_numbers(self, misnamed_scans): + for orig_name in misnamed_scans: + source_path = os.path.join(self.bids_folder, orig_name) + dest_path = os.path.join( + self.bids_folder, misnamed_scans[orig_name] + ) + + if not os.path.exists(os.path.dirname(dest_path)): + os.makedirs(os.path.dirname(dest_path)) + + for found in glob(source_path + "*"): + _, ext = datman.utils.splitext(found) + os.rename(found, dest_path + ext) + + def get_xnat_parser(self): + participant = dcm2bids.Participant( + self.bids_sub, session=self.bids_ses + ) + bids_conf = dcm2bids.load_json(self.dcm2bids_config) + + xnat_sidecars = [] + for scan in self.experiment.scans: + xnat_sidecars.append(FakeSidecar(scan)) + xnat_sidecars = sorted( + xnat_sidecars, key=lambda x: int(x.data['SeriesNumber']) + ) + + xnat_parser = dcm2bids.SidecarPairing( + xnat_sidecars, remove_criteria(bids_conf['descriptions']) + ) + xnat_parser.build_graph() + xnat_parser.build_acquisitions(participant) + + # Use this to find scans that have extra 'criteria' for single match + extra_acqs = [] + for sidecar, descriptions in xnat_parser.graph.items(): + if len(descriptions) > 1: + for descr in descriptions: + acq = Acquisition(participant, srcSidecar=sidecar, **descr) + extra_acqs.append(acq) + + xnat_parser.acquisitions.extend(extra_acqs) + xnat_parser.find_runs() + + return xnat_parser + + def get_local_parser(self): + participant = dcm2bids.Participant( + self.bids_sub, session=self.bids_ses + ) + + bids_conf = dcm2bids.load_json(self.dcm2bids_config) + + bids_tmp = os.path.join( + self.bids_folder, + "tmp_dcm2bids", + f"{self.session.bids_sub}_{self.session.bids_ses}" + ) + + local_sidecars = [] + for search_path in [self.output_dir, bids_tmp]: + for item in self.find_outputs(".json", start_dir=search_path): + local_sidecars.append(dcm2bids.Sidecar(item)) + local_sidecars = sorted(local_sidecars) + + parser = dcm2bids.SidecarPairing( + local_sidecars, bids_conf["descriptions"] + ) + parser.build_graph() + parser.build_acquisitions(participant) + parser.find_runs() + + return parser + def _get_scan_dir(self, download_dir): if self.refresh: # Use existing tmp_dir instead of raw dcms @@ -220,17 +395,13 @@ def outputs_exist(self): ) return False - sidecars = self.get_sidecars() - repeat_nums = [sidecars[path].get("Repeat") for path in sidecars] - - if any([repeat == self.repeat for repeat in repeat_nums]): - return True - - if self.repeat == "01" and sidecars: - # Catch instances where adding repeat to sidecars failed. - return True + expected_scans = self.get_expected_scans() + actual_scans = self.get_actual_scans() + _, missing = self.check_contents(expected_scans, actual_scans) + if missing: + return False - return False + return True def needs_raw_data(self): return not self.outputs_exist() and not self.refresh @@ -254,7 +425,45 @@ def export(self, raw_data_dir, **kwargs): self.make_output_dir() + try: + self.run_dcm2bids(raw_data_dir) + except Exception as e: + print(f"Failed to extract data. {e}") + + try: + self.add_repeat_num() + except (PermissionError, JSONDecodeError): + logger.error( + "Failed to add repeat numbers to sidecars in " + f"{self.output_dir}. If a repeat scan is added, scans may " + "incorrectly be tagged as belonging to the later repeat." + ) + + def run_dcm2bids(self, raw_data_dir, tries=2): + if tries == 0: + logger.error(f"Dcm2bids failed to run for {self.output_dir}.") + return + input_dir = self._get_scan_dir(raw_data_dir) + + if self.refresh and not os.path.exists(input_dir): + logger.error( + f"Cannot refresh contents of {self.output_dir}, no " + f"files found at {input_dir}.") + return + + # Only run dcm2niix the first try, on the second just export the + # tmp folder contents from the last run + force_niix = False if tries == 1 else self.force_dcm2niix + + expected_scans = self.get_expected_scans() + actual_scans = self.get_actual_scans() + rename, missing = self.check_contents(expected_scans, actual_scans) + + if rename: + self.fix_run_numbers(rename) + + niix_log = [] try: dcm2bids_app = Dcm2bids( input_dir, @@ -263,24 +472,107 @@ def export(self, raw_data_dir, **kwargs): output_dir=self.bids_folder, session=self.bids_ses, clobber=self.clobber, - forceDcm2niix=self.force_dcm2niix, + forceDcm2niix=force_niix, log_level=self.log_level ) dcm2bids_app.run() except Exception as exc: logger.error( - f"Dcm2Bids failed to run for {self.output_dir}. " - f"{type(exc)}: {exc}" + f"Dcm2Bids error for {self.output_dir}. {type(exc)}: {exc}" ) + niix_log = exc.stdout + self.run_dcm2bids(raw_data_dir, tries=tries - 1) - try: - self.add_repeat_num() - except (PermissionError, JSONDecodeError): - logger.error( - "Failed to add repeat numbers to sidecars in " - f"{self.output_dir}. If a repeat scan is added, scans may " - "incorrectly be tagged as belonging to the later repeat." - ) + if not niix_log: + # No dcm2niix conversion errors to handle + return + + expected_scans = self.get_expected_scans() + actual_scans = self.get_actual_scans() + rename, missing = self.check_contents(expected_scans, actual_scans) + + if missing: + self.handle_missing_scans(missing, niix_log) + + if rename: + self.fix_run_numbers(rename) + + def report_export_issues(self, xnat_map, local_map, series_log): + rename = {} + missing = {} + for scan in xnat_map: + if scan not in local_map: + # Note the [0] should probably be dropped. kept for testing. + missing[scan] = xnat_map[scan][0] + continue + if len(xnat_map[scan]) != 1: + continue + if len(local_map[scan]) != 1: + continue + if xnat_map[scan][0] == local_map[scan][0]: + continue + rename[local_map[scan][0]] = xnat_map[scan][0] + + for scan in missing: + if scan.series not in series_log: + print(f"{scan} -> {missing[scan]} failed dcm2niix export") + for orig_name in rename: + print(f"Renaming {orig_name} -> {rename[orig_name]}") + + return rename, missing + + def get_xnat_map(self): + xnat_parser = self.get_xnat_parser() + xnat_map = {} + for acq in xnat_parser.acquisitions: + xnat_map.setdefault(acq.srcSidecar.scan, []).append(acq.dstRoot) + return xnat_map + + def get_local_map(self): + local_parser = self.get_local_parser() + # Map exported local scans to the xnat series + local_map = {} + xnat_series_nums = [scan.series for scan in self.experiment.scans] + for acq in local_parser.acquisitions: + sidecar = acq.srcSidecar + if ('Repeat' in sidecar.data and + sidecar.data['Repeat'] != self.session.session): + continue + if 'SeriesNumber' not in sidecar.data: + continue + series = str(sidecar.data['SeriesNumber']) + if series not in xnat_series_nums: + if len(series) < 3: + continue + # This may be one of the split series, which get '10' prefixed + # strip it and check again + # Convert to int to trim preceding zeries + tmp_series = str(int(str(series)[2:])) + if tmp_series not in xnat_series_nums: + # It's just not a recognized series + continue + # It IS a prefixed one, so replace with orig num + series = tmp_series + found = None + for scan in self.experiment.scans: + if scan.series == str(series): + found = scan + if not found: + continue + + # Handle previously renamed series + # This happens when there are multiple runs but an + # early one has completely failed to extract. + # (i.e. dcm2bids thinks the run number differs from what it + # _should_ be if all had extracted) + dst_path = os.path.join(self.bids_folder, acq.dstRoot) + if dst_path != acq.srcRoot: + dst_path = acq.srcRoot.replace(self.bids_folder, "") + else: + dst_path = acq.dstRoot + + local_map.setdefault(found, []).append(dst_path) + return local_map def add_repeat_num(self): orig_contents = self.get_sidecars() @@ -293,11 +585,150 @@ def add_repeat_num(self): orig_contents[path]["Repeat"] = self.repeat write_json(path, orig_contents[path]) + def find_outputs(self, ext, start_dir=None): + """Find output files with the given extension. + """ + if not ext.startswith("."): + ext = "." + ext + + if not start_dir: + start_dir = self.output_dir + + found = [] + for root, _, files in os.walk(start_dir): + for item in files: + if item.endswith(ext): + found.append(os.path.join(root, item)) + return found + def get_sidecars(self): - sidecars = glob(os.path.join(self.output_dir, "*", "*.json")) + sidecars = self.find_outputs(".json") contents = {path: read_json(path) for path in sidecars} return contents + def find_missing_scans(self): + """Find scans that exist on xnat but are missing from the bids folder. + """ + class FakeSidecar(dcm2bids.Sidecar): + """Turns XNAT series descriptions into pseudo-sidecars. + """ + def __init__(self, xnat_scan): + self.scan = xnat_scan + self.data = xnat_scan + self.compKeys = dcm2bids.DEFAULT.compKeys + + # Placeholders for compatibility with dcm2bids.Sidecar + self.root = ( + f"/tmp/{xnat_scan.series}" + + f"_{xnat_scan.description}" + + f"_{xnat_scan.subject}" + ) + self.filename = f"{self.root}.json" + self.data["SidecarFilename"] = self.filename + + @property + def data(self): + return self._data + + @data.setter + def data(self, scan): + self._data = OrderedDict() + self._data['SeriesDescription'] = scan.description + self._data['SeriesNumber'] = scan.series + + def __repr__(self): + return f"" + + def get_expected_names(participant, sidecars, bids_conf): + parser = dcm2bids.SidecarPairing( + sidecars, bids_conf["descriptions"] + ) + parser.build_graph() + parser.build_acquisitions(participant) + parser.find_runs() + return [acq.dstRoot for acq in parser.acquisitions] + + def remove_criteria(descriptions): + trim_conf = [] + for descr in bids_conf['descriptions']: + new_descr = descr.copy() + if len(descr['criteria']) > 1: + new_descr['criteria'] = OrderedDict() + new_descr['criteria']['SeriesDescription'] = descr[ + 'criteria']['SeriesDescription'] + trim_conf.append(new_descr) + return trim_conf + + participant = dcm2bids.Participant( + self.bids_sub, session=self.bids_ses + ) + + bids_conf = dcm2bids.load_json(self.dcm2bids_config) + + bids_tmp = os.path.join( + self.bids_folder, + "tmp_dcm2bids", + f"{self.session.bids_sub}_{self.session.bids_ses}" + ) + + local_sidecars = [] + for search_path in [self.output_dir, bids_tmp]: + for item in self.find_outputs(".json", start_dir=search_path): + sidecar = dcm2bids.Sidecar(item) + if ('Repeat' in sidecar.data and + sidecar.data['Repeat'] != self.session.session): + continue + local_sidecars.append(sidecar) + local_sidecars = sorted(local_sidecars) + + xnat_sidecars = [] + for scan in self.experiment.scans: + xnat_sidecars.append(FakeSidecar(scan)) + xnat_sidecars = sorted(xnat_sidecars) + + local_scans = get_expected_names( + participant, local_sidecars, bids_conf + ) + + # Use a more permissive bids_conf when finding xnat acqs + xnat_parser = dcm2bids.SidecarPairing( + xnat_sidecars, remove_criteria(bids_conf['descriptions']) + ) + xnat_parser.build_graph() + xnat_parser.build_acquisitions(participant) + # Use this to find scans that have extra 'criteria' for single match + extra_acqs = [] + for sidecar, descriptions in xnat_parser.graph.items(): + if len(descriptions) > 1: + for descr in descriptions: + acq = Acquisition(participant, srcSidecar=sidecar, **descr) + extra_acqs.append(acq) + xnat_parser.acquisitions.extend(extra_acqs) + xnat_parser.find_runs() + xnat_scans = [acq.dstRoot for acq in xnat_parser.acquisitions] + + missing_scans = [] + for scan in xnat_scans: + if scan not in local_scans: + if "run-01" in scan: + norun_scan = scan.replace("_run-01", "") + if norun_scan not in local_scans: + missing_scans.append(scan) + else: + missing_scans.append(scan) + + extra_scans = [] + for scan in local_scans: + if scan not in xnat_scans: + if "run-01" in scan: + norun_scan = scan.replace("_run-01", "") + if norun_scan not in xnat_scans: + extra_scans.append(scan) + else: + extra_scans.append(scan) + + return missing_scans, extra_scans + class NiiLinkExporter(SessionExporter): """Populates a study's nii folder with symlinks pointing to the bids dir. @@ -1226,3 +1657,91 @@ def _export_multi_echo(self, raw_data_dir): SERIES_EXPORTERS = { exp.type: exp for exp in SeriesExporter.__subclasses__() } + + +class FakeSidecar(dcm2bids.Sidecar): + """Turns XNAT series descriptions into pseudo-sidecars. + """ + def __init__(self, xnat_scan): + self.scan = xnat_scan + self.data = xnat_scan + self.compKeys = dcm2bids.DEFAULT.compKeys + + # Placeholders for compatibility with dcm2bids.Sidecar + self.root = ( + f"/tmp/{xnat_scan.series}" + + f"_{xnat_scan.description}" + + f"_{xnat_scan.subject}" + ) + self.filename = f"{self.root}.json" + self.data["SidecarFilename"] = self.filename + + @property + def data(self): + return self._data + + @data.setter + def data(self, scan): + self._data = OrderedDict() + self._data['SeriesDescription'] = scan.description + self._data['SeriesNumber'] = scan.series + + def __repr__(self): + return f"" + + +def get_expected_names(participant, sidecars, bids_conf): + parser = dcm2bids.SidecarPairing( + sidecars, bids_conf["descriptions"] + ) + parser.build_graph() + parser.build_acquisitions(participant) + parser.find_runs() + return [acq.dstRoot for acq in parser.acquisitions] + + +def remove_criteria(descriptions): + trim_conf = [] + for descr in descriptions: + new_descr = descr.copy() + if len(descr['criteria']) > 1: + new_descr['criteria'] = OrderedDict() + new_descr['criteria']['SeriesDescription'] = descr[ + 'criteria']['SeriesDescription'] + trim_conf.append(new_descr) + return trim_conf + + +def parse_niix_log(niix_output, xnat_scans): + log_lines = sort_log(niix_output.split(b"\n")) + + series_log = {} + for entry in log_lines: + for line in entry: + if line.startswith("Compress:"): + nii_path = line.split(" ")[-1] + series = str(int(os.path.basename(nii_path).split("_")[0])) + # Handle split series (they get '10' prepended to series num) + if series not in [scan.series for scan in xnat_scans]: + # drop the '10' prefix: + series = str(int(series[2:])) + series_log.setdefault(series, []).extend(entry) + return series_log + + +def sort_log(log_lines): + """Sort a dcm2nix stdout log by series that produced each entry. + """ + sorted_lines = [] + cur_idx = -1 + cur_entry = None + for idx, line in enumerate(log_lines): + line = line.decode('utf-8') + if line.startswith("Convert "): + if cur_entry: + sorted_lines.append(cur_entry) + cur_idx = idx + cur_entry = [] + if cur_idx >= 0: + cur_entry.append(line) + return sorted_lines