diff --git a/fissa/core.py b/fissa/core.py index 358413ef..ce3feeb8 100644 --- a/fissa/core.py +++ b/fissa/core.py @@ -281,11 +281,16 @@ def separation_prep(self, redo=False): fname = os.path.join(self.folder, 'preparation.npy') # try to load data from filename + if fname is None or not os.path.isfile(fname): + redo = True if not redo: try: nCell, raw, roi_polys = np.load(fname, allow_pickle=True) print('Reloading previously prepared data...') - except BaseException: + except BaseException as err: + print("An error occurred while loading {}".format(fname)) + print(err) + print("Extraction will be redone and {} overwritten".format(fname)) redo = True if redo: @@ -387,6 +392,7 @@ def separate(self, redo_prep=False, redo_sep=False): # Do data preparation if redo_prep or self.raw is None: self.separation_prep(redo_prep) + if redo_prep: redo_sep = True # Define filename to store data in @@ -395,11 +401,19 @@ def separate(self, redo_prep=False, redo_sep=False): redo_sep = True else: fname = os.path.join(self.folder, 'separated.npy') + if fname is None or not os.path.isfile(fname): + redo_sep = True if not redo_sep: try: info, mixmat, sep, result = np.load(fname, allow_pickle=True) print('Reloading previously separated data...') - except BaseException: + except BaseException as err: + print("An error occurred while loading {}".format(fname)) + print(err) + print( + "Signal separation will be redone and {} overwritten" + "".format(fname) + ) redo_sep = True # separate data, if necessary diff --git a/fissa/tests/base_test.py b/fissa/tests/base_test.py index efc7a8d3..4ad6ee6d 100644 --- a/fissa/tests/base_test.py +++ b/fissa/tests/base_test.py @@ -7,12 +7,14 @@ import unittest import os.path from inspect import getsourcefile +import sys import numpy as np from numpy.testing import (assert_almost_equal, assert_array_equal, assert_allclose, assert_equal) +import pytest # Check where the test directory is located, to be used when fetching @@ -44,6 +46,37 @@ def subTest(self, *args, **kwargs): else: yield None + @pytest.fixture(autouse=True) + def capsys(self, capsys): + self.capsys = capsys + + def recapsys(self, *captures): + """ + Capture stdout and stderr, then write them back to stdout and stderr. + + Capture is done using the `pytest.capsys` fixture. + + Parameters + ---------- + *captures : pytest.CaptureResult, optional + A series of extra captures to output. For each `capture` in + `captures`, `capture.out` and `capture.err` are written to stdout + and stderr. + + Returns + ------- + capture : pytest.CaptureResult + `capture.out` and `capture.err` contain all the outputs to stdout + and stderr since the previous capture with `~pytest.capsys`. + """ + capture_now = self.capsys.readouterr() + for capture in captures: + sys.stdout.write(capture.out) + sys.stderr.write(capture.err) + sys.stdout.write(capture_now.out) + sys.stderr.write(capture_now.err) + return capture_now + def assert_almost_equal(self, *args, **kwargs): return assert_almost_equal(*args, **kwargs) @@ -72,3 +105,35 @@ def assert_equal_dict_of_array(self, desired, actual): self.assertEqual(desired.keys(), actual.keys()) for k in desired.keys(): self.assertEqual(desired[k], actual[k]) + + def assert_starts_with(self, desired, actual): + """ + Check that a string starts with a certain substring. + + Parameters + ---------- + desired : str + Desired initial string. + actual : str-like + Actual string or string-like object. + """ + try: + self.assertTrue(len(actual) >= len(desired)) + except BaseException as err: + print("Actual string too short ({} < {} characters)".format(len(actual), len(desired))) + print("ACTUAL: {}".format(actual)) + raise + try: + return assert_equal(str(actual)[:len(desired)], desired) + except BaseException as err: + msg = "ACTUAL: {}".format(actual) + if isinstance(getattr(err, "args", None), str): + err.args += "\n" + msg + elif isinstance(getattr(err, "args", None), tuple): + if len(err.args) == 1: + err.args = (err.args[0] + "\n" + msg, ) + else: + err.args += (msg, ) + else: + print(msg) + raise diff --git a/fissa/tests/test_core.py b/fissa/tests/test_core.py index 901959e7..447fafe9 100644 --- a/fissa/tests/test_core.py +++ b/fissa/tests/test_core.py @@ -243,33 +243,163 @@ def test_prepfirst(self): self.assert_allclose(actual[0][0], self.expected_00) def test_redo(self): + """Test whether experiment redoes work when requested.""" exp = core.Experiment(self.images_dir, self.roi_zip_path, self.output_dir) + capture_pre = self.capsys.readouterr() # Clear stdout exp.separate() + capture_post = self.recapsys(capture_pre) + self.assert_starts_with("Doing", capture_post.out) + capture_pre = self.capsys.readouterr() # Clear stdout exp.separate(redo_prep=True, redo_sep=True) + capture_post = self.recapsys(capture_pre) + self.assert_starts_with("Doing", capture_post.out) + + def test_load_cache(self): + """Test whether cached output is loaded during init.""" + image_path = self.images_dir + roi_path = self.roi_zip_path + exp1 = core.Experiment(image_path, roi_path, self.output_dir) + exp1.separate() + exp = core.Experiment(image_path, roi_path, self.output_dir) + # Cache should be loaded without calling separate actual = exp.result self.assert_equal(len(actual), 1) self.assert_equal(len(actual[0]), 1) self.assert_allclose(actual[0][0], self.expected_00) - def test_noredo(self): + def test_load_cache_piecemeal(self): + """ + Test whether cached output is loaded during individual method calls. + """ image_path = self.images_dir roi_path = self.roi_zip_path exp1 = core.Experiment(image_path, roi_path, self.output_dir) exp1.separate() exp = core.Experiment(image_path, roi_path, self.output_dir) + capture_pre = self.capsys.readouterr() # Clear stdout + exp.separation_prep() + capture_post = self.recapsys(capture_pre) + self.assert_starts_with("Reloading previously prepared", capture_post.out) + capture_pre = self.capsys.readouterr() # Clear stdout exp.separate() + capture_post = self.recapsys(capture_pre) + self.assert_starts_with("Reloading previously separated", capture_post.out) actual = exp.result self.assert_equal(len(actual), 1) self.assert_equal(len(actual[0]), 1) self.assert_allclose(actual[0][0], self.expected_00) - def test_previousprep(self): + def test_load_cached_prep(self): + """ + With prep cached, test prep loads and separate waits for us to call it. + """ image_path = self.images_dir roi_path = self.roi_zip_path exp1 = core.Experiment(image_path, roi_path, self.output_dir) exp1.separation_prep() + capture_pre = self.capsys.readouterr() # Clear stdout + exp = core.Experiment(image_path, roi_path, self.output_dir) + capture_post = self.recapsys(capture_pre) # Capture and then re-output + self.assert_starts_with("Reloading previously prepared", capture_post.out) + capture_pre = self.capsys.readouterr() # Clear stdout + exp.separation_prep() + capture_post = self.recapsys(capture_pre) # Capture and then re-output + self.assert_starts_with("Reloading previously prepared", capture_post.out) + capture_pre = self.capsys.readouterr() # Clear stdout + exp.separate() + capture_post = self.recapsys(capture_pre) + self.assert_starts_with("Doing signal separation", capture_post.out) + actual = exp.result + self.assert_equal(len(actual), 1) + self.assert_equal(len(actual[0]), 1) + self.assert_allclose(actual[0][0], self.expected_00) + + @unittest.expectedFailure + def test_badprepcache_init1(self): + """ + With a faulty prep cache, test prep hits an error during init and then stops. + """ + image_path = self.images_dir + roi_path = self.roi_zip_path + if not os.path.exists(self.output_dir): + os.makedirs(self.output_dir) + with open(os.path.join(self.output_dir, "preparation.npy"), "w") as f: + f.write("badfilecontents") + + capture_pre = self.capsys.readouterr() # Clear stdout + exp = core.Experiment(image_path, roi_path, self.output_dir) + capture_post = self.recapsys(capture_pre) # Capture and then re-output + self.assert_starts_with("An error occurred", capture_post.out) + + self.assertTrue(exp.raw is None) + + @unittest.expectedFailure + def test_badprepcache_init2(self): + """ + With a faulty prep cache, test prep initially errors but then runs when called. + """ + image_path = self.images_dir + roi_path = self.roi_zip_path + if not os.path.exists(self.output_dir): + os.makedirs(self.output_dir) + with open(os.path.join(self.output_dir, "preparation.npy"), "w") as f: + f.write("badfilecontents") + + capture_pre = self.capsys.readouterr() # Clear stdout + exp = core.Experiment(image_path, roi_path, self.output_dir) + capture_post = self.recapsys(capture_pre) # Capture and then re-output + self.assert_starts_with("An error occurred", capture_post.out) + + capture_pre = self.capsys.readouterr() # Clear stdout + exp.separation_prep() + capture_post = self.recapsys(capture_pre) # Capture and then re-output + self.assert_starts_with("Doing region growing", capture_post.out) + + def test_badprepcache(self): + """ + With a faulty prep cache, test prep catches error and runs when called. + """ + image_path = self.images_dir + roi_path = self.roi_zip_path + if not os.path.exists(self.output_dir): + os.makedirs(self.output_dir) + exp = core.Experiment(image_path, roi_path, self.output_dir) + with open(os.path.join(self.output_dir, "preparation.npy"), "w") as f: + f.write("badfilecontents") + + capture_pre = self.capsys.readouterr() # Clear stdout + exp.separation_prep() + capture_post = self.recapsys(capture_pre) # Capture and then re-output + self.assert_starts_with("An error occurred", capture_post.out) + + capture_pre = self.capsys.readouterr() # Clear stdout + exp.separate() + capture_post = self.recapsys(capture_pre) # Capture and then re-output + self.assert_starts_with("Doing signal separation", capture_post.out) + + actual = exp.result + self.assert_equal(len(actual), 1) + self.assert_equal(len(actual[0]), 1) + self.assert_allclose(actual[0][0], self.expected_00) + + def test_badsepcache(self): + """ + With a faulty separated cache, test separate catches error and runs when called. + """ + image_path = self.images_dir + roi_path = self.roi_zip_path + if not os.path.exists(self.output_dir): + os.makedirs(self.output_dir) exp = core.Experiment(image_path, roi_path, self.output_dir) + exp.separation_prep() + with open(os.path.join(self.output_dir, "separated.npy"), "w") as f: + f.write("badfilecontents") + + capture_pre = self.capsys.readouterr() # Clear stdout exp.separate() + capture_post = self.recapsys(capture_pre) # Capture and then re-output + self.assert_starts_with("An error occurred", capture_post.out) + actual = exp.result self.assert_equal(len(actual), 1) self.assert_equal(len(actual[0]), 1)