You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
161 lines
3.9 KiB
161 lines
3.9 KiB
import logging
|
|
import time
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
from typing import Sequence
|
|
|
|
import serial
|
|
|
|
# DEVICE_ID = "/dev/tty.usbmodem207E3283544E1"
|
|
DEVICE_ID = "/dev/tty.usbmodem2067368F32521"
|
|
BAUDRATE = 115200
|
|
|
|
logging.basicConfig(
|
|
level=logging.DEBUG,
|
|
format="[%(asctime)s] [%(levelname)-8s] --- %(message)s",
|
|
)
|
|
_logger = logging.getLogger(__name__)
|
|
received_logger = _logger.getChild("received")
|
|
received_logger.setLevel(logging.DEBUG)
|
|
|
|
logging.DEBUG
|
|
|
|
HEADER_SIZE = 4
|
|
|
|
|
|
class CommandId(Enum):
|
|
command_none = 0
|
|
command_log = 0xFF
|
|
|
|
|
|
@dataclass
|
|
class CommandMeta:
|
|
command_id: CommandId
|
|
data_length: int
|
|
|
|
|
|
@dataclass
|
|
class LogCommand:
|
|
"""Command ID: command_log"""
|
|
|
|
level: int
|
|
message: str
|
|
|
|
HEADER_SIZE = 1 # log level
|
|
|
|
def __init__(
|
|
self,
|
|
data: bytes,
|
|
) -> None:
|
|
self._logger = _logger.getChild(self.__class__.__name__)
|
|
|
|
self._logger.setLevel(logging.INFO)
|
|
|
|
level = int(data[0])
|
|
self._logger.debug(f"level: {level}")
|
|
|
|
message = data[self.HEADER_SIZE :]
|
|
self._logger.debug("Message: " + str(message))
|
|
|
|
self.level = level
|
|
self.message = message.decode()
|
|
|
|
def execute(self):
|
|
received_logger.log(level=self.level, msg=self.message)
|
|
|
|
|
|
def write_bytes(
|
|
serial: serial.Serial,
|
|
values: Sequence[int],
|
|
):
|
|
data = bytearray(values)
|
|
serial.write(data)
|
|
|
|
|
|
def echo(
|
|
serial_connection: serial.Serial,
|
|
):
|
|
echo_logger = _logger.getChild("echo")
|
|
serial_connection.write(b"Hello echo")
|
|
time.sleep(0.1)
|
|
echo_logger.info(
|
|
f"received: {serial_connection.read(serial_connection.in_waiting)}",
|
|
)
|
|
|
|
|
|
def receive_and_log(
|
|
serial_connection: serial.Serial,
|
|
):
|
|
logger = _logger.getChild("receive_and_log")
|
|
logger.setLevel(logging.INFO)
|
|
|
|
bytes_read = serial_connection.read(serial_connection.in_waiting)
|
|
|
|
while bytes_read:
|
|
|
|
logger.debug(f"bytes: {bytes_read.hex()}")
|
|
# for index in range(4):
|
|
# logger.debug(f"byte{index}: {bytes_read[index]}")
|
|
|
|
command_id = int(bytes_read[0])
|
|
logger.debug(f"command_id: {command_id}")
|
|
|
|
data_length = (int(bytes_read[1]) << 4) + int(bytes_read[2])
|
|
logger.debug(f"data_length: {data_length}")
|
|
|
|
meta = CommandMeta(
|
|
command_id=CommandId(command_id),
|
|
data_length=data_length,
|
|
)
|
|
|
|
if meta.command_id == CommandId.command_log:
|
|
command = LogCommand(
|
|
data=bytes_read[HEADER_SIZE : HEADER_SIZE + meta.data_length],
|
|
)
|
|
command.execute()
|
|
else:
|
|
return
|
|
|
|
stop_byte = bytes_read[HEADER_SIZE + meta.data_length]
|
|
logger.debug(f"stop_byte: {stop_byte}")
|
|
assert stop_byte == 0xFF
|
|
|
|
bytes_read = bytes_read[HEADER_SIZE + meta.data_length + 1 :]
|
|
|
|
|
|
def main():
|
|
with serial.Serial(DEVICE_ID, BAUDRATE) as serial_connection:
|
|
_logger.info(f"Connection open with: {serial_connection.name}")
|
|
|
|
echo(serial_connection)
|
|
|
|
while True:
|
|
receive_and_log(serial_connection)
|
|
time.sleep(0.05)
|
|
|
|
return
|
|
delay = 0.1
|
|
while True:
|
|
_logger.info("Set blue LED")
|
|
write_bytes(serial_connection, [1, 1])
|
|
time.sleep(delay)
|
|
_logger.info("Set green LED")
|
|
write_bytes(serial_connection, [2, 1])
|
|
time.sleep(delay)
|
|
_logger.info("Set red LED")
|
|
write_bytes(serial_connection, [3, 1])
|
|
time.sleep(delay)
|
|
|
|
_logger.info("Reset blue LED")
|
|
write_bytes(serial_connection, [1, 0])
|
|
time.sleep(delay)
|
|
_logger.info("Reset green LED")
|
|
write_bytes(serial_connection, [2, 0])
|
|
time.sleep(delay)
|
|
_logger.info("Reset red LED")
|
|
write_bytes(serial_connection, [3, 0])
|
|
time.sleep(delay)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|