Browse Source

ble-dongle: Implement logger over USB

ble
Andreas Berthoud 5 years ago
parent
commit
fcd5d5aa5b
  1. 16
      nucleo-wb55-dongle-ble/Core/Inc/logger.h
  2. 118
      nucleo-wb55-dongle-ble/Core/Src/logger.c
  3. 16
      nucleo-wb55-dongle-ble/Core/Src/main.c
  4. 1
      requirements.txt
  5. 136
      serial_communication.py

16
nucleo-wb55-dongle-ble/Core/Inc/logger.h

@ -0,0 +1,16 @@
/*
* logger.h
*
* Created on: Jul 4, 2021
* Author: Andreas Berthoud
*/
#ifndef INC_LOGGER_H_
#define INC_LOGGER_H_
void log_debug(const char * format, int nargs, ...);
void log_info(const char * format, int nargs, ...);
void log_warning(const char * format, int nargs, ...);
void log_error(const char * format, int nargs, ...);
#endif /* INC_LOGGER_H_ */

118
nucleo-wb55-dongle-ble/Core/Src/logger.c

@ -0,0 +1,118 @@
/*
* logger.c
*
* Created on: Jul 4, 2021
* Author: Andreas Berthoud
*/
#include <stdarg.h>
#include <string.h>
#include "usbd_cdc_if.h"
#include "logger.h"
#define BUFFER_MAX 512
#define COMMAND_HEADER_SIZE 4
#define LOG_COMMAND_HEADER_SIZE 1
#define HEADER_SIZE (COMMAND_HEADER_SIZE + LOG_COMMAND_HEADER_SIZE)
#define MESSAGE_BUFFER_START (COMMAND_HEADER_SIZE + LOG_COMMAND_HEADER_SIZE)
#define MAX_LOG_MESSAGE_SIZE (BUFFER_MAX - COMMAND_HEADER_SIZE - LOG_COMMAND_HEADER_SIZE - 1)
#define min(a,b) \
({ __typeof__ (a) _a = (a); \
__typeof__ (b) _b = (b); \
_a < _b ? _a : _b; })
typedef struct
{
uint8_t LOG_LEVEL_DEBUG;
uint8_t LOG_LEVEL_INFO;
uint8_t LOG_LEVEL_WARNING;
uint8_t LOG_LEVEL_ERROR;
} LOGGER_SEVERTITY_T;
LOGGER_SEVERTITY_T LOGGER_SEVERTITY = {
.LOG_LEVEL_DEBUG = 10,
.LOG_LEVEL_INFO = 20,
.LOG_LEVEL_WARNING = 30,
.LOG_LEVEL_ERROR = 40,
};
/*
* A command is 4 bytes long
*
* | 1 byte | 2 bytes | 1 byte | N bytes | 1 byte |
* | command ID | length N | reserved | data | stop byte |
* */
/*
* command length = N
* | 1 byte | N-1 bytes |
* | severtity | message |
* */
char buffer[BUFFER_MAX];
void log_message(uint8_t severtiy) {
int message_length = min(strlen(buffer + MESSAGE_BUFFER_START), MAX_LOG_MESSAGE_SIZE);
int command_length = message_length + 1;
buffer[0] = 0xFF; // log command
buffer[1] = (command_length & 0x0000FF00) >> 8;
buffer[2] = command_length & 0x000000FF;
buffer[3] = 0x00; // reserved
buffer[4] = severtiy;
buffer[5 + message_length] = 0xFF; // stop byte
CDC_Transmit_FS((uint8_t*)buffer, (uint16_t)(HEADER_SIZE + message_length + 1));
}
void log_debug(const char * format, int nargs, ...) {
va_list args;
va_start(args, nargs);
vsnprintf(
buffer + MESSAGE_BUFFER_START,
MAX_LOG_MESSAGE_SIZE,
format,
args
);
va_end(args);
log_message(LOGGER_SEVERTITY.LOG_LEVEL_DEBUG);
}
void log_info(const char * format, int nargs, ...) {
va_list args;
va_start(args, nargs);
vsnprintf(
buffer + MESSAGE_BUFFER_START,
MAX_LOG_MESSAGE_SIZE,
format,
args
);
va_end(args);
log_message(LOGGER_SEVERTITY.LOG_LEVEL_INFO);
}
void log_warning(const char * format, int nargs, ...) {
va_list args;
va_start(args, nargs);
vsnprintf(
buffer + MESSAGE_BUFFER_START,
MAX_LOG_MESSAGE_SIZE,
format,
args
);
va_end(args);
log_message(LOGGER_SEVERTITY.LOG_LEVEL_WARNING);
}
void log_error(const char * format, int nargs, ...) {
va_list args;
va_start(args, nargs);
vsnprintf(
buffer + MESSAGE_BUFFER_START,
MAX_LOG_MESSAGE_SIZE,
format,
args
);
va_end(args);
log_message(LOGGER_SEVERTITY.LOG_LEVEL_ERROR);
}

16
nucleo-wb55-dongle-ble/Core/Src/main.c

