-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding agent to monitor external fuel tanks level. #639
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another overall comment, please wrap lines to 80 characters. Lots of editors support automating this in some fashion, or have extensions to do so. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,340 @@ | ||
import argparse | ||
import os | ||
import struct | ||
import time | ||
|
||
import serial | ||
import txaio | ||
from ocs import ocs_agent, site_config | ||
from ocs.ocs_twisted import Pacemaker, TimeoutLock | ||
|
||
|
||
class TankLevelMonitor: | ||
|
||
"""Initialize serial communication with tank sensors & creates dicts to store data depending on tanks numbers.""" | ||
|
||
def __init__(self): | ||
|
||
self.client = serial.Serial("/dev/ttyUSB1", 9600, timeout=0.5) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This |
||
|
||
self.tank_data = {} # Define dictionary for Tank data, 7 fields | ||
self.tank1_data = {} | ||
self.tank2_data = {} | ||
self.tank1_data_fields = ['Tank1_vol', 'Net1_vol', 'emp1_vol', 'prod1_h', 'water1_h', 'avg1_temp', 'water1_vol'] | ||
self.tank2_data_fields = ['Tank2_vol', 'Net2_vol', 'emp2_vol', 'prod2_h', 'water2_h', 'avg2_temp', 'water2_vol'] | ||
BrianJKoopman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
self.tank_data_ok = False | ||
self.tank_length_to_read = 89 | ||
self.verbose = True | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A verbosity flag works for debugging, but I'll make the same optional suggestion as I made in #640 -- an alternative approach would be to use debug logging. This would allow you to drop the tracking of
The advantage here is that the If you want to implement this check out the Logging documentation. (@felipecarrero also just implemented this, you can ask him as well.) I wouldn't let it hold up the PR though. |
||
|
||
self.client.flushOutput() | ||
|
||
def tank_data_checker(self, tank_num): | ||
self.tank_data_ok = False | ||
|
||
if len(self.full_data) == 89: | ||
if self.full_data[18:19] != b'': | ||
if self.verbose: | ||
print("IN DATA CHECKER 1: ", self.full_data[0:2]) | ||
if self.full_data[0:2] == b'\x01i': | ||
if self.verbose: | ||
print("IN DATA CHECKER 2:", self.full_data[88:89]) | ||
if self.full_data[88:89] == b'\x03': | ||
if self.verbose: | ||
print("IN DATA CHECKER 3:", self.full_data[18:19]) | ||
if int(self.full_data[18:19]) == int(tank_num): | ||
if self.verbose: | ||
print("DATA IS OK") | ||
self.tank_data_ok = True | ||
Comment on lines
+32
to
+48
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's replace this deep nesting with a series of checks. Starting with the assumption that
|
||
|
||
def tank_data_verbosity(self, tank_num, msg=""): | ||
|
||
if self.verbose: | ||
print("Tank Number: ", tank_num) | ||
print(msg, self.full_data) | ||
|
||
"""Recieves tank number (default tank1) and returns non decoded data.""" | ||
|
||
def tank_data_reader(self, tank_num=1): | ||
|
||
self.client.flushOutput() | ||
self.client.flushInput() | ||
|
||
self.client.write(b'01i201' + tank_num) # Inquiry <SOH>i201TT = 01 i201tank_num | ||
self.full_data = {} | ||
self.full_data = self.client.read(self.tank_length_to_read) | ||
Comment on lines
+64
to
+65
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line 64 isn't needed here and can be removed. |
||
|
||
self.tank_data_checker(tank_num) | ||
|
||
while True: | ||
self.tank_data_verbosity(tank_num, "Entering while loop in tank_data_reader") | ||
self.tank_data_checker(tank_num) | ||
if self.tank_data_ok: | ||
break | ||
else: | ||
self.client.flushOutput() | ||
self.client.write(b'01i201' + tank_num) | ||
self.full_data = {} | ||
self.full_data = self.client.read(self.tank_length_to_read) | ||
self.tank_data_verbosity(tank_num, "After tank_data_ok got False, full data: ") | ||
self.client.flushOutput() | ||
self.client.flushInput() | ||
time.sleep(10) | ||
Comment on lines
+69
to
+82
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a little concerned about this loop, as it potentially goes on forever if the tank data is always bad. How often is the tank data bad? And when it's bad, do you know why? I would rather not loop here, do the write/read once, check the data quality, and raise an exception if it fails. The main There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem is that tank data ib bad very often, so it breaks my code after some minutes, data is bad like once per 5 minutes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gotcha, yeah, I'm not proposing we allow it to break your code, just that it gets caught in the loop at the Agent level so we can tell when it's bad. |
||
|
||
if self.verbose: | ||
print(tank_num, self.full_data) | ||
|
||
"""Recieves undecoded hex data and returns a dictionary with decoded & corrected data.""" | ||
|
||
def tank_decode_data(self, tank_num): | ||
|
||
multfactor = [3.785, 3.785, 3.785, 2.54 / 100, 2.54 / 100, 5 / 9, 3.785] # correction factor | ||
addfactor = [0, 0, 0, 0, 0, -32, 0] | ||
|
||
data = {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to above, this line 94 isn't needed and can be removed. |
||
data = self.full_data[26:26 + 8 * 7] | ||
|
||
for i in range(7): | ||
data_field = data[i * 8:(i + 1) * 8] # moving through all data | ||
decoded_data = struct.unpack('!f', bytes.fromhex(data_field.decode('ascii')))[0] # decode hex to ieee float | ||
|
||
if tank_num == b'01': | ||
self.tank1_data[self.tank1_data_fields[i]] = (decoded_data + addfactor[i]) * multfactor[i] | ||
elif tank_num == b'02': | ||
self.tank2_data[self.tank2_data_fields[i]] = (decoded_data + addfactor[i]) * multfactor[i] | ||
|
||
def read_cycle(self): | ||
|
||
self.tank_data_reader(b'01') | ||
self.tank_decode_data(b'01') | ||
|
||
self.tank_data_reader(b'02') | ||
self.tank_decode_data(b'02') | ||
|
||
self.tank_data = self.tank1_data | ||
self.tank_data.update(self.tank2_data) | ||
|
||
if self.verbose: | ||
print("READ CYCLE: ", self.tank_data) | ||
|
||
try: | ||
this_cycle_data = {} | ||
for key in self.tank_data: | ||
this_cycle_data[key] = {'value': self.tank_data[key]} | ||
Comment on lines
+122
to
+123
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This adds an extra layer of depth that I don't think is needed. If you change this to:
Then the loop in the
to
Or perhaps more simply,
Since
I made a similar comment in #640, which had the same structure: #640 (comment) |
||
if self.verbose: | ||
print(this_cycle_data) | ||
return this_cycle_data | ||
except BaseException: | ||
pass | ||
Comment on lines
+127
to
+128
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What exception does this catch? A |
||
|
||
|
||
class TankLevelMonitorAgent: | ||
"""Monitor the External fuel Tank level via Serial COM. | ||
|
||
Parameters | ||
---------- | ||
agent : OCSAgent | ||
OCSAgent object which forms this Agent | ||
sample_interval : float | ||
Time between samples in seconds. | ||
|
||
Attributes | ||
---------- | ||
agent : OCSAgent | ||
OCSAgent object which forms this Agent | ||
take_data : bool | ||
Tracks whether or not the agent is actively issuing SNMP GET commands | ||
to the ibootbar. Setting to false stops sending commands. | ||
log : txaio.tx.Logger | ||
txaio logger object, created by the OCSAgent | ||
""" | ||
|
||
def __init__(self, agent, unit=1, sample_interval=15.): | ||
|
||
self.unit = unit | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This |
||
self.agent: ocs_agent.OCSAgent = agent | ||
self.log = agent.log | ||
self.lock = TimeoutLock() | ||
|
||
self.pacemaker_freq = 1. / sample_interval | ||
|
||
self.initialized = False | ||
self.take_data = False | ||
|
||
self.TankLevelMonitor = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Conventionally we'd want to use "snakecase" here for variable names, so either |
||
|
||
agg_params = { | ||
'frame_length': 10 * 60 # [sec] | ||
} | ||
self.agent.register_feed('TankLevelMonitor', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Feed names must be lowercase and can use underscores. See Feed Name Rules in the OCS docs. |
||
record=True, | ||
agg_params=agg_params, | ||
buffer_time=0) | ||
|
||
def _connect(self): | ||
"""connect() | ||
Instantiates tank level monitor object and mark it as initialized. | ||
""" | ||
self.TankLevelMonitor = TankLevelMonitor() | ||
self.initialized = True | ||
|
||
@ocs_agent.param('auto_acquire', default=False, type=bool) | ||
def init_TankLevelMonitor(self, session, params=None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another case/naming nitpick, all lowercase with underscores please. Maybe just |
||
"""init_generator(auto_acquire=False) | ||
|
||
**Task** - Perform first time setup of the VLT. | ||
Comment on lines
+183
to
+185
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A couple things to update in the docstring, the first line should match the function name, and the description for the task should be updated, this still refers to the VLT (which I think is a different thing, but maybe it's this monitor?) |
||
|
||
Parameters: | ||
auto_acquire (bool, optional): Starts data acquisition after | ||
initialization if True. Defaults to False. | ||
|
||
""" | ||
if self.initialized: | ||
return True, "Already initialized." | ||
|
||
with self.lock.acquire_timeout(3, job='init') as acquired: | ||
if not acquired: | ||
self.log.warn("Could not start init because " | ||
"{} is already running".format(self.lock.job)) | ||
return False, "Could not acquire lock." | ||
|
||
session.set_status('starting') | ||
|
||
self._connect() | ||
if not self.initialized: | ||
return False, 'Could not connect to Tank Monitor' | ||
|
||
# Start data acquisition if requested | ||
if params['auto_acquire']: | ||
self.agent.start('acq') | ||
|
||
return True, 'Tank Level Monitor initialized.' | ||
|
||
@ocs_agent.param('_') | ||
def acq(self, session, params=None): | ||
|
||
Comment on lines
+214
to
+215
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a docstring for this Process so it'll show up in the user docs. For an example check out the docs. It's important to have the structure of |
||
with self.lock.acquire_timeout(0, job='acq') as acquired: | ||
if not acquired: | ||
self.log.warn("Could not start acq because {} is already running" | ||
.format(self.lock.job)) | ||
return False, "Could not acquire lock." | ||
|
||
session.set_status('running') | ||
|
||
self.take_data = True | ||
|
||
session.data = {"fields": {}} | ||
|
||
pm = Pacemaker(self.pacemaker_freq) | ||
while self.take_data: | ||
pm.sleep() | ||
|
||
current_time = time.time() | ||
data = { | ||
'timestamp': current_time, | ||
'connection': {}, | ||
'block_name': 'registers', | ||
'data': {} | ||
} | ||
if not self.TankLevelMonitor.client.is_open: | ||
self.initialized = False | ||
|
||
"""Try to re-initialize if connection lost""" | ||
if not self.initialized: | ||
self._connect() | ||
|
||
""" Only get readings if connected""" | ||
if self.initialized: | ||
session.data.update({'connection': {'last_attempt': time.time(), | ||
'connected': True}}) | ||
|
||
regdata = self.TankLevelMonitor.read_cycle() | ||
|
||
if regdata: | ||
for reg in regdata: | ||
data['data'][reg] = regdata[reg]["value"] | ||
field_dict = {reg: regdata[reg]['value']} | ||
session.data['fields'].update(field_dict) | ||
session.data.update({'timestamp': current_time}) | ||
else: | ||
self.log.info('Connection error or error in processing data.') | ||
self.initialized = False | ||
|
||
""" Continue trying to connect""" | ||
if not self.initialized: | ||
session.data.update({'connection': {'last_attempt': time.time(), | ||
'connected': False}}) | ||
self.log.info('Trying to reconnect.') | ||
continue | ||
|
||
for field, val in data['data'].items(): | ||
_data = { | ||
'timestamp': current_time, | ||
'block_name': field, | ||
'data': {field: val} | ||
} | ||
self.agent.publish_to_feed('TankLevelMonitor', _data) | ||
|
||
self.agent.feeds['TankLevelMonitor'].flush_buffer() | ||
|
||
return True, 'Acquisition exited cleanly.' | ||
|
||
def _stop_acq(self, session, params=None): | ||
""" | ||
Stops acq process. | ||
""" | ||
if self.TankLevelMonitor.verbose: | ||
print("DEBUG: stops acq process: ", self.take_data) | ||
if self.take_data: | ||
self.take_data = False | ||
return True, 'requested to stop taking data.' | ||
else: | ||
return False, 'acq is not currently running' | ||
|
||
|
||
def make_parser(parser=None): | ||
if parser is None: | ||
parser = argparse.ArgumentParser() | ||
|
||
pgroup = parser.add_argument_group('Agent Options') | ||
pgroup.add_argument("--unit", default=1, | ||
help="unit to listen to.") | ||
Comment on lines
+300
to
+301
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove the |
||
pgroup.add_argument('--mode', type=str, choices=['idle', 'init', 'acq'], | ||
help="Starting action for the agent.") | ||
pgroup.add_argument("--sample-interval", type=float, default=15., help="Time between samples in seconds.") | ||
|
||
return parser | ||
|
||
|
||
def main(args=None): | ||
# Start logging | ||
txaio.start_logging(level=os.environ.get("LOGLEVEL", "info")) | ||
|
||
parser = make_parser() | ||
|
||
# Interpret options in the context of site_config. | ||
args = site_config.parse_args(agent_class='TankLevelMonitorAgent', | ||
parser=parser, | ||
args=args) | ||
|
||
# Automatically acquire data if requested (default) | ||
init_params = False | ||
if args.mode == 'init': | ||
init_params = {'auto_acquire': False} | ||
elif args.mode == 'acq': | ||
init_params = {'auto_acquire': True} | ||
# print('init_params', init_params) | ||
agent, runner = ocs_agent.init_site_agent(args) | ||
|
||
p = TankLevelMonitorAgent(agent, | ||
unit=int(args.unit), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Last place to remove |
||
sample_interval=args.sample_interval) | ||
|
||
agent.register_task('init_TankLevelMonitor', p.init_TankLevelMonitor, | ||
startup=init_params) | ||
agent.register_process('acq', p.acq, p._stop_acq) | ||
runner.run(agent, auto_reconnect=True) | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One overall comment here is that we need a documentation page for the Agent. I described this recently in #638, so I'll link you there: #638 (comment)