From d472ab51528be4f7de216af740642d0539b29670 Mon Sep 17 00:00:00 2001 From: WillB97 Date: Sun, 26 Jan 2025 18:23:52 +0000 Subject: [PATCH] Implement vision support --- sbot/future/board_manager.py | 93 +++++++++++++++++++++++++++++++ sbot/future/classless/__init__.py | 4 +- sbot/future/classless/robot.py | 2 + sbot/future/classless/utils.py | 1 + sbot/future/classless/vision.py | 65 +++++++++++++++++++++ 5 files changed, 163 insertions(+), 2 deletions(-) create mode 100644 sbot/future/classless/vision.py diff --git a/sbot/future/board_manager.py b/sbot/future/board_manager.py index 588c69c..ee3a499 100644 --- a/sbot/future/board_manager.py +++ b/sbot/future/board_manager.py @@ -13,7 +13,17 @@ import os from typing import Callable, ClassVar, NamedTuple +from april_vision import ( + FrameSource, + USBCamera, + calibrations, + find_cameras, + generate_marker_size_mapping, +) +from april_vision import Processor as AprilCamera +from april_vision.helpers import Base64Sender from sbot.exceptions import BoardDisconnectionError +from sbot.game_specific import MARKER_SIZES from sbot.serial_wrapper import BASE_TIMEOUT, SerialWrapper from sbot.utils import BoardIdentity, get_simulator_boards, get_USB_identity from serial.tools.list_ports import comports @@ -113,6 +123,8 @@ def __init__(self) -> None: self._loaded: bool = False self.boards: dict[str, dict[str, SerialWrapper]] = {} self.outputs: dict[str, list[OutputIdentifier]] = {} + self.cameras: dict[str, AprilCamera] = {} + self._mqtt = None @classmethod def register_board(cls, board: DiscoveryTemplate) -> None: @@ -399,3 +411,84 @@ def _name_from_identifier(self, identifier: str) -> str: if template.identifier == identifier: return template.name return identifier + + def load_cameras(self) -> None: + """ + Find all connected cameras with calibration and configure tag sizes. + + Where MQTT is enabled, set up a frame sender to send annotated frames + as base64 encoded JPEG bytestreams of each image, detection is run on. + + This method will attempt to identify all connected cameras and load them + into the cameras dictionary. Cameras will be indexed by their name. + """ + if self.cameras: + raise RuntimeError("Cameras have already been loaded") + + camera_source: FrameSource + # Unroll the tag ID iterables and convert the sizes to meters + expanded_tag_sizes = generate_marker_size_mapping(MARKER_SIZES) + + if self._mqtt: + frame_sender = Base64Sender(self._mqtt.wrapped_publish) + else: + frame_sender = None + + if IN_SIMULATOR: + from sbot.simulator.camera import WebotsRemoteCameraSource + + for camera_info in get_simulator_boards('CameraBoard'): + camera_source = WebotsRemoteCameraSource(camera_info) + # The processor handles the detection and pose estimation + camera = AprilCamera( + camera_source, + calibration=camera_source.calibration, + name=camera_info.serial_number, + mask_unknown_size_tags=True, + ) + + # Set the tag sizes in the camera + camera.set_marker_sizes(expanded_tag_sizes) + + if frame_sender: + camera.detection_hook = frame_sender.annotated_frame_hook + + self.cameras[camera_info.serial_number] = camera + else: + for camera_data in find_cameras(calibrations): + cam_name = f"{camera_data.name} - {camera_data.index}" + + # The camera source handles the connection between the camera and the processor + camera_source = USBCamera.from_calibration_file( + camera_data.index, + calibration_file=camera_data.calibration, + vidpid=camera_data.vidpid, + ) + # The processor handles the detection and pose estimation + camera = AprilCamera( + camera_source, + calibration=camera_source.calibration, + name=camera_data.name, + vidpid=camera_data.vidpid, + mask_unknown_size_tags=True, + ) + + # Set the tag sizes in the camera + camera.set_marker_sizes(expanded_tag_sizes) + + if frame_sender: + camera.detection_hook = frame_sender.annotated_frame_hook + + self.cameras[cam_name] = camera + + def get_camera(self) -> AprilCamera: + """Get the first camera connected to the robot.""" + if len(self.cameras) > 1: + camera_name = next(iter(self.cameras.keys())) + logger.warning(f"Multiple cameras connected, using {camera_name!r}") + + try: + return next(iter(self.cameras.values())) + except StopIteration: + raise ValueError("No cameras connected") from None + diff --git a/sbot/future/classless/__init__.py b/sbot/future/classless/__init__.py index 6f93f67..8656e39 100644 --- a/sbot/future/classless/__init__.py +++ b/sbot/future/classless/__init__.py @@ -1,3 +1,3 @@ -from .robot import arduino, comp, led, motor, power, servo, utils +from .robot import arduino, comp, led, motor, power, servo, utils, vision -__all__ = ['arduino', 'comp', 'led', 'motor', 'power', 'servo', 'utils'] +__all__ = ['arduino', 'comp', 'led', 'motor', 'power', 'servo', 'utils', 'vision'] diff --git a/sbot/future/classless/robot.py b/sbot/future/classless/robot.py index 96db4b4..964a54e 100644 --- a/sbot/future/classless/robot.py +++ b/sbot/future/classless/robot.py @@ -15,6 +15,7 @@ from .power import Power from .servos import Servo from .utils import Utils +from .vision import Vision # Ensure that the atexit handler is registered to clean up the boards on termination ensure_atexit_on_term() @@ -24,6 +25,7 @@ motor = Motor(boards) servo = Servo(boards) arduino = Arduino(boards) +vision = Vision(boards) led = Led(boards) comp = Comp() utils = Utils(boards, comp) diff --git a/sbot/future/classless/utils.py b/sbot/future/classless/utils.py index 3e3c3f0..4b13c54 100644 --- a/sbot/future/classless/utils.py +++ b/sbot/future/classless/utils.py @@ -156,6 +156,7 @@ def load_boards(self) -> None: if not self._boards.loaded: self._boards.load_boards() self._boards.populate_outputs() + self._boards.load_cameras() else: raise RuntimeError("Boards have already been loaded") diff --git a/sbot/future/classless/vision.py b/sbot/future/classless/vision.py new file mode 100644 index 0000000..3a53bd3 --- /dev/null +++ b/sbot/future/classless/vision.py @@ -0,0 +1,65 @@ +"""An implementation of a camera board using the april_vision library.""" +from __future__ import annotations + +import logging +from pathlib import Path + +from numpy.typing import NDArray +from sbot.future.board_manager import BoardManager +from sbot.marker import Marker + +PathLike = Path | str +logger = logging.getLogger(__name__) + + +class Vision: + """ + Virtual Camera Board for detecting fiducial markers. + + Additionally, it will do pose estimation, along with some calibration + in order to determine the spatial positon and orientation of the markers + that it has detected. + """ + + __slots__ = ('_boards',) + + def __init__(self, boards: BoardManager): + self._boards = boards + + def detect_markers( + self, + *, + frame: NDArray | None = None, + save: PathLike | None = None + ) -> list[Marker]: + """ + Capture an image and identify fiducial markers. + + :param frame: An image to detect markers in, instead of capturing a new one, + :param save: If given, save the annotated frame to the path. + This is given a JPEG extension if none is provided. + :returns: list of markers that the camera could see. + """ + cam = self._boards.get_camera() + if frame is None: + frame = cam.capture() + + markers = cam.see(frame=frame) + + if save: + cam.save(save, frame=frame, detections=markers) + return [Marker.from_april_vision_marker(marker) for marker in markers] + + def capture(self, save: PathLike | None = None) -> NDArray: + """ + Get the raw image data from the camera. + + :param save: If given, save the unannotated frame to the path. + This is given a JPEG extension if none is provided. + :returns: Camera pixel data + """ + cam = self._boards.get_camera() + raw_frame = cam.capture() + if save: + cam.save(save, frame=raw_frame, annotated=False) + return raw_frame