update python tools to new proto names

This commit is contained in:
2024-02-25 19:51:41 +01:00
parent b09924b9c8
commit 85e08d31be
4 changed files with 49 additions and 49 deletions

View File

@@ -3,21 +3,21 @@ import struct
from google.protobuf.message import DecodeError from google.protobuf.message import DecodeError
from serial.tools import list_ports from serial.tools import list_ports
from cls_device_pb2 import RequestDeviceList, Device, ResponseDeviceList from cls_device_pb2 import RequestList, Device, ResponseList
from usb_pb2 import UsbPackageType from usb_pb2 import PackageType
from vcp_driver import * from vcp_driver import *
if __name__ == "__main__": if __name__ == "__main__":
ser = setup_connection() ser = setup_connection()
# Create a RequestDeviceList message # Create a RequestDeviceList message
request = RequestDeviceList() request = RequestList()
request.msg = 1 # or whatever value you want to set request.msg = 1 # or whatever value you want to set
# Serialize the request to a bytearray # Serialize the request to a bytearray
request_data = request.SerializeToString() request_data = request.SerializeToString()
# Send the request # Send the request
send_package(UsbPackageType.REQUEST_DEVICE_LIST, request_data, ser) send_package(PackageType.REQUEST_DEVICE_LIST, request_data, ser)
# Now wait for the response # Now wait for the response
while True: while True:
@@ -28,13 +28,13 @@ if __name__ == "__main__":
length, typeid, check = struct.unpack('<HHB', response_header) length, typeid, check = struct.unpack('<HHB', response_header)
# Check if the type is RESPONSE_DEVICE_LIST # Check if the type is RESPONSE_DEVICE_LIST
if typeid == UsbPackageType.RESPONSE_DEVICE_LIST: if typeid == PackageType.RESPONSE_DEVICE_LIST:
# Read the response data from the serial port # Read the response data from the serial port
response_data = ser.read(length) response_data = ser.read(length)
# Try to parse the data as a ResponseDeviceList message # Try to parse the data as a ResponseDeviceList message
try: try:
response = ResponseDeviceList.FromString(response_data) response = ResponseList.FromString(response_data)
# If we get here, we successfully parsed the response. Break the loop. # If we get here, we successfully parsed the response. Break the loop.
break break
except DecodeError: except DecodeError:

View File

@@ -3,33 +3,33 @@ import struct
from google.protobuf.message import DecodeError from google.protobuf.message import DecodeError
from serial.tools import list_ports from serial.tools import list_ports
from light_pb2 import LightGlobalBrightness, LightTheme, LightGlobalTheme, LightThemeSettings from light_pb2 import GlobalBrightness, Theme, GlobalTheme, ThemeSettings
from usb_pb2 import UsbPackageType from usb_pb2 import PackageType
from vcp_driver import * from vcp_driver import *
if __name__ == "__main__": if __name__ == "__main__":
ser = setup_connection() ser = setup_connection()
# Create a message # Create a message
request = LightGlobalBrightness() request = GlobalBrightness()
request.brightness = 128 request.brightness = 0
# Serialize the request to a bytearray # Serialize the request to a bytearray
request_data = request.SerializeToString() request_data = request.SerializeToString()
# Send the request # Send the request
send_package(UsbPackageType.LIGHT_GLOBAL_BRIGHT, request_data, ser) send_package(PackageType.LIGHT_GLOBAL_BRIGHT, request_data, ser)
request = LightGlobalTheme() request = GlobalTheme()
request.theme = 1 request.theme = 1
request_data = request.SerializeToString() request_data = request.SerializeToString()
send_package(UsbPackageType.LIGHT_GLOBAL_THEME, request_data, ser) send_package(PackageType.LIGHT_GLOBAL_THEME, request_data, ser)
request = LightThemeSettings() # request = ThemeSettings()
request.deviceId = 1 # request.deviceId = 1
request.theme = 1 # request.theme = 1
request.rgb = 0x00f000f0 # request.rgb = 0x00f000f0
request.brightness = 255 # request.brightness = 255
request.animation = 2 # request.animation = 2
request_data = request.SerializeToString() # request_data = request.SerializeToString()
send_package(UsbPackageType.LIGHT_SETTING_THEME, request_data, ser) # send_package(PackageType.LIGHT_SETTING_THEME, request_data, ser)