@ -23,7 +23,10 @@
/* Private includes ----------------------------------------------------------*/
/* USER CODE BEGIN Includes */
#include <stdio.h>
#include "usbd_cdc_if.h"
#include "logger.h"
/* USER CODE END Includes */
/* Private typedef -----------------------------------------------------------*/
@ -91,6 +94,7 @@ int main(void)
MX_USB_Device_Init();
/* USER CODE BEGIN 2 */
int counter_value = 0;
/* USER CODE END 2 */
/* Infinite loop */
@ -100,7 +104,12 @@ int main(void)
HAL_GPIO_WritePin(LED_GREEN_GPIO_Port, LED_GREEN_Pin, GPIO_PIN_SET);
HAL_Delay(50);
HAL_GPIO_WritePin(LED_GREEN_GPIO_Port, LED_GREEN_Pin, GPIO_PIN_RESET);
HAL_Delay(50);
HAL_Delay(100);
log_debug("Counter value is %d", 1, counter_value++);
// log_info("Counter value is %d", 1, counter_value++);
// log_warning("Counter value is %d", 1, counter_value++);
// log_error("Counter value is %d", 1, counter_value++);
/* USER CODE END WHILE */
/* USER CODE BEGIN 3 */
@ -224,7 +233,10 @@ static void MX_GPIO_Init(void)
}
/* USER CODE BEGIN 4 */
int _write(int file, char *ptr, int len) {
CDC_Transmit_FS((uint8_t*)ptr, len);
return len;
}
/* USER CODE END 4 */
/**

1
requirements.txt

@ -1 +1,2 @@
pre-commit
pyserial>=3.5,<4

136
serial_communication.py

@ -1,15 +1,68 @@
import logging
import time
from dataclasses import dataclass
from enum import Enum
from typing import Sequence
import serial
#DEVICE_ID = "/dev/tty.usbmodem207E3283544E1"
# DEVICE_ID = "/dev/tty.usbmodem207E3283544E1"
DEVICE_ID = "/dev/tty.usbmodem2067368F32521"
BAUDRATE = 115200
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.DEBUG,
format="[%(asctime)s] [%(levelname)-8s] --- %(message)s",
)
_logger = logging.getLogger(__name__)
received_logger = _logger.getChild("received")
received_logger.setLevel(logging.DEBUG)
logging.DEBUG
HEADER_SIZE = 4
class CommandId(Enum):
command_none = 0
command_log = 0xFF
@dataclass
class CommandMeta:
command_id: CommandId
data_length: int
@dataclass
class LogCommand:
"""Command ID: command_log"""
level: int
message: str
HEADER_SIZE = 1 # log level
def __init__(
self,
data: bytes,
) -> None:
self._logger = _logger.getChild(self.__class__.__name__)
self._logger.setLevel(logging.INFO)
level = int(data[0])
self._logger.debug(f"level: {level}")
message = data[self.HEADER_SIZE :]
self._logger.debug("Message: " + str(message))
self.level = level
self.message = message.decode()
def execute(self):
received_logger.log(level=self.level, msg=self.message)
def write_bytes(
serial: serial.Serial,
@ -19,36 +72,87 @@ def write_bytes(
serial.write(data)
def echo(
serial_connection: serial.Serial,
):
echo_logger = _logger.getChild("echo")
serial_connection.write(b"Hello echo")
time.sleep(0.1)
echo_logger.info(
f"received: {serial_connection.read(serial_connection.in_waiting)}",
)
def receive_and_log(
serial_connection: serial.Serial,
):
logger = _logger.getChild("receive_and_log")
logger.setLevel(logging.INFO)
bytes_read = serial_connection.read(serial_connection.in_waiting)
while bytes_read:
logger.debug(f"bytes: {bytes_read.hex()}")
# for index in range(4):
# logger.debug(f"byte{index}: {bytes_read[index]}")
command_id = int(bytes_read[0])
logger.debug(f"command_id: {command_id}")
data_length = (int(bytes_read[1]) << 4) + int(bytes_read[2])
logger.debug(f"data_length: {data_length}")
meta = CommandMeta(
command_id=CommandId(command_id),
data_length=data_length,
)
if meta.command_id == CommandId.command_log:
command = LogCommand(
data=bytes_read[HEADER_SIZE : HEADER_SIZE + meta.data_length],
)
command.execute()
else:
return
stop_byte = bytes_read[HEADER_SIZE + meta.data_length]
logger.debug(f"stop_byte: {stop_byte}")
assert stop_byte == 0xFF
bytes_read = bytes_read[HEADER_SIZE + meta.data_length + 1 :]
def main():
with serial.Serial(DEVICE_ID, BAUDRATE) as serial_connection:
logger.info(f"Connection open with: {serial_connection.name}")
serial_connection.write(int.to_bytes(0, 1, "little") + b'Hello')
time.sleep(0.1)
logger.info(f"in_waiting: {serial_connection.in_waiting}")
x = serial_connection.read(serial_connection.in_waiting)
logger.info(f"received: {x}")
logger.info(f"in_waiting: {serial_connection.in_waiting}")
_logger.info(f"Connection open with: {serial_connection.name}")
echo(serial_connection)
while True:
receive_and_log(serial_connection)
time.sleep(0.05)
return
delay = 0.1
while True:
logger.info("Set blue LED")
_logger.info("Set blue LED")
write_bytes(serial_connection, [1, 1])
time.sleep(delay)
logger.info("Set green LED")
_logger.info("Set green LED")
write_bytes(serial_connection, [2, 1])
time.sleep(delay)
logger.info("Set red LED")
_logger.info("Set red LED")
write_bytes(serial_connection, [3, 1])
time.sleep(delay)
logger.info("Reset blue LED")
_logger.info("Reset blue LED")
write_bytes(serial_connection, [1, 0])
time.sleep(delay)
logger.info("Reset green LED")
_logger.info("Reset green LED")
write_bytes(serial_connection, [2, 0])
time.sleep(delay)
logger.info("Reset red LED")
_logger.info("Reset red LED")
write_bytes(serial_connection, [3, 0])
time.sleep(delay)

Loading…
Cancel
Save