diff --git a/backend/monsun_backend/command_endpoint.py b/backend/monsun_backend/command_endpoint.py index e1193b4..fea1560 100644 --- a/backend/monsun_backend/command_endpoint.py +++ b/backend/monsun_backend/command_endpoint.py @@ -1,11 +1,14 @@ import logging +from typing import Optional from flask import Response from flask import blueprints +from flask import make_response from flask import request from flask_api import status -from .command_execution import enqueue_command +from . import commands +from .command_execution import execute_command from .commands import CommandId from .commands import CommandTarget from .commands import get_command_id_from_name @@ -39,7 +42,7 @@ def command(role: str): return Response(status=status.HTTP_400_BAD_REQUEST) try: - enqueue_command( + response: Optional[commands.Response] = execute_command( role=role, command=command, ) @@ -47,4 +50,10 @@ def command(role: str): logger.error(f"role {role} does not exist") return Response(status=status.HTTP_400_BAD_REQUEST) - return Response(status=status.HTTP_200_OK) + if response is None: + return Response(status=status.HTTP_408_REQUEST_TIMEOUT) + + return make_response( + response.as_dict(), + status.HTTP_200_OK, + ) diff --git a/backend/monsun_backend/command_execution.py b/backend/monsun_backend/command_execution.py index c0d796f..f1341a1 100644 --- a/backend/monsun_backend/command_execution.py +++ b/backend/monsun_backend/command_execution.py @@ -2,8 +2,11 @@ import atexit import logging import time from enum import Enum +from multiprocessing import Manager from multiprocessing import Process from multiprocessing import Queue +from multiprocessing.managers import SyncManager +from queue import Empty from struct import error from struct import unpack from typing import Dict @@ -30,6 +33,13 @@ _logger = logging.getLogger(__file__) _command_queue: Dict[str, Queue] = dict() """role name: command queue""" +_response_queue: Dict[str, Queue] = dict() +"""role name: response queue""" + +_manager: Optional[SyncManager] = None +_awaiting_response_identifier_list: Dict[str, List] = dict() +"""role name: request queue""" + class State(Enum): heart_beat = 0x0 @@ -40,7 +50,9 @@ class State(Enum): def worker_process( role: str, - queue: Queue, + commnad_queue: Queue, + response_queue: Queue, + awaiting_response_identifier_list: List, ): logging.basicConfig( level=logging.DEBUG, @@ -73,7 +85,9 @@ def worker_process( target=CommandTarget[role], root_logger=root_logger, serial=serial, - command_queue=queue, + command_queue=commnad_queue, + response_queue=response_queue, + awaiting_response_identifier_list=awaiting_response_identifier_list, heartbeat_interval=heartbeat_interval, ) except OSError: @@ -89,6 +103,8 @@ def enter_fsm( root_logger: logging.Logger, serial: Serial, command_queue: Queue, + response_queue: Queue, + awaiting_response_identifier_list: List, heartbeat_interval: float, ): logger = root_logger.getChild("FSM") @@ -161,6 +177,12 @@ def enter_fsm( request.process_response( response=received_response, ) + if request.response_identifier in awaiting_response_identifier_list: + response_queue.put(received_response) + awaiting_response_identifier_list.remove( + request.response_identifier, + ) + state = State.executing_command break else: @@ -206,17 +228,26 @@ def dequeue_command( return None -def enqueue_command( +def execute_command( role: str, command: Command, -): + timeout: int = 2, +) -> Optional[Response]: """Add a command to the command queue :param role: The role name :param command: The command to enqueue """ + if isinstance(command, Request): + _awaiting_response_identifier_list[role].append(command.response_identifier) + _command_queue[role].put(command) + try: + return _response_queue[role].get(timeout=timeout) + except Empty: + return None + class CommandInterpretationError(Exception): """Raised in case the command could not be interpreted""" @@ -372,17 +403,23 @@ def _end_running_process(): def start_backgroup_process(): global _process global _command_queue + global _manager container = get_initialize_container() + _manager = Manager() role_name: str for role_name in container.config.roles(): _command_queue[role_name] = Queue() + _response_queue[role_name] = Queue(maxsize=32) + _awaiting_response_identifier_list[role_name] = _manager.list() _process[role_name] = Process( target=worker_process, args=( role_name, _command_queue[role_name], + _response_queue[role_name], + _awaiting_response_identifier_list[role_name], ), ) _process[role_name].start() diff --git a/backend/monsun_backend/commands.py b/backend/monsun_backend/commands.py index 0aa0c23..fe6b0b9 100644 --- a/backend/monsun_backend/commands.py +++ b/backend/monsun_backend/commands.py @@ -56,6 +56,10 @@ class Response(abc.ABC): ): pass + @abc.abstractmethod + def as_dict(self) -> Dict: + pass + class Command(abc.ABC): def __init__( @@ -208,6 +212,9 @@ class HeartbeatResponse(Response): ): pass + def as_dict(self) -> Dict: + return dict() + class HeartbeatRequest(Request): @property @@ -241,10 +248,9 @@ register_response( class LEDResponse(Response): - was_successful = True - - # def __init__(self) -> None: - # super().__init__() + def __init__(self, *args, **kwargs) -> None: + self.was_successful = False + super().__init__(*args, **kwargs) def unpack_payload( self, @@ -252,6 +258,11 @@ class LEDResponse(Response): ): self.was_successful = bool(data[0]) + def as_dict(self) -> Dict: + return { + "was_successful": self.was_successful, + } + class LEDRequest(Request): def __init__( @@ -337,6 +348,11 @@ class GPResponse(Response): ): self.was_successful = bool(data[0]) + def as_dict(self) -> Dict: + return { + "was_successful": self.was_successful, + } + class GPRequest(Request): def __init__(self, command_id: Union[int, str], *args, **kwargs) -> None: @@ -391,6 +407,12 @@ class PumpResponse(Response): self.was_successful = bool(data[0]) self.is_on = bool(data[1]) + def as_dict(self) -> Dict: + return { + "was_successful": self.was_successful, + "is_on": self.is_on, + } + class PumpRequest(Request): def __init__(self, do: str, timeout: Union[int, str] = 60, *args, **kwargs) -> None: