import atexit import logging import time from enum import Enum from multiprocessing import Process from multiprocessing import Queue from struct import unpack from typing import List from typing import Optional from typing import Sequence from typing import Tuple from serial import Serial from . import commands from .commands import Command from .commands import CommandId from .commands import Request from .commands import Response from .container import get_container from .container import initialize_container _logger = logging.getLogger(__file__) _command_queue: Queue = Queue() class State(Enum): heart_beat = 0x0 executing_command = 0x1 executing_command_waiting_for_response = 0x2 receiving_command = 0x10 def worker_process( queue: Queue, config, ): logging.basicConfig( level=logging.DEBUG, format="[%(asctime)s] [%(name)-20s] [%(levelname)-8s] --- %(message)s", ) logger = logging.getLogger("Command Loop") logger.setLevel(logging.INFO) initialize_container(config) container = get_container() heartbeat_interval = container.config.heartbeat_interval() serial_reconnection_wait_timeout = ( container.config.serial_reconnection_wait_timeout() ) connected = False logger.info("entering command loop...") while True: try: with container.serial() as serial: logger.info("connected with serial device") connected = True enter_fsm( serial=serial, command_queue=queue, heartbeat_interval=heartbeat_interval, ) except OSError: if connected: logger.warning("connection to serial device lost") connected = False time.sleep(serial_reconnection_wait_timeout) logger.warning("reconnecting...") def enter_fsm( serial: Serial, command_queue: Queue, heartbeat_interval: float, ): logger = logging.getLogger("FSM") state = State.executing_command current_command: Optional[Command] = None responses_received: List[Response] = list() time_at_beginning_waiting_for_response: float = 0.0 last_heart_beat_time: float = 0.0 while True: if state == State.heart_beat: if time.time() - heartbeat_interval > last_heart_beat_time: command_queue.put(commands.HeartbeatRequest()) last_heart_beat_time = time.time() state = State.executing_command continue elif state == State.executing_command: current_command = dequeue_command(queue=command_queue) if current_command is None: state = State.receiving_command continue current_command.execute(serial=serial) if isinstance(current_command, Request): time_at_beginning_waiting_for_response = time.time() state = State.executing_command_waiting_for_response continue elif state == State.executing_command_waiting_for_response: if not isinstance(current_command, Request): raise RuntimeError( "entered state 'executing_command_waiting_for_response' but " "current command does not expect a response.", ) else: request: Request = current_command commands_, responses = receive(serial=serial) responses_received.extend(responses) for command in commands_: command_queue.put(command) while responses_received: received_response: Response = responses_received.pop(0) if request.response_identifier == received_response.identifier: request.process_response( response=received_response, ) state = State.executing_command break else: logger.warning( f"received response with ID {received_response.identifier} " "but expected response with ID " f"{request.response_identifier}", ) else: if ( time.time() - request.timeout > time_at_beginning_waiting_for_response ): logger.error( "Timeout while waiting for response with ID " f"{request.response_identifier}", ) current_command = None state = State.executing_command continue elif state == State.receiving_command: commands_, responses = receive(serial=serial) responses_received.extend(responses) for command in commands_: command_queue.put(command) state = State.heart_beat continue else: raise RuntimeError(f"Invalid state: {state}") def dequeue_command( queue: Queue, ) -> Optional[Command]: while not queue.empty(): return queue.get() return None def enqueue_command(command: Command): _command_queue.put(command) def receive( serial: Serial, ) -> Tuple[Sequence[Command], Sequence[Response]]: logger = logging.getLogger("receive_and_log") logger.setLevel(logging.INFO) commands_received: List[Command] = list() responses_received: List[Response] = list() header_size = 4 bytes_read = serial.read(serial.in_waiting) while bytes_read: logger.debug(f"bytes: {bytes_read.hex()}") command_id_int, data_length, _ = unpack(">BHB", bytes_read[:header_size]) payload = bytes(bytes_read[header_size : header_size + data_length]) stop_byte = bytes_read[header_size + data_length] try: command_id = CommandId(command_id_int) except ValueError: logger.error( f"invalid command {command_id_int} with payload {str(payload)}", ) bytes_read = bytes_read[header_size + data_length + 1 :] continue logger.debug(f"stop_byte: {stop_byte}") if stop_byte != 0xFF: logger.error("Invalid stop byte") bytes_read = None else: bytes_read = bytes_read[header_size + data_length + 1 :] if command_id == CommandId.command_log: command = commands.LogCommand( data=payload, ) commands_received.append(command) elif command_id == CommandId.command_heartbeat_response: responses_received.append(commands.HeartbeatResponse(payload)) elif command_id == CommandId.command_led_response: responses_received.append(commands.LEDResponse(payload)) elif command_id == CommandId.command_gp_response: responses_received.append(commands.GPResponse(payload)) else: raise RuntimeError return commands_received, responses_received _process: Optional[Process] = None def _end_running_process(): if _process is not None: _process.kill() def start_backgroup_process(): _logger.warning("start_backgroup_process called") global _process _process = Process( target=worker_process, args=( _command_queue, get_container().config(), ), ) _process.start() atexit.register(_end_running_process)