From e5ecdd2a1d9cd3975b1efd4b02da1c502643fecb Mon Sep 17 00:00:00 2001 From: Saianeesh Keshav Haridas Date: Wed, 18 Dec 2024 12:20:21 -0500 Subject: [PATCH] feat: make ds factor a parameter and remove threadpool --- sotodlib/tod_ops/jumps.py | 46 +++++++++++++++++---------------------- 1 file changed, 20 insertions(+), 26 deletions(-) diff --git a/sotodlib/tod_ops/jumps.py b/sotodlib/tod_ops/jumps.py index b98041a8c..0b2707353 100644 --- a/sotodlib/tod_ops/jumps.py +++ b/sotodlib/tod_ops/jumps.py @@ -1,5 +1,3 @@ -import concurrent.futures -import os from typing import Literal, Optional, Tuple, Union, cast, overload import numpy as np @@ -12,7 +10,6 @@ from so3g import ( matched_jumps, matched_jumps64, - clean_flag, find_quantized_jumps, find_quantized_jumps64, ) @@ -21,8 +18,6 @@ from ..flag_utils import _merge -NFUTURE = int(os.environ.get("NUM_FUTURES", min(32, int(os.cpu_count() or 0) + 4))) - def std_est( x: NDArray[np.floating], @@ -204,6 +199,8 @@ def _fix(jump_ranges, heights, x_fixed): jumps = RangesMatrix.from_mask(np.atleast_2d(jumps)) elif isinstance(jumps, Ranges): jumps = RangesMatrix.from_mask(np.atleast_2d(jumps.mask())) + if not isinstance(jumps, RangesMatrix): + raise TypeError("jumps not RangesMatrix or convertable to RangesMatrix") if heights is None: heights = estimate_heights(x_fixed, jumps.mask(), **kwargs) @@ -211,12 +208,7 @@ def _fix(jump_ranges, heights, x_fixed): heights = heights.toarray() heights = cast(NDArray[np.floating], heights) - nfuture = min(len(x_fixed), NFUTURE) - slice_size = len(x_fixed) // nfuture - slices = [slice(i * slice_size, (i + 1) * slice_size) for i in range(nfuture)] - slices[-1] = slice(slices[-1].start, len(x_fixed)) - with concurrent.futures.ThreadPoolExecutor() as e: - _ = [e.submit(_fix, jumps.ranges[s], heights[s], x_fixed[s]) for s in slices] + _fix(jumps.ranges, heights, x_fixed) return x_fixed.reshape(orig_shape) @@ -337,6 +329,7 @@ def twopi_jumps( merge=..., overwrite=..., name=..., + ds=..., **filter_pars, ) -> Tuple[RangesMatrix, csr_array, NDArray[np.floating]]: ... @@ -355,6 +348,7 @@ def twopi_jumps( merge=..., overwrite=..., name=..., + ds=..., **filter_pars, ) -> Tuple[RangesMatrix, csr_array]: ... @@ -372,6 +366,7 @@ def twopi_jumps( merge: bool = True, overwrite: bool = False, name: str = "jumps_2pi", + ds: int = 10, **filter_pars, ) -> Union[ Tuple[RangesMatrix, csr_array], Tuple[RangesMatrix, csr_array, NDArray[np.floating]] @@ -410,6 +405,8 @@ def twopi_jumps( name: String used to populate field in flagmanager if merge is True. + ds: Downsample factor used when computing noise level, the actual factor used is `ds*win_size`. + **filter_pars: Parameters to pass to _filter Returns: @@ -428,7 +425,7 @@ def twopi_jumps( raise TypeError("Signal is not an array") if atol is None: atol = nsigma * std_est( - signal.astype(float), ds=win_size * 10, win_size=win_size + signal.astype(float), ds=win_size * ds, win_size=win_size ) np.clip(atol, 1e-8, max_tol) @@ -602,6 +599,7 @@ def find_jumps( merge=..., overwrite=..., name=..., + ds=..., **filter_pars, ) -> Tuple[RangesMatrix, csr_array]: ... @@ -620,6 +618,7 @@ def find_jumps( merge=..., overwrite=..., name=..., + ds=..., **filter_pars, ) -> Tuple[RangesMatrix, csr_array, NDArray[np.floating]]: ... @@ -637,6 +636,7 @@ def find_jumps( merge: bool = True, overwrite: bool = False, name: str = "jumps", + ds: int = 10, **filter_pars, ) -> Union[ Tuple[RangesMatrix, csr_array], Tuple[RangesMatrix, csr_array, NDArray[np.floating]] @@ -678,6 +678,8 @@ def find_jumps( name: String used to populate field in flagmanager if merge is True. + ds: Downsample factor used when computing noise level, the actual factor used is `ds*win_size`. + **filter_pars: Parameters to pass to _filter Returns: @@ -697,36 +699,28 @@ def find_jumps( raise TypeError("Signal is not an array") orig_shape = signal.shape + _signal = _filter(signal, **filter_pars) + _signal = np.atleast_2d(_signal) + if len(orig_shape) > 2: raise ValueError("Jumpfinder only works on 1D or 2D data") if min_size is None and min_sigma is not None: min_size = min_sigma * std_est( - signal, ds=win_size * 10, win_size=win_size, axis=-1 + signal, ds=win_size * ds, win_size=win_size, axis=-1 ) if min_size is None: raise ValueError("min_size is somehow still None") if isinstance(min_size, np.ndarray) and np.ndim(min_size) > 1: # type: ignore raise ValueError("min_size must be 1d or a scalar") elif isinstance(min_size, (float, int)): - min_size = float(min_size) * np.ones(len(signal)) + min_size = float(min_size) * np.ones(len(_signal)) - _signal = _filter(signal, **filter_pars) - _signal = np.atleast_2d(_signal) # Mean subtract, if we don't do this then when we cumsum we get floats # that are too big and lack the precicion to find jumps well _signal -= np.mean(_signal, axis=-1)[..., None] - nfuture = min(len(_signal), NFUTURE) - slice_size = len(_signal) // nfuture - slices = [slice(i * slice_size, (i + 1) * slice_size) for i in range(nfuture)] - slices[-1] = slice(slices[-1].start, len(_signal)) - with concurrent.futures.ThreadPoolExecutor() as e: - jump_futures = [ - e.submit(_jumpfinder, _signal[s], min_size[s], win_size, exact) - for s in slices - ] - jumps = np.vstack([j.result() for j in jump_futures]).reshape(orig_shape) + jumps = _jumpfinder(_signal, min_size, win_size, exact).reshape(orig_shape) jump_ranges = RangesMatrix.from_mask(jumps).buffer(int(win_size / 2)) jumps = jump_ranges.mask()