|
|
|
@ -5,6 +5,8 @@ from enum import Enum |
|
|
|
from random import randint |
|
|
|
from struct import pack |
|
|
|
from struct import unpack |
|
|
|
from typing import Dict |
|
|
|
from typing import Type |
|
|
|
from typing import Union |
|
|
|
|
|
|
|
from serial import Serial |
|
|
|
@ -25,41 +27,15 @@ class CommandId(Enum): |
|
|
|
command_gp_request = 0x6 |
|
|
|
command_gp_response = 0x7 |
|
|
|
|
|
|
|
command_pump_request = 0x8 |
|
|
|
command_pump_response = 0x9 |
|
|
|
|
|
|
|
|
|
|
|
class CommandTarget(Enum): |
|
|
|
client = 0 |
|
|
|
server = 1 |
|
|
|
|
|
|
|
|
|
|
|
def get_command_id_from_name(name: str) -> CommandId: |
|
|
|
return { |
|
|
|
"log": CommandId.command_log, |
|
|
|
"led": CommandId.command_led_request, |
|
|
|
"gp": CommandId.command_gp_request, |
|
|
|
}[name] |
|
|
|
|
|
|
|
|
|
|
|
def get_request_class( |
|
|
|
command_id: CommandId, |
|
|
|
): |
|
|
|
return { |
|
|
|
CommandId.command_log: LogCommand, |
|
|
|
CommandId.command_heartbeat_request: HeartbeatRequest, |
|
|
|
CommandId.command_led_request: LEDRequest, |
|
|
|
CommandId.command_gp_request: GPRequest, |
|
|
|
}[command_id] |
|
|
|
|
|
|
|
|
|
|
|
def get_response_class( |
|
|
|
command_id: CommandId, |
|
|
|
): |
|
|
|
return { |
|
|
|
CommandId.command_heartbeat_response: HeartbeatResponse, |
|
|
|
CommandId.command_led_response: LEDResponse, |
|
|
|
CommandId.command_gp_response: GPResponse, |
|
|
|
}[command_id] |
|
|
|
|
|
|
|
|
|
|
|
class Response(abc.ABC): |
|
|
|
|
|
|
|
identifier: int |
|
|
|
@ -148,6 +124,43 @@ class Request(Command): |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
__command_alias: Dict[str, CommandId] = dict() |
|
|
|
__request_by_id: Dict[CommandId, Type[Request]] = dict() |
|
|
|
__response_by_id: Dict[CommandId, Type[Response]] = dict() |
|
|
|
|
|
|
|
|
|
|
|
def get_command_id_from_name(name: str) -> CommandId: |
|
|
|
return __command_alias[name] |
|
|
|
|
|
|
|
|
|
|
|
def get_request_class( |
|
|
|
command_id: CommandId, |
|
|
|
): |
|
|
|
return __request_by_id[command_id] |
|
|
|
|
|
|
|
|
|
|
|
def get_response_class( |
|
|
|
command_id: CommandId, |
|
|
|
): |
|
|
|
return __response_by_id[command_id] |
|
|
|
|
|
|
|
|
|
|
|
def register_request( |
|
|
|
cls: Type[Request], |
|
|
|
command_id: CommandId, |
|
|
|
alias: str, |
|
|
|
): |
|
|
|
__command_alias[alias] = command_id |
|
|
|
__request_by_id[command_id] = cls |
|
|
|
|
|
|
|
|
|
|
|
def register_response( |
|
|
|
cls: Type[Response], |
|
|
|
command_id: CommandId, |
|
|
|
): |
|
|
|
__response_by_id[command_id] = cls |
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
|
class LogCommand(Command): |
|
|
|
"""Command ID: command_log""" |
|
|
|
@ -216,6 +229,17 @@ class HeartbeatRequest(Request): |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
register_request( |
|
|
|
cls=HeartbeatRequest, |
|
|
|
command_id=CommandId.command_heartbeat_request, |
|
|
|
alias="hp", |
|
|
|
) |
|
|
|
register_response( |
|
|
|
cls=HeartbeatResponse, |
|
|
|
command_id=CommandId.command_heartbeat_response, |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
class LEDResponse(Response): |
|
|
|
was_successful = True |
|
|
|
|
|
|
|
@ -293,6 +317,17 @@ class LEDRequest(Request): |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
register_request( |
|
|
|
cls=LEDRequest, |
|
|
|
command_id=CommandId.command_led_request, |
|
|
|
alias="led", |
|
|
|
) |
|
|
|
register_response( |
|
|
|
cls=LEDResponse, |
|
|
|
command_id=CommandId.command_led_response, |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
class GPResponse(Response): |
|
|
|
was_successful = True |
|
|
|
|
|
|
|
@ -333,3 +368,84 @@ class GPRequest(Request): |
|
|
|
payload=payload, |
|
|
|
serial=serial, |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
register_request( |
|
|
|
cls=GPRequest, |
|
|
|
command_id=CommandId.command_gp_request, |
|
|
|
alias="gp", |
|
|
|
) |
|
|
|
register_response( |
|
|
|
cls=GPResponse, |
|
|
|
command_id=CommandId.command_gp_response, |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
class PumpResponse(Response): |
|
|
|
was_successful = True |
|
|
|
|
|
|
|
def unpack_payload( |
|
|
|
self, |
|
|
|
data: bytes, |
|
|
|
): |
|
|
|
self.was_successful = bool(data[0]) |
|
|
|
self.is_on = bool(data[1]) |
|
|
|
|
|
|
|
|
|
|
|
class PumpRequest(Request): |
|
|
|
def __init__(self, do: str, timeout: Union[int, str] = 60, *args, **kwargs) -> None: |
|
|
|
super().__init__(*args, **kwargs) |
|
|
|
try: |
|
|
|
self.do = { |
|
|
|
"turn_off": 0, |
|
|
|
"turn_on": 1, |
|
|
|
"is_on": 2, |
|
|
|
}[do] |
|
|
|
except KeyError: |
|
|
|
self.do = int(do) |
|
|
|
|
|
|
|
self.pump_timeout = int(timeout) |
|
|
|
|
|
|
|
@property |
|
|
|
def identifier(self) -> CommandId: |
|
|
|
return CommandId.command_pump_request |
|
|
|
|
|
|
|
@property |
|
|
|
def timeout(self) -> float: |
|
|
|
return 1 |
|
|
|
|
|
|
|
def process_response(self, response: Response): |
|
|
|
if not isinstance(response, PumpResponse): |
|
|
|
raise TypeError(f"{response} is not a {PumpResponse}") |
|
|
|
|
|
|
|
if response.was_successful: |
|
|
|
self._logger.debug("Pump command was successful") |
|
|
|
else: |
|
|
|
self._logger.debug("Pump command was not successful") |
|
|
|
|
|
|
|
if response.is_on: |
|
|
|
self._logger.debug("Pump is on") |
|
|
|
else: |
|
|
|
self._logger.debug("Pump is off") |
|
|
|
|
|
|
|
def execute(self, serial: Serial): |
|
|
|
payload = pack( |
|
|
|
">BH", |
|
|
|
self.do, |
|
|
|
self.pump_timeout, |
|
|
|
) |
|
|
|
self.send_command( |
|
|
|
payload=payload, |
|
|
|
serial=serial, |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
register_request( |
|
|
|
cls=PumpRequest, |
|
|
|
command_id=CommandId.command_pump_request, |
|
|
|
alias="pump", |
|
|
|
) |
|
|
|
register_response( |
|
|
|
cls=PumpResponse, |
|
|
|
command_id=CommandId.command_pump_response, |
|
|
|
) |
|
|
|
|