Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): driver and simulator for FLEX Stacker #17120

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ async def create(
alarm_keyword: Optional[str] = None,
reset_buffer_before_write: bool = False,
async_error_ack: Optional[str] = None,
number_of_retries: int = 0,
) -> AsyncResponseSerialConnection:
"""
Create a connection.
Expand Down Expand Up @@ -340,6 +341,7 @@ async def create(
error_keyword=error_keyword or "err",
alarm_keyword=alarm_keyword or "alarm",
async_error_ack=async_error_ack or "async",
number_of_retries=number_of_retries,
)

def __init__(
Expand All @@ -352,6 +354,7 @@ def __init__(
error_keyword: str,
alarm_keyword: str,
async_error_ack: str,
number_of_retries: int = 0,
) -> None:
"""
Constructor
Expand Down Expand Up @@ -383,6 +386,7 @@ def __init__(
self._name = name
self._ack = ack.encode()
self._retry_wait_time_seconds = retry_wait_time_seconds
self._number_of_retries = number_of_retries
self._error_keyword = error_keyword.lower()
self._alarm_keyword = alarm_keyword.lower()
self._async_error_ack = async_error_ack.lower()
Expand All @@ -403,7 +407,9 @@ async def send_command(
Raises: SerialException
"""
return await self.send_data(
data=command.build(), retries=retries, timeout=timeout
data=command.build(),
retries=retries or self._number_of_retries,
timeout=timeout,
)

async def send_data(
Expand All @@ -424,7 +430,9 @@ async def send_data(
async with super().send_data_lock, self._serial.timeout_override(
"timeout", timeout
):
return await self._send_data(data=data, retries=retries)
return await self._send_data(
data=data, retries=retries or self._number_of_retries
)

async def _send_data(self, data: str, retries: int = 0) -> str:
"""
Expand All @@ -439,6 +447,7 @@ async def _send_data(self, data: str, retries: int = 0) -> str:
Raises: SerialException
"""
data_encode = data.encode()
retries = retries or self._number_of_retries

for retry in range(retries + 1):
log.debug(f"{self._name}: Write -> {data_encode!r}")
Expand Down
4 changes: 2 additions & 2 deletions api/src/opentrons/drivers/command_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
class CommandBuilder:
"""Class used to build GCODE commands."""

def __init__(self, terminator: str) -> None:
def __init__(self, terminator: str = "\n") -> None:
"""
Construct a command builder.

Expand All @@ -17,7 +17,7 @@ def __init__(self, terminator: str) -> None:
self._elements: List[str] = []

def add_float(
self, prefix: str, value: float, precision: Optional[int]
self, prefix: str, value: float, precision: Optional[int] = None
) -> CommandBuilder:
"""
Add a float value.
Expand Down
9 changes: 9 additions & 0 deletions api/src/opentrons/drivers/flex_stacker/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .abstract import AbstractStackerDriver
from .driver import FlexStackerDriver
from .simulator import SimulatingDriver

__all__ = [
"AbstractStackerDriver",
"FlexStackerDriver",
"SimulatingDriver",
]
89 changes: 89 additions & 0 deletions api/src/opentrons/drivers/flex_stacker/abstract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from typing import Protocol

from .types import (
StackerAxis,
PlatformStatus,
Direction,
MoveParams,
StackerInfo,
LEDColor,
)


class AbstractStackerDriver(Protocol):
"""Protocol for the Stacker driver."""

async def connect(self) -> None:
"""Connect to stacker."""
...

async def disconnect(self) -> None:
"""Disconnect from stacker."""
...

async def is_connected(self) -> bool:
"""Check connection to stacker."""
...

async def update_firmware(self, firmware_file_path: str) -> None:
"""Updates the firmware on the device."""
...

async def get_device_info(self) -> StackerInfo:
"""Get Device Info."""
...

async def set_serial_number(self, sn: str) -> bool:
"""Set Serial Number."""
...

async def stop_motors(self) -> bool:
"""Stop all motor movement."""
...

async def get_limit_switch(self, axis: StackerAxis, direction: Direction) -> bool:
"""Get limit switch status.

:return: True if limit switch is triggered, False otherwise
"""
...

async def get_platform_sensor(self, direction: Direction) -> bool:
"""Get platform sensor status.

:return: True if platform is present, False otherwise
"""
...

async def get_platform_status(self) -> PlatformStatus:
"""Get platform status."""
...

async def get_hopper_door_closed(self) -> bool:
"""Get whether or not door is closed.

:return: True if door is closed, False otherwise
"""
...

async def move_in_mm(
self, axis: StackerAxis, distance: float, params: MoveParams | None = None
) -> bool:
"""Move axis."""
...

async def move_to_limit_switch(
self, axis: StackerAxis, direction: Direction, params: MoveParams | None = None
) -> bool:
"""Move until limit switch is triggered."""
...

async def home_axis(self, axis: StackerAxis, direction: Direction) -> bool:
"""Home axis."""
...

async def set_led(
self, power: float, color: LEDColor | None = None, external: bool | None = None
) -> bool:
"""Set LED color of status bar."""
...
Loading
Loading