Skip to content

Commit

Permalink
Merge pull request #2 from lmnr-ai:inputs
Browse files Browse the repository at this point in the history
inputs
  • Loading branch information
skull8888888 authored Dec 2, 2024
2 parents 132e467 + 1c7cfe1 commit d06b578
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 24 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ jobs:
python-version: '3.10'
- name: Install the project
run: uv sync --all-extras --dev
- name: Run tests
run: pytest
- name: Build package
run: uv build
- name: Publish package
Expand Down
36 changes: 31 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ A lightweight task engine for building AI agents that prioritizes simplicity and

## Core Concept

Unlike traditional node-based workflows, Laminar Flow uses a dynamic task queue system built on three simple principles:
Unlike traditional node and edge-based workflows, Flow uses a dynamic task queue system built on three simple principles:

1. **Concurrent Execution** - Tasks run in parallel automatically
2. **Dynamic Scheduling** - Tasks can schedule new tasks at runtime
Expand All @@ -18,12 +18,16 @@ This task-based architecture makes complex workflows surprisingly simple:
- [x] Self-modifying dynamic workflows and cycles
- [x] Conditional branching and control flow
- [x] Streaming of tasks execution
- [x] Automatic state management and persistence
- [x] State management, load previous state and save current state
- [x] Start execution from a specific task
- [x] Dynamically push next tasks with specific inputs

