From 85e08d31be51b6ead0eef6d3e9ad061f26d3fa6d Mon Sep 17 00:00:00 2001 From: Oliver Walter Date: Sun, 25 Feb 2024 19:51:41 +0100 Subject: [PATCH] update python tools to new proto names --- tools/find_devices.py | 12 +++++------ tools/light_test.py | 30 +++++++++++++------------- tools/send_fw.py | 50 +++++++++++++++++++++---------------------- tools/vcp_driver.py | 6 +++--- 4 files changed, 49 insertions(+), 49 deletions(-) diff --git a/tools/find_devices.py b/tools/find_devices.py index 521e695..bec79dc 100644 --- a/tools/find_devices.py +++ b/tools/find_devices.py @@ -3,21 +3,21 @@ import struct from google.protobuf.message import DecodeError from serial.tools import list_ports -from cls_device_pb2 import RequestDeviceList, Device, ResponseDeviceList -from usb_pb2 import UsbPackageType +from cls_device_pb2 import RequestList, Device, ResponseList +from usb_pb2 import PackageType from vcp_driver import * if __name__ == "__main__": ser = setup_connection() # Create a RequestDeviceList message - request = RequestDeviceList() + request = RequestList() request.msg = 1 # or whatever value you want to set # Serialize the request to a bytearray request_data = request.SerializeToString() # Send the request - send_package(UsbPackageType.REQUEST_DEVICE_LIST, request_data, ser) + send_package(PackageType.REQUEST_DEVICE_LIST, request_data, ser) # Now wait for the response while True: @@ -28,13 +28,13 @@ if __name__ == "__main__": length, typeid, check = struct.unpack(' bytearray: +def make_header(typeid: PackageType, length: int) -> bytearray: struct_format = ' bytearray: return packed_data -def send_package(typeid : UsbPackageType, data: bytearray, serial: serial.Serial): +def send_package(typeid : PackageType, data: bytearray, serial: serial.Serial): head = make_header(typeid, len(data)) package = head + data serial.write(package) @@ -56,7 +56,7 @@ def read_header(serial): def receive_ack(serial): length, typeid = read_header(serial) - if not length or typeid != UsbPackageType.FIRMWAREPACKAGEACK: + if not length or typeid != PackageType.FIRMWAREPACKAGEACK: return None # Read the message @@ -66,18 +66,18 @@ def receive_ack(serial): return None # Parse the message - ack = FirmwarePackageAck() + ack = PackageAck() try: ack.ParseFromString(message_data) except DecodeError: - print('Failed to parse FirmwarePackageAck') + print('Failed to parse PackageAck') return None return ack def receive_file_check(serial): length, typeid = read_header(serial) - if not length or typeid != UsbPackageType.FIRMWAREFILECHECK: + if not length or typeid != PackageType.FIRMWAREFILECHECK: return None # Read the message @@ -87,11 +87,11 @@ def receive_file_check(serial): return None # Parse the message - file_check = FirmwareFileCheck() + file_check = FileCheck() try: file_check.ParseFromString(message_data) except DecodeError: - print('Failed to parse FirmwareFileCheck') + print('Failed to parse FileCheck') return None return file_check @@ -121,60 +121,60 @@ if __name__ == "__main__": crc, size, packages = load_firmware(FILENAME) - # Create a FirmwareStart message - start = FirmwareStart() + # Create a Start message + start = Start() start.name = os.path.basename(FILENAME) start.size = size start.packages = len(packages) start.device_id = ID start.crc_fw = crc - # Send the FirmwareStart message + # Send the Start message print(start) - send_package(UsbPackageType.FIRMWARESTART, start.SerializeToString(), ser) + send_package(PackageType.FIRMWARESTART, start.SerializeToString(), ser) #time.sleep(1) # wait for the device to process the message - # Receive the FirmwareFileCheck message + # Receive the FileCheck message file_check = receive_file_check(ser) print(file_check) if not file_check: - print('Failed to receive FirmwareFileCheck') + print('Failed to receive FileCheck') exit(-1) if file_check.crc_fw == start.crc_fw and file_check.size == start.size and not file_check.ready_for_data: - # Skip to FirmwareDone if the CRC and size match and ready_for_data is false + # Skip to Done if the CRC and size match and ready_for_data is false print('No need for data transfer') elif file_check.ready_for_data: # Send the firmware packages for (i,pack_data) in enumerate(packages): - package = FirmwarePackage() + package = Package() package.counter = i package.crc_pac = Crc32.calc(pack_data) package.device_id = start.device_id package.data = pack_data - # Send the FirmwarePackage message + # Send the Package message print(package) print(hex(package.crc_pac)) - send_package(UsbPackageType.FIRMWAREPACKAGE, package.SerializeToString(), ser) + send_package(PackageType.FIRMWAREPACKAGE, package.SerializeToString(), ser) - # Wait for the FirmwarePackageAck message + # Wait for the PackageAck message ack = receive_ack(ser) print(ack) if not ack.ack: print(f'Package {i} not acknowledged') exit(-1) else: - print('Error in FirmwareFileCheck message') + print('Error in FileCheck message') exit(-1) - # Send the FirmwareDone message - done = FirmwareDone() + # Send the Done message + done = Done() done.size = start.size done.crc_fw = start.crc_fw done.device_id = start.device_id print(done) - send_package(UsbPackageType.FIRMWAREDONE, done.SerializeToString(), ser) + send_package(PackageType.FIRMWAREDONE, done.SerializeToString(), ser) diff --git a/tools/vcp_driver.py b/tools/vcp_driver.py index a471842..bafd984 100644 --- a/tools/vcp_driver.py +++ b/tools/vcp_driver.py @@ -1,11 +1,11 @@ import serial import struct -from usb_pb2 import UsbPackageType +from usb_pb2 import PackageType from serial.tools import list_ports -def make_header(typeid: UsbPackageType, length: int) -> bytearray: +def make_header(typeid: PackageType, length: int) -> bytearray: struct_format = ' bytearray: packed_data = struct.pack(struct_format, length, typeid, check) return packed_data -def send_package(typeid : UsbPackageType, data: bytearray, serial: serial.Serial): +def send_package(typeid : PackageType, data: bytearray, serial: serial.Serial): head = make_header(typeid, len(data)) package = head + data serial.write(package)