Compare commits

...

2 Commits

  1. 15
      backend/monsun_backend/command_endpoint.py
  2. 45
      backend/monsun_backend/command_execution.py
  3. 30
      backend/monsun_backend/commands.py
  4. 6
      nucleo-wb55-ble/STM32_WPAN/App/app_ble.c

15
backend/monsun_backend/command_endpoint.py

@ -1,11 +1,14 @@
import logging import logging
from typing import Optional
from flask import Response from flask import Response
from flask import blueprints from flask import blueprints
from flask import make_response
from flask import request from flask import request
from flask_api import status from flask_api import status
from .command_execution import enqueue_command from . import commands
from .command_execution import execute_command
from .commands import CommandId from .commands import CommandId
from .commands import CommandTarget from .commands import CommandTarget
from .commands import get_command_id_from_name from .commands import get_command_id_from_name
@ -39,7 +42,7 @@ def command(role: str):
return Response(status=status.HTTP_400_BAD_REQUEST) return Response(status=status.HTTP_400_BAD_REQUEST)
try: try:
enqueue_command( response: Optional[commands.Response] = execute_command(
role=role, role=role,
command=command, command=command,
) )
@ -47,4 +50,10 @@ def command(role: str):
logger.error(f"role {role} does not exist") logger.error(f"role {role} does not exist")
return Response(status=status.HTTP_400_BAD_REQUEST) return Response(status=status.HTTP_400_BAD_REQUEST)
return Response(status=status.HTTP_200_OK) if response is None:
return Response(status=status.HTTP_408_REQUEST_TIMEOUT)
return make_response(
response.as_dict(),
status.HTTP_200_OK,
)

45
backend/monsun_backend/command_execution.py

