Skip to content

Commit

Permalink
feat: Add functions to detect cycles in directed graphs.
Browse files Browse the repository at this point in the history
  • Loading branch information
ogabrielluiz committed Aug 14, 2024
1 parent 17337b4 commit 2ff3641
Showing 1 changed file with 124 additions and 0 deletions.
124 changes: 124 additions & 0 deletions src/backend/base/langflow/graph/graph/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,3 +282,127 @@ def sort_up_to_vertex(graph: Dict[str, Dict[str, List[str]]], vertex_id: str, is
excluded.add(succ_id)

return list(visited)


def has_cycle(vertex_ids: list[str], edges: list[tuple[str, str]]) -> bool:
"""
Determines whether a directed graph represented by a list of vertices and edges contains a cycle.
Args:
vertex_ids (list[str]): A list of vertex IDs.
edges (list[tuple[str, str]]): A list of tuples representing directed edges between vertices.
Returns:
bool: True if the graph contains a cycle, False otherwise.
"""
# Build the graph as an adjacency list
graph = defaultdict(list)
for u, v in edges:
graph[u].append(v)

# Utility function to perform DFS
def dfs(v, visited, rec_stack):
visited.add(v)
rec_stack.add(v)

for neighbor in graph[v]:
if neighbor not in visited:
if dfs(neighbor, visited, rec_stack):
return True
elif neighbor in rec_stack:
return True

rec_stack.remove(v)
return False

visited: set[str] = set()
rec_stack: set[str] = set()

for vertex in vertex_ids:
if vertex not in visited:
if dfs(vertex, visited, rec_stack):
return True

return False


def find_cycle_edge(entry_point: str, edges: list[tuple[str, str]]) -> tuple[str, str]:
"""
Find the edge that causes a cycle in a directed graph starting from a given entry point.
Args:
entry_point (str): The vertex ID from which to start the search.
edges (list[tuple[str, str]]): A list of tuples representing directed edges between vertices.
Returns:
tuple[str, str]: A tuple representing the edge that causes a cycle, or None if no cycle is found.
"""
# Build the graph as an adjacency list
graph = defaultdict(list)
for u, v in edges:
graph[u].append(v)

# Utility function to perform DFS
def dfs(v, visited, rec_stack):
visited.add(v)
rec_stack.add(v)

for neighbor in graph[v]:
if neighbor not in visited:
result = dfs(neighbor, visited, rec_stack)
if result:
return result
elif neighbor in rec_stack:
return (v, neighbor) # This edge causes the cycle

rec_stack.remove(v)
return None

visited: set[str] = set()
rec_stack: set[str] = set()

return dfs(entry_point, visited, rec_stack)


def find_all_cycle_edges(entry_point: str, edges: list[tuple[str, str]]) -> list[tuple[str, str]]:
"""
Find all edges that cause cycles in a directed graph starting from a given entry point.
Args:
entry_point (str): The vertex ID from which to start the search.
edges (list[tuple[str, str]]): A list of tuples representing directed edges between vertices.
Returns:
list[tuple[str, str]]: A list of tuples representing edges that cause cycles.
"""
# Build the graph as an adjacency list
graph = defaultdict(list)
for u, v in edges:
graph[u].append(v)

# Utility function to perform DFS
def dfs(v, visited, rec_stack):
visited.add(v)
rec_stack.add(v)

cycle_edges = []

for neighbor in graph[v]:
if neighbor not in visited:
cycle_edges += dfs(neighbor, visited, rec_stack)
elif neighbor in rec_stack:
cycle_edges.append((v, neighbor)) # This edge causes a cycle

rec_stack.remove(v)
return cycle_edges

visited: set[str] = set()
rec_stack: set[str] = set()

return dfs(entry_point, visited, rec_stack)


def should_continue(yielded_counts: dict[str, int], max_iterations: int | None) -> bool:
if max_iterations is None:
return True
return max(yielded_counts.values(), default=0) <= max_iterations

0 comments on commit 2ff3641

Please sign in to comment.