|
|
|
@ -2,8 +2,11 @@ import atexit |
|
|
|
import logging |
|
|
|
import time |
|
|
|
from enum import Enum |
|
|
|
from multiprocessing import Manager |
|
|
|
from multiprocessing import Process |
|
|
|
from multiprocessing import Queue |
|
|
|
from multiprocessing.managers import SyncManager |
|
|
|
from queue import Empty |
|
|
|
from struct import error |
|
|
|
from struct import unpack |
|
|
|
from typing import Dict |
|
|
|
@ -30,6 +33,13 @@ _logger = logging.getLogger(__file__) |
|
|
|
_command_queue: Dict[str, Queue] = dict() |
|
|
|
"""role name: command queue""" |
|
|
|
|
|
|
|
_response_queue: Dict[str, Queue] = dict() |
|
|
|
"""role name: response queue""" |
|
|
|
|
|
|
|
_manager: Optional[SyncManager] = None |
|
|
|
_awaiting_response_identifier_list: Dict[str, List] = dict() |
|
|
|
"""role name: request queue""" |
|
|
|
|
|
|
|
|
|
|
|
class State(Enum): |
|
|
|
heart_beat = 0x0 |
|
|
|
@ -40,7 +50,9 @@ class State(Enum): |
|
|
|
|
|
|
|
def worker_process( |
|
|
|
role: str, |
|
|
|
queue: Queue, |
|
|
|
commnad_queue: Queue, |
|
|
|
response_queue: Queue, |
|
|
|
awaiting_response_identifier_list: List, |
|
|
|
): |
|
|
|
logging.basicConfig( |
|
|
|
level=logging.DEBUG, |
|
|
|
@ -73,7 +85,9 @@ def worker_process( |
|
|
|
target=CommandTarget[role], |
|
|
|
root_logger=root_logger, |
|
|
|
serial=serial, |
|
|
|
command_queue=queue, |
|
|
|
command_queue=commnad_queue, |
|
|
|
response_queue=response_queue, |
|
|
|
awaiting_response_identifier_list=awaiting_response_identifier_list, |
|
|
|
heartbeat_interval=heartbeat_interval, |
|
|
|
) |
|
|
|
except OSError: |
|
|
|
@ -89,6 +103,8 @@ def enter_fsm( |
|
|
|
root_logger: logging.Logger, |
|
|
|
serial: Serial, |
|
|
|
command_queue: Queue, |
|
|
|
response_queue: Queue, |
|
|
|
awaiting_response_identifier_list: List, |
|
|
|
heartbeat_interval: float, |
|
|
|
): |
|
|
|
logger = root_logger.getChild("FSM") |
|
|
|
@ -161,6 +177,12 @@ def enter_fsm( |
|
|
|
request.process_response( |
|
|
|
response=received_response, |
|
|
|
) |
|
|
|
if request.response_identifier in awaiting_response_identifier_list: |
|
|
|
response_queue.put(received_response) |
|
|
|
awaiting_response_identifier_list.remove( |
|
|
|
request.response_identifier, |
|
|
|
) |
|
|
|
|
|
|
|
state = State.executing_command |
|
|
|
break |
|
|
|
else: |
|
|
|
@ -206,17 +228,26 @@ def dequeue_command( |
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
def enqueue_command( |
|
|
|
def execute_command( |
|
|
|
role: str, |
|
|
|
command: Command, |
|
|
|
): |
|
|
|
timeout: int = 2, |
|
|
|
) -> Optional[Response]: |
|
|
|
"""Add a command to the command queue |
|
|
|
|
|
|
|
:param role: The role name |
|
|
|
:param command: The command to enqueue |
|
|
|
""" |
|
|
|
if isinstance(command, Request): |
|
|
|
_awaiting_response_identifier_list[role].append(command.response_identifier) |
|
|
|
|
|
|
|
_command_queue[role].put(command) |
|
|
|
|
|
|
|
try: |
|
|
|
return _response_queue[role].get(timeout=timeout) |
|
|
|
except Empty: |
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
class CommandInterpretationError(Exception): |
|
|
|
"""Raised in case the command could not be interpreted""" |
|
|
|
@ -372,17 +403,23 @@ def _end_running_process(): |
|
|
|
def start_backgroup_process(): |
|
|
|
global _process |
|
|
|
global _command_queue |
|
|
|
global _manager |
|
|
|
container = get_initialize_container() |
|
|
|
|
|
|
|
_manager = Manager() |
|
|
|
role_name: str |
|
|
|
for role_name in container.config.roles(): |
|
|
|
_command_queue[role_name] = Queue() |
|
|
|
_response_queue[role_name] = Queue(maxsize=32) |
|
|
|
_awaiting_response_identifier_list[role_name] = _manager.list() |
|
|
|
|
|
|
|
_process[role_name] = Process( |
|
|
|
target=worker_process, |
|
|
|
args=( |
|
|
|
role_name, |
|
|
|
_command_queue[role_name], |
|
|
|
_response_queue[role_name], |
|
|
|
_awaiting_response_identifier_list[role_name], |
|
|
|
), |
|
|
|
) |
|
|
|
_process[role_name].start() |
|
|
|
|