import abc import logging from dataclasses import dataclass from enum import Enum from random import randint from struct import pack from struct import unpack from typing import Union from serial import Serial class CommandId(Enum): command_none = 0 command_log = 0x1 # even numbers are requests with optional response with ID = request ID + 1 command_heartbeat_request = 0x2 command_heartbeat_response = 0x3 command_led_request = 0x4 command_led_response = 0x5 command_gp_request = 0x6 command_gp_response = 0x7 def get_command_id_from_name(name: str) -> CommandId: return { "log": CommandId.command_log, "led": CommandId.command_led_request, "gp": CommandId.command_gp_request, }[name] def get_request_class( command_id: CommandId, ): return { CommandId.command_log: LogCommand, CommandId.command_heartbeat_request: HeartbeatRequest, CommandId.command_led_request: LEDRequest, CommandId.command_gp_request: GPRequest, }[command_id] def get_response_class( command_id: CommandId, ): return { CommandId.command_heartbeat_response: HeartbeatResponse, CommandId.command_led_response: LEDResponse, CommandId.command_gp_response: GPResponse, }[command_id] class Response(abc.ABC): identifier: int def __init__( self, data: bytes, ) -> None: self._logger = logging.getLogger(self.__class__.__name__) self.identifier = unpack(">H", data[:2])[0] self.unpack_payload(data[2:]) @abc.abstractmethod def unpack_payload( self, data: bytes, ): pass class Command(abc.ABC): def __init__( self, root_logger: logging.Logger, ) -> None: self._logger = root_logger.getChild(self.__class__.__name__) @property @abc.abstractmethod def identifier(self) -> CommandId: pass @abc.abstractmethod def execute( self, serial: Serial, ): pass def send_command(self, payload: bytes, serial: Serial): length = len(payload) data = pack( ">BHB" + "B" * length + "B", int(self.identifier.value), length, 0, *list(payload), 0xFF, ) serial.write(data) class Request(Command): def __init__( self, root_logger: logging.Logger, ) -> None: super().__init__(root_logger=root_logger) self.response_identifier = randint(0, pow(2, 16) - 1) @property @abc.abstractmethod def timeout(self) -> float: pass @abc.abstractmethod def process_response( self, response: Response, ): pass def send_command(self, payload: bytes, serial: Serial): response_identifier_header = pack(">H", self.response_identifier) super().send_command( payload=response_identifier_header + payload, serial=serial, ) @dataclass class LogCommand(Command): """Command ID: command_log""" level: int message: str HEADER_SIZE = 2 # log level + logger name length def __init__( self, root_logger: logging.Logger, data: bytes, ) -> None: super().__init__(root_logger=root_logger) self._logger.setLevel(logging.INFO) level = int(data[0]) logger_name_length = int(data[1]) self._logger.debug(f"level: {level}") self._logger.debug(f"logger_name_length: {logger_name_length}") logger_name = data[self.HEADER_SIZE : self.HEADER_SIZE + logger_name_length] message = data[self.HEADER_SIZE + logger_name_length :] self._logger.debug("logger_name " + str(logger_name)) self._logger.debug("Message: " + str(message)) self.received_logger = root_logger.getChild(logger_name.decode()) self.received_logger.setLevel(logging.DEBUG) self.level = level self.message = message.decode() @property def identifier(self) -> CommandId: return CommandId.command_log def execute( self, serial: Serial, ): self.received_logger.log(level=self.level, msg=self.message) class HeartbeatResponse(Response): def unpack_payload( self, data: bytes, ): pass class HeartbeatRequest(Request): @property def identifier(self) -> CommandId: return CommandId.command_heartbeat_request @property def timeout(self) -> float: return 0.1 def process_response(self, response: Response): if not isinstance(response, HeartbeatResponse): raise TypeError(f"{response} is not a {HeartbeatResponse}") def execute(self, serial: Serial): self.send_command( payload=bytes(), serial=serial, ) class LEDResponse(Response): was_successful = True # def __init__(self) -> None: # super().__init__() def unpack_payload( self, data: bytes, ): self.was_successful = bool(data[0]) class LEDRequest(Request): def __init__( self, root_logger: logging.Logger, id: Union[int, str], command: Union[int, str], ) -> None: """ led_id ------- 0: green 1: red 2: blue led_command -------- 0: off 1: on 2: toggle """ super().__init__(root_logger=root_logger) try: self.led_id = int(id) except ValueError: self.led_id = { "green": 0, "red": 1, "blue": 2, }[str(id)] try: self.led_command = int(command) except ValueError: self.led_command = { "off": 0, "on": 1, "toggle": 2, }[str(command)] @property def identifier(self) -> CommandId: return CommandId.command_led_request @property def timeout(self) -> float: return 0.1 def process_response(self, response: Response): if not isinstance(response, LEDResponse): raise TypeError(f"{response} is not a {LEDResponse}") if response.was_successful: self._logger.debug("LED command was successful") else: self._logger.debug("LED command was not successful") def execute(self, serial: Serial): payload = pack( ">BB", self.led_id, self.led_command, ) self.send_command( payload=payload, serial=serial, ) class GPResponse(Response): was_successful = True def unpack_payload( self, data: bytes, ): self.was_successful = bool(data[0]) class GPRequest(Request): def __init__( self, root_logger: logging.Logger, command_id: Union[int, str], ) -> None: super().__init__(root_logger=root_logger) self.command_id = int(command_id) @property def identifier(self) -> CommandId: return CommandId.command_gp_request @property def timeout(self) -> float: return 0.1 def process_response(self, response: Response): if not isinstance(response, GPResponse): raise TypeError(f"{response} is not a {GPResponse}") if response.was_successful: self._logger.debug("GP command was successful") else: self._logger.debug("GP command was not successful") def execute(self, serial: Serial): payload = pack( ">B", self.command_id, ) self.send_command( payload=payload, serial=serial, )