Browse Source

backend: Echo commands send to STM32

Andreas Berthoud 5 years ago
parent
commit
f041fa4052
  1. 57
      backend/command.py

57
backend/command.py

@ -6,6 +6,8 @@ from dataclasses import dataclass
from enum import Enum
from multiprocessing import Process
from multiprocessing import Queue
from struct import pack
from struct import unpack
from typing import Optional
from flask import Response
@ -58,9 +60,9 @@ def worker_process(
counter = 1
with container.serial() as serial:
while True:
logger.info(f"Ping {counter} {os.getpid()}")
logger.debug(f"Ping {counter} process_id={os.getpid()}")
counter += 1
time.sleep(0.5)
time.sleep(0.1)
receive_and_log(
serial=serial,
header_size=container.config.header_size(),
@ -69,8 +71,19 @@ def worker_process(
if queue is not None:
while not queue.empty():
command_id = queue.get()
logger.debug(f"device_id {container.config.device_id()}")
logger.info(f"would execute command: {command_id}")
payload = "test value".encode()
length = len(payload)
data = pack(
">BHB" + "B" * length + "B",
int(command_id),
length,
0,
*list(payload),
0xFF,
)
serial.write(data)
class CommandId(Enum):
@ -78,12 +91,6 @@ class CommandId(Enum):
command_log = 0xFF
@dataclass
class CommandMeta:
command_id: CommandId
data_length: int
@dataclass
class LogCommand:
"""Command ID: command_log"""
@ -128,31 +135,31 @@ def receive_and_log(
while bytes_read:
logger.debug(f"bytes: {bytes_read.hex()}")
command_id_int, data_length, _ = unpack(">BHB", bytes_read[:header_size])
payload = bytes(bytes_read[header_size : header_size + data_length])
stop_byte = bytes_read[header_size + data_length]
try:
command_id = CommandId(command_id_int)
except ValueError:
logger.error(
f"invalid command {command_id_int} with payload {str(payload)}",
)
bytes_read = bytes_read[header_size + data_length + 1 :]
continue
command_id = int(bytes_read[0])
logger.debug(f"command_id: {command_id}")
data_length = (int(bytes_read[1]) << 4) + int(bytes_read[2])
logger.debug(f"data_length: {data_length}")
meta = CommandMeta(
command_id=CommandId(command_id),
data_length=data_length,
)
if meta.command_id == CommandId.command_log:
if command_id == CommandId.command_log:
command = LogCommand(
data=bytes_read[header_size : header_size + meta.data_length],
data=bytes_read[header_size : header_size + data_length],
)
command.execute()
else:
return
stop_byte = bytes_read[header_size + meta.data_length]
logger.debug(f"stop_byte: {stop_byte}")
assert stop_byte == 0xFF
bytes_read = bytes_read[header_size + meta.data_length + 1 :]
bytes_read = bytes_read[header_size + data_length + 1 :]
def start_backgroup_process():

Loading…
Cancel
Save