@ -2,8 +2,11 @@ import atexit
import logging import logging
import time import time
from enum import Enum from enum import Enum
from multiprocessing import Manager
from multiprocessing import Process from multiprocessing import Process
from multiprocessing import Queue from multiprocessing import Queue
from multiprocessing.managers import SyncManager
from queue import Empty
from struct import error from struct import error
from struct import unpack from struct import unpack
from typing import Dict from typing import Dict
@ -30,6 +33,13 @@ _logger = logging.getLogger(__file__)
_command_queue: Dict[str, Queue] = dict() _command_queue: Dict[str, Queue] = dict()
"""role name: command queue""" """role name: command queue"""
_response_queue: Dict[str, Queue] = dict()
"""role name: response queue"""
_manager: Optional[SyncManager] = None
_awaiting_response_identifier_list: Dict[str, List] = dict()
"""role name: request queue"""
class State(Enum): class State(Enum):
heart_beat = 0x0 heart_beat = 0x0
@ -40,7 +50,9 @@ class State(Enum):
def worker_process( def worker_process(
role: str, role: str,
queue: Queue, commnad_queue: Queue,
response_queue: Queue,
awaiting_response_identifier_list: List,
): ):
logging.basicConfig( logging.basicConfig(
level=logging.DEBUG, level=logging.DEBUG,
@ -73,7 +85,9 @@ def worker_process(
target=CommandTarget[role], target=CommandTarget[role],
root_logger=root_logger, root_logger=root_logger,
serial=serial, serial=serial,
command_queue=queue, command_queue=commnad_queue,
response_queue=response_queue,
awaiting_response_identifier_list=awaiting_response_identifier_list,
heartbeat_interval=heartbeat_interval, heartbeat_interval=heartbeat_interval,
) )
except OSError: except OSError:
@ -89,6 +103,8 @@ def enter_fsm(
root_logger: logging.Logger, root_logger: logging.Logger,
serial: Serial, serial: Serial,
command_queue: Queue, command_queue: Queue,
response_queue: Queue,
awaiting_response_identifier_list: List,
heartbeat_interval: float, heartbeat_interval: float,
): ):
logger = root_logger.getChild("FSM") logger = root_logger.getChild("FSM")
@ -161,6 +177,12 @@ def enter_fsm(
request.process_response( request.process_response(
response=received_response, response=received_response,
) )
if request.response_identifier in awaiting_response_identifier_list:
response_queue.put(received_response)
awaiting_response_identifier_list.remove(
request.response_identifier,
)
state = State.executing_command state = State.executing_command
break break
else: else:
@ -206,17 +228,26 @@ def dequeue_command(
return None return None
def enqueue_command( def execute_command(
role: str, role: str,
command: Command, command: Command,
): timeout: int = 2,
) -> Optional[Response]:
"""Add a command to the command queue """Add a command to the command queue
:param role: The role name :param role: The role name
:param command: The command to enqueue :param command: The command to enqueue
""" """
if isinstance(command, Request):
_awaiting_response_identifier_list[role].append(command.response_identifier)
_command_queue[role].put(command) _command_queue[role].put(command)
try:
return _response_queue[role].get(timeout=timeout)
except Empty:
return None
class CommandInterpretationError(Exception): class CommandInterpretationError(Exception):
"""Raised in case the command could not be interpreted""" """Raised in case the command could not be interpreted"""
@ -372,17 +403,23 @@ def _end_running_process():
def start_backgroup_process(): def start_backgroup_process():
global _process global _process
global _command_queue global _command_queue
global _manager
container = get_initialize_container() container = get_initialize_container()
_manager = Manager()
role_name: str role_name: str
for role_name in container.config.roles(): for role_name in container.config.roles():
_command_queue[role_name] = Queue() _command_queue[role_name] = Queue()
_response_queue[role_name] = Queue(maxsize=32)
_awaiting_response_identifier_list[role_name] = _manager.list()
_process[role_name] = Process( _process[role_name] = Process(
target=worker_process, target=worker_process,
args=( args=(
role_name, role_name,
_command_queue[role_name], _command_queue[role_name],
_response_queue[role_name],
_awaiting_response_identifier_list[role_name],
), ),
) )
_process[role_name].start() _process[role_name].start()

30
backend/monsun_backend/commands.py

@ -56,6 +56,10 @@ class Response(abc.ABC):
): ):
pass pass
@abc.abstractmethod
def as_dict(self) -> Dict:
pass
class Command(abc.ABC): class Command(abc.ABC):
def __init__( def __init__(
@ -208,6 +212,9 @@ class HeartbeatResponse(Response):
): ):
pass pass
def as_dict(self) -> Dict:
return dict()
class HeartbeatRequest(Request): class HeartbeatRequest(Request):
@property @property
@ -241,10 +248,9 @@ register_response(
class LEDResponse(Response): class LEDResponse(Response):
was_successful = True def __init__(self, *args, **kwargs) -> None:
self.was_successful = False
# def __init__(self) -> None: super().__init__(*args, **kwargs)
# super().__init__()
def unpack_payload( def unpack_payload(
self, self,
@ -252,6 +258,11 @@ class LEDResponse(Response):
): ):
self.was_successful = bool(data[0]) self.was_successful = bool(data[0])
def as_dict(self) -> Dict:
return {
"was_successful": self.was_successful,
}
class LEDRequest(Request): class LEDRequest(Request):
def __init__( def __init__(
@ -337,6 +348,11 @@ class GPResponse(Response):
): ):
self.was_successful = bool(data[0]) self.was_successful = bool(data[0])
def as_dict(self) -> Dict:
return {
"was_successful": self.was_successful,
}
class GPRequest(Request): class GPRequest(Request):
def __init__(self, command_id: Union[int, str], *args, **kwargs) -> None: def __init__(self, command_id: Union[int, str], *args, **kwargs) -> None:
@ -391,6 +407,12 @@ class PumpResponse(Response):
self.was_successful = bool(data[0]) self.was_successful = bool(data[0])
self.is_on = bool(data[1]) self.is_on = bool(data[1])
def as_dict(self) -> Dict:
return {
"was_successful": self.was_successful,
"is_on": self.is_on,
}
class PumpRequest(Request): class PumpRequest(Request):
def __init__(self, do: str, timeout: Union[int, str] = 60, *args, **kwargs) -> None: def __init__(self, do: str, timeout: Union[int, str] = 60, *args, **kwargs) -> None:

6
nucleo-wb55-ble/STM32_WPAN/App/app_ble.c

@ -37,7 +37,7 @@
/* Private includes ----------------------------------------------------------*/ /* Private includes ----------------------------------------------------------*/
/* USER CODE BEGIN Includes */ /* USER CODE BEGIN Includes */
#include "commands.h"
/* USER CODE END Includes */ /* USER CODE END Includes */
/* Private typedef -----------------------------------------------------------*/ /* Private typedef -----------------------------------------------------------*/
@ -485,7 +485,8 @@ void APP_BLE_Init( void )
/** /**
* Start to Advertise to be connected by P2P Client * Start to Advertise to be connected by P2P Client
*/ */
Adv_Request(APP_BLE_FAST_ADV); log_debug("APP_BLE_Init", "Start advertising...", 0);
Adv_Request(APP_BLE_FAST_ADV);
/* USER CODE BEGIN APP_BLE_Init_2 */ /* USER CODE BEGIN APP_BLE_Init_2 */
@ -524,6 +525,7 @@ SVCCTL_UserEvtFlowStatus_t SVCCTL_App_Notification( void *pckt )
} }
/* restart advertising */ /* restart advertising */
log_debug("SVCCTL_App_Notification", "Start advertising...", 0);
Adv_Request(APP_BLE_FAST_ADV); Adv_Request(APP_BLE_FAST_ADV);
/** /**

Loading…
Cancel
Save