diff --git a/backend/monsun_backend/command_execution.py b/backend/monsun_backend/command_execution.py index d4e8873..48a461a 100644 --- a/backend/monsun_backend/command_execution.py +++ b/backend/monsun_backend/command_execution.py @@ -94,6 +94,9 @@ def enter_fsm( responses_received: List[Response] = list() time_at_beginning_waiting_for_response: float = 0.0 last_heart_beat_time: float = 0.0 + serial_receiver = SerialReceiver( + root_logger=root_logger, + ) while True: if state == State.heart_beat: @@ -129,8 +132,7 @@ def enter_fsm( ) else: request: Request = current_command - commands_, responses = receive( - root_logger=root_logger, + commands_, responses = serial_receiver.receive( serial=serial, ) responses_received.extend(responses) @@ -165,8 +167,7 @@ def enter_fsm( continue elif state == State.receiving_command: - commands_, responses = receive( - root_logger=root_logger, + commands_, responses = serial_receiver.receive( serial=serial, ) responses_received.extend(responses) @@ -201,74 +202,144 @@ def enqueue_command( _command_queue[role].put(command) -def receive( - root_logger: logging.Logger, - serial: Serial, -) -> Tuple[Sequence[Command], Sequence[Response]]: - logger = root_logger.getChild("receive_and_log") - logger.setLevel(logging.INFO) - commands_received: List[Command] = list() - responses_received: List[Response] = list() - header_size = 4 +class CommandInterpretationError(Exception): + """Raised in case the command could not be interpreted""" + - bytes_read = serial.read(serial.in_waiting) +class CommandBytesReadInsufficient(CommandInterpretationError): + """Raised in case the command could not be interpreted""" - while bytes_read: - logger.debug(f"bytes: {bytes_read.hex()}") +class CommandInterpreter: + header_size = 4 + + def __init__( + self, + root_logger: logging.Logger, + ) -> None: + self._logger = root_logger.getChild(self.__class__.__name__) + self._logger.setLevel(logging.INFO) + self.command_id_int = 0 + self.data_length = 0 + self.payload = bytes() + + def interpret( + self, + bytes_read: bytes, + ) -> bytes: + """Interpret the first command in a byte stream. + + :param bytes_read: The bytes which are not yet parsed. + :returns: The byte which are not yet parsed. + :raises CommandInterpretationError: If the command could not be parsed. + :raises CommandBytesReadInsufficient: Not enough bytes to fully parse the + command. + """ + self._logger.debug(f"bytes: {bytes_read.hex()}") try: - command_id_int, data_length, _ = unpack(">BHB", bytes_read[:header_size]) + self.command_id_int, self.data_length, _ = unpack( + ">BHB", + bytes_read[: self.header_size], + ) except error: - logger.error("error while interpreting command header") - bytes_read = None - continue + self._logger.error("error while interpreting command header") + raise CommandBytesReadInsufficient() try: - payload = bytes(bytes_read[header_size : header_size + data_length]) - stop_byte = bytes_read[header_size + data_length] + self.payload = bytes( + bytes_read[self.header_size : self.header_size + self.data_length], + ) except IndexError: - # okay, something went wrong. - logger.error( + self._logger.error( "There are less bytes than expected: " - f"Expected={header_size + data_length -1}, " + f"Expected={self.header_size + self.data_length -1}, " f"received={len(bytes_read)}", ) - logger.debug(f"bytes: {bytes_read.hex()}") - bytes_read = None - continue + self._logger.debug(f"bytes: {bytes_read.hex()}") + raise CommandBytesReadInsufficient() try: - command_id = CommandId(command_id_int) - except ValueError: - logger.error( - f"invalid command {command_id_int} with payload {str(payload)}", - ) - bytes_read = bytes_read[header_size + data_length + 1 :] - continue + stop_byte = bytes_read[self.header_size + self.data_length] + except IndexError: + self._logger.error("could not get stop byte") + raise CommandBytesReadInsufficient() - logger.debug(f"stop_byte: {stop_byte}") if stop_byte != 0xFF: - logger.error("Invalid stop byte") - bytes_read = None - else: - bytes_read = bytes_read[header_size + data_length + 1 :] + self._logger.error("Invalid stop byte") + raise CommandInterpretationError() - if command_id == CommandId.command_log: - command = commands.LogCommand( - root_logger=root_logger, - data=payload, - ) - commands_received.append(command) - elif command_id == CommandId.command_heartbeat_response: - responses_received.append(commands.HeartbeatResponse(payload)) - elif command_id == CommandId.command_led_response: - responses_received.append(commands.LEDResponse(payload)) - elif command_id == CommandId.command_gp_response: - responses_received.append(commands.GPResponse(payload)) - else: - raise RuntimeError + try: + return bytes_read[self.header_size + self.data_length + 1 :] + except IndexError: + return bytes() + + +class SerialReceiver: + def __init__( + self, + root_logger: logging.Logger, + ) -> None: + self.root_logger = root_logger + self._logger = root_logger.getChild(self.__class__.__name__) + self._logger.setLevel(logging.INFO) + + self._bytes_unread = bytearray() + + def receive( + self, + serial: Serial, + ) -> Tuple[Sequence[Command], Sequence[Response]]: + commands_received: List[Command] = list() + responses_received: List[Response] = list() + + self._bytes_unread.extend(serial.read(serial.in_waiting)) + + while self._bytes_unread: + + try: + command_interpreter = CommandInterpreter( + root_logger=self.root_logger, + ) + self._bytes_unread = bytearray( + command_interpreter.interpret( + bytes_read=self._bytes_unread, + ), + ) + # except CommandBytesReadInsufficient: + # return commands_received, responses_received + except CommandInterpretationError: + return commands_received, responses_received + + try: + command_id = CommandId(command_interpreter.command_id_int) + except ValueError: + self._logger.error( + f"invalid command {command_interpreter.command_id_int} with " + f"payload {str(command_interpreter.payload)}", + ) + + if command_id == CommandId.command_log: + command = commands.LogCommand( + root_logger=self.root_logger, + data=command_interpreter.payload, + ) + commands_received.append(command) + elif command_id == CommandId.command_heartbeat_response: + responses_received.append( + commands.HeartbeatResponse(command_interpreter.payload), + ) + elif command_id == CommandId.command_led_response: + responses_received.append( + commands.LEDResponse(command_interpreter.payload), + ) + elif command_id == CommandId.command_gp_response: + responses_received.append( + commands.GPResponse(command_interpreter.payload), + ) + else: + raise RuntimeError - return commands_received, responses_received + return commands_received, responses_received _process: Dict[str, Process] = dict()