implenet new fwUp behaviour in py

This commit is contained in:
2024-02-08 02:45:09 +01:00
parent 19bcaaf18c
commit 786353d4bd
2 changed files with 77 additions and 25 deletions

View File

@@ -2,11 +2,12 @@ import serial
import struct import struct
import time import time
import sys import sys
import os
from google.protobuf.message import DecodeError from google.protobuf.message import DecodeError
from serial.tools import list_ports from serial.tools import list_ports
from crccheck.crc import Crc from crccheck.crc import Crc
from firmware_pb2 import FirmwareStart, FirmwarePackage, FirmwarePackageAck, FirmwareDone, UsbPackageType from firmware_pb2 import FirmwareStart, FirmwareFileCheck , FirmwarePackage, FirmwarePackageAck, FirmwareDone, UsbPackageType
Crc32 = Crc(32,0x04C11DB7,0xffffffff) Crc32 = Crc(32,0x04C11DB7,0xffffffff)
@@ -39,7 +40,7 @@ def send_package(typeid : UsbPackageType, data: bytearray, serial: serial.Serial
serial.write(package) serial.write(package)
def receive_ack(serial): def read_header(serial):
# Read the header # Read the header
header_data = serial.read(5) # header size is 5 bytes header_data = serial.read(5) # header size is 5 bytes
length, typeid, check = struct.unpack('<HHB', header_data) length, typeid, check = struct.unpack('<HHB', header_data)
@@ -48,6 +49,13 @@ def receive_ack(serial):
check_calculated = (length & 0xFF) + ((length >> 8) & 0xFF) + (typeid & 0xFF) + ((typeid >> 8) & 0xFF) check_calculated = (length & 0xFF) + ((length >> 8) & 0xFF) + (typeid & 0xFF) + ((typeid >> 8) & 0xFF)
if check != check_calculated: if check != check_calculated:
print('Header check byte mismatch') print('Header check byte mismatch')
return None, None
return length, typeid
def receive_ack(serial):
length, typeid = read_header(serial)
if not length or typeid != UsbPackageType.FIRMWAREPACKAGEACK:
return None return None
# Read the message # Read the message
@@ -56,9 +64,6 @@ def receive_ack(serial):
print('Incomplete message') print('Incomplete message')
return None return None
if typeid != UsbPackageType.FIRMWAREPACKAGEACK:
return None
# Parse the message # Parse the message
ack = FirmwarePackageAck() ack = FirmwarePackageAck()
try: try:
@@ -69,6 +74,27 @@ def receive_ack(serial):
return ack return ack
def receive_file_check(serial):
length, typeid = read_header(serial)
if not length or typeid != UsbPackageType.FIRMWAREFILECHECK:
return None
# Read the message
message_data = serial.read(length)
if len(message_data) != length:
print('Incomplete message')
return None
# Parse the message
file_check = FirmwareFileCheck()
try:
file_check.ParseFromString(message_data)
except DecodeError:
print('Failed to parse FirmwareFileCheck')
return None
return file_check
FILENAME = 'firmware.bin' FILENAME = 'firmware.bin'
ID = 0 ID = 0
if len(sys.argv) > 1: if len(sys.argv) > 1:
@@ -96,7 +122,7 @@ if __name__ == "__main__":
# Create a FirmwareStart message # Create a FirmwareStart message
start = FirmwareStart() start = FirmwareStart()
start.name = FILENAME start.name = os.path.basename(FILENAME)
start.size = size start.size = size
start.packages = len(packages) start.packages = len(packages)
start.device_id = ID start.device_id = ID
@@ -107,26 +133,41 @@ if __name__ == "__main__":
send_package(UsbPackageType.FIRMWARESTART, start.SerializeToString(), ser) send_package(UsbPackageType.FIRMWARESTART, start.SerializeToString(), ser)
#time.sleep(1) # wait for the device to process the message #time.sleep(1) # wait for the device to process the message
# Send the firmware packages # Receive the FirmwareFileCheck message
for (i,pack_data) in enumerate(packages): file_check = receive_file_check(ser)
package = FirmwarePackage() print(file_check)
package.counter = i if not file_check:
package.crc_pac = Crc32.calc(pack_data) print('Failed to receive FirmwareFileCheck')
package.device_id = start.device_id exit(-1)
package.data = pack_data
# Send the FirmwarePackage message
print(package)
print(hex(package.crc_pac))
send_package(UsbPackageType.FIRMWAREPACKAGE, package.SerializeToString(), ser)
# Wait for the FirmwarePackageAck message if file_check.crc_fw == start.crc_fw and file_check.size == start.size and not file_check.ready_for_data:
ack = receive_ack(ser) # Skip to FirmwareDone if the CRC and size match and ready_for_data is false
print(ack) print('No need for data transfer')
if not ack.ack: elif file_check.ready_for_data:
print(f'Package {i} not acknowledged') # Send the firmware packages
exit(-1) for (i,pack_data) in enumerate(packages):
package = FirmwarePackage()
package.counter = i
package.crc_pac = Crc32.calc(pack_data)
package.device_id = start.device_id
package.data = pack_data
# Send the FirmwarePackage message
print(package)
print(hex(package.crc_pac))
send_package(UsbPackageType.FIRMWAREPACKAGE, package.SerializeToString(), ser)
# Wait for the FirmwarePackageAck message
ack = receive_ack(ser)
print(ack)
if not ack.ack:
print(f'Package {i} not acknowledged')
exit(-1)
else:
print('Error in FirmwareFileCheck message')
exit(-1)
# Send the FirmwareDone message # Send the FirmwareDone message

11
tools/update_devices.py Normal file
View File

@@ -0,0 +1,11 @@
import subprocess
import sys
def send_fw(firmware, ids):
for id in ids:
subprocess.call(['python', 'send_fw.py', firmware, str(id)])
if __name__ == "__main__":
firmware = sys.argv[1]
ids = sys.argv[2].split(';') # Split the ID string into a list
send_fw(firmware, ids)