Skip to content

Commit

Permalink
Merge pull request #5755 from dhsrivas/pci_api
Browse files Browse the repository at this point in the history
Signed-off-by: Jan Richter <jarichte@redhat.com>
  • Loading branch information
richtja authored Sep 6, 2023
2 parents 4bf7543 + 3e57c44 commit cfce417
Showing 1 changed file with 164 additions and 1 deletion.
165 changes: 164 additions & 1 deletion avocado/utils/pci.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import os
import re

from avocado.utils import genio, process
from avocado.utils import genio, process, wait


def get_domains():
Expand Down Expand Up @@ -320,6 +320,169 @@ def get_driver(pci_address):
return line.rsplit(None, 1)[-1]


def unbind(driver, full_pci_address):
"""
Unbind the pci device(full_pci_address) from driver
:param driver: driver of the PCI address (full_pci_address)
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: None
"""
genio.write_file_or_fail(f"/sys/bus/pci/drivers/{driver}/unbind", full_pci_address)
if wait.wait_for(
lambda: os.path.exists(
f"/sys/bus/pci/drivers/\
{driver}/{full_pci_address}"
),
timeout=5,
):
raise ValueError(f"Not able to unbind {full_pci_address} from {driver}")


def bind(driver, full_pci_address):
"""
Bind the pci device(full_pci_address) to driver
:param driver: driver of the PCI address (full_pci_address)
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: None
"""
genio.write_file_or_fail(f"/sys/bus/pci/drivers/{driver}/bind", full_pci_address)
if not wait.wait_for(
lambda: os.path.exists(
f"/sys/bus/pci/drivers/\
{driver}/{full_pci_address}"
),
timeout=5,
):
raise ValueError(f"Not able to bind {full_pci_address} to {driver}")


def get_vendor_id(full_pci_address):
"""
Get vendor id of a PCI address
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: vendor id of PCI address
rtype: str
"""
cmd = f"lspci -n -s {full_pci_address}"
out = process.run(cmd, ignore_status=True, shell=True).stdout_text
if out == "":
raise ValueError(f"Not able to get {full_pci_address} vendor id")
return out.split(" ")[2].strip()


def reset_check(full_pci_address):
"""
Check if reset for "full_pci_address" is successful
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: whether reset for "full_pci_address" is successful
rtype: bool
"""
cmd = f"lspci -vvs {full_pci_address}"
output = process.run(cmd, ignore_status=True, shell=True).stdout_text
if output != "":
return False
return True


def rescan_check(full_pci_address):
"""
Check if rescan for full_pci_address is successful
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: whether rescan for full_pci_address is successful
rtype: bool
"""
cmd = f"lspci -vvs {full_pci_address}"
output = process.run(cmd, ignore_status=True, shell=True).stdout_text
if output == "":
return False
return True


def change_domain_check(dom, full_pci_address, def_dom):
"""
Check if the domain changed successfully to "dom" for "full_pci_address"
:param dom: domain type
:param def_dom: default domain of pci device(full_pci_address)
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: whether domain changed successfully to "dom"
rtype: bool
"""
try:
output = genio.read_one_line(
f"/sys/bus/pci/devices/{full_pci_address}/iommu_group/type"
)
out = output.rsplit(None, 1)[-1]
if (dom not in ("auto", out)) or (dom == "auto" and out != def_dom):
return False
return True
except OSError as details:
raise ValueError(f"Change domain check failed: {details}")


def reset(full_pci_address):
"""
Remove the full_pci_address
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: None
"""
genio.write_file_or_fail(f"/sys/bus/pci/devices/{full_pci_address}/remove", "1")
if not wait.wait_for(lambda: reset_check(full_pci_address), timeout=5):
raise ValueError(f"Unsuccessful to remove {full_pci_address}")


def rescan(full_pci_address):
"""
Rescan the system and check for full_pci_address
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: None
"""
genio.write_file_or_fail("/sys/bus/pci/rescan", "1")
if not wait.wait_for(lambda: rescan_check(full_pci_address), timeout=5):
raise ValueError(f"Unsuccessful to rescan for {full_pci_address}")


def get_iommu_group(full_pci_address):
"""
Return the iommu group of full_pci_address
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: iommu group of full_pci_address
rtype: string
"""
cmd = f"lspci -vvvv -s {full_pci_address}"
out = process.run(cmd, ignore_status=True, shell=True)
for line in out.stdout_text.split("\n"):
if "IOMMU group" in line:
return line.strip().split(" ")[2]
raise ValueError(f"{full_pci_address} group not found")


def change_domain(dom, def_dom, full_pci_address):
"""
Change the domain of pci device(full_pci_address) to dom
:param dom: domain type
:param def_dom: default domain of pci device(full_pci_address)
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: None
"""
genio.write_file_or_fail(
f"/sys/bus/pci/devices/{full_pci_address}/iommu_group/type", dom
)
if not wait.wait_for(
lambda: change_domain_check(dom, full_pci_address, def_dom), timeout=5
):
raise ValueError(f"Domain type change failed for {full_pci_address}")


def get_memory_address(pci_address):
"""
Gets the memory address of a PCI address. (first match only)
Expand Down

0 comments on commit cfce417

Please sign in to comment.