|
|
|
@ -5,6 +5,8 @@ from enum import Enum |
|
|
|
from random import randint |
|
|
|
from struct import pack |
|
|
|
from struct import unpack |
|
|
|
from typing import Dict |
|
|
|
from typing import Type |
|
|
|
from typing import Union |
|
|
|
|
|
|
|
from serial import Serial |
|
|
|
@ -31,35 +33,6 @@ class CommandTarget(Enum): |
|
|
|
server = 1 |
|
|
|
|
|
|
|
|
|
|
|
def get_command_id_from_name(name: str) -> CommandId: |
|
|
|
return { |
|
|
|
"log": CommandId.command_log, |
|
|
|
"led": CommandId.command_led_request, |
|
|
|
"gp": CommandId.command_gp_request, |
|
|
|
}[name] |
|
|
|
|
|
|
|
|
|
|
|
def get_request_class( |
|
|
|
command_id: CommandId, |
|
|
|
): |
|
|
|
return { |
|
|
|
CommandId.command_log: LogCommand, |
|
|
|
CommandId.command_heartbeat_request: HeartbeatRequest, |
|
|
|
CommandId.command_led_request: LEDRequest, |
|
|
|
CommandId.command_gp_request: GPRequest, |
|
|
|
}[command_id] |
|
|
|
|
|
|
|
|
|
|
|
def get_response_class( |
|
|
|
command_id: CommandId, |
|
|
|
): |
|
|
|
return { |
|
|
|
CommandId.command_heartbeat_response: HeartbeatResponse, |
|
|
|
CommandId.command_led_response: LEDResponse, |
|
|
|
CommandId.command_gp_response: GPResponse, |
|
|
|
}[command_id] |
|
|
|
|
|
|
|
|
|
|
|
class Response(abc.ABC): |
|
|
|
|
|
|
|
identifier: int |
|
|
|
@ -148,6 +121,43 @@ class Request(Command): |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
__command_alias: Dict[str, CommandId] = dict() |
|
|
|
__request_by_id: Dict[CommandId, Type[Request]] = dict() |
|
|
|
__response_by_id: Dict[CommandId, Type[Response]] = dict() |
|
|
|
|
|
|
|
|
|
|
|
def get_command_id_from_name(name: str) -> CommandId: |
|
|
|
return __command_alias[name] |
|
|
|
|
|
|
|
|
|
|
|
def get_request_class( |
|
|
|
command_id: CommandId, |
|
|
|
): |
|
|
|
return __request_by_id[command_id] |
|
|
|
|
|
|
|
|
|
|
|
def get_response_class( |
|
|
|
command_id: CommandId, |
|
|
|
): |
|
|
|
return __response_by_id[command_id] |
|
|
|
|
|
|
|
|
|
|
|
def register_request( |
|
|
|
cls: Type[Request], |
|
|
|
command_id: CommandId, |
|
|
|
alias: str, |
|
|
|
): |
|
|
|
__command_alias[alias] = command_id |
|
|
|
__request_by_id[command_id] = cls |
|
|
|
|
|
|
|
|
|
|
|
def register_response( |
|
|
|
cls: Type[Response], |
|
|
|
command_id: CommandId, |
|
|
|
): |
|
|
|
__response_by_id[command_id] = cls |
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
|
class LogCommand(Command): |
|
|
|
"""Command ID: command_log""" |
|
|
|
@ -216,6 +226,17 @@ class HeartbeatRequest(Request): |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
register_request( |
|
|
|
cls=HeartbeatRequest, |
|
|
|
command_id=CommandId.command_heartbeat_request, |
|
|
|
alias="hp", |
|
|
|
) |
|
|
|
register_response( |
|
|
|
cls=HeartbeatResponse, |
|
|
|
command_id=CommandId.command_heartbeat_response, |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
class LEDResponse(Response): |
|
|
|
was_successful = True |
|
|
|
|
|
|
|
@ -293,6 +314,17 @@ class LEDRequest(Request): |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
register_request( |
|
|
|
cls=LEDRequest, |
|
|
|
command_id=CommandId.command_led_request, |
|
|
|
alias="led", |
|
|
|
) |
|
|
|
register_response( |
|
|
|
cls=LEDResponse, |
|
|
|
command_id=CommandId.command_led_response, |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
class GPResponse(Response): |
|
|
|
was_successful = True |
|
|
|
|
|
|
|
@ -333,3 +365,14 @@ class GPRequest(Request): |
|
|
|
payload=payload, |
|
|
|
serial=serial, |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
register_request( |
|
|
|
cls=GPRequest, |
|
|
|
command_id=CommandId.command_gp_request, |
|
|
|
alias="gp", |
|
|
|
) |
|
|
|
register_response( |
|
|
|
cls=GPResponse, |
|
|
|
command_id=CommandId.command_gp_response, |
|
|
|
) |
|
|
|
|