Skip to content

Commit

Permalink
Implement vision support
Browse files Browse the repository at this point in the history
  • Loading branch information
WillB97 committed Jan 26, 2025
1 parent 84e45e4 commit d472ab5
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 2 deletions.
93 changes: 93 additions & 0 deletions sbot/future/board_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,17 @@
import os
from typing import Callable, ClassVar, NamedTuple

from april_vision import (
FrameSource,
USBCamera,
calibrations,
find_cameras,
generate_marker_size_mapping,
)
from april_vision import Processor as AprilCamera
from april_vision.helpers import Base64Sender
from sbot.exceptions import BoardDisconnectionError
from sbot.game_specific import MARKER_SIZES
from sbot.serial_wrapper import BASE_TIMEOUT, SerialWrapper
from sbot.utils import BoardIdentity, get_simulator_boards, get_USB_identity
from serial.tools.list_ports import comports
Expand Down Expand Up @@ -113,6 +123,8 @@ def __init__(self) -> None:
self._loaded: bool = False
self.boards: dict[str, dict[str, SerialWrapper]] = {}
self.outputs: dict[str, list[OutputIdentifier]] = {}
self.cameras: dict[str, AprilCamera] = {}
self._mqtt = None

@classmethod
def register_board(cls, board: DiscoveryTemplate) -> None:
Expand Down Expand Up @@ -399,3 +411,84 @@ def _name_from_identifier(self, identifier: str) -> str:
if template.identifier == identifier:
return template.name
return identifier

def load_cameras(self) -> None:
"""
Find all connected cameras with calibration and configure tag sizes.
Where MQTT is enabled, set up a frame sender to send annotated frames
as base64 encoded JPEG bytestreams of each image, detection is run on.
This method will attempt to identify all connected cameras and load them
into the cameras dictionary. Cameras will be indexed by their name.
"""
if self.cameras:
raise RuntimeError("Cameras have already been loaded")

camera_source: FrameSource
# Unroll the tag ID iterables and convert the sizes to meters
expanded_tag_sizes = generate_marker_size_mapping(MARKER_SIZES)

if self._mqtt:
frame_sender = Base64Sender(self._mqtt.wrapped_publish)
else:
frame_sender = None

if IN_SIMULATOR:
from sbot.simulator.camera import WebotsRemoteCameraSource

for camera_info in get_simulator_boards('CameraBoard'):
camera_source = WebotsRemoteCameraSource(camera_info)
# The processor handles the detection and pose estimation
camera = AprilCamera(
camera_source,
calibration=camera_source.calibration,
name=camera_info.serial_number,
mask_unknown_size_tags=True,
)

# Set the tag sizes in the camera
camera.set_marker_sizes(expanded_tag_sizes)

if frame_sender:
camera.detection_hook = frame_sender.annotated_frame_hook

self.cameras[camera_info.serial_number] = camera
else:
for camera_data in find_cameras(calibrations):
cam_name = f"{camera_data.name} - {camera_data.index}"

# The camera source handles the connection between the camera and the processor
camera_source = USBCamera.from_calibration_file(
camera_data.index,
calibration_file=camera_data.calibration,
vidpid=camera_data.vidpid,
)
# The processor handles the detection and pose estimation
camera = AprilCamera(
camera_source,
calibration=camera_source.calibration,
name=camera_data.name,
vidpid=camera_data.vidpid,
mask_unknown_size_tags=True,
)

# Set the tag sizes in the camera
camera.set_marker_sizes(expanded_tag_sizes)

if frame_sender:
camera.detection_hook = frame_sender.annotated_frame_hook

self.cameras[cam_name] = camera

def get_camera(self) -> AprilCamera:
"""Get the first camera connected to the robot."""
if len(self.cameras) > 1:
camera_name = next(iter(self.cameras.keys()))
logger.warning(f"Multiple cameras connected, using {camera_name!r}")

try:
return next(iter(self.cameras.values()))
except StopIteration:
raise ValueError("No cameras connected") from None

4 changes: 2 additions & 2 deletions sbot/future/classless/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .robot import arduino, comp, led, motor, power, servo, utils
from .robot import arduino, comp, led, motor, power, servo, utils, vision

__all__ = ['arduino', 'comp', 'led', 'motor', 'power', 'servo', 'utils']
__all__ = ['arduino', 'comp', 'led', 'motor', 'power', 'servo', 'utils', 'vision']
2 changes: 2 additions & 0 deletions sbot/future/classless/robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .power import Power
from .servos import Servo
from .utils import Utils
from .vision import Vision

# Ensure that the atexit handler is registered to clean up the boards on termination
ensure_atexit_on_term()
Expand All @@ -24,6 +25,7 @@
motor = Motor(boards)
servo = Servo(boards)
arduino = Arduino(boards)
vision = Vision(boards)
led = Led(boards)
comp = Comp()
utils = Utils(boards, comp)
Expand Down
1 change: 1 addition & 0 deletions sbot/future/classless/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def load_boards(self) -> None:
if not self._boards.loaded:
self._boards.load_boards()
self._boards.populate_outputs()
self._boards.load_cameras()
else:
raise RuntimeError("Boards have already been loaded")

Expand Down
65 changes: 65 additions & 0 deletions sbot/future/classless/vision.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""An implementation of a camera board using the april_vision library."""
from __future__ import annotations

import logging
from pathlib import Path

from numpy.typing import NDArray
from sbot.future.board_manager import BoardManager
from sbot.marker import Marker

PathLike = Path | str
logger = logging.getLogger(__name__)


class Vision:
"""
Virtual Camera Board for detecting fiducial markers.
Additionally, it will do pose estimation, along with some calibration
in order to determine the spatial positon and orientation of the markers
that it has detected.
"""

__slots__ = ('_boards',)

def __init__(self, boards: BoardManager):
self._boards = boards

def detect_markers(
self,
*,
frame: NDArray | None = None,
save: PathLike | None = None
) -> list[Marker]:
"""
Capture an image and identify fiducial markers.
:param frame: An image to detect markers in, instead of capturing a new one,
:param save: If given, save the annotated frame to the path.
This is given a JPEG extension if none is provided.
:returns: list of markers that the camera could see.
"""
cam = self._boards.get_camera()
if frame is None:
frame = cam.capture()

markers = cam.see(frame=frame)

if save:
cam.save(save, frame=frame, detections=markers)
return [Marker.from_april_vision_marker(marker) for marker in markers]

def capture(self, save: PathLike | None = None) -> NDArray:
"""
Get the raw image data from the camera.
:param save: If given, save the unannotated frame to the path.
This is given a JPEG extension if none is provided.
:returns: Camera pixel data
"""
cam = self._boards.get_camera()
raw_frame = cam.capture()
if save:
cam.save(save, frame=raw_frame, annotated=False)
return raw_frame

0 comments on commit d472ab5

Please sign in to comment.