From 62f8d6db29d7afa2abee57b5ca048b912f1f94b0 Mon Sep 17 00:00:00 2001 From: Oliver Walter Date: Sat, 3 Feb 2024 02:53:22 +0100 Subject: [PATCH] tool to send fw with usb using protbuf --- .gitignore | 6 +- tools/CMakeLists.txt | 20 +++++++ tools/send_fw.py | 134 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 159 insertions(+), 1 deletion(-) create mode 100644 tools/CMakeLists.txt create mode 100644 tools/send_fw.py diff --git a/.gitignore b/.gitignore index 42afabf..0e1d110 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,5 @@ -/build \ No newline at end of file +build/ +venv/ +tools/*_pb2.py +tools/__pycache__ +tools/*.bin \ No newline at end of file diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt new file mode 100644 index 0000000..7a7a789 --- /dev/null +++ b/tools/CMakeLists.txt @@ -0,0 +1,20 @@ + +# Specify the .proto file +set(PROTO_SRC_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../proto) +set(PROTO_FILE ${PROTO_SRC_DIR}/firmware.proto) + +# Specify where you want to generate the python code +set(PYTHON_OUT_DIR ${CMAKE_CURRENT_SOURCE_DIR}) + +# Specify the include directory +set(PROTO_INC_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../lib/nanopb/generator/proto) + +# Add the command to generate the python code +add_custom_command( + OUTPUT ${PYTHON_OUT_DIR}/firmware_pb2.py + COMMAND protoc --python_out=${PYTHON_OUT_DIR} --proto_path=${PROTO_SRC_DIR}:${PROTO_INC_DIR} ${PROTO_FILE} ${PROTO_INC_DIR}/nanopb.proto + DEPENDS ${PROTO_FILE} +) + +# Add the generated python code to a custom target +add_custom_target(GeneratePythonProtobufCode ALL DEPENDS ${PYTHON_OUT_DIR}/firmware_pb2.py) \ No newline at end of file diff --git a/tools/send_fw.py b/tools/send_fw.py new file mode 100644 index 0000000..c31c380 --- /dev/null +++ b/tools/send_fw.py @@ -0,0 +1,134 @@ +import serial +import struct +import time +from google.protobuf.internal import encoder +from google.protobuf.internal import decoder +from google.protobuf.message import DecodeError +from serial.tools import list_ports +from crccheck.crc import Crc32 + +from firmware_pb2 import FirmwareStart, FirmwarePackage, FirmwarePackageAck, FirmwareDone + + + +def load_firmware(filename): + # Open the file in binary mode + with open(filename, 'rb') as f: + data = f.read() + # Calculate the CRC32 checksum + crc = Crc32.calc(data) + # Get the file size + size = len(data) + # Split the data into 256-byte chunks + packages = [data[i:i+256] for i in range(0, len(data), 256)] + return crc, size, packages + + +def make_header(typeid: int, length: int) -> bytearray: + struct_format = '> 8) & 0xFF) + (typeid & 0xFF) + ((typeid >> 8) & 0xFF) + packed_data = struct.pack(struct_format, length, typeid, check) + return packed_data + + +def send_package(typeid :int, data: bytearray, serial: serial.Serial): + head = make_header(typeid, len(data)) + package = head + data + serial.write(package) + + +def receive_ack(serial): + # Read the header + header_data = serial.read(5) # header size is 5 bytes + length, typeid, check = struct.unpack('> 8) & 0xFF) + (typeid & 0xFF) + ((typeid >> 8) & 0xFF) + if check != check_calculated: + print('Header check byte mismatch') + return None + + # Read the message + message_data = serial.read(length) + if len(message_data) != length: + print('Incomplete message') + return None + + if typeid != 0xf04: + return None + + # Parse the message + ack = FirmwarePackageAck() + try: + ack.ParseFromString(message_data) + except DecodeError: + print('Failed to parse FirmwarePackageAck') + return None + + return ack + +FILENAME = 'firmware.bin' + + +if __name__ == "__main__": + + stm_port = None + for port in list_ports.comports(): + print(port) + if "STM32 Virtual ComPort" in port.description: + stm_port = port.device + break + + if stm_port is None: + print("STM32 Virtual ComPort not found") + exit(-1) + else: + # Open the serial port + ser = serial.Serial(stm_port,baudrate=5000000) + + crc, size, packages = load_firmware(FILENAME) + + # Create a FirmwareStart message + start = FirmwareStart() + start.name = FILENAME + start.size = size + start.packages = len(packages) + start.device_id = 1 + start.crc_fw = crc + + # Send the FirmwareStart message + print(start) + #send_package(0xf01, start.SerializeToString(), ser) + time.sleep(1) # wait for the device to process the message + + # Send the firmware packages + for (i,pack_data) in enumerate(packages): + package = FirmwarePackage() + package.counter = i + package.crc_pac = Crc32.calc(pack_data) + package.device_id = start.device_id + package.data = pack_data + + # Send the FirmwarePackage message + print(package) + print(hex(package.crc_pac)) + exit(0) + #send_package(0xf02, package.SerializeToString(), ser) + + + # Wait for the FirmwarePackageAck message + ack = receive_ack(ser) + print(ack) + if not ack.ack: + print(f'Package {i} not acknowledged') + break + + + # Send the FirmwareDone message + done = FirmwareDone() + done.size = start.size + done.crc_fw = start.crc_fw + done.device_id = start.device_id + print(done) + send_package(0xf03, done.SerializeToString(), ser)