Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sfputil] Firmware download/upgrade CLI support for QSFP-DD #1947

Merged
merged 7 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
324 changes: 324 additions & 0 deletions sfputil/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import sys
import natsort
import ast
import time
import datetime

import subprocess
import click
Expand All @@ -26,12 +28,20 @@
PLATFORM_JSON = 'platform.json'
PORT_CONFIG_INI = 'port_config.ini'

EXIT_FAIL = -1
EXIT_SUCCESS = 0
ERROR_PERMISSIONS = 1
ERROR_CHASSIS_LOAD = 2
ERROR_SFPUTILHELPER_LOAD = 3
ERROR_PORT_CONFIG_LOAD = 4
ERROR_NOT_IMPLEMENTED = 5
ERROR_INVALID_PORT = 6
SMBUS_BLOCK_WRITE_SIZE = 32
# Default host password as per CMIS spec:
# http://www.qsfp-dd.com/wp-content/uploads/2021/05/CMIS5p0.pdf
CDB_DEFAULT_HOST_PASSWORD = 0x00001011

MAX_LPL_FIRMWARE_BLOCK_SIZE = 116 #Bytes

# TODO: We should share these maps and the formatting functions between sfputil and sfpshow
QSFP_DATA_MAP = {
Expand Down Expand Up @@ -225,6 +235,17 @@
# Global logger instance
log = logger.Logger(SYSLOG_IDENTIFIER)

def is_sfp_present(port_name):
physical_port = logical_port_to_physical_port_index(port_name)
sfp = platform_chassis.get_sfp(physical_port)

try:
presence = sfp.get_presence()
except NotImplementedError:
click.echo("sfp get_presence() NOT implemented!", err=True)
sys.exit(ERROR_NOT_IMPLEMENTED)

return bool(presence)

# ========================== Methods for formatting output ==========================

Expand Down Expand Up @@ -408,6 +429,19 @@ def logical_port_name_to_physical_port_list(port_name):
else:
return [int(port_name)]

def logical_port_to_physical_port_index(port_name):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this function available in some other library? can we reuse?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i couldn't find one in sonic-utilities

if not platform_sfputil.is_logical_port(port_name):
click.echo("Error: invalid port '{}'\n".format(port_name))
print_all_valid_port_values()
sys.exit(ERROR_INVALID_PORT)

physical_port = logical_port_name_to_physical_port_list(port_name)[0]
if physical_port is None:
click.echo("Error: No physical port found for logical port '{}'".format(port_name))
sys.exit(EXIT_FAIL)

return physical_port


def print_all_valid_port_values():
click.echo("Valid values for port: {}\n".format(str(platform_sfputil.logical)))
Expand Down Expand Up @@ -805,6 +839,37 @@ def lpmode(port):

click.echo(tabulate(output_table, table_header, tablefmt='simple'))

def show_firmware_version(physical_port):
try:
sfp = platform_chassis.get_sfp(physical_port)
api = sfp.get_xcvr_api()
out = api.get_module_fw_info()
click.echo(out['info'])
except NotImplementedError:
click.echo("This functionality is currently not implemented for this platform")
sys.exit(ERROR_NOT_IMPLEMENTED)

# 'fwversion' subcommand
@show.command()
@click.argument('port_name', metavar='<port_name>', required=True)
def fwversion(port_name):
"""Show firmware version of the transceiver"""

physical_port = logical_port_to_physical_port_index(port_name)
sfp = platform_chassis.get_sfp(physical_port)

try:
presence = sfp.get_presence()
except NotImplementedError:
click.echo("sfp get_presence() NOT implemented!")
sys.exit(EXIT_FAIL)

if not presence:
click.echo("{}: SFP EEPROM not detected\n".format(port_name))
sys.exit(EXIT_FAIL)

show_firmware_version(physical_port)
sys.exit(EXIT_SUCCESS)

# 'lpmode' subgroup
@cli.group()
Expand Down Expand Up @@ -903,6 +968,265 @@ def reset(port_name):

i += 1

# 'firmware' subgroup
@cli.group()
def firmware():
"""Download/Upgrade firmware on the transceiver"""
pass

def run_firmware(port_name, mode):
prgeor marked this conversation as resolved.
Show resolved Hide resolved
prgeor marked this conversation as resolved.
Show resolved Hide resolved
"""
Make the inactive firmware as the current running firmware
@port_name:
@mode: 0, 1, 2, 3 different modes to run the firmware
Returns 1 on success, and exit_code = -1 on failure
"""
status = 0
physical_port = logical_port_to_physical_port_index(port_name)
sfp = platform_chassis.get_sfp(physical_port)

try:
api = sfp.get_xcvr_api()
except NotImplementedError:
click.echo("This functionality is currently not implemented for this platform")
sys.exit(ERROR_NOT_IMPLEMENTED)

if mode == 0:
click.echo("Running firmware: Non-hitless Reset to Inactive Image")
elif mode == 1:
click.echo("Running firmware: Hitless Reset to Inactive Image")
elif mode == 2:
click.echo("Running firmware: Attempt non-hitless Reset to Running Image")
elif mode == 3:
click.echo("Running firmware: Attempt Hitless Reset to Running Image")
else:
click.echo("Running firmware: Unknown mode {}".format(mode))
sys.exit(EXIT_FAIL)

try:
status = api.cdb_run_firmware(mode)
except NotImplementedError:
prgeor marked this conversation as resolved.
Show resolved Hide resolved
click.echo("This functionality is not applicable for this transceiver")
sys.exit(EXIT_FAIL)

return status

def commit_firmware(port_name):
status = 0
physical_port = logical_port_to_physical_port_index(port_name)
sfp = platform_chassis.get_sfp(physical_port)

try:
api = sfp.get_xcvr_api()
except NotImplementedError:
click.echo("This functionality is currently not implemented for this platform")
sys.exit(ERROR_NOT_IMPLEMENTED)

try:
status = api.cdb_commit_firmware()
except NotImplementedError:
click.echo("This functionality is not applicable for this transceiver")

return status

def download_firmware(port_name, filepath):
"""Download firmware on the transceiver"""
try:
fd = open(filepath, 'rb')
fd.seek(0, 2)
file_size = fd.tell()
fd.seek(0, 0)
except FileNotFoundError:
click.echo("Firmware file {} NOT found".format(filepath))
sys.exit(EXIT_FAIL)

physical_port = logical_port_to_physical_port_index(port_name)
sfp = platform_chassis.get_sfp(physical_port)
try:
api = sfp.get_xcvr_api()
except NotImplementedError:
click.echo("This functionality is NOT applicable to this platform")
sys.exit(ERROR_NOT_IMPLEMENTED)

try:
fwinfo = api.get_module_fw_mgmt_feature()
if fwinfo['status'] == True:
startLPLsize, maxblocksize, lplonly_flag, autopaging_flag, writelength = fwinfo['feature']
else:
click.echo("Failed to fetch CDB Firmware management features")
sys.exit(EXIT_FAIL)
except NotImplementedError:
click.echo("This functionality is NOT applicable for this transceiver")
sys.exit(ERROR_NOT_IMPLEMENTED)

click.echo('CDB: Starting firmware download')
startdata = fd.read(startLPLsize)
status = api.cdb_start_firmware_download(startLPLsize, startdata, file_size)
if status != 1:
click.echo('CDB: Start firmware download failed - status {}'.format(status))
sys.exit(EXIT_FAIL)

# Increase the optoe driver's write max to speed up firmware download
prgeor marked this conversation as resolved.
Show resolved Hide resolved
sfp.set_optoe_write_max(SMBUS_BLOCK_WRITE_SIZE)

with click.progressbar(length=file_size, label="Downloading ...") as bar:
address = 0
BLOCK_SIZE = MAX_LPL_FIRMWARE_BLOCK_SIZE if lplonly_flag else maxblocksize
remaining = file_size - startLPLsize
while remaining > 0:
count = BLOCK_SIZE if remaining >= BLOCK_SIZE else remaining
data = fd.read(count)
if len(data) != count:
click.echo("Firmware file read failed!")
sys.exit(EXIT_FAIL)

if lplonly_flag:
status = api.cdb_lpl_block_write(address, data)
else:
status = api.cdb_epl_block_write(address, data, autopaging_flag, writelength)
if (status != 1):
click.echo("CDB: firmware download failed! - status {}".format(status))
sys.exit(EXIT_FAIL)

bar.update(count)
address += count
Copy link
Contributor

@qiluo-msft qiluo-msft Nov 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

count

If read less than count, you need to increase or decrease the actually read bytes. #Closed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we make sure count matches len of the data read

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that does not match, you trigger assert. However this is a runtime error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now, exiting

remaining -= count

# Restore the optoe driver's write max to '1' (default value)
sfp.set_optoe_write_max(1)

status = api.cdb_firmware_download_complete()
click.echo('CDB: firmware download complete')
return status

# 'run' subcommand
@firmware.command()
@click.argument('port_name', required=True, default=None)
@click.option('--mode', default="1", type=click.Choice(["0", "1", "2", "3"]), show_default=True,
help="0 = Non-hitless Reset to Inactive Image\n \
1 = Hitless Reset to Inactive Image (Default)\n \
2 = Attempt non-hitless Reset to Running Image\n \
3 = Attempt Hitless Reset to Running Image\n")
def run(port_name, mode):
"""Run the firmware with default mode=1"""

if not is_sfp_present(port_name):
click.echo("{}: SFP EEPROM not detected\n".format(port_name))
sys.exit(EXIT_FAIL)

status = run_firmware(port_name, int(mode))
if status != 1:
click.echo('Failed to run firmware in mode={}! CDB status: {}'.format(mode, status))
sys.exit(EXIT_FAIL)

click.echo("Firmware run in mode={} success".format(mode))

# 'commit' subcommand
@firmware.command()
@click.argument('port_name', required=True, default=None)
def commit(port_name):
"""Commit the running firmware"""

if not is_sfp_present(port_name):
click.echo("{}: SFP EEPROM not detected\n".format(port_name))
sys.exit(EXIT_FAIL)

status = commit_firmware(port_name)
if status != 1:
click.echo('Failed to commit firmware! CDB status: {}'.format(status))
sys.exit(EXIT_FAIL)

click.echo("Firmware commit successful")

# 'upgrade' subcommand
@firmware.command()
@click.argument('port_name', required=True, default=None)
@click.argument('filepath', required=True, default=None)
def upgrade(port_name, filepath):
prgeor marked this conversation as resolved.
Show resolved Hide resolved
"""Upgrade firmware on the transceiver"""

physical_port = logical_port_to_physical_port_index(port_name)

if not is_sfp_present(port_name):
click.echo("{}: SFP EEPROM not detected\n".format(port_name))
sys.exit(EXIT_FAIL)

show_firmware_version(physical_port)

status = download_firmware(port_name, filepath)
if status == 1:
click.echo("Firmware download complete success")
else:
click.echo("Firmware download complete failed! CDB status = {}".format(status))
prgeor marked this conversation as resolved.
Show resolved Hide resolved
sys.exit(EXIT_FAIL)

status = run_firmware(port_name, 1)
if status != 1:
click.echo('Failed to run firmware in mode=1 ! CDB status: {}'.format(status))
sys.exit(EXIT_FAIL)

click.echo("Firmware run in mode 1 successful")

status = commit_firmware(port_name)
if status != 1:
click.echo('Failed to commit firmware! CDB status: {}'.format(status))
sys.exit(EXIT_FAIL)

click.echo("Firmware commit successful")

# 'download' subcommand
@firmware.command()
@click.argument('port_name', required=True, default=None)
@click.argument('filepath', required=True, default=None)
def download(port_name, filepath):
"""Download firmware on the transceiver"""

if not is_sfp_present(port_name):
click.echo("{}: SFP EEPROM not detected\n".format(port_name))
sys.exit(EXIT_FAIL)

start = time.time()
status = download_firmware(port_name, filepath)
if status == 1:
click.echo("Firmware download complete success")
else:
click.echo("Firmware download complete failed! status = {}".format(status))
sys.exit(EXIT_FAIL)
end = time.time()
click.echo("Total download Time: {}".format(str(datetime.timedelta(seconds=end-start))))


# 'unlock' subcommand
@firmware.command()
@click.argument('port_name', required=True, default=None)
@click.option('--password', type=click.INT, help="Password in integer\n")
def unlock(port_name, password):
"""Unlock the firmware download feature via CDB host password"""
physical_port = logical_port_to_physical_port_index(port_name)
sfp = platform_chassis.get_sfp(physical_port)

if not is_sfp_present(port_name):
click.echo("{}: SFP EEPROM not detected\n".format(port_name))
sys.exit(EXIT_FAIL)

try:
api = sfp.get_xcvr_api()
except NotImplementedError:
click.echo("This functionality is currently not implemented for this platform")
sys.exit(ERROR_NOT_IMPLEMENTED)

if password is None:
password = CDB_DEFAULT_HOST_PASSWORD
try:
status = api.cdb_enter_host_password(int(password))
except NotImplementedError:
click.echo("This functionality is not applicable for this transceiver")
sys.exit(EXIT_FAIL)

if status == 1:
click.echo("CDB: Host password accepted")
else:
click.echo("CDB: Host password NOT accepted! status = {}".format(status))

# 'version' subcommand
@cli.command()
Expand Down
Loading