Flow is extremely lightweight, clearly written and has not external dependencies for the engine. It is designed and maintained by [Laminar](https://github.com/lmnr-ai) team.
By removing the need to predefine edges between nodes, and opting for a dynamic task scheduling architecture, Flow makes it easy to build complex, dynamic workflows as well as simple linear ones. Flow actually helps you write better and cleaner code by making it easier to reason about control flow and dependencies.

Flow is lightweight, bloat-free, and has no external dependencies for the engine. It is designed to be simple, flexible and very powerful, and is maintained by the [Laminar](https://github.com/lmnr-ai/lmnr) team.

## Auto-instrumentation
Flow comes with auto-instrumentation for tracing using [Laminar](https://github.com/lmnr-ai/lmnr). To enable tracing, initialize the Laminar SDK with tracing enabled before using Flow.
Flow comes with auto-instrumentation for tracing using [Laminar](https://github.com/lmnr-ai/lmnr). To enable OpenTelemetry-based tracing, initialize the Laminar SDK before using Flow.

```python
from lmnr import Laminar
Expand All @@ -43,7 +47,7 @@ pip install lmnr-flow
### Basic Usage
```python
from concurrent.futures import ThreadPoolExecutor
from lmnr_flow import Flow, TaskOutput
from lmnr_flow import Flow, TaskOutput, Context

# thread pool executor is optional, defaults to 4 workers
flow = Flow(thread_pool_executor=ThreadPoolExecutor(max_workers=4))
Expand Down Expand Up @@ -99,6 +103,7 @@ def streaming_task(context: Context) -> TaskOutput:
# Stream intermediate results
stream = context.get_stream()
for i in range(3):
# you push tuple of (task_id, chunk)
stream.put(("streaming_task", f"interim_{i}"))
return TaskOutput(output="final", next=None)

Expand Down Expand Up @@ -142,6 +147,22 @@ result = flow.run("greet", inputs={"user_name": "Alice"})
# Returns {"greet": "Hello Alice!"}
```

### Push next task with inputs
```python
def task1(ctx):
return TaskOutput("result1", ["task2"], next_inputs=[{"input1": "value1"}])

# task2 will be called with inputs={"input1": "value1"}
def task2(ctx, inputs):
assert inputs == {"input1": "value1"}
return TaskOutput("result2")

flow.add_task("task1", task1)
flow.add_task("task2", task2)
result = flow.run("task1")
# Returns {"task2": "result2"}
```

### Dynamic Routing
```python
def router(context: Context) -> TaskOutput:
Expand Down Expand Up @@ -174,6 +195,11 @@ flow.run("task2")

assert flow.context.get("task1") == "result1" # True, because it was set in the context
assert flow.context.get("task2") == "result2"


# Serialize the context to a dictionary
flow.get_context().to_dict()
# Returns {"task1": "result1", "task2": "result2"}
```

## Advanced Features
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "lmnr-flow"
version = "0.1.0"
version = "0.1.1"
description = "Lightweight task engine for building AI agents"
readme = "README.md"
requires-python = ">=3.10"
Expand Down
48 changes: 31 additions & 17 deletions src/lmnr_flow/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import traceback
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from inspect import signature
from queue import Queue
from threading import Lock
from typing import Any, Callable, Dict, List, Optional, Union
from typing import Any, Callable, Dict, List, Optional

from lmnr import Laminar, observe

Expand All @@ -18,15 +19,15 @@
@dataclass
class TaskOutput:
output: Any
next: Union[List[str], None]
next: Optional[List[str]] = None
next_inputs: Optional[List[Dict[str, Any]]] = None


@dataclass
class Task:
id: str
action: Callable[[Context], TaskOutput]


class Flow:
def __init__(
self,
Expand All @@ -50,13 +51,18 @@ def add_task(self, name: str, action: Callable[[Context], TaskOutput]):
self.logger.info(f"Added task '{name}'")

def execute_task(
self, task: Task, task_queue: Queue, stream_queue: Optional[Queue] = None
self, task: Task, inputs: Optional[Dict[str, Any]], task_queue: Queue, stream_queue: Optional[Queue] = None
):
self.logger.info(f"Starting execution of task '{task.id}'")

try:
with Laminar.start_as_current_span(task.id, input=self.context.to_dict()):
result: TaskOutput = task.action(self.context)
with Laminar.start_as_current_span(task.id, input={"context": self.context.to_dict(), "inputs": inputs}):
# Check if action accepts inputs parameter
sig = signature(task.action)
if "inputs" in sig.parameters:
result: TaskOutput = task.action(self.context, inputs=inputs)
else:
result: TaskOutput = task.action(self.context)
Laminar.set_span_output(result)

# Set state to the output of the task
Expand All @@ -71,17 +77,20 @@ def execute_task(
self.logger.info(f"Task '{task.id}' completed as output node")
with self.output_ids_lock:
self.output_task_ids.add(task.id)
task_queue.put(__OUTPUT__)
task_queue.put((__OUTPUT__, None))
else:
self.logger.debug(
f"Task '{task.id}' scheduling next tasks: {result.next}"
)

with self.active_tasks_lock:
for next_task_id in result.next:
for i, next_task_id in enumerate(result.next):
if next_task_id in self.tasks:
if next_task_id not in self.active_tasks:
task_queue.put(next_task_id)
if result.next_inputs and i < len(result.next_inputs):
task_queue.put((next_task_id, result.next_inputs[i]))
else:
task_queue.put((next_task_id, None))
else:
raise Exception(f"Task {next_task_id} not found")

Expand All @@ -92,7 +101,7 @@ def execute_task(
with self.active_tasks_lock:
self.active_tasks.clear()

task_queue.put(__ERROR__)
task_queue.put((__ERROR__, None))

raise e

Expand All @@ -110,7 +119,7 @@ def run(
task_queue = Queue()
futures = set()

task_queue.put(start_task_id)
task_queue.put((start_task_id, inputs))

if inputs:
for key, value in inputs.items():
Expand All @@ -119,7 +128,7 @@ def run(
# Main execution loop
while True:
# block until there is a task to spawn
task_id = task_queue.get()
task_id, inputs = task_queue.get()

if task_id == __ERROR__:
# Cancel all pending futures on error
Expand All @@ -139,26 +148,31 @@ def run(
self.active_tasks.add(task_id)

task = self.tasks[task_id]
future = self._executor.submit(self.execute_task, task, task_queue)
future = self._executor.submit(self.execute_task, task, inputs, task_queue)
futures.add(future)

# Return values of the output nodes
# task_id -> value of the task
return {task_id: self.context.get(task_id) for task_id in self.output_task_ids}

@observe(name="flow.stream")
def stream(self, start_task_id: str):
def stream(self, start_task_id: str, inputs: Optional[Dict[str, Any]] = None):
print("stream")
task_queue = Queue()
stream_queue = Queue()
futures = set()

task_queue.put((start_task_id, inputs))

if inputs:
for key, value in inputs.items():
self.context.set(key, value)

self.context.set_stream(stream_queue)

def run_engine():
task_queue.put(start_task_id)
while True:
task_id = task_queue.get()
task_id, inputs = task_queue.get()

if task_id == __ERROR__:
for f in futures:
Expand All @@ -179,7 +193,7 @@ def run_engine():
self.active_tasks.add(task_id)

future = self._executor.submit(
self.execute_task, task, task_queue, stream_queue
self.execute_task, task, inputs, task_queue, stream_queue
)
futures.add(future)

Expand Down
63 changes: 62 additions & 1 deletion tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,27 @@ def task2(ctx):
assert ("task2", "result2") in results


def test_streaming_with_inputs(flow):
def task1(ctx, inputs):
assert inputs == {"input1": "value1"}
return TaskOutput("result1", ["task2"], next_inputs=[{"input1": "value1"}])

def task2(ctx, inputs):
assert inputs == {"input1": "value1"}
return TaskOutput("result2")

flow.add_task("task1", task1)
flow.add_task("task2", task2)

results = []
for task_id, output in flow.stream("task1", inputs={"input1": "value1"}):
results.append((task_id, output))

assert len(results) == 2
assert ("task1", "result1") in results
assert ("task2", "result2") in results


def test_streaming_within_task(flow):
def task1(ctx):
for i in range(3):
Expand Down Expand Up @@ -291,7 +312,7 @@ def task2(ctx):
return TaskOutput("result2", ["task1"])

def task3(ctx):
return TaskOutput("final", None)
return TaskOutput("final")

flow.add_task("task1", task1)
flow.add_task("task2", task2)
Expand All @@ -307,3 +328,43 @@ def test_state_loading(flow_with_state):

assert flow_with_state.context.get("task1") == "result1"
assert flow_with_state.context.get("task2") == "result2"

def test_inputs_to_next_tasks(flow):
# Test that inputs are passed to next tasks
def task1(ctx):
return TaskOutput("result1", ["task2"], next_inputs=[{"input1": "value1"}])

def task2(ctx, inputs):
assert inputs["input1"] == "value1"
return TaskOutput("result2")

flow.add_task("task1", task1)
flow.add_task("task2", task2)

result = flow.run("task1")
assert result == {"task2": "result2"}

def test_inputs_to_next_tasks_with_no_inputs(flow):
# Test that inputs are passed to next tasks
def task1(ctx):
return TaskOutput("result1", ["task2"], next_inputs=None)

def task2(ctx, inputs):
assert inputs is None
return TaskOutput("result2")

flow.add_task("task1", task1)
flow.add_task("task2", task2)

result = flow.run("task1")
assert result == {"task2": "result2"}

def test_inputs_to_first_task(flow):
# Test that inputs are passed to the first task
def task1(ctx, inputs):
assert inputs == {"input1": "value1"}
return TaskOutput("result1")

flow.add_task("task1", task1)
result = flow.run("task1", inputs={"input1": "value1"})
assert result == {"task1": "result1"}

0 comments on commit d06b578

Please sign in to comment.