The Hapax Graph API provides a flexible and intuitive way to build complex data processing pipelines. Inspired by frameworks like JAX and Flyte, it offers a fluent interface for composing operations and defining workflows, with built-in type checking at import time.
Operations are the basic building blocks of a Hapax graph. Each operation is a pure function that takes an input and produces an output. Operations are type-checked at both import time and runtime:
@ops(name="summarize", tags=["llm"])
def summarize(text: str) -> str:
"""Generate a concise summary using LLM."""
# Implementation
The @ops
decorator performs initial type validation at import time, ensuring the function has proper type hints. Further type checking occurs at runtime when the operation is executed.
Hapax performs comprehensive type checking at multiple stages:
-
Import Time (Static):
- Validates presence of type hints through the
@ops
decorator - Checks input parameter types exist
- Verifies return type annotations exist
- Stores validated type information for later use
- Validates presence of type hints through the
-
Graph Definition Time:
- Type compatibility between connected operations
- Structural validation (cycles, missing connections)
- Configuration and metadata validation
- Immediate type checking when using operation composition (
>>
)
-
Runtime (Dynamic):
- Input type validation before operation execution
- Output type validation after operation execution
- Complete graph validation during execution
- Type checking of operation results
- Resource availability checks
- Configuration validation
This multi-stage type checking ensures type safety throughout the entire lifecycle of your data processing pipeline:
- Early detection of type-related issues during development (import time)
- Immediate feedback when building graphs (definition time)
- Runtime safety guarantees during execution
For example, the following code would fail at different stages:
# Fails at import time - missing type hints
@ops(name="bad_op")
def no_type_hints(x):
return x + 1
# Fails at graph definition time - type mismatch
graph = (
Graph("type_mismatch")
.then(str_op) # str -> str
.then(int_op) # int -> int # Type error!
)
# Fails at runtime - actual input type doesn't match declaration
@ops(name="runtime_check")
def expect_string(text: str) -> str:
return text.upper()
result = expect_string(123) # Runtime type error
A Graph is a collection of operations connected in a specific way. The Graph class provides a fluent API for building these connections, with comprehensive type checking at definition time:
# Type compatibility is checked when the graph is defined
graph = (
Graph("name", "description")
.then(op1) # Type compatibility checked immediately
.then(op2) # Type compatibility checked immediately
)
Chain operations one after another with automatic type checking:
graph = (
Graph("text_processing")
.then(clean_text) # Returns str
.then(tokenize) # Expects str, returns List[str]
.then(analyze) # Expects List[str]
)
Execute multiple operations in parallel with type-safe result collection:
graph = (
Graph("parallel_processing")
.branch(
sentiment_analysis, # Branch 1: str -> float
entity_extraction, # Branch 2: str -> List[str]
topic_modeling # Branch 3: str -> Dict[str, float]
)
.merge(combine_results) # List[Union[float, List[str], Dict[str, float]]] -> Result
)
Add type-safe branching logic:
graph = (
Graph("language_processing")
.then(detect_language) # str -> str
.condition(
lambda lang: lang != "en",
translate, # str -> str
lambda x: x # str -> str (identity)
)
)
Repeat an operation with type-safe condition checking:
graph = (
Graph("retry_logic")
.loop(
api_call, # Request -> Response
condition=lambda response: response.status == "success",
max_iterations=3
)
)
Hapax provides clear error messages when validation fails:
- Type Mismatch:
TypeError: Cannot compose operations: output type List[str] does not match input type Dict[str, Any]
- Structural Issues:
GraphValidationError: Graph contains cycles: [['op1', 'op2', 'op1']]
- Configuration Issues:
ValueError: Missing required configuration: operation 'api_call' requires API endpoint
Here's a real-world example that showcases the power and flexibility of the Graph API:
def create_nlp_pipeline() -> Graph[str, Dict[str, Any]]:
return (
Graph("nlp_pipeline", "Advanced NLP processing pipeline")
# First detect language and translate if needed
.then(detect_language)
.condition(
lambda lang: lang != "en",
translate,
lambda x: x
)
# Then process in parallel
.branch(
summarize, # Branch 1: Summarization
sentiment_analysis, # Branch 2: Sentiment
extract_entities, # Branch 3: Entity extraction
extract_keywords # Branch 4: Keyword extraction
)
# Merge results
.merge(combine_results)
)
This pipeline:
- Detects the language of input text
- Translates to English if needed
- Processes the text in parallel:
- Generates a summary
- Analyzes sentiment and emotions
- Extracts named entities
- Identifies key topics
- Combines all results into a structured output
The Graph API includes built-in type checking to ensure type safety across operations:
def tokenize(text: str) -> List[str]: ...
def analyze(tokens: List[str]) -> Dict[str, float]: ...
# Types are checked at runtime
graph = Graph("example").then(tokenize).then(analyze)
Hapax integrates with OpenLit for monitoring and observability:
# Configure globally
set_openlit_config({
"trace_content": True,
"disable_metrics": False
})
# Operations are automatically monitored
@ops(name="process", tags=["processing"])
def process(data: str) -> str:
# OpenLit will trace this operation
return data.upper()
- Modularity: Keep operations small and focused on a single task
- Type Hints: Always use type hints to catch type errors early
- Documentation: Add clear docstrings to operations
- Error Handling: Use appropriate error handling in operations
- Monitoring: Configure OpenLit for production monitoring
Operations can be composed using the >>
operator:
pipeline = tokenize >> normalize >> analyze
The Graph API automatically infers input and output types from function signatures:
@ops(name="process")
def process(text: str) -> List[str]:
# Input and output types are automatically extracted
return text.split()
Flow operators provide detailed error information:
try:
result = pipeline(text)
except BranchError as e:
print(f"Branch errors: {e.branch_errors}")
print(f"Partial results: {e.partial_results}")