View File

@@ -7,8 +7,8 @@ from google.protobuf.message import DecodeError
from serial.tools import list_ports from serial.tools import list_ports
from crccheck.crc import Crc from crccheck.crc import Crc
from firmware_pb2 import FirmwareStart, FirmwareFileCheck , FirmwarePackage, FirmwarePackageAck, FirmwareDone from firmware_pb2 import Start, FileCheck , Package, PackageAck, Done
from usb_pb2 import UsbPackageType from usb_pb2 import PackageType
Crc32 = Crc(32,0x04C11DB7,0xffffffff) Crc32 = Crc(32,0x04C11DB7,0xffffffff)
@@ -26,7 +26,7 @@ def load_firmware(filename):
return crc, size, packages return crc, size, packages
def make_header(typeid: UsbPackageType, length: int) -> bytearray: def make_header(typeid: PackageType, length: int) -> bytearray:
struct_format = '<HHB' # '<' for little-endian, 'H' for uint16_t, 'B' for uint8_t struct_format = '<HHB' # '<' for little-endian, 'H' for uint16_t, 'B' for uint8_t
# Calculate the check byte as the sum of length and type # Calculate the check byte as the sum of length and type
typeidint = int(typeid) typeidint = int(typeid)
@@ -35,7 +35,7 @@ def make_header(typeid: UsbPackageType, length: int) -> bytearray:
return packed_data return packed_data
def send_package(typeid : UsbPackageType, data: bytearray, serial: serial.Serial): def send_package(typeid : PackageType, data: bytearray, serial: serial.Serial):
head = make_header(typeid, len(data)) head = make_header(typeid, len(data))
package = head + data package = head + data
serial.write(package) serial.write(package)
@@ -56,7 +56,7 @@ def read_header(serial):
def receive_ack(serial): def receive_ack(serial):
length, typeid = read_header(serial) length, typeid = read_header(serial)
if not length or typeid != UsbPackageType.FIRMWAREPACKAGEACK: if not length or typeid != PackageType.FIRMWAREPACKAGEACK:
return None return None
# Read the message # Read the message
@@ -66,18 +66,18 @@ def receive_ack(serial):
return None return None
# Parse the message # Parse the message
ack = FirmwarePackageAck() ack = PackageAck()
try: try:
ack.ParseFromString(message_data) ack.ParseFromString(message_data)
except DecodeError: except DecodeError:
print('Failed to parse FirmwarePackageAck') print('Failed to parse PackageAck')
return None return None
return ack return ack
def receive_file_check(serial): def receive_file_check(serial):
length, typeid = read_header(serial) length, typeid = read_header(serial)
if not length or typeid != UsbPackageType.FIRMWAREFILECHECK: if not length or typeid != PackageType.FIRMWAREFILECHECK:
return None return None
# Read the message # Read the message
@@ -87,11 +87,11 @@ def receive_file_check(serial):
return None return None
# Parse the message # Parse the message
file_check = FirmwareFileCheck() file_check = FileCheck()
try: try:
file_check.ParseFromString(message_data) file_check.ParseFromString(message_data)
except DecodeError: except DecodeError:
print('Failed to parse FirmwareFileCheck') print('Failed to parse FileCheck')
return None return None
return file_check return file_check
@@ -121,60 +121,60 @@ if __name__ == "__main__":
crc, size, packages = load_firmware(FILENAME) crc, size, packages = load_firmware(FILENAME)
# Create a FirmwareStart message # Create a Start message
start = FirmwareStart() start = Start()
start.name = os.path.basename(FILENAME) start.name = os.path.basename(FILENAME)
start.size = size start.size = size
start.packages = len(packages) start.packages = len(packages)
start.device_id = ID start.device_id = ID
start.crc_fw = crc start.crc_fw = crc
# Send the FirmwareStart message # Send the Start message
print(start) print(start)
send_package(UsbPackageType.FIRMWARESTART, start.SerializeToString(), ser) send_package(PackageType.FIRMWARESTART, start.SerializeToString(), ser)
#time.sleep(1) # wait for the device to process the message #time.sleep(1) # wait for the device to process the message
# Receive the FirmwareFileCheck message # Receive the FileCheck message
file_check = receive_file_check(ser) file_check = receive_file_check(ser)
print(file_check) print(file_check)
if not file_check: if not file_check:
print('Failed to receive FirmwareFileCheck') print('Failed to receive FileCheck')
exit(-1) exit(-1)
if file_check.crc_fw == start.crc_fw and file_check.size == start.size and not file_check.ready_for_data: if file_check.crc_fw == start.crc_fw and file_check.size == start.size and not file_check.ready_for_data:
# Skip to FirmwareDone if the CRC and size match and ready_for_data is false # Skip to Done if the CRC and size match and ready_for_data is false
print('No need for data transfer') print('No need for data transfer')
elif file_check.ready_for_data: elif file_check.ready_for_data:
# Send the firmware packages # Send the firmware packages
for (i,pack_data) in enumerate(packages): for (i,pack_data) in enumerate(packages):
package = FirmwarePackage() package = Package()
package.counter = i package.counter = i
package.crc_pac = Crc32.calc(pack_data) package.crc_pac = Crc32.calc(pack_data)
package.device_id = start.device_id package.device_id = start.device_id
package.data = pack_data package.data = pack_data
# Send the FirmwarePackage message # Send the Package message
print(package) print(package)
print(hex(package.crc_pac)) print(hex(package.crc_pac))
send_package(UsbPackageType.FIRMWAREPACKAGE, package.SerializeToString(), ser) send_package(PackageType.FIRMWAREPACKAGE, package.SerializeToString(), ser)
# Wait for the FirmwarePackageAck message # Wait for the PackageAck message
ack = receive_ack(ser) ack = receive_ack(ser)
print(ack) print(ack)
if not ack.ack: if not ack.ack:
print(f'Package {i} not acknowledged') print(f'Package {i} not acknowledged')
exit(-1) exit(-1)
else: else:
print('Error in FirmwareFileCheck message') print('Error in FileCheck message')
exit(-1) exit(-1)
# Send the FirmwareDone message # Send the Done message
done = FirmwareDone() done = Done()
done.size = start.size done.size = start.size
done.crc_fw = start.crc_fw done.crc_fw = start.crc_fw
done.device_id = start.device_id done.device_id = start.device_id
print(done) print(done)
send_package(UsbPackageType.FIRMWAREDONE, done.SerializeToString(), ser) send_package(PackageType.FIRMWAREDONE, done.SerializeToString(), ser)

View File

@@ -1,11 +1,11 @@
import serial import serial
import struct import struct
from usb_pb2 import UsbPackageType from usb_pb2 import PackageType
from serial.tools import list_ports from serial.tools import list_ports
def make_header(typeid: UsbPackageType, length: int) -> bytearray: def make_header(typeid: PackageType, length: int) -> bytearray:
struct_format = '<HHB' # '<' for little-endian, 'H' for uint16_t, 'B' for uint8_t struct_format = '<HHB' # '<' for little-endian, 'H' for uint16_t, 'B' for uint8_t
# Calculate the check byte as the sum of length and type # Calculate the check byte as the sum of length and type
typeidint = int(typeid) typeidint = int(typeid)
@@ -13,7 +13,7 @@ def make_header(typeid: UsbPackageType, length: int) -> bytearray:
packed_data = struct.pack(struct_format, length, typeid, check) packed_data = struct.pack(struct_format, length, typeid, check)
return packed_data return packed_data
def send_package(typeid : UsbPackageType, data: bytearray, serial: serial.Serial): def send_package(typeid : PackageType, data: bytearray, serial: serial.Serial):
head = make_header(typeid, len(data)) head = make_header(typeid, len(data))
package = head + data package = head + data
serial.write(package) serial.write(package)