You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
272 lines
5.8 KiB
272 lines
5.8 KiB
import abc
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
from random import randint
|
|
from struct import pack
|
|
from struct import unpack
|
|
from typing import Union
|
|
|
|
from serial import Serial
|
|
|
|
|
|
class CommandId(Enum):
|
|
command_none = 0
|
|
command_log = 0x1
|
|
|
|
# even numbers are requests with optional response with ID = request ID + 1
|
|
|
|
command_heartbeat_request = 0x2
|
|
command_heartbeat_response = 0x3
|
|
|
|
command_led_request = 0x4
|
|
command_led_response = 0x5
|
|
|
|
|
|
def get_command_id_from_name(name: str) -> CommandId:
|
|
return {
|
|
"log": CommandId.command_log,
|
|
"led": CommandId.command_led_request,
|
|
}[name]
|
|
|
|
|
|
def get_request_class(
|
|
command_id: CommandId,
|
|
):
|
|
return {
|
|
CommandId.command_log: LogCommand,
|
|
CommandId.command_heartbeat_request: HeartbeatRequest,
|
|
CommandId.command_led_request: LEDRequest,
|
|
}[command_id]
|
|
|
|
|
|
def get_response_class(
|
|
command_id: CommandId,
|
|
):
|
|
return {
|
|
CommandId.command_heartbeat_response: HeartbeatResponse,
|
|
CommandId.command_led_response: LEDResponse,
|
|
}[command_id]
|
|
|
|
|
|
class Response(abc.ABC):
|
|
|
|
identifier: int
|
|
|
|
def __init__(
|
|
self,
|
|
data: bytes,
|
|
) -> None:
|
|
self._logger = logging.getLogger(self.__class__.__name__)
|
|
|
|
self.identifier = unpack(">H", data[:2])[0]
|
|
self.unpack_payload(data[2:])
|
|
|
|
@abc.abstractmethod
|
|
def unpack_payload(
|
|
self,
|
|
data: bytes,
|
|
):
|
|
pass
|
|
|
|
|
|
class Command(abc.ABC):
|
|
def __init__(self) -> None:
|
|
self._logger = logging.getLogger(self.__class__.__name__)
|
|
|
|
@property
|
|
@abc.abstractmethod
|
|
def identifier(self) -> CommandId:
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def execute(
|
|
self,
|
|
serial: Serial,
|
|
):
|
|
pass
|
|
|
|
def send_command(self, payload: bytes, serial: Serial):
|
|
length = len(payload)
|
|
data = pack(
|
|
">BHB" + "B" * length + "B",
|
|
int(self.identifier.value),
|
|
length,
|
|
0,
|
|
*list(payload),
|
|
0xFF,
|
|
)
|
|
|
|
serial.write(data)
|
|
|
|
|
|
class Request(Command):
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
|
|
self.response_identifier = randint(0, pow(2, 16) - 1)
|
|
|
|
@property
|
|
@abc.abstractmethod
|
|
def timeout(self) -> float:
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def process_response(
|
|
self,
|
|
response: Response,
|
|
):
|
|
pass
|
|
|
|
def send_command(self, payload: bytes, serial: Serial):
|
|
response_identifier_header = pack(">H", self.response_identifier)
|
|
|
|
super().send_command(
|
|
payload=response_identifier_header + payload,
|
|
serial=serial,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class LogCommand(Command):
|
|
"""Command ID: command_log"""
|
|
|
|
level: int
|
|
message: str
|
|
|
|
HEADER_SIZE = 1 # log level
|
|
|
|
def __init__(
|
|
self,
|
|
data: bytes,
|
|
) -> None:
|
|
self.received_logger = logging.getLogger("stm32wb55")
|
|
self.received_logger.setLevel(logging.DEBUG)
|
|
|
|
self._logger.setLevel(logging.INFO)
|
|
|
|
level = int(data[0])
|
|
self._logger.debug(f"level: {level}")
|
|
|
|
message = data[self.HEADER_SIZE :]
|
|
self._logger.debug("Message: " + str(message))
|
|
|
|
self.level = level
|
|
self.message = message.decode()
|
|
|
|
@property
|
|
def identifier(self) -> CommandId:
|
|
return CommandId.command_log
|
|
|
|
def execute(
|
|
self,
|
|
serial: Serial,
|
|
):
|
|
self.received_logger.log(level=self.level, msg=self.message)
|
|
|
|
|
|
class HeartbeatResponse(Response):
|
|
def unpack_payload(
|
|
self,
|
|
data: bytes,
|
|
):
|
|
pass
|
|
|
|
|
|
class HeartbeatRequest(Request):
|
|
@property
|
|
def identifier(self) -> CommandId:
|
|
return CommandId.command_heartbeat_request
|
|
|
|
@property
|
|
def timeout(self) -> float:
|
|
return 0.1
|
|
|
|
def process_response(self, response: Response):
|
|
if not isinstance(response, HeartbeatResponse):
|
|
raise TypeError(f"{response} is not a {HeartbeatResponse}")
|
|
|
|
def execute(self, serial: Serial):
|
|
self.send_command(
|
|
payload=bytes(),
|
|
serial=serial,
|
|
)
|
|
|
|
|
|
class LEDResponse(Response):
|
|
was_successful = True
|
|
|
|
# def __init__(self) -> None:
|
|
# super().__init__()
|
|
|
|
def unpack_payload(
|
|
self,
|
|
data: bytes,
|
|
):
|
|
self.was_successful = bool(data[0])
|
|
if self.was_successful:
|
|
self._logger.debug("LED command was successful")
|
|
else:
|
|
self._logger.debug("LED command was not successful")
|
|
|
|
|
|
class LEDRequest(Request):
|
|
def __init__(
|
|
self,
|
|
id: Union[int, str],
|
|
command: Union[int, str],
|
|
) -> None:
|
|
"""
|
|
led_id
|
|
-------
|
|
0: green
|
|
1: red
|
|
2: blue
|
|
|
|
led_command
|
|
--------
|
|
0: off
|
|
1: on
|
|
2: toggle
|
|
"""
|
|
super().__init__()
|
|
try:
|
|
self.led_id = int(id)
|
|
except ValueError:
|
|
self.led_id = {
|
|
"green": 0,
|
|
"red": 1,
|
|
"blue": 2,
|
|
}[str(id)]
|
|
|
|
try:
|
|
self.led_command = int(command)
|
|
except ValueError:
|
|
self.led_command = {
|
|
"off": 0,
|
|
"on": 1,
|
|
"toggle": 2,
|
|
}[str(command)]
|
|
|
|
@property
|
|
def identifier(self) -> CommandId:
|
|
return CommandId.command_led_request
|
|
|
|
@property
|
|
def timeout(self) -> float:
|
|
return 0.1
|
|
|
|
def process_response(self, response: Response):
|
|
if not isinstance(response, LEDResponse):
|
|
raise TypeError(f"{response} is not a {LEDResponse}")
|
|
|
|
def execute(self, serial: Serial):
|
|
payload = pack(
|
|
">BB",
|
|
self.led_id,
|
|
self.led_command,
|
|
)
|
|
self.send_command(
|
|
payload=payload,
|
|
serial=serial,
|
|
)
|
|
|