import logging import time from dataclasses import dataclass from enum import Enum from typing import Sequence import serial # DEVICE_ID = "/dev/tty.usbmodem207E3283544E1" DEVICE_ID = "/dev/tty.usbmodem2067368F32521" BAUDRATE = 115200 logging.basicConfig( level=logging.DEBUG, format="[%(asctime)s] [%(levelname)-8s] --- %(message)s", ) _logger = logging.getLogger(__name__) received_logger = _logger.getChild("received") received_logger.setLevel(logging.DEBUG) logging.DEBUG HEADER_SIZE = 4 class CommandId(Enum): command_none = 0 command_log = 0xFF @dataclass class CommandMeta: command_id: CommandId data_length: int @dataclass class LogCommand: """Command ID: command_log""" level: int message: str HEADER_SIZE = 1 # log level def __init__( self, data: bytes, ) -> None: self._logger = _logger.getChild(self.__class__.__name__) self._logger.setLevel(logging.INFO) level = int(data[0]) self._logger.debug(f"level: {level}") message = data[self.HEADER_SIZE :] self._logger.debug("Message: " + str(message)) self.level = level self.message = message.decode() def execute(self): received_logger.log(level=self.level, msg=self.message) def write_bytes( serial: serial.Serial, values: Sequence[int], ): data = bytearray(values) serial.write(data) def echo( serial_connection: serial.Serial, ): echo_logger = _logger.getChild("echo") serial_connection.write(b"Hello echo") time.sleep(0.1) echo_logger.info( f"received: {serial_connection.read(serial_connection.in_waiting)}", ) def receive_and_log( serial_connection: serial.Serial, ): logger = _logger.getChild("receive_and_log") logger.setLevel(logging.INFO) bytes_read = serial_connection.read(serial_connection.in_waiting) while bytes_read: logger.debug(f"bytes: {bytes_read.hex()}") # for index in range(4): # logger.debug(f"byte{index}: {bytes_read[index]}") command_id = int(bytes_read[0]) logger.debug(f"command_id: {command_id}") data_length = (int(bytes_read[1]) << 4) + int(bytes_read[2]) logger.debug(f"data_length: {data_length}") meta = CommandMeta( command_id=CommandId(command_id), data_length=data_length, ) if meta.command_id == CommandId.command_log: command = LogCommand( data=bytes_read[HEADER_SIZE : HEADER_SIZE + meta.data_length], ) command.execute() else: return stop_byte = bytes_read[HEADER_SIZE + meta.data_length] logger.debug(f"stop_byte: {stop_byte}") assert stop_byte == 0xFF bytes_read = bytes_read[HEADER_SIZE + meta.data_length + 1 :] def main(): with serial.Serial(DEVICE_ID, BAUDRATE) as serial_connection: _logger.info(f"Connection open with: {serial_connection.name}") echo(serial_connection) while True: receive_and_log(serial_connection) time.sleep(0.05) return delay = 0.1 while True: _logger.info("Set blue LED") write_bytes(serial_connection, [1, 1]) time.sleep(delay) _logger.info("Set green LED") write_bytes(serial_connection, [2, 1]) time.sleep(delay) _logger.info("Set red LED") write_bytes(serial_connection, [3, 1]) time.sleep(delay) _logger.info("Reset blue LED") write_bytes(serial_connection, [1, 0]) time.sleep(delay) _logger.info("Reset green LED") write_bytes(serial_connection, [2, 0]) time.sleep(delay) _logger.info("Reset red LED") write_bytes(serial_connection, [3, 0]) time.sleep(delay) if __name__ == "__main__": main()