import atexit import logging import os import time from dataclasses import dataclass from enum import Enum from multiprocessing import Process from multiprocessing import Queue from struct import pack from struct import unpack from typing import Optional from flask import Response from flask import blueprints from flask import request from flask_api import status from serial import Serial from .container import get_container from .container import initialize_container bp = blueprints.Blueprint("command", __name__) _process: Optional[Process] = None _queue: Queue = Queue() _logger = logging.getLogger(__name__) @bp.route("/command", methods=["POST", "GET"]) def command(): logger = logging.getLogger("test") command_id = request.args.get("command-id") if _queue is not None and command_id is not None: logger.info(f"put in queue: {command_id}") _queue.put(command_id) return Response(status=status.HTTP_200_OK) def _end_running_process(): if _process is not None: _process.kill() def worker_process( queue: Queue, config, # container: Container ): logging.basicConfig( level=logging.DEBUG, format="[%(asctime)s] [%(name)-20s] [%(levelname)-8s] --- %(message)s", ) logger = logging.getLogger("worker_process") logger.setLevel(logging.INFO) initialize_container(config) container = get_container() counter = 1 with container.serial() as serial: while True: logger.debug(f"Ping {counter} process_id={os.getpid()}") counter += 1 time.sleep(0.01) receive_and_log( serial=serial, header_size=container.config.header_size(), ) if queue is not None: while not queue.empty(): command_id = queue.get() payload = "test value".encode() length = len(payload) data = pack( ">BHB" + "B" * length + "B", int(command_id), length, 0, *list(payload), 0xFF, ) serial.write(data) class CommandId(Enum): command_none = 0 command_log = 0xFF @dataclass class LogCommand: """Command ID: command_log""" level: int message: str HEADER_SIZE = 1 # log level def __init__( self, data: bytes, ) -> None: self._logger = logging.getLogger(self.__class__.__name__) self.received_logger = logging.getLogger("stm32wb55") self.received_logger.setLevel(logging.DEBUG) self._logger.setLevel(logging.INFO) level = int(data[0]) self._logger.debug(f"level: {level}") message = data[self.HEADER_SIZE :] self._logger.debug("Message: " + str(message)) self.level = level self.message = message.decode() def execute(self): self.received_logger.log(level=self.level, msg=self.message) def receive_and_log( serial: Serial, header_size: int, ): logger = logging.getLogger("receive_and_log") logger.setLevel(logging.INFO) bytes_read = serial.read(serial.in_waiting) while bytes_read: logger.debug(f"bytes: {bytes_read.hex()}") command_id_int, data_length, _ = unpack(">BHB", bytes_read[:header_size]) payload = bytes(bytes_read[header_size : header_size + data_length]) stop_byte = bytes_read[header_size + data_length] try: command_id = CommandId(command_id_int) except ValueError: logger.error( f"invalid command {command_id_int} with payload {str(payload)}", ) bytes_read = bytes_read[header_size + data_length + 1 :] continue if command_id == CommandId.command_log: command = LogCommand( data=payload, ) command.execute() else: return logger.debug(f"stop_byte: {stop_byte}") assert stop_byte == 0xFF bytes_read = bytes_read[header_size + data_length + 1 :] def start_backgroup_process(): _logger.warning("start_backgroup_process called") global _process _process = Process( target=worker_process, args=( _queue, get_container().config(), ), ) _process.start() atexit.register(_end_running_process)