From a65f974412401f4f1ef3d646487d888337619503 Mon Sep 17 00:00:00 2001 From: Prince George Date: Wed, 24 Nov 2021 16:42:28 +0000 Subject: [PATCH] Firmware download/upgrade CLI support for QSFP-DD Signed-off-by: Prince George --- sfputil/main.py | 335 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 334 insertions(+), 1 deletion(-) diff --git a/sfputil/main.py b/sfputil/main.py index 96dfcd9e79..3333cf2d6c 100644 --- a/sfputil/main.py +++ b/sfputil/main.py @@ -9,6 +9,7 @@ import sys import natsort import ast +import time import subprocess import click @@ -26,12 +27,16 @@ PLATFORM_JSON = 'platform.json' PORT_CONFIG_INI = 'port_config.ini' +EXIT_FAIL = -1 ERROR_PERMISSIONS = 1 ERROR_CHASSIS_LOAD = 2 ERROR_SFPUTILHELPER_LOAD = 3 ERROR_PORT_CONFIG_LOAD = 4 ERROR_NOT_IMPLEMENTED = 5 ERROR_INVALID_PORT = 6 +SMBUS_BLOCK_WRITE_SIZE = 32 +# Default host password as per CMIS spec +CDB_DEFAULT_HOST_PASSWORD = 0x00001011 # TODO: We should share these maps and the formatting functions between sfputil and sfpshow QSFP_DATA_MAP = { @@ -408,6 +413,19 @@ def logical_port_name_to_physical_port_list(port_name): else: return [int(port_name)] +def logical_port_to_physical_port_index(port_name): + if platform_sfputil.is_logical_port(port_name) == 0: + click.echo("Error: invalid port '{}'\n".format(port_name)) + print_all_valid_port_values() + sys.exit(ERROR_INVALID_PORT) + + physical_port = logical_port_name_to_physical_port_list(port_name)[0] + if physical_port is None: + click.echo("Error: No physical port found for logical port '{}'".format(port_name)) + sys.exit(EXIT_FAIL) + + return physical_port + def print_all_valid_port_values(): click.echo("Valid values for port: {}\n".format(str(platform_sfputil.logical))) @@ -805,6 +823,36 @@ def lpmode(port): click.echo(tabulate(output_table, table_header, tablefmt='simple')) +def show_firmware_version(physical_port): + try: + sfp = platform_chassis.get_sfp(physical_port) + api = sfp.get_xcvr_api() + out = api.get_module_fw_info() + click.echo(out['info']) + except NotImplementedError: + click.echo("This functionality is currently not implemented for this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) + +# 'fwversion' subcommand +@show.command() +@click.argument('port_name', metavar='', required=True) +def fwversion(port_name): + """Show firmware version of the transceiver""" + + physical_port = logical_port_to_physical_port_index(port_name) + sfp = platform_chassis.get_sfp(physical_port) + + try: + presence = sfp.get_presence() + except NotImplemented: + click.echo("sfp get_presence() NOT implemented!") + sys.exit(EXIT_FAIL) + + if not presence: + click.echo("{}: SFP EEPROM not detected\n".format(port_name)) + sys.exit(EXIT_FAIL) + + show_firmware_version(physical_port) # 'lpmode' subgroup @cli.group() @@ -903,6 +951,291 @@ def reset(port_name): i += 1 +# 'firmware' subgroup +@cli.group() +def firmware(): + """Download/Upgrade firmware on the transceiver""" + pass + +def run_firmware(port_name, mode): + status = 0 + physical_port = logical_port_to_physical_port_index(port_name) + sfp = platform_chassis.get_sfp(physical_port) + + try: + api = sfp.get_xcvr_api() + except NotImplementedError: + click.echo("This functionality is currently not implemented for this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) + + if mode == 0: + click.echo("Running firmare: Non-hitless Reset to Inactive Image") + elif mode == 1: + click.echo("Running firmware: Hitless Reset to Inactive Image") + elif mode == 2: + click.echo("Running firmware: Attempt non-hitless Reset to Running Image") + elif mode == 3: + click.echo("Running firmware: Attempt Hitless Reset to Running Image") + else: + click.echo("Running firmwaer: Unknown mode {}".format(mode)) + sys.exit(EXIT_FAIL) + + try: + status = api.cdb_run_firmware(mode) + except NotImplementedError: + click.echo("This functionality is not applicable for this transceiver") + + return status + +def commit_firmware(port_name): + status = 0 + physical_port = logical_port_to_physical_port_index(port_name) + sfp = platform_chassis.get_sfp(physical_port) + + try: + api = sfp.get_xcvr_api() + except NotImplementedError: + click.echo("This functionality is currently not implemented for this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) + + try: + status = api.cdb_commit_firmware() + except NotImplementedError: + click.echo("This functionality is not applicable for this transceiver") + + return status + +def download_firmware(port_name, filepath): + """Download firmware on the transceiver""" + try: + fd = open(filepath, 'rb') + fd.seek(0, 2) + file_size = fd.tell() + fd.seek(0, 0) + except FileNotFoundError: + click.echo("Firmware file {} NOT found".format(filepath)) + sys.exit(EXIT_FAIL) + + physical_port = logical_port_to_physical_port_index(port_name) + sfp = platform_chassis.get_sfp(physical_port) + try: + api = sfp.get_xcvr_api() + except NotImplementedError: + click.echo("This functionality is NOT applicable to this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) + + try: + fwinfo = api.get_module_fw_mgmt_feature() + if fwinfo['status'] == True: + startLPLsize, maxblocksize, lplonly_flag, autopaging_flag, writelength = fwinfo['feature'] + else: + click.echo("Failed to fetch CDB Firmware management features") + sys.exit(EXIT_FAIL) + except NotImplementedError: + click.echo("This functionality is NOT applicable for this transceiver") + sys.exit(ERROR_NOT_IMPLEMENTED) + + click.echo('CDB: Starting firmware download') + startdata = fd.read(startLPLsize) + status = api.cdb_start_firmware_download(startLPLsize, startdata, file_size) + if status != 1: + click.echo('CDB: Start firmware download failed - status {}'.format(status)) + sys.exit(EXIT_FAIL) + + # Increase the optoe driver's write max to speed up firmware download + sfp.set_optoe_write_max(SMBUS_BLOCK_WRITE_SIZE) + + with click.progressbar(length=file_size, label="Downloading ...") as bar: + address = 0 + BLOCK_SIZE = 116 if lplonly_flag else maxblocksize + remaining = file_size - startLPLsize + while remaining > 0: + count = BLOCK_SIZE if remaining >= BLOCK_SIZE else remaining + data = fd.read(count) + if lplonly_flag: + status = api.cdb_lpl_block_write(address, data) + else: + status = api.cdb_epl_block_write(address, data, autopaging_flag, writelength) + bar.update(count) + time.sleep(0.1) + address += count + remaining = remaining - count + + # Restore the optoe driver's write max to '1' (default value) + sfp.set_optoe_write_max(1) + + time.sleep(2) + status = api.cdb_firmware_download_complete() + click.echo('CDB: firmware download complete') + return status + +# 'run' subcommand +@firmware.command() +@click.argument('port_name', required=True, default=None) +@click.option('--mode', type=click.Choice(["0", "1", "2", "3"]), help="0 = Non-hitless Reset to Inactive Image\n \ + 1 = Hitless Reset to Inactive Image\n \ + 2 = Attempt non-hitless Reset to Running Image\n \ + 3 = Attempt Hitless Reset to Running Image\n") +def run(port_name, mode): + """Run the firmware with default mode=1""" + physical_port = logical_port_to_physical_port_index(port_name) + sfp = platform_chassis.get_sfp(physical_port) + + try: + presence = sfp.get_presence() + except NotImplemented: + click.echo("sfp get_presence() NOT implemented!") + sys.exit(EXIT_FAIL) + + if not presence: + click.echo("{}: SFP EEPROM not detected\n".format(port_name)) + sys.exit(EXIT_FAIL) + + if mode is None: + mode = 1 + + status = run_firmware(port_name, int(mode)) + if status != 1: + click.echo('Failed to run firmware! CDB status: {}'.format(status)) + sys.exit(EXIT_FAIL) + + click.echo("Firmware run in mode {} success".format(mode)) + +# 'commit' subcommand +@firmware.command() +@click.argument('port_name', required=True, default=None) +def commit(port_name): + """Commit the running firmware""" + + physical_port = logical_port_to_physical_port_index(port_name) + sfp = platform_chassis.get_sfp(physical_port) + + try: + presence = sfp.get_presence() + except NotImplemented: + click.echo("sfp get_presence() NOT implemented!") + sys.exit(EXIT_FAIL) + + if not presence: + click.echo("{}: SFP EEPROM not detected\n".format(port_name)) + sys.exit(EXIT_FAIL) + + status = commit_firmware(port_name) + if status != 1: + click.echo('Failed to commit firmware! CDB status: {}'.format(status)) + sys.exit(EXIT_FAIL) + + click.echo("Firmware commit successful") + +# 'upgrade' subcommand +@firmware.command() +@click.argument('port_name', required=True, default=None) +@click.argument('filepath', required=True, default=None) +def upgrade(port_name, filepath): + """Upgrade firmware on the transceiver""" + + physical_port = logical_port_to_physical_port_index(port_name) + sfp = platform_chassis.get_sfp(physical_port) + + try: + presence = sfp.get_presence() + except NotImplemented: + click.echo("sfp get_presence() NOT implemented!") + sys.exit(EXIT_FAIL) + + if not presence: + click.echo("{}: SFP EEPROM not detected\n".format(port_name)) + sys.exit(EXIT_FAIL) + + show_firmware_version(physical_port) + + status = download_firmware(port_name, filepath) + if status == 1: + click.echo("Firmware download complete success") + else: + click.echo("Firmware download complete failed! CDB status = {}".format(status)) + + show_firmware_version(physical_port) + + status = run_firmware(port_name, 1) + if status != 1: + click.echo('Failed to run firmware! CDB status: {}'.format(status)) + sys.exit(EXIT_FAIL) + + click.echo("Firmware run in mode 1 successful") + + status = commit_firmware(port_name) + if status != 1: + click.echo('Failed to commit firmware! CDB status: {}'.format(status)) + sys.exit(EXIT_FAIL) + + click.echo("Firmware commit successful") + show_firmware_version(physical_port) + +# 'download' subcommand +@firmware.command() +@click.argument('port_name', required=True, default=None) +@click.argument('filepath', required=True, default=None) +def download(port_name, filepath): + """Download firmware on the transceiver""" + + physical_port = logical_port_to_physical_port_index(port_name) + sfp = platform_chassis.get_sfp(physical_port) + + try: + presence = sfp.get_presence() + except NotImplemented: + click.echo("sfp get_presence() NOT implemented!") + sys.exit(EXIT_FAIL) + + if not presence: + click.echo("{}: SFP EEPROM not detected\n".format(port_name)) + sys.exit(EXIT_FAIL) + + status = download_firmware(port_name, filepath) + if status == 1: + click.echo("Firmware download complete success") + else: + click.echo("Firmware download complete failed! status = {}".format(status)) + sys.exit(EXIT_FAIL) + +# 'unlock' subcommand +@firmware.command() +@click.argument('port_name', required=True, default=None) +@click.option('--password', type=click.INT, help="Password in integer\n") +def unlock(port_name, password): + """Unlock the firmware download feature via CDB host password""" + physical_port = logical_port_to_physical_port_index(port_name) + sfp = platform_chassis.get_sfp(physical_port) + + try: + presence = sfp.get_presence() + except NotImplemented: + click.echo("sfp get_presence() NOT implemented!") + sys.exit(EXIT_FAIL) + + if not presence: + click.echo("{}: SFP EEPROM not detected\n".format(port_name)) + sys.exit(EXIT_FAIL) + + try: + api = sfp.get_xcvr_api() + except NotImplementedError: + click.echo("This functionality is currently not implemented for this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) + + if password is None: + password = CDB_DEFAULT_HOST_PASSWORD + try: + status = api.cdb_enter_host_password(int(password)) + except NotImplementedError: + click.echo("This functionality is not applicable for this transceiver") + sys.exit(EXIT_FAIL) + + if status == 1: + click.echo("CDB: Host password accepted") + else: + click.echo("CDB: Host password NOT accepted! status = {}".format(status)) # 'version' subcommand @cli.command() @@ -912,4 +1245,4 @@ def version(): if __name__ == '__main__': - cli() + cli() \ No newline at end of file