Browse Source

backend: Support client/server simultaneously

Andreas Berthoud 4 years ago
parent
commit
b6f0cdabb0
  1. 20
      backend/monsun_backend/command_endpoint.py
  2. 75
      backend/monsun_backend/command_execution.py
  3. 2
      backend/monsun_backend/container.py
  4. 8
      config_example.yml

20
backend/monsun_backend/command_endpoint.py

@ -1,3 +1,5 @@
import logging
from flask import Response
from flask import blueprints
from flask import request
@ -8,12 +10,17 @@ from .commands import CommandId
from .commands import get_command_id_from_name
from .commands import get_request_class
_logger = logging.getLogger(__name__)
bp = blueprints.Blueprint("command", __name__)
@bp.route("/command", methods=["POST", "GET"])
def command():
@bp.route("/<role>/command", methods=["POST", "GET"])
def command(role: str):
logger = _logger.getChild(f"{role}/command")
arguments = dict(request.args)
logger.debug(f"arguments: {arguments}")
cmd = arguments.pop("cmd")
try:
@ -26,6 +33,13 @@ def command():
except Exception:
return Response(status=status.HTTP_400_BAD_REQUEST)
enqueue_command(command)
try:
enqueue_command(
role=role,
command=command,
)
except KeyError:
logger.error(f"role {role} does not exist")
return Response(status=status.HTTP_400_BAD_REQUEST)
return Response(status=status.HTTP_200_OK)

75
backend/monsun_backend/command_execution.py

@ -5,6 +5,7 @@ from enum import Enum
from multiprocessing import Process
from multiprocessing import Queue
from struct import unpack
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence
@ -12,6 +13,8 @@ from typing import Tuple
from serial import Serial
from backend.monsun_backend.util import log_function_call
from . import commands
from .commands import Command
from .commands import CommandId
@ -21,7 +24,8 @@ from .container import get_initialize_container
_logger = logging.getLogger(__file__)
_command_queue: Queue = Queue()
_command_queue: Dict[str, Queue] = dict()
"""role name: command queue"""
class State(Enum):
@ -32,16 +36,24 @@ class State(Enum):
def worker_process(
role: str,
queue: Queue,
):
logging.basicConfig(
level=logging.DEBUG,
format="[%(asctime)s] [%(name)-20s] [%(levelname)-8s] --- %(message)s",
)
logger = logging.getLogger("Command Loop")
root_logger = logging.getLogger(role)
root_logger.setLevel(logging.DEBUG)
logger = root_logger.getChild("worker_process")
logger.setLevel(logging.INFO)
container = get_initialize_container()
container.config.from_dict(
{
"role": role,
},
)
heartbeat_interval = container.config.heartbeat_interval()
serial_reconnection_wait_timeout = (
container.config.serial_reconnection_wait_timeout()
@ -55,6 +67,7 @@ def worker_process(
logger.info("connected with serial device")
connected = True
enter_fsm(
root_logger=root_logger,
serial=serial,
command_queue=queue,
heartbeat_interval=heartbeat_interval,
@ -68,11 +81,12 @@ def worker_process(
def enter_fsm(
root_logger: logging.Logger,
serial: Serial,
command_queue: Queue,
heartbeat_interval: float,
):
logger = logging.getLogger("FSM")
logger = root_logger.getChild("FSM")
state = State.executing_command
current_command: Optional[Command] = None
@ -110,7 +124,10 @@ def enter_fsm(
)
else:
request: Request = current_command
commands_, responses = receive(serial=serial)
commands_, responses = receive(
root_logger=root_logger,
serial=serial,
)
responses_received.extend(responses)
for command in commands_:
command_queue.put(command)
@ -143,7 +160,10 @@ def enter_fsm(
continue
elif state == State.receiving_command:
commands_, responses = receive(serial=serial)
commands_, responses = receive(
root_logger=root_logger,
serial=serial,
)
responses_received.extend(responses)
for command in commands_:
command_queue.put(command)
@ -164,14 +184,23 @@ def dequeue_command(
return None
def enqueue_command(command: Command):
_command_queue.put(command)
def enqueue_command(
role: str,
command: Command,
):
"""Add a command to the command queue
:param role: The role name
:param command: The command to enqueue
"""
_command_queue[role].put(command)
def receive(
root_logger: logging.Logger,
serial: Serial,
) -> Tuple[Sequence[Command], Sequence[Response]]:
logger = logging.getLogger("receive_and_log")
logger = root_logger.getChild("receive_and_log")
logger.setLevel(logging.INFO)
commands_received: List[Command] = list()
responses_received: List[Response] = list()
@ -219,21 +248,33 @@ def receive(
return commands_received, responses_received
_process: Optional[Process] = None
_process: Dict[str, Process] = dict()
"""role name: process"""
def _end_running_process():
if _process is not None:
_process.kill()
process: Process
for process in _process.values():
process.kill()
@log_function_call
def start_backgroup_process():
_logger.info("start_backgroup_process called")
global _process
global _command_queue
container = get_initialize_container()
role_name: str
for role_name in container.config.roles():
_command_queue[role_name] = Queue()
_process[role_name] = Process(
target=worker_process,
args=(
role_name,
_command_queue[role_name],
),
)
_process[role_name].start()
_process = Process(
target=worker_process,
args=(_command_queue,),
)
_process.start()
atexit.register(_end_running_process)

2
backend/monsun_backend/container.py

@ -21,7 +21,7 @@ class Container(containers.DeclarativeContainer):
serial = providers.Factory(
Serial,
port=config.device_id.required(),
port=config.device_id[config.role].required(),
baudrate=config.baudrate.required(),
)

8
config_example.yml

@ -1 +1,7 @@
device_id: /dev/tty.usbmodem207E3283544E1
roles:
- server
- client
device_id:
server: /dev/tty.usbmodem207E3283544E1
client: /dev/tty.usbmodem2067368F32521

Loading…
Cancel
Save