diff --git a/hera_cal/tests/test_vis_clean.py b/hera_cal/tests/test_vis_clean.py index 5ab388ea0..e8e88e16e 100644 --- a/hera_cal/tests/test_vis_clean.py +++ b/hera_cal/tests/test_vis_clean.py @@ -21,6 +21,7 @@ from hera_cal.data import DATA_PATH from hera_cal import frf import glob +import re import copy @@ -1130,7 +1131,8 @@ def test_time_chunk_from_baseline_chunks_argparser(self): assert a.time_chunk_template == 'a' assert a.outfilename == 'a.out' - def test_time_chunk_from_baseline_chunks(self, tmp_path): + @pytest.mark.parametrize("ninterleave", [1, 2, 4, 8]) + def test_time_chunk_from_baseline_chunks(self, tmpdir, ninterleave): # First, construct some cross-talk baseline files. datafiles = [os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.OCR_53x_54x_only.first.uvh5"), os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.OCR_53x_54x_only.second.uvh5")] @@ -1138,38 +1140,44 @@ def test_time_chunk_from_baseline_chunks(self, tmp_path): cals = [os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.uv.abs.calfits_54x_only.part1"), os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.uv.abs.calfits_54x_only.part2")] # make a cache directory - cdir = tmp_path / "cache_temp" - cdir.mkdir() + tmp_path = tmpdir.strpath + cdir = tmp_path + "/cache_temp" + os.mkdir(cdir) # cross-talk filter chunked baselines for filenum, file in enumerate(datafiles): - baselines = io.baselines_from_filelist_position(file, datafiles) - fname = 'temp.fragment.part.%d.h5' % filenum - fragment_filename = tmp_path / fname - frf.load_tophat_frfilter_and_write(datafiles, baseline_list=baselines, calfile_list=cals, - spw_range=[0, 20], cache_dir=cdir, read_cache=True, write_cache=True, - res_outfilename=fragment_filename, clobber=True, case='sky') + for interleave in range(ninterleave): + baselines = io.baselines_from_filelist_position(file, datafiles) + if ninterleave > 1: + fname = f'temp.fragment.part.%d.interleave_{interleave}.h5' % filenum + else: + fname = f'temp.fragment.part.%d.h5' % filenum + fragment_filename = tmp_path + fname + frf.load_tophat_frfilter_and_write(datafiles, baseline_list=baselines, calfile_list=cals, + spw_range=[0, 20], cache_dir=cdir, read_cache=True, write_cache=True, + res_outfilename=fragment_filename, clobber=True, case='sky') # load in fragment and make sure the number of baselines is equal to the length of the baseline list hd_fragment = io.HERAData(str(fragment_filename)) assert len(hd_fragment.bls) == len(baselines) assert hd_fragment.Ntimes == 60 assert hd_fragment.Nfreqs == 20 - fragments = glob.glob(DATA_PATH + '/test_output/temp.fragment.h5.part*') + fragments = sorted(glob.glob(DATA_PATH + '/test_output/temp.fragment.h5.part*')) # reconstitute the filtered data for filenum, file in enumerate(datafiles): # reconstitute + # AEW -- 5-10-2023 -- I AM HERE! fname = 'temp.reconstituted.part.%d.h5' % filenum vis_clean.time_chunk_from_baseline_chunks(time_chunk_template=file, - baseline_chunk_files=glob.glob(str(tmp_path / 'temp.fragment.part.*.h5')), clobber=True, - outfilename=str(tmp_path / fname)) + baseline_chunk_files=glob.glob(str(tmp_path + 'temp.fragment.part.*.h5')), clobber=True, + outfilename=str(tmp_path + fname)) # load in the reconstituted files. - hd_reconstituted = io.HERAData(glob.glob(str(tmp_path / 'temp.reconstituted.part.*.h5'))) + hd_reconstituted = io.HERAData(glob.glob(str(tmp_path + 'temp.reconstituted.part.*.h5'))) hd_reconstituted.read() # compare to xtalk filtering the whole file. frf.load_tophat_frfilter_and_write(datafile_list=os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.OCR_53x_54x_only.uvh5"), calfile_list=os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.uv.abs.calfits_54x_only"), - res_outfilename=str(tmp_path / 'temp.h5'), clobber=True, spw_range=[0, 20], case='sky') - hd = io.HERAData(str(tmp_path / 'temp.h5')) + res_outfilename=str(tmp_path + 'temp.h5'), clobber=True, spw_range=[0, 20], case='sky') + hd = io.HERAData(str(tmp_path + 'temp.h5')) hd.read() assert np.all(np.isclose(hd.data_array, hd_reconstituted.data_array)) assert np.all(np.isclose(hd.flag_array, hd_reconstituted.flag_array)) @@ -1179,23 +1187,25 @@ def test_time_chunk_from_baseline_chunks(self, tmp_path): # reconstitute fname = 'temp.reconstituted.part.%d.h5' % filenum vis_clean.time_chunk_from_baseline_chunks(time_chunk_template=file, - baseline_chunk_files=glob.glob(str(tmp_path / 'temp.fragment.part.*.h5')), clobber=True, - outfilename=str(tmp_path / fname), time_bounds=True) + baseline_chunk_files=glob.glob(str(tmp_path + 'temp.fragment.part.*.h5')), clobber=True, + outfilename=str(tmp_path + fname), time_bounds=True) # load in the reconstituted files. - hd_reconstituted = io.HERAData(glob.glob(str(tmp_path / 'temp.reconstituted.part.*.h5'))) + hd_reconstituted = io.HERAData(glob.glob(str(tmp_path + 'temp.reconstituted.part.*.h5'))) hd_reconstituted.read() # compare to xtalk filtering the whole file. frf.load_tophat_frfilter_and_write(datafile_list=os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.OCR_53x_54x_only.uvh5"), calfile_list=os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.uv.abs.calfits_54x_only"), - res_outfilename=str(tmp_path / 'temp.h5'), clobber=True, spw_range=[0, 20], case='sky') - hd = io.HERAData(str(tmp_path / 'temp.h5')) + res_outfilename=str(tmp_path + 'temp.h5'), + clobber=True, spw_range=[0, 20], case='sky') + hd = io.HERAData(str(tmp_path + 'temp.h5')) hd.read() assert np.all(np.isclose(hd.data_array, hd_reconstituted.data_array)) assert np.all(np.isclose(hd.flag_array, hd_reconstituted.flag_array)) assert np.all(np.isclose(hd.nsample_array, hd_reconstituted.nsample_array)) # check warning. with pytest.warns(RuntimeWarning): - vis_clean.time_chunk_from_baseline_chunks(datafiles[0], baseline_chunk_files=datafiles[1:], clobber=True, outfilename=str(tmp_path / fname), time_bounds=True) + vis_clean.time_chunk_from_baseline_chunks(datafiles[0], baseline_chunk_files=datafiles[1:], + clobber=True, outfilename=str(tmp_path + fname), time_bounds=True) def test_discard_autocorr_imag(): hd = io.HERAData(os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.OCR_53x_54x_only.uvh5")) @@ -1217,5 +1227,3 @@ def test_discard_autocorr_imag(): assert data1[first_key].dtype == np.complex64 assert np.all(data1[first_key].imag == 0) assert np.all(data1[first_key] == data0[first_key]) - - \ No newline at end of file diff --git a/hera_cal/vis_clean.py b/hera_cal/vis_clean.py index 94e29222f..0adfc17e4 100644 --- a/hera_cal/vis_clean.py +++ b/hera_cal/vis_clean.py @@ -19,7 +19,7 @@ from .datacontainer import DataContainer from .utils import echo from .flag_utils import factorize_flags - +import re def discard_autocorr_imag(data_container, keys=None): @@ -794,8 +794,7 @@ def vis_clean(self, keys=None, x=None, data=None, flags=None, wgts=None, # get filter properties mfrate = max_frate[k] if max_frate is not None else None filter_centers, filter_half_widths = gen_filter_properties(ax=ax, horizon=horizon, - standoff=standoff, min_dly=min_dly, - bl_len=self.bllens[k[:2]], max_frate=mfrate) + standoff=standoff, min_dly=min_dly, bl_len=self.bllens[k[:2]], max_frate=mfrate) if mode != 'clean': suppression_factors = [[tol], [tol]] if ax == 'both' else [tol] self.fourier_filter(filter_centers=filter_centers, filter_half_widths=filter_half_widths, @@ -2023,7 +2022,9 @@ def time_chunk_from_baseline_chunks(time_chunk_template, baseline_chunk_files, o output will trim the extra frequencies in the time_chunk and write out trimmed freqs. The same is true for polarizations. baseline_chunk_files : list of strings - list of paths to baseline-chunk files to select time-chunk file from. + list of paths to baseline-chunk files to select time-chunk file from. If the files have "_interleave" in their title then + the method will automatically identify the number of unique interleaves, chunk the file list up into interleaved sets and + retrieve integrations based on the time in the first file. outfilename : string name of the output file to write. clobber : bool optional. @@ -2040,11 +2041,38 @@ def time_chunk_from_baseline_chunks(time_chunk_template, baseline_chunk_files, o ------- Nothing """ + # check whether "interleave" is in baseline chunk filenames. If so, make sure that they all have "interleave", + # split them into sets, and make sure that the sets all have the same number of files. + interleave_mode = "interleave" in baseline_chunk_files[0] + + for fname in baseline_chunk_files: + if "interleave" not in fname and interleave_mode or \ + "interleave" in fname and not interleave_mode: + raise ValueError("must not have a subset of files with 'interleave' in name.") + if interleave_mode: + interleave_indices = np.unique([int(re.findall("interleave_[0-9]{1,10}", fname)[0][11:]) for fname in baseline_chunk_files]) + ninterleave = len(interleave_indices) + interleave_sets = {} + interleave_index_dict = {} + for inum, interleave in enumerate(interleave_indices): + interleave_sets[inum] = sorted([fname for fname in baseline_chunk_files if f'interleave_{inum}' in fname]) + interleave_index_dict[inum] = interleave + # check that all interleave sets have the same length. + if len(np.unique([len(interleave_sets[k]) for k in interleave_sets])) > 1: + raise ValueError("If you are providing interleaved files, each interleaved set must have the same number of files in it!") + else: + ninterleave = 1 + interleave_sets = {0: baseline_chunk_files} + interleave_index_dict = {0: 0} + hd_time_chunk = io.HERAData(time_chunk_template) - hd_baseline_chunk = io.HERAData(baseline_chunk_files[0]) times = hd_time_chunk.times + hd_baseline_chunk = io.HERAData(baseline_chunk_files[0]) freqs = hd_baseline_chunk.freqs polarizations = hd_baseline_chunk.pols + + + # read in the template file, but only include polarizations, frequencies # from the baseline_chunk_file files. if not time_bounds: @@ -2056,34 +2084,56 @@ def time_chunk_from_baseline_chunks(time_chunk_template, baseline_chunk_files, o # for each baseline_chunk_file, read in only the times relevant to the templatefile. # and update the data, flags, nsamples array of the template file # with the baseline_chunk_file data. - for baseline_chunk_file in baseline_chunk_files: - hd_baseline_chunk = io.HERAData(baseline_chunk_file) - # find times that are close. - tload = [] - # use tolerance in times that is set by the time resolution of the dataset. - atol = np.median(np.diff(hd_baseline_chunk.times)) / 10. - all_times = np.unique(hd_baseline_chunk.times) - for t in all_times: - if np.any(np.isclose(t, hd_time_chunk.times, atol=atol, rtol=0)): - tload.append(t) - d, f, n = hd_baseline_chunk.read(times=tload, axis='blt') - hd_time_chunk.update(flags=f, data=d, nsamples=n) - # now that we've updated everything, we write the output file. - hd_time_chunk.write_uvh5(outfilename, clobber=clobber) + for inum in interleave_sets: + for baseline_chunk_file in interleave_sets[inum]: + hd_baseline_chunk = io.HERAData(baseline_chunk_file) + # find times that are close. + tload = [] + # use tolerance in times that is set by the time resolution of the dataset. + atol = np.median(np.diff(hd_baseline_chunk.times)) / 10. + all_times = np.unique(hd_baseline_chunk.times) + for t in all_times: + if np.any(np.isclose(t, hd_time_chunk.times, atol=atol, rtol=0)): + tload.append(t) + d, f, n = hd_baseline_chunk.read(times=tload, axis='blt') + hd_time_chunk.update(flags=f, data=d, nsamples=n) + # now that we've updated everything, we write the output file. + if ninterleave > 1: + outfilename_i = outfilename.replace('.uvh5', '.interleave_{}.uvh5') + else: + outfilename_i = outfilename + + hd_time_chunk.write_uvh5(outfilename_i, clobber=clobber) + # reinstantiate writer file. + hd_time_chunk = io.HERAData(time_chunk_template) + hd_time_chunk.read(times=times, frequencies=freqs, polarizations=polarizations) + else: dt_time_chunk = np.median(np.diff(hd_time_chunk.times)) / 2. tmax = hd_time_chunk.times.max() + dt_time_chunk tmin = hd_time_chunk.times.min() - dt_time_chunk - hd_combined = io.HERAData(baseline_chunk_files) + t_select0 = (hd_baseline_chunk.times >= tmin) & (hd_baseline_chunk.times < tmax) + # use same time selection for all interleaves even though the times don't line + # up. We need the indices too. + # if the interleaves have different Ntimes, + # we will only use the Ntimes from the first interleave. + t_select = t_select0[:hd_baseline_chunk.Ntimes] # we only compare centers of baseline files to time limits of time-file. # this is to prevent integrations that straddle file boundaries from being dropped. # when we perform reconstitution. - t_select = (hd_baseline_chunk.times >= tmin) & (hd_baseline_chunk.times < tmax) - if np.any(t_select): - hd_combined.read(times=hd_baseline_chunk.times[t_select], axis='blt') - hd_combined.write_uvh5(outfilename, clobber=clobber) - else: - warnings.warn("no times selected. No outputfile produced.", RuntimeWarning) + + for inum in interleave_sets: + hd_baseline_chunk_iset = io.HERAData(interleave_sets[inum][0]) + hd_combined = io.HERAData(interleave_sets[inum]) + if np.any(t_select): + hd_combined.read(times=hd_baseline_chunk_iset.times[t_select], axis='blt') + if ninterleave > 1: + outfilename_interleave = outfilename.replace('.uvh5', f'.interleave_{interleave_index_dict[inum]}.uvh5') + else: + outfilename_interleave = outfilename + hd_combined.write_uvh5(outfilename_interleave, clobber=clobber) + else: + warnings.warn("no times selected. No outputfile produced.", RuntimeWarning) def time_chunk_from_baseline_chunks_argparser():