You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

328 lines
7.4 KiB

import abc
import logging
from dataclasses import dataclass
from enum import Enum
from random import randint
from struct import pack
from struct import unpack
from typing import Union
from serial import Serial
class CommandId(Enum):
command_none = 0
command_log = 0x1
# even numbers are requests with optional response with ID = request ID + 1
command_heartbeat_request = 0x2
command_heartbeat_response = 0x3
command_led_request = 0x4
command_led_response = 0x5
command_gp_request = 0x6
command_gp_response = 0x7
def get_command_id_from_name(name: str) -> CommandId:
return {
"log": CommandId.command_log,
"led": CommandId.command_led_request,
"gp": CommandId.command_gp_request,
}[name]
def get_request_class(
command_id: CommandId,
):
return {
CommandId.command_log: LogCommand,
CommandId.command_heartbeat_request: HeartbeatRequest,
CommandId.command_led_request: LEDRequest,
CommandId.command_gp_request: GPRequest,
}[command_id]
def get_response_class(
command_id: CommandId,
):
return {
CommandId.command_heartbeat_response: HeartbeatResponse,
CommandId.command_led_response: LEDResponse,
CommandId.command_gp_response: GPResponse,
}[command_id]
class Response(abc.ABC):
identifier: int
def __init__(
self,
data: bytes,
) -> None:
self._logger = logging.getLogger(self.__class__.__name__)
self.identifier = unpack(">H", data[:2])[0]
self.unpack_payload(data[2:])
@abc.abstractmethod
def unpack_payload(
self,
data: bytes,
):
pass
class Command(abc.ABC):
def __init__(self) -> None:
self._logger = logging.getLogger(self.__class__.__name__)
@property
@abc.abstractmethod
def identifier(self) -> CommandId:
pass
@abc.abstractmethod
def execute(
self,
serial: Serial,
):
pass
def send_command(self, payload: bytes, serial: Serial):
length = len(payload)
data = pack(
">BHB" + "B" * length + "B",
int(self.identifier.value),
length,
0,
*list(payload),
0xFF,
)
serial.write(data)
class Request(Command):
def __init__(self) -> None:
super().__init__()
self.response_identifier = randint(0, pow(2, 16) - 1)
@property
@abc.abstractmethod
def timeout(self) -> float:
pass
@abc.abstractmethod
def process_response(
self,
response: Response,
):
pass
def send_command(self, payload: bytes, serial: Serial):
response_identifier_header = pack(">H", self.response_identifier)
super().send_command(
payload=response_identifier_header + payload,
serial=serial,
)
@dataclass
class LogCommand(Command):
"""Command ID: command_log"""
level: int
message: str
HEADER_SIZE = 2 # log level + logger name length
def __init__(
self,
data: bytes,
) -> None:
super().__init__()
self._logger.setLevel(logging.INFO)
level = int(data[0])
logger_name_length = int(data[1])
self._logger.debug(f"level: {level}")
self._logger.debug(f"logger_name_length: {logger_name_length}")
logger_name = data[self.HEADER_SIZE : self.HEADER_SIZE + logger_name_length]
message = data[self.HEADER_SIZE + logger_name_length :]
self._logger.debug("logger_name " + str(logger_name))
self._logger.debug("Message: " + str(message))
self.received_logger = logging.getLogger(logger_name.decode())
self.received_logger.setLevel(logging.DEBUG)
self.level = level
self.message = message.decode()
@property
def identifier(self) -> CommandId:
return CommandId.command_log
def execute(
self,
serial: Serial,
):
self.received_logger.log(level=self.level, msg=self.message)
class HeartbeatResponse(Response):
def unpack_payload(
self,
data: bytes,
):
pass
class HeartbeatRequest(Request):
@property
def identifier(self) -> CommandId:
return CommandId.command_heartbeat_request
@property
def timeout(self) -> float:
return 0.1
def process_response(self, response: Response):
if not isinstance(response, HeartbeatResponse):
raise TypeError(f"{response} is not a {HeartbeatResponse}")
def execute(self, serial: Serial):
self.send_command(
payload=bytes(),
serial=serial,
)
class LEDResponse(Response):
was_successful = True
# def __init__(self) -> None:
# super().__init__()
def unpack_payload(
self,
data: bytes,
):
self.was_successful = bool(data[0])
if self.was_successful:
self._logger.debug("LED command was successful")
else:
self._logger.debug("LED command was not successful")
class LEDRequest(Request):
def __init__(
self,
id: Union[int, str],
command: Union[int, str],
) -> None:
"""
led_id
-------
0: green
1: red
2: blue
led_command
--------
0: off
1: on
2: toggle
"""
super().__init__()
try:
self.led_id = int(id)
except ValueError:
self.led_id = {
"green": 0,
"red": 1,
"blue": 2,
}[str(id)]
try:
self.led_command = int(command)
except ValueError:
self.led_command = {
"off": 0,
"on": 1,
"toggle": 2,
}[str(command)]
@property
def identifier(self) -> CommandId:
return CommandId.command_led_request
@property
def timeout(self) -> float:
return 0.1
def process_response(self, response: Response):
if not isinstance(response, LEDResponse):
raise TypeError(f"{response} is not a {LEDResponse}")
def execute(self, serial: Serial):
payload = pack(
">BB",
self.led_id,
self.led_command,
)
self.send_command(
payload=payload,
serial=serial,
)
class GPResponse(Response):
was_successful = True
def unpack_payload(
self,
data: bytes,
):
self.was_successful = bool(data[0])
if self.was_successful:
self._logger.debug("GP command was successful")
else:
self._logger.debug("GP command was not successful")
class GPRequest(Request):
def __init__(
self,
command_id: Union[int, str],
) -> None:
super().__init__()
self.command_id = int(command_id)
@property
def identifier(self) -> CommandId:
return CommandId.command_gp_request
@property
def timeout(self) -> float:
return 0.1
def process_response(self, response: Response):
if not isinstance(response, GPResponse):
raise TypeError(f"{response} is not a {GPResponse}")
def execute(self, serial: Serial):
payload = pack(
">B",
self.command_id,
)
self.send_command(
payload=payload,
serial=serial,
)