Browse Source

backend: Each request waits for the response

ble
Andreas Berthoud 4 years ago
parent
commit
265944b5ec
  1. 15
      backend/monsun_backend/command_endpoint.py
  2. 45
      backend/monsun_backend/command_execution.py
  3. 30
      backend/monsun_backend/commands.py

15
backend/monsun_backend/command_endpoint.py

@ -1,11 +1,14 @@
import logging
from typing import Optional
from flask import Response
from flask import blueprints
from flask import make_response
from flask import request
from flask_api import status
from .command_execution import enqueue_command
from . import commands
from .command_execution import execute_command
from .commands import CommandId
from .commands import CommandTarget
from .commands import get_command_id_from_name
@ -39,7 +42,7 @@ def command(role: str):
return Response(status=status.HTTP_400_BAD_REQUEST)
try:
enqueue_command(
response: Optional[commands.Response] = execute_command(
role=role,
command=command,
)
@ -47,4 +50,10 @@ def command(role: str):
logger.error(f"role {role} does not exist")
return Response(status=status.HTTP_400_BAD_REQUEST)
return Response(status=status.HTTP_200_OK)
if response is None:
return Response(status=status.HTTP_408_REQUEST_TIMEOUT)
return make_response(
response.as_dict(),
status.HTTP_200_OK,
)

45
backend/monsun_backend/command_execution.py

@ -2,8 +2,11 @@ import atexit
import logging
import time
from enum import Enum
from multiprocessing import Manager
from multiprocessing import Process
from multiprocessing import Queue
from multiprocessing.managers import SyncManager
from queue import Empty
from struct import error
from struct import unpack
from typing import Dict
@ -30,6 +33,13 @@ _logger = logging.getLogger(__file__)
_command_queue: Dict[str, Queue] = dict()
"""role name: command queue"""
_response_queue: Dict[str, Queue] = dict()
"""role name: response queue"""
_manager: Optional[SyncManager] = None
_awaiting_response_identifier_list: Dict[str, List] = dict()
"""role name: request queue"""
class State(Enum):
heart_beat = 0x0
@ -40,7 +50,9 @@ class State(Enum):
def worker_process(
role: str,
queue: Queue,
commnad_queue: Queue,
response_queue: Queue,
awaiting_response_identifier_list: List,
):
logging.basicConfig(
level=logging.DEBUG,
@ -73,7 +85,9 @@ def worker_process(
target=CommandTarget[role],
root_logger=root_logger,
serial=serial,
command_queue=queue,
command_queue=commnad_queue,
response_queue=response_queue,
awaiting_response_identifier_list=awaiting_response_identifier_list,
heartbeat_interval=heartbeat_interval,
)
except OSError:
@ -89,6 +103,8 @@ def enter_fsm(
root_logger: logging.Logger,
serial: Serial,
command_queue: Queue,
response_queue: Queue,
awaiting_response_identifier_list: List,
heartbeat_interval: float,
):
logger = root_logger.getChild("FSM")
@ -161,6 +177,12 @@ def enter_fsm(
request.process_response(
response=received_response,
)
if request.response_identifier in awaiting_response_identifier_list:
response_queue.put(received_response)
awaiting_response_identifier_list.remove(
request.response_identifier,
)
state = State.executing_command
break
else:
@ -206,17 +228,26 @@ def dequeue_command(
return None
def enqueue_command(
def execute_command(
role: str,
command: Command,
):
timeout: int = 2,
) -> Optional[Response]:
"""Add a command to the command queue
:param role: The role name
:param command: The command to enqueue
"""
if isinstance(command, Request):
_awaiting_response_identifier_list[role].append(command.response_identifier)
_command_queue[role].put(command)
try:
return _response_queue[role].get(timeout=timeout)
except Empty:
return None
class CommandInterpretationError(Exception):
"""Raised in case the command could not be interpreted"""
@ -372,17 +403,23 @@ def _end_running_process():
def start_backgroup_process():
global _process
global _command_queue
global _manager
container = get_initialize_container()
_manager = Manager()
role_name: str
for role_name in container.config.roles():
_command_queue[role_name] = Queue()
_response_queue[role_name] = Queue(maxsize=32)
_awaiting_response_identifier_list[role_name] = _manager.list()
_process[role_name] = Process(
target=worker_process,
args=(
role_name,
_command_queue[role_name],
_response_queue[role_name],
_awaiting_response_identifier_list[role_name],
),
)
_process[role_name].start()

30
backend/monsun_backend/commands.py

@ -56,6 +56,10 @@ class Response(abc.ABC):
):
pass
@abc.abstractmethod
def as_dict(self) -> Dict:
pass
class Command(abc.ABC):
def __init__(
@ -208,6 +212,9 @@ class HeartbeatResponse(Response):
):
pass
def as_dict(self) -> Dict:
return dict()
class HeartbeatRequest(Request):
@property
@ -241,10 +248,9 @@ register_response(
class LEDResponse(Response):
was_successful = True
# def __init__(self) -> None:
# super().__init__()
def __init__(self, *args, **kwargs) -> None:
self.was_successful = False
super().__init__(*args, **kwargs)
def unpack_payload(
self,
@ -252,6 +258,11 @@ class LEDResponse(Response):
):
self.was_successful = bool(data[0])
def as_dict(self) -> Dict:
return {
"was_successful": self.was_successful,
}
class LEDRequest(Request):
def __init__(
@ -337,6 +348,11 @@ class GPResponse(Response):
):
self.was_successful = bool(data[0])
def as_dict(self) -> Dict:
return {
"was_successful": self.was_successful,
}
class GPRequest(Request):
def __init__(self, command_id: Union[int, str], *args, **kwargs) -> None:
@ -391,6 +407,12 @@ class PumpResponse(Response):
self.was_successful = bool(data[0])
self.is_on = bool(data[1])
def as_dict(self) -> Dict:
return {
"was_successful": self.was_successful,
"is_on": self.is_on,
}
class PumpRequest(Request):
def __init__(self, do: str, timeout: Union[int, str] = 60, *args, **kwargs) -> None:

Loading…
Cancel
Save