Skip to content

Commit

Permalink
Merge pull request #67 from simonsobs/hk-format-v2
Browse files Browse the repository at this point in the history
hk format v2
  • Loading branch information
mhasself authored Jul 29, 2020
2 parents 9bfd03e + d37c7c6 commit 31fa404
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 73 deletions.
82 changes: 49 additions & 33 deletions python/hk/getdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ def is_sub_seq(full_seq, sub_seq):


class HKArchive:
"""Container for information necessary to determine what data fields
are present in a data archive at what times. This object has
methods that can determine what fields have data over a given time
range, and can group fields that share a timeline (i.e. are
co-sampled) over that range.
"""Contains information necessary to determine what data fields are
present in a data archive at what times. It also knows how to
group fields that share a commong timeline.
"""
field_groups = None
def __init__(self, field_groups=None):
Expand Down Expand Up @@ -90,6 +92,13 @@ def _get_groups(self, fields=None, start=None, end=None,
fields belonging to this group. ``fgs`` is a list of
_FieldGroup objects relevant to the fields in this
group.
Notes:
Each entry in the returned list carries information for
set of fields that are co-sampled over the requested time
interval. Each of the requested fields will thus appear
in at most one group.
"""
span = so3g.IntervalsDouble()
if start is None:
Expand Down Expand Up @@ -204,20 +213,16 @@ def get_data(self, field=None, start=None, end=None, min_stride=None,
blocks_in = []
for fg in fgrps:
for r in fg.index_info:
fn,off = r['filename'], r['byte_offset']
fn, off, bidx = r['filename'], r['byte_offset'], r['block_index']
if not fn in handles:
handles[fn] = so3g.G3IndexedReader(fn)
handles[fn].Seek(off)
fn = handles[fn].Process(None)
fn = self.translator(fn[0])
assert(len(fn) == 1)
# Find the right block.
for blk in fn[0]['blocks']:
assert(isinstance(blk, core.G3TimesampleMap))
test_f = fields[0].split('.')[-1] ## dump prefix.
if test_f in blk.keys():
blocks_in.append(blk)
break
blocks_in.append(fn[0]['blocks'][bidx])

# Sort those blocks by timestamp. (Otherwise they'll stay sorted by object id :)
blocks_in.sort(key=lambda b: b.times[0].time)
# Create a new Block for this group.
Expand Down Expand Up @@ -301,13 +306,16 @@ def from_g3(cls, element):


class HKArchiveScanner:
"""Consume SO Housekeeping streams and create a record of what fields
"""Consumes SO Housekeeping streams and creates a record of what fields
cover what time ranges. This can run as a G3Pipeline module, but
will not be able to record stream indexing information in that
case. If it's populated through the process_file method, then
index information (in the sense of filenames and byte offsets)
will be stored.
After processing frames, calling .finalize() will return an
HKArchive that can be used to load data more efficiently.
"""
def __init__(self):
self.session_id = None
Expand Down Expand Up @@ -343,7 +351,7 @@ def Process(self, f, index_info=None):
return [f]

vers = f.get('hkagg_version', 0)
assert(vers == 1)
assert(vers == 2)

if f['hkagg_type'] == so3g.HKFrameType.session:
session_id = f['session_id']
Expand Down Expand Up @@ -373,24 +381,19 @@ def Process(self, f, index_info=None):
prov = self.providers[f['prov_id']]
representatives = prov.blocks.keys()

for b in f['blocks']:
for bidx, (bname, b) in enumerate(zip(f['block_names'], f['blocks'])):
assert(isinstance(b, core.G3TimesampleMap))
fields = b.keys()
if len(b.times) == 0 or len(fields) == 0:
continue
for block_index,rep in enumerate(representatives):
if rep in fields:
break
else:
rep = fields[0]
prov.blocks[rep] = {'fields': fields,
'start': b.times[0].time / core.G3Units.seconds,
'index_info': []}
if bname not in prov.blocks:
prov.blocks[bname] = {'fields': list(b.keys()),
'start': b.times[0].time / core.G3Units.seconds,
'index_info': []}
# To ensure that the last sample is actually included
# in the semi-open intervals we use to track frames,
# the "end" time has to be after the final sample.
prov.blocks[rep]['end'] = b.times[-1].time / core.G3Units.seconds + SPAN_BUFFER_SECONDS
prov.blocks[rep]['index_info'].append(index_info)
prov.blocks[bname]['end'] = b.times[-1].time / core.G3Units.seconds + SPAN_BUFFER_SECONDS
ii = {'block_index': bidx}
ii.update(index_info)
prov.blocks[bname]['index_info'].append(ii)

else:
core.log_warn('Weird hkagg_type: %i' % f['hkagg_type'],
Expand Down Expand Up @@ -464,13 +467,26 @@ def process_file(self, filename, flush_after=True):


class _FieldGroup:
"""Track a set of readout fields that share a timeline. Attributes
are:
- fields (list of str): the field names.
- cover (IntervalsDouble): time range over which these fields have data.
- index_info (dict): description of how to find the actual data
(perhaps a filename and byte_offset?).
"""Container object for look-up information associated with a group of
fields that share a timeline (i.e. a group of fields that are
co-sampled over some requested time range).
Attributes:
prefix (str): Not used.
fields (list of str): The field names.
cover (IntervalsDouble): The time range (as unix timestamps)
over which these fields have data. This is expressed as an
IntervalsDouble to enable the use of Intervals arithmetic.
The range is created from the "start" and "end" arguments of
the constructor.
index_info (list of dict): Information that the consumer will
use to locate and load the data efficiently. The entries in
the list represent time-ordered frames. The look-up
information in each dict includes 'filename' (string; the
filename where the HKData frame lives), 'byte_offset' (int;
the offset into the file where the frame starts), and
'block_index' (int; the index of the frame.blocks where the
fields' data lives).
"""
def __init__(self, prefix, fields, start, end, index_info):
Expand Down
36 changes: 32 additions & 4 deletions python/hk/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def __call__(self, f):
'timestamp_data': None, # Timestamp of most recent data frame.
'ticks': 0, # Total number of timestamps in all blocks.
'span': None, # (earliest_time, latest_time)
'block_streams_map': {}, # Map from field name to block name.
}

elif f['hkagg_type'] == so3g.HKFrameType.data:
Expand Down Expand Up @@ -126,11 +127,28 @@ def __call__(self, f):
if vers == 0:
block_timef = lambda block: block.t
block_itemf = lambda block: [(k, block.data[k]) for k in block.data.keys()]
elif vers == 1:
elif vers >= 1:
block_timef = lambda block: np.array([t.time / core.G3Units.seconds for t in b.times])
block_itemf = lambda block: [(k, block[k]) for k in block.keys()]

for b in blocks:
if vers in [0]:
block_name = lambda block_idx: list(sorted(blocks[block_idx].data.keys()))[0]
if vers in [1]:
block_name = lambda block_idx: list(sorted(blocks[block_idx].keys()))[0]
elif vers >= 2:
block_names = f.get('block_names', [])
if len(block_names) != len(blocks):
# This is a schema error in its own right.
core.log_error('Frame does not have "block_names" entry, '
'or it is not the same length as "blocks".',
unit='HKScanner')
self.stats['concerns']['n_error'] += 1
# Fall back on v1 strategy.
block_name = lambda block_idx: list(sorted(blocks[block_idx].keys()))[0]
else:
block_name = lambda block_idx: f['block_names'][block_idx]

for block_idx, b in enumerate(blocks):
times = block_timef(b)
if len(times):
if info['span'] is None:
Expand All @@ -140,12 +158,21 @@ def __call__(self, f):
info['span'] = min(times[0], t0), max(times[-1], t1)
t_check.append(times[0])
info['ticks'] += len(times)
for k,v in block_itemf(b):
bname = block_name(block_idx)
for k, v in block_itemf(b):
if len(v) != len(times):
core.log_error('Field "%s" has %i samples but .t has %i samples.' %

(k, len(v), len(times)))
self.stats['concerns']['n_error'] += 1
# Make sure field has a block_stream registered.
if k not in info['block_streams_map']:
info['block_streams_map'][k] = bname
if info['block_streams_map'][k] != bname:
core.log_error('Field "%s" appeared in block_name %s '
'and later in block_name %s.' %
(k, info['block_streams_map'][k], bname))
self.stats['concerns']['n_error'] += 1
if len(t_check) and abs(min(t_check) - t_this) > 60:
core.log_warn('data frame timestamp (%.1f) does not correspond to '
'data timestamp vectors (%s) .' % (t_this, t_check),
Expand All @@ -164,6 +191,7 @@ def __call__(self, f):

parser = argparse.ArgumentParser()
parser.add_argument('--translate', action='store_true')
parser.add_argument('--target-version', type=int, default=2)
parser.add_argument('files', nargs='+')
args = parser.parse_args()

Expand All @@ -175,6 +203,6 @@ def __call__(self, f):
p = core.G3Pipeline()
p.Add(core.G3Reader(f))
if args.translate:
p.Add(hk.HKTranslator())
p.Add(hk.HKTranslator(target_version=args.target_version))
p.Add(HKScanner())
p.Run()
2 changes: 2 additions & 0 deletions python/hk/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,6 @@ def data_frame(self, prov_id, timestamp=None):
f['prov_id'] = prov_id
f['timestamp'] = timestamp
f['blocks'] = core.G3VectorFrameObject()
if self.hkagg_version >= 2:
f['block_names'] = core.G3VectorString()
return f
107 changes: 77 additions & 30 deletions python/hk/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,38 @@


class HKTranslator:
"""Translates SO Housekeeping frames from schema version 0 to version
1; passes v1 frames through unaffected. This can be used in a
G3Pipeline to condition HK streams for processing by v1-compatible
code.
Version 1 is not a strict superset of version 0, but the two main
structural features eliminated in v1 (block names and field
prefixes) were not really used.
"""Translates SO Housekeeping frames from schema versions {v0, v1} to
schema version 2. Passes v2 (or newer) frames through,
unmodified. This can be used in a G3Pipeline to condition
archival HK streams for processing by v2-compatible code. (Note
that code that works with the short-lived v1 schema should also
work on a v2 stream, unless it explicitly rejects based on
hkagg_version.)
Version 1/2 are not a strict superset of version 0, but the main
structural features eliminated in v1 (field prefixes) was not
really used.
"""
TARGET_VERSION = 1

def __init__(self):
def __init__(self, target_version=2, future_tolerant=True):
"""Arguments:
target_version (int): 0, 1, or 2. Version to which to translate
the stream. The code is not able to downgrade a stream. See
future_tolerant parameter.
future_tolerant (bool): Determines the behavior of the
translator should it encounter a frame with hkagg_version
higher than target_version. If future_tolerant is True,
the frame will be passed through unmodified. Otherwise, a
ValueError will be raised.
"""
self.stats = {'n_hk': 0,
'n_other': 0,
'versions': {}}
self.target_version = target_version
self.future_tolerant = future_tolerant

def Process(self, f):
"""Translates one frame to the target schema. Irrelevant frames are
Expand Down Expand Up @@ -51,37 +67,63 @@ def Process(self, f):
return [f]

# It is an HK frame.
hkagg_version = f.get('hkagg_version', 0)
orig_version = f.get('hkagg_version', 0)

self.stats['n_hk'] += 1
self.stats['versions'][hkagg_version] = self.stats['versions'].get(hkagg_version, 0) + 1
self.stats['versions'][orig_version] = self.stats['versions'].get(orig_version, 0) + 1

if orig_version > self.target_version and not self.future_tolerant:
raise ValueError(
('Translator to v%i encountered v%i, but future_tolerant=False.')
% (self.TARGET_VERSION, orig_version))

if hkagg_version == 1:
# Good enough.
if orig_version >= self.target_version:
return [f]

# Always update the version, even if that's our only change...
if 'hkagg_version' in f:
f['hkagg_version_orig'] = hkagg_version
if 'hkagg_version_orig' not in f:
f['hkagg_version_orig'] = orig_version
del f['hkagg_version']
f['hkagg_version'] = 1
f['hkagg_version'] = self.target_version

# No difference in Session/Status for v0 -> v1.
# No difference in Session/Status for v0, v1, v2.
if f.get('hkagg_type') != so3g.HKFrameType.data:
return [f]

# Pop the data blocks out of the frame.
orig_blocks = f.pop('blocks')
f['blocks'] = core.G3VectorFrameObject()

# Now process the data blocks.
for block in orig_blocks:
new_block = core.G3TimesampleMap()
new_block.times = so3g.hk.util.get_g3_time(block.t)
for k in block.data.keys():
v = block.data[k]
new_block[k] = core.G3VectorDouble(v)
f['blocks'].append(new_block)
if self.target_version == 0:
return [f]

if orig_version == 0:
# Pop the data blocks out of the frame.
orig_blocks = f.pop('blocks')
f['blocks'] = core.G3VectorFrameObject()

# Now process the data blocks.
for block in orig_blocks:
new_block = core.G3TimesampleMap()
new_block.times = so3g.hk.util.get_g3_time(block.t)
for k in block.data.keys():
v = block.data[k]
new_block[k] = core.G3VectorDouble(v)
f['blocks'].append(new_block)

if self.target_version == 1:
return [f]

if orig_version <= 1:
# Add 'block_names'. Since we don't want to start
# caching Block Stream information, just compute a good
# block name based on the alphabetically first field in
# the block.
block_names = []
for block in f['blocks']:
field_names = list(sorted(block.keys()))
block_names.append('block_for_%s' % field_names[0])
assert(len(block_names[-1]) < 256) # What have you done.
orig_block_names = []
f['block_names'] = core.G3VectorString(block_names)

return [f]

def __call__(self, *args, **kwargs):
Expand All @@ -94,16 +136,21 @@ def __call__(self, *args, **kwargs):
usage='This program can be used to convert SO HK Frames to the '
'latest schema version.')
parser.add_argument('--output-file', '-o', default='out.g3')
parser.add_argument('--target-version', type=int)
parser.add_argument('files', nargs='+', help=
"SO Housekeeping files to convert.")
args = parser.parse_args()

# Run me on a G3File containing a Housekeeping stream.
core.set_log_level(core.G3LogLevel.LOG_INFO)

translator_args = {}
if args.target_version is not None:
translator_args['target_version'] = args.target_version

print(f'Streaming to {args.output_file}')
p = core.G3Pipeline()
p.Add(core.G3Reader(args.files))
p.Add(HKTranslator())
p.Add(HKTranslator(**translator_args))
p.Add(core.G3Writer(args.output_file))
p.Run()
Loading

0 comments on commit 31fa404

Please sign in to comment.