Browse Source

backend: Support client/server simultaneously

ble
Andreas Berthoud 4 years ago
parent
commit
c351eb4794
  1. 20
      backend/monsun_backend/command_endpoint.py
  2. 71
      backend/monsun_backend/command_execution.py
  3. 2
      backend/monsun_backend/container.py
  4. 8
      config_example.yml

20
backend/monsun_backend/command_endpoint.py

@ -1,3 +1,5 @@
import logging
from flask import Response from flask import Response
from flask import blueprints from flask import blueprints
from flask import request from flask import request
@ -8,12 +10,17 @@ from .commands import CommandId
from .commands import get_command_id_from_name from .commands import get_command_id_from_name
from .commands import get_request_class from .commands import get_request_class
_logger = logging.getLogger(__name__)
bp = blueprints.Blueprint("command", __name__) bp = blueprints.Blueprint("command", __name__)
@bp.route("/command", methods=["POST", "GET"]) @bp.route("/<role>/command", methods=["POST", "GET"])
def command(): def command(role: str):
logger = _logger.getChild(f"{role}/command")
arguments = dict(request.args) arguments = dict(request.args)
logger.debug(f"arguments: {arguments}")
cmd = arguments.pop("cmd") cmd = arguments.pop("cmd")
try: try:
@ -26,6 +33,13 @@ def command():
except Exception: except Exception:
return Response(status=status.HTTP_400_BAD_REQUEST) return Response(status=status.HTTP_400_BAD_REQUEST)
enqueue_command(command) try:
enqueue_command(
role=role,
command=command,
)
except KeyError:
logger.error(f"role {role} does not exist")
return Response(status=status.HTTP_400_BAD_REQUEST)
return Response(status=status.HTTP_200_OK) return Response(status=status.HTTP_200_OK)

71
backend/monsun_backend/command_execution.py

@ -5,6 +5,7 @@ from enum import Enum
from multiprocessing import Process from multiprocessing import Process
from multiprocessing import Queue from multiprocessing import Queue
from struct import unpack from struct import unpack
from typing import Dict
from typing import List from typing import List
from typing import Optional from typing import Optional
from typing import Sequence from typing import Sequence
@ -12,6 +13,8 @@ from typing import Tuple
from serial import Serial from serial import Serial
from backend.monsun_backend.util import log_function_call
from . import commands from . import commands
from .commands import Command from .commands import Command
from .commands import CommandId from .commands import CommandId
@ -21,7 +24,8 @@ from .container import get_initialize_container
_logger = logging.getLogger(__file__) _logger = logging.getLogger(__file__)
_command_queue: Queue = Queue() _command_queue: Dict[str, Queue] = dict()
"""role name: command queue"""
class State(Enum): class State(Enum):
@ -32,16 +36,24 @@ class State(Enum):
def worker_process( def worker_process(
role: str,
queue: Queue, queue: Queue,
): ):
logging.basicConfig( logging.basicConfig(
level=logging.DEBUG, level=logging.DEBUG,
format="[%(asctime)s] [%(name)-20s] [%(levelname)-8s] --- %(message)s", format="[%(asctime)s] [%(name)-20s] [%(levelname)-8s] --- %(message)s",
) )
logger = logging.getLogger("Command Loop") root_logger = logging.getLogger(role)
root_logger.setLevel(logging.DEBUG)
logger = root_logger.getChild("worker_process")
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
container = get_initialize_container() container = get_initialize_container()
container.config.from_dict(
{
"role": role,
},
)
heartbeat_interval = container.config.heartbeat_interval() heartbeat_interval = container.config.heartbeat_interval()
serial_reconnection_wait_timeout = ( serial_reconnection_wait_timeout = (
container.config.serial_reconnection_wait_timeout() container.config.serial_reconnection_wait_timeout()
@ -55,6 +67,7 @@ def worker_process(
logger.info("connected with serial device") logger.info("connected with serial device")
connected = True connected = True
enter_fsm( enter_fsm(
root_logger=root_logger,
serial=serial, serial=serial,
command_queue=queue, command_queue=queue,
heartbeat_interval=heartbeat_interval, heartbeat_interval=heartbeat_interval,
@ -68,11 +81,12 @@ def worker_process(
def enter_fsm( def enter_fsm(
root_logger: logging.Logger,
serial: Serial, serial: Serial,
command_queue: Queue, command_queue: Queue,
heartbeat_interval: float, heartbeat_interval: float,
): ):
logger = logging.getLogger("FSM") logger = root_logger.getChild("FSM")
state = State.executing_command state = State.executing_command
current_command: Optional[Command] = None current_command: Optional[Command] = None
@ -110,7 +124,10 @@ def enter_fsm(
) )
else: else:
request: Request = current_command request: Request = current_command
commands_, responses = receive(serial=serial) commands_, responses = receive(
root_logger=root_logger,
serial=serial,
)
responses_received.extend(responses) responses_received.extend(responses)
for command in commands_: for command in commands_:
command_queue.put(command) command_queue.put(command)
@ -143,7 +160,10 @@ def enter_fsm(
continue continue
elif state == State.receiving_command: elif state == State.receiving_command:
commands_, responses = receive(serial=serial) commands_, responses = receive(
root_logger=root_logger,
serial=serial,
)
responses_received.extend(responses) responses_received.extend(responses)
for command in commands_: for command in commands_:
command_queue.put(command) command_queue.put(command)
@ -164,14 +184,23 @@ def dequeue_command(
return None return None
def enqueue_command(command: Command): def enqueue_command(
_command_queue.put(command) role: str,
command: Command,
):
"""Add a command to the command queue
:param role: The role name
:param command: The command to enqueue
"""
_command_queue[role].put(command)
def receive( def receive(
root_logger: logging.Logger,
serial: Serial, serial: Serial,
) -> Tuple[Sequence[Command], Sequence[Response]]: ) -> Tuple[Sequence[Command], Sequence[Response]]:
logger = logging.getLogger("receive_and_log") logger = root_logger.getChild("receive_and_log")
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
commands_received: List[Command] = list() commands_received: List[Command] = list()
responses_received: List[Response] = list() responses_received: List[Response] = list()
@ -219,21 +248,33 @@ def receive(
return commands_received, responses_received return commands_received, responses_received
_process: Optional[Process] = None _process: Dict[str, Process] = dict()
"""role name: process"""
def _end_running_process(): def _end_running_process():
if _process is not None: process: Process
_process.kill() for process in _process.values():
process.kill()
@log_function_call
def start_backgroup_process(): def start_backgroup_process():
_logger.info("start_backgroup_process called")
global _process global _process
global _command_queue
container = get_initialize_container()
_process = Process( role_name: str
for role_name in container.config.roles():
_command_queue[role_name] = Queue()
_process[role_name] = Process(
target=worker_process, target=worker_process,
args=(_command_queue,), args=(
role_name,
_command_queue[role_name],
),
) )
_process.start() _process[role_name].start()
atexit.register(_end_running_process) atexit.register(_end_running_process)

2
backend/monsun_backend/container.py

@ -21,7 +21,7 @@ class Container(containers.DeclarativeContainer):
serial = providers.Factory( serial = providers.Factory(
Serial, Serial,
port=config.device_id.required(), port=config.device_id[config.role].required(),
baudrate=config.baudrate.required(), baudrate=config.baudrate.required(),
) )

8
config_example.yml

@ -1 +1,7 @@
device_id: /dev/tty.usbmodem207E3283544E1 roles:
- server
- client
device_id:
server: /dev/tty.usbmodem207E3283544E1
client: /dev/tty.usbmodem2067368F32521

Loading…
Cancel
Save