You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

161 lines
3.9 KiB

import logging
import time
from dataclasses import dataclass
from enum import Enum
from typing import Sequence
import serial
# DEVICE_ID = "/dev/tty.usbmodem207E3283544E1"
DEVICE_ID = "/dev/tty.usbmodem2067368F32521"
BAUDRATE = 115200
logging.basicConfig(
level=logging.DEBUG,
format="[%(asctime)s] [%(levelname)-8s] --- %(message)s",
)
_logger = logging.getLogger(__name__)
received_logger = _logger.getChild("received")
received_logger.setLevel(logging.DEBUG)
logging.DEBUG
HEADER_SIZE = 4
class CommandId(Enum):
command_none = 0
command_log = 0xFF
@dataclass
class CommandMeta:
command_id: CommandId
data_length: int
@dataclass
class LogCommand:
"""Command ID: command_log"""
level: int
message: str
HEADER_SIZE = 1 # log level
def __init__(
self,
data: bytes,
) -> None:
self._logger = _logger.getChild(self.__class__.__name__)
self._logger.setLevel(logging.INFO)
level = int(data[0])
self._logger.debug(f"level: {level}")
message = data[self.HEADER_SIZE :]
self._logger.debug("Message: " + str(message))
self.level = level
self.message = message.decode()
def execute(self):
received_logger.log(level=self.level, msg=self.message)
def write_bytes(
serial: serial.Serial,
values: Sequence[int],
):
data = bytearray(values)
serial.write(data)
def echo(
serial_connection: serial.Serial,
):
echo_logger = _logger.getChild("echo")
serial_connection.write(b"Hello echo")
time.sleep(0.1)
echo_logger.info(
f"received: {serial_connection.read(serial_connection.in_waiting)}",
)
def receive_and_log(
serial_connection: serial.Serial,
):
logger = _logger.getChild("receive_and_log")
logger.setLevel(logging.INFO)
bytes_read = serial_connection.read(serial_connection.in_waiting)
while bytes_read:
logger.debug(f"bytes: {bytes_read.hex()}")
# for index in range(4):
# logger.debug(f"byte{index}: {bytes_read[index]}")
command_id = int(bytes_read[0])
logger.debug(f"command_id: {command_id}")
data_length = (int(bytes_read[1]) << 4) + int(bytes_read[2])
logger.debug(f"data_length: {data_length}")
meta = CommandMeta(
command_id=CommandId(command_id),
data_length=data_length,
)
if meta.command_id == CommandId.command_log:
command = LogCommand(
data=bytes_read[HEADER_SIZE : HEADER_SIZE + meta.data_length],
)
command.execute()
else:
return
stop_byte = bytes_read[HEADER_SIZE + meta.data_length]
logger.debug(f"stop_byte: {stop_byte}")
assert stop_byte == 0xFF
bytes_read = bytes_read[HEADER_SIZE + meta.data_length + 1 :]
def main():
with serial.Serial(DEVICE_ID, BAUDRATE) as serial_connection:
_logger.info(f"Connection open with: {serial_connection.name}")
echo(serial_connection)
while True:
receive_and_log(serial_connection)
time.sleep(0.05)
return
delay = 0.1
while True:
_logger.info("Set blue LED")
write_bytes(serial_connection, [1, 1])
time.sleep(delay)
_logger.info("Set green LED")
write_bytes(serial_connection, [2, 1])
time.sleep(delay)
_logger.info("Set red LED")
write_bytes(serial_connection, [3, 1])
time.sleep(delay)
_logger.info("Reset blue LED")
write_bytes(serial_connection, [1, 0])
time.sleep(delay)
_logger.info("Reset green LED")
write_bytes(serial_connection, [2, 0])
time.sleep(delay)
_logger.info("Reset red LED")
write_bytes(serial_connection, [3, 0])
time.sleep(delay)
if __name__ == "__main__":
main()