tool to send fw with usb using protbuf
This commit is contained in:
6
.gitignore
vendored
6
.gitignore
vendored
@@ -1 +1,5 @@
|
|||||||
/build
|
build/
|
||||||
|
venv/
|
||||||
|
tools/*_pb2.py
|
||||||
|
tools/__pycache__
|
||||||
|
tools/*.bin
|
||||||
20
tools/CMakeLists.txt
Normal file
20
tools/CMakeLists.txt
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
|
||||||
|
# Specify the .proto file
|
||||||
|
set(PROTO_SRC_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../proto)
|
||||||
|
set(PROTO_FILE ${PROTO_SRC_DIR}/firmware.proto)
|
||||||
|
|
||||||
|
# Specify where you want to generate the python code
|
||||||
|
set(PYTHON_OUT_DIR ${CMAKE_CURRENT_SOURCE_DIR})
|
||||||
|
|
||||||
|
# Specify the include directory
|
||||||
|
set(PROTO_INC_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../lib/nanopb/generator/proto)
|
||||||
|
|
||||||
|
# Add the command to generate the python code
|
||||||
|
add_custom_command(
|
||||||
|
OUTPUT ${PYTHON_OUT_DIR}/firmware_pb2.py
|
||||||
|
COMMAND protoc --python_out=${PYTHON_OUT_DIR} --proto_path=${PROTO_SRC_DIR}:${PROTO_INC_DIR} ${PROTO_FILE} ${PROTO_INC_DIR}/nanopb.proto
|
||||||
|
DEPENDS ${PROTO_FILE}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add the generated python code to a custom target
|
||||||
|
add_custom_target(GeneratePythonProtobufCode ALL DEPENDS ${PYTHON_OUT_DIR}/firmware_pb2.py)
|
||||||
134
tools/send_fw.py
Normal file
134
tools/send_fw.py
Normal file
@@ -0,0 +1,134 @@
|
|||||||
|
import serial
|
||||||
|
import struct
|
||||||
|
import time
|
||||||
|
from google.protobuf.internal import encoder
|
||||||
|
from google.protobuf.internal import decoder
|
||||||
|
from google.protobuf.message import DecodeError
|
||||||
|
from serial.tools import list_ports
|
||||||
|
from crccheck.crc import Crc32
|
||||||
|
|
||||||
|
from firmware_pb2 import FirmwareStart, FirmwarePackage, FirmwarePackageAck, FirmwareDone
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def load_firmware(filename):
|
||||||
|
# Open the file in binary mode
|
||||||
|
with open(filename, 'rb') as f:
|
||||||
|
data = f.read()
|
||||||
|
# Calculate the CRC32 checksum
|
||||||
|
crc = Crc32.calc(data)
|
||||||
|
# Get the file size
|
||||||
|
size = len(data)
|
||||||
|
# Split the data into 256-byte chunks
|
||||||
|
packages = [data[i:i+256] for i in range(0, len(data), 256)]
|
||||||
|
return crc, size, packages
|
||||||
|
|
||||||
|
|
||||||
|
def make_header(typeid: int, length: int) -> bytearray:
|
||||||
|
struct_format = '<HHB' # '<' for little-endian, 'H' for uint16_t, 'B' for uint8_t
|
||||||
|
# Calculate the check byte as the sum of length and type
|
||||||
|
check = (length & 0xFF) + ((length >> 8) & 0xFF) + (typeid & 0xFF) + ((typeid >> 8) & 0xFF)
|
||||||
|
packed_data = struct.pack(struct_format, length, typeid, check)
|
||||||
|
return packed_data
|
||||||
|
|
||||||
|
|
||||||
|
def send_package(typeid :int, data: bytearray, serial: serial.Serial):
|
||||||
|
head = make_header(typeid, len(data))
|
||||||
|
package = head + data
|
||||||
|
serial.write(package)
|
||||||
|
|
||||||
|
|
||||||
|
def receive_ack(serial):
|
||||||
|
# Read the header
|
||||||
|
header_data = serial.read(5) # header size is 5 bytes
|
||||||
|
length, typeid, check = struct.unpack('<HHB', header_data)
|
||||||
|
|
||||||
|
# Verify the check byte
|
||||||
|
check_calculated = (length & 0xFF) + ((length >> 8) & 0xFF) + (typeid & 0xFF) + ((typeid >> 8) & 0xFF)
|
||||||
|
if check != check_calculated:
|
||||||
|
print('Header check byte mismatch')
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Read the message
|
||||||
|
message_data = serial.read(length)
|
||||||
|
if len(message_data) != length:
|
||||||
|
print('Incomplete message')
|
||||||
|
return None
|
||||||
|
|
||||||
|
if typeid != 0xf04:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Parse the message
|
||||||
|
ack = FirmwarePackageAck()
|
||||||
|
try:
|
||||||
|
ack.ParseFromString(message_data)
|
||||||
|
except DecodeError:
|
||||||
|
print('Failed to parse FirmwarePackageAck')
|
||||||
|
return None
|
||||||
|
|
||||||
|
return ack
|
||||||
|
|
||||||
|
FILENAME = 'firmware.bin'
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
|
||||||
|
stm_port = None
|
||||||
|
for port in list_ports.comports():
|
||||||
|
print(port)
|
||||||
|
if "STM32 Virtual ComPort" in port.description:
|
||||||
|
stm_port = port.device
|
||||||
|
break
|
||||||
|
|
||||||
|
if stm_port is None:
|
||||||
|
print("STM32 Virtual ComPort not found")
|
||||||
|
exit(-1)
|
||||||
|
else:
|
||||||
|
# Open the serial port
|
||||||
|
ser = serial.Serial(stm_port,baudrate=5000000)
|
||||||
|
|
||||||
|
crc, size, packages = load_firmware(FILENAME)
|
||||||
|
|
||||||
|
# Create a FirmwareStart message
|
||||||
|
start = FirmwareStart()
|
||||||
|
start.name = FILENAME
|
||||||
|
start.size = size
|
||||||
|
start.packages = len(packages)
|
||||||
|
start.device_id = 1
|
||||||
|
start.crc_fw = crc
|
||||||
|
|
||||||
|
# Send the FirmwareStart message
|
||||||
|
print(start)
|
||||||
|
#send_package(0xf01, start.SerializeToString(), ser)
|
||||||
|
time.sleep(1) # wait for the device to process the message
|
||||||
|
|
||||||
|
# Send the firmware packages
|
||||||
|
for (i,pack_data) in enumerate(packages):
|
||||||
|
package = FirmwarePackage()
|
||||||
|
package.counter = i
|
||||||
|
package.crc_pac = Crc32.calc(pack_data)
|
||||||
|
package.device_id = start.device_id
|
||||||
|
package.data = pack_data
|
||||||
|
|
||||||
|
# Send the FirmwarePackage message
|
||||||
|
print(package)
|
||||||
|
print(hex(package.crc_pac))
|
||||||
|
exit(0)
|
||||||
|
#send_package(0xf02, package.SerializeToString(), ser)
|
||||||
|
|
||||||
|
|
||||||
|
# Wait for the FirmwarePackageAck message
|
||||||
|
ack = receive_ack(ser)
|
||||||
|
print(ack)
|
||||||
|
if not ack.ack:
|
||||||
|
print(f'Package {i} not acknowledged')
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
|
# Send the FirmwareDone message
|
||||||
|
done = FirmwareDone()
|
||||||
|
done.size = start.size
|
||||||
|
done.crc_fw = start.crc_fw
|
||||||
|
done.device_id = start.device_id
|
||||||
|
print(done)
|
||||||
|
send_package(0xf03, done.SerializeToString(), ser)
|
||||||
Reference in New Issue
Block a user