From c351eb4794bf1a8e259ad97073ffff350de0c8e3 Mon Sep 17 00:00:00 2001 From: Andreas Berthoud Date: Mon, 19 Jul 2021 11:09:15 +0200 Subject: [PATCH] backend: Support client/server simultaneously --- backend/monsun_backend/command_endpoint.py | 20 +++++- backend/monsun_backend/command_execution.py | 75 ++++++++++++++++----- backend/monsun_backend/container.py | 2 +- config_example.yml | 8 ++- 4 files changed, 83 insertions(+), 22 deletions(-) diff --git a/backend/monsun_backend/command_endpoint.py b/backend/monsun_backend/command_endpoint.py index 194a659..92ff40a 100644 --- a/backend/monsun_backend/command_endpoint.py +++ b/backend/monsun_backend/command_endpoint.py @@ -1,3 +1,5 @@ +import logging + from flask import Response from flask import blueprints from flask import request @@ -8,12 +10,17 @@ from .commands import CommandId from .commands import get_command_id_from_name from .commands import get_request_class +_logger = logging.getLogger(__name__) + bp = blueprints.Blueprint("command", __name__) -@bp.route("/command", methods=["POST", "GET"]) -def command(): +@bp.route("//command", methods=["POST", "GET"]) +def command(role: str): + logger = _logger.getChild(f"{role}/command") + arguments = dict(request.args) + logger.debug(f"arguments: {arguments}") cmd = arguments.pop("cmd") try: @@ -26,6 +33,13 @@ def command(): except Exception: return Response(status=status.HTTP_400_BAD_REQUEST) - enqueue_command(command) + try: + enqueue_command( + role=role, + command=command, + ) + except KeyError: + logger.error(f"role {role} does not exist") + return Response(status=status.HTTP_400_BAD_REQUEST) return Response(status=status.HTTP_200_OK) diff --git a/backend/monsun_backend/command_execution.py b/backend/monsun_backend/command_execution.py index 1a33cf0..8f2a753 100644 --- a/backend/monsun_backend/command_execution.py +++ b/backend/monsun_backend/command_execution.py @@ -5,6 +5,7 @@ from enum import Enum from multiprocessing import Process from multiprocessing import Queue from struct import unpack +from typing import Dict from typing import List from typing import Optional from typing import Sequence @@ -12,6 +13,8 @@ from typing import Tuple from serial import Serial +from backend.monsun_backend.util import log_function_call + from . import commands from .commands import Command from .commands import CommandId @@ -21,7 +24,8 @@ from .container import get_initialize_container _logger = logging.getLogger(__file__) -_command_queue: Queue = Queue() +_command_queue: Dict[str, Queue] = dict() +"""role name: command queue""" class State(Enum): @@ -32,16 +36,24 @@ class State(Enum): def worker_process( + role: str, queue: Queue, ): logging.basicConfig( level=logging.DEBUG, format="[%(asctime)s] [%(name)-20s] [%(levelname)-8s] --- %(message)s", ) - logger = logging.getLogger("Command Loop") + root_logger = logging.getLogger(role) + root_logger.setLevel(logging.DEBUG) + logger = root_logger.getChild("worker_process") logger.setLevel(logging.INFO) container = get_initialize_container() + container.config.from_dict( + { + "role": role, + }, + ) heartbeat_interval = container.config.heartbeat_interval() serial_reconnection_wait_timeout = ( container.config.serial_reconnection_wait_timeout() @@ -55,6 +67,7 @@ def worker_process( logger.info("connected with serial device") connected = True enter_fsm( + root_logger=root_logger, serial=serial, command_queue=queue, heartbeat_interval=heartbeat_interval, @@ -68,11 +81,12 @@ def worker_process( def enter_fsm( + root_logger: logging.Logger, serial: Serial, command_queue: Queue, heartbeat_interval: float, ): - logger = logging.getLogger("FSM") + logger = root_logger.getChild("FSM") state = State.executing_command current_command: Optional[Command] = None @@ -110,7 +124,10 @@ def enter_fsm( ) else: request: Request = current_command - commands_, responses = receive(serial=serial) + commands_, responses = receive( + root_logger=root_logger, + serial=serial, + ) responses_received.extend(responses) for command in commands_: command_queue.put(command) @@ -143,7 +160,10 @@ def enter_fsm( continue elif state == State.receiving_command: - commands_, responses = receive(serial=serial) + commands_, responses = receive( + root_logger=root_logger, + serial=serial, + ) responses_received.extend(responses) for command in commands_: command_queue.put(command) @@ -164,14 +184,23 @@ def dequeue_command( return None -def enqueue_command(command: Command): - _command_queue.put(command) +def enqueue_command( + role: str, + command: Command, +): + """Add a command to the command queue + + :param role: The role name + :param command: The command to enqueue + """ + _command_queue[role].put(command) def receive( + root_logger: logging.Logger, serial: Serial, ) -> Tuple[Sequence[Command], Sequence[Response]]: - logger = logging.getLogger("receive_and_log") + logger = root_logger.getChild("receive_and_log") logger.setLevel(logging.INFO) commands_received: List[Command] = list() responses_received: List[Response] = list() @@ -219,21 +248,33 @@ def receive( return commands_received, responses_received -_process: Optional[Process] = None +_process: Dict[str, Process] = dict() +"""role name: process""" def _end_running_process(): - if _process is not None: - _process.kill() + process: Process + for process in _process.values(): + process.kill() +@log_function_call def start_backgroup_process(): - _logger.info("start_backgroup_process called") global _process + global _command_queue + container = get_initialize_container() + + role_name: str + for role_name in container.config.roles(): + _command_queue[role_name] = Queue() + + _process[role_name] = Process( + target=worker_process, + args=( + role_name, + _command_queue[role_name], + ), + ) + _process[role_name].start() - _process = Process( - target=worker_process, - args=(_command_queue,), - ) - _process.start() atexit.register(_end_running_process) diff --git a/backend/monsun_backend/container.py b/backend/monsun_backend/container.py index f410006..6175ab0 100644 --- a/backend/monsun_backend/container.py +++ b/backend/monsun_backend/container.py @@ -21,7 +21,7 @@ class Container(containers.DeclarativeContainer): serial = providers.Factory( Serial, - port=config.device_id.required(), + port=config.device_id[config.role].required(), baudrate=config.baudrate.required(), ) diff --git a/config_example.yml b/config_example.yml index 028ae0b..0054fb2 100644 --- a/config_example.yml +++ b/config_example.yml @@ -1 +1,7 @@ -device_id: /dev/tty.usbmodem207E3283544E1 +roles: + - server + - client + +device_id: + server: /dev/tty.usbmodem207E3283544E1 + client: /dev/tty.usbmodem2067368F32521