Skip to content

Commit

Permalink
find connected component for PCM
Browse files Browse the repository at this point in the history
Signed-off-by: Yongan Lu <yongan.lu@intel.com>
  • Loading branch information
miRoox committed Sep 1, 2021
1 parent 21759ee commit 6ff8401
Showing 1 changed file with 60 additions and 0 deletions.
60 changes: 60 additions & 0 deletions tools/tplgtool-ng/tplgtool.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
import os
import sys
import typing
from collections import defaultdict
from collections.abc import Callable
from construct import this, Container, ListContainer, Struct, Enum, FlagsEnum, Const, Switch, Array, Bytes, GreedyRange, FocusedSeq, Padded, Padding, PaddedString, Flag, Byte, Int16ul, Int32ul, Int64ul, Terminated
from functools import cached_property

class TplgType(enum.IntEnum):
"File and Block header data types. `SND_SOC_TPLG_TYPE_`"
Expand Down Expand Up @@ -464,6 +467,63 @@ def _node_name_in_graph(self, widget: Container) -> typing.Union[str, None]:
return ename
return None

@cached_property
def _nodes_dict(self) -> "dict[str, Container]":
nodes = {}
for widget in self._widget_list:
nodes[widget["widget"]["name"]] = widget
nodes[widget["widget"]["sname"]] = widget
return nodes

@cached_property
def _graph_edge_list(self) -> "typing.Tuple[dict[Container, list[Container]], dict[Container, list[Container]]]":
r"""
Return edge list for graph: forward edge list, backward edge list
Edge list is a dictionary like:
{ source1: [sink1, sink2, ...], ... }
"""

forward_edge = defaultdict(list)
backward_edge = defaultdict(list)
node_dict = self._nodes_dict
for edge in self._graph_list:
forward_edge[node_dict[edge["source"]]].append(node_dict[edge["sink"]])
backward_edge[node_dict[edge["sink"]]].append(node_dict[edge["source"]])
return forward_edge, backward_edge

def find_comp_for_pcm(self, pcm: Container, prefix: str) -> "list[set[Container]]":
r"""
find specified components for PCM.
return a list:
[0]: specified components connected to playback
[1]: specified components connected to capture
"""
node_dict = self._nodes_dict
prefix = prefix.upper()
return [self.find_connected_comp(node_dict[cap["name"]], lambda node: node["widget"]["name"].startswith(prefix)) for cap in pcm["caps"]]

def find_connected_comp(self, ref_node: Container, predicate: Callable[[Container], bool]) -> "set[Container]":
"find specified components connected to ref_node"

forward_edge, backward_edge = self._graph_edge_list
collected = set()
TplgAdaptor._recursive_search_comp(forward_edge, ref_node, predicate, collected)
TplgAdaptor._recursive_search_comp(backward_edge, ref_node, predicate, collected)
return collected

@staticmethod
def _recursive_search_comp(
edge_list: "dict[Container, list[Container]]",
node: Container,
predicate: Callable[[Container], bool],
collected: "set[Container]"):
if predicate(node):
collected.add(node)
for next_node in edge_list[node]:
TplgAdaptor._recursive_search_comp(edge_list, next_node, predicate, collected)

if __name__ == "__main__":
from pathlib import Path

Expand Down

0 comments on commit 6ff8401

Please sign in to comment.