Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding agent to monitor external fuel tanks level. #639

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
340 changes: 340 additions & 0 deletions socs/agents/tank_level_monitor/agent.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One overall comment here is that we need a documentation page for the Agent. I described this recently in #638, so I'll link you there: #638 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another overall comment, please wrap lines to 80 characters. Lots of editors support automating this in some fashion, or have extensions to do so.

Original file line number Diff line number Diff line change
@@ -0,0 +1,340 @@
import argparse
import os
import struct
import time

import serial
import txaio
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import Pacemaker, TimeoutLock


class TankLevelMonitor:

"""Initialize serial communication with tank sensors & creates dicts to store data depending on tanks numbers."""

def __init__(self):

self.client = serial.Serial("/dev/ttyUSB1", 9600, timeout=0.5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This /dev/ttyUSB1 will need to be configurable, as it is not guaranteed to be the same every time the device is plugged in/the computer is rebooted. I'd recommend passing it as an argument when instantiating a TankLevelMonitor object.


self.tank_data = {} # Define dictionary for Tank data, 7 fields
self.tank1_data = {}
self.tank2_data = {}
self.tank1_data_fields = ['Tank1_vol', 'Net1_vol', 'emp1_vol', 'prod1_h', 'water1_h', 'avg1_temp', 'water1_vol']
self.tank2_data_fields = ['Tank2_vol', 'Net2_vol', 'emp2_vol', 'prod2_h', 'water2_h', 'avg2_temp', 'water2_vol']
BrianJKoopman marked this conversation as resolved.
Show resolved Hide resolved

self.tank_data_ok = False
self.tank_length_to_read = 89
self.verbose = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A verbosity flag works for debugging, but I'll make the same optional suggestion as I made in #640 -- an alternative approach would be to use debug logging. This would allow you to drop the tracking of verbose/verbosity and instead use:

self.log.debug('debug statement')

The advantage here is that the LOGLEVEL environment variable can be changed without modification of this verbosity constant in the code.

If you want to implement this check out the Logging documentation. (@felipecarrero also just implemented this, you can ask him as well.) I wouldn't let it hold up the PR though.


self.client.flushOutput()

def tank_data_checker(self, tank_num):
self.tank_data_ok = False

if len(self.full_data) == 89:
if self.full_data[18:19] != b'':
if self.verbose:
print("IN DATA CHECKER 1: ", self.full_data[0:2])
if self.full_data[0:2] == b'\x01i':
if self.verbose:
print("IN DATA CHECKER 2:", self.full_data[88:89])
if self.full_data[88:89] == b'\x03':
if self.verbose:
print("IN DATA CHECKER 3:", self.full_data[18:19])
if int(self.full_data[18:19]) == int(tank_num):
if self.verbose:
print("DATA IS OK")
self.tank_data_ok = True
Comment on lines +32 to +48
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's replace this deep nesting with a series of checks. Starting with the assumption that self.tank_data_ok = False, each check should return if the condition means the data is not ok. If all checks pass, then set self.tank_data_ok = True, like this:

    def tank_data_checker(self, tank_num):
        self.tank_data_ok = False

        if len(self.full_data) != 89:
            return

        if self.full_data[18:19] == b'':
            return

        if self.verbose:
            print("IN DATA CHECKER 1: ", self.full_data[0:2])
        if self.full_data[0:2] != b'\x01i':
            return

        if self.verbose:
            print("IN DATA CHECKER 2:", self.full_data[88:89])
        if self.full_data[88:89] != b'\x03':
            return

        if self.verbose:
            print("IN DATA CHECKER 3:", self.full_data[18:19])
        if int(self.full_data[18:19]) != int(tank_num):
            return

        if self.verbose:
            print("DATA IS OK")
        self.tank_data_ok = True


def tank_data_verbosity(self, tank_num, msg=""):

if self.verbose:
print("Tank Number: ", tank_num)
print(msg, self.full_data)

"""Recieves tank number (default tank1) and returns non decoded data."""

def tank_data_reader(self, tank_num=1):

self.client.flushOutput()
self.client.flushInput()

self.client.write(b'01i201' + tank_num) # Inquiry <SOH>i201TT = 01 i201tank_num
self.full_data = {}
self.full_data = self.client.read(self.tank_length_to_read)
Comment on lines +64 to +65
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 64 isn't needed here and can be removed.


self.tank_data_checker(tank_num)

while True:
self.tank_data_verbosity(tank_num, "Entering while loop in tank_data_reader")
self.tank_data_checker(tank_num)
if self.tank_data_ok:
break
else:
self.client.flushOutput()
self.client.write(b'01i201' + tank_num)
self.full_data = {}
self.full_data = self.client.read(self.tank_length_to_read)
self.tank_data_verbosity(tank_num, "After tank_data_ok got False, full data: ")
self.client.flushOutput()
self.client.flushInput()
time.sleep(10)
Comment on lines +69 to +82
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little concerned about this loop, as it potentially goes on forever if the tank data is always bad. How often is the tank data bad? And when it's bad, do you know why?

I would rather not loop here, do the write/read once, check the data quality, and raise an exception if it fails. The main acq Process is already looping to repeat the read. The exception can be caught there and a warning printed about the data being bad, and you can move on to the next loop in acq, while possibly trying to fix the issue causing the data read to fail.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that tank data ib bad very often, so it breaks my code after some minutes, data is bad like once per 5 minutes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, yeah, I'm not proposing we allow it to break your code, just that it gets caught in the loop at the Agent level so we can tell when it's bad.


if self.verbose:
print(tank_num, self.full_data)

"""Recieves undecoded hex data and returns a dictionary with decoded & corrected data."""

def tank_decode_data(self, tank_num):

multfactor = [3.785, 3.785, 3.785, 2.54 / 100, 2.54 / 100, 5 / 9, 3.785] # correction factor
addfactor = [0, 0, 0, 0, 0, -32, 0]

data = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above, this line 94 isn't needed and can be removed.

data = self.full_data[26:26 + 8 * 7]

for i in range(7):
data_field = data[i * 8:(i + 1) * 8] # moving through all data
decoded_data = struct.unpack('!f', bytes.fromhex(data_field.decode('ascii')))[0] # decode hex to ieee float

if tank_num == b'01':
self.tank1_data[self.tank1_data_fields[i]] = (decoded_data + addfactor[i]) * multfactor[i]
elif tank_num == b'02':
self.tank2_data[self.tank2_data_fields[i]] = (decoded_data + addfactor[i]) * multfactor[i]

def read_cycle(self):

self.tank_data_reader(b'01')
self.tank_decode_data(b'01')

self.tank_data_reader(b'02')
self.tank_decode_data(b'02')

self.tank_data = self.tank1_data
self.tank_data.update(self.tank2_data)

if self.verbose:
print("READ CYCLE: ", self.tank_data)

try:
this_cycle_data = {}
for key in self.tank_data:
this_cycle_data[key] = {'value': self.tank_data[key]}
Comment on lines +122 to +123
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds an extra layer of depth that I don't think is needed. If you change this to:

for key in self.tank_data:
    this_cycle_data[key] = self.tank_data[key]

Then the loop in the acq() Process that extracts data from this simplifies from:

for reg in regdata:
    data['data'][reg] = regdata[reg]["value"]
    field_dict = {reg: regdata[reg]['value']}
    session.data['fields'].update(field_dict)
session.data.update({'timestamp': current_time})

to

for reg in regdata:
    data['data'][reg] = regdata[reg]
    field_dict = {reg: regdata[reg]}
    session.data['fields'].update(field_dict)
session.data.update({'timestamp': current_time})

Or perhaps more simply,

for key, value in regdata.items():
    data['data'][key] = value
    field_dict = {key: value}
    session.data['fields'].update(field_dict)
session.data.update({'timestamp': current_time})

Since field_dict == regdata, this again reduces to:

for key, value in regdata.items():
    data['data'][key] = value

session.data['fields'].update(regdata)
session.data.update({'timestamp': current_time})

I made a similar comment in #640, which had the same structure: #640 (comment)

if self.verbose:
print(this_cycle_data)
return this_cycle_data
except BaseException:
pass
Comment on lines +127 to +128
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exception does this catch? A KeyError maybe? It'd be good to understand more specifically what's being raised and why, rather than catching all exceptions here and skipping to the next read.



class TankLevelMonitorAgent:
"""Monitor the External fuel Tank level via Serial COM.

Parameters
----------
agent : OCSAgent
OCSAgent object which forms this Agent
sample_interval : float
Time between samples in seconds.

Attributes
----------
agent : OCSAgent
OCSAgent object which forms this Agent
take_data : bool
Tracks whether or not the agent is actively issuing SNMP GET commands
to the ibootbar. Setting to false stops sending commands.
log : txaio.tx.Logger
txaio logger object, created by the OCSAgent
"""

def __init__(self, agent, unit=1, sample_interval=15.):

self.unit = unit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This self.unit is unused. (It looks like it's leftover from the Agent this one was based on.) Remove, along with the unit argument.

self.agent: ocs_agent.OCSAgent = agent
self.log = agent.log
self.lock = TimeoutLock()

self.pacemaker_freq = 1. / sample_interval

self.initialized = False
self.take_data = False

self.TankLevelMonitor = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conventionally we'd want to use "snakecase" here for variable names, so either self.tank_level_monitor, or something shorter like self.monitor.


agg_params = {
'frame_length': 10 * 60 # [sec]
}
self.agent.register_feed('TankLevelMonitor',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feed names must be lowercase and can use underscores. See Feed Name Rules in the OCS docs.

record=True,
agg_params=agg_params,
buffer_time=0)

def _connect(self):
"""connect()
Instantiates tank level monitor object and mark it as initialized.
"""
self.TankLevelMonitor = TankLevelMonitor()
self.initialized = True

@ocs_agent.param('auto_acquire', default=False, type=bool)
def init_TankLevelMonitor(self, session, params=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another case/naming nitpick, all lowercase with underscores please. Maybe just init_monitor.

"""init_generator(auto_acquire=False)

**Task** - Perform first time setup of the VLT.
Comment on lines +183 to +185
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple things to update in the docstring, the first line should match the function name, and the description for the task should be updated, this still refers to the VLT (which I think is a different thing, but maybe it's this monitor?)


Parameters:
auto_acquire (bool, optional): Starts data acquisition after
initialization if True. Defaults to False.

"""
if self.initialized:
return True, "Already initialized."

with self.lock.acquire_timeout(3, job='init') as acquired:
if not acquired:
self.log.warn("Could not start init because "
"{} is already running".format(self.lock.job))
return False, "Could not acquire lock."

session.set_status('starting')

self._connect()
if not self.initialized:
return False, 'Could not connect to Tank Monitor'

# Start data acquisition if requested
if params['auto_acquire']:
self.agent.start('acq')

return True, 'Tank Level Monitor initialized.'

@ocs_agent.param('_')
def acq(self, session, params=None):

Comment on lines +214 to +215
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a docstring for this Process so it'll show up in the user docs. For an example check out the docs. It's important to have the structure of session.data documented, as that structure is left up to you, and differs between Agents.

with self.lock.acquire_timeout(0, job='acq') as acquired:
if not acquired:
self.log.warn("Could not start acq because {} is already running"
.format(self.lock.job))
return False, "Could not acquire lock."

session.set_status('running')

self.take_data = True

session.data = {"fields": {}}

pm = Pacemaker(self.pacemaker_freq)
while self.take_data:
pm.sleep()

current_time = time.time()
data = {
'timestamp': current_time,
'connection': {},
'block_name': 'registers',
'data': {}
}
if not self.TankLevelMonitor.client.is_open:
self.initialized = False

"""Try to re-initialize if connection lost"""
if not self.initialized:
self._connect()

""" Only get readings if connected"""
if self.initialized:
session.data.update({'connection': {'last_attempt': time.time(),
'connected': True}})

regdata = self.TankLevelMonitor.read_cycle()

if regdata:
for reg in regdata:
data['data'][reg] = regdata[reg]["value"]
field_dict = {reg: regdata[reg]['value']}
session.data['fields'].update(field_dict)
session.data.update({'timestamp': current_time})
else:
self.log.info('Connection error or error in processing data.')
self.initialized = False

""" Continue trying to connect"""
if not self.initialized:
session.data.update({'connection': {'last_attempt': time.time(),
'connected': False}})
self.log.info('Trying to reconnect.')
continue

for field, val in data['data'].items():
_data = {
'timestamp': current_time,
'block_name': field,
'data': {field: val}
}
self.agent.publish_to_feed('TankLevelMonitor', _data)

self.agent.feeds['TankLevelMonitor'].flush_buffer()

return True, 'Acquisition exited cleanly.'

def _stop_acq(self, session, params=None):
"""
Stops acq process.
"""
if self.TankLevelMonitor.verbose:
print("DEBUG: stops acq process: ", self.take_data)
if self.take_data:
self.take_data = False
return True, 'requested to stop taking data.'
else:
return False, 'acq is not currently running'


def make_parser(parser=None):
if parser is None:
parser = argparse.ArgumentParser()

pgroup = parser.add_argument_group('Agent Options')
pgroup.add_argument("--unit", default=1,
help="unit to listen to.")
Comment on lines +300 to +301
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the --unit argument.

pgroup.add_argument('--mode', type=str, choices=['idle', 'init', 'acq'],
help="Starting action for the agent.")
pgroup.add_argument("--sample-interval", type=float, default=15., help="Time between samples in seconds.")

return parser


def main(args=None):
# Start logging
txaio.start_logging(level=os.environ.get("LOGLEVEL", "info"))

parser = make_parser()

# Interpret options in the context of site_config.
args = site_config.parse_args(agent_class='TankLevelMonitorAgent',
parser=parser,
args=args)

# Automatically acquire data if requested (default)
init_params = False
if args.mode == 'init':
init_params = {'auto_acquire': False}
elif args.mode == 'acq':
init_params = {'auto_acquire': True}
# print('init_params', init_params)
agent, runner = ocs_agent.init_site_agent(args)

p = TankLevelMonitorAgent(agent,
unit=int(args.unit),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last place to remove unit.

sample_interval=args.sample_interval)

agent.register_task('init_TankLevelMonitor', p.init_TankLevelMonitor,
startup=init_params)
agent.register_process('acq', p.acq, p._stop_acq)
runner.run(agent, auto_reconnect=True)


if __name__ == '__main__':
main()