Lightweight Python library for building resilient data pipelines with a fluent API, designed to scale effortlessly from a single script to hundreds of cores and thousands of distributed serverless functions.
Laygo is the lightweight Python library for data pipelines that I wish existed when I first started. It's designed from the ground up to make data engineering simpler, cleaner, and more intuitive, letting you build resilient, in-memory data workflows with an elegant, fluent API.
It's built to grow with you. Scale seamlessly from a single local script to thousands of concurrent serverless functions with minimal operational overhead.
Key Features:
-
Fluent & Readable: Craft complex data transformations with a clean, chainable method syntax that's easy to write and maintain.
-
Performance Optimized: Process data at maximum speed using chunked processing, lazy evaluation, and list comprehensions.
-
Memory Efficient: Built-in streaming and lazy iterators allow you to handle datasets far larger than available memory.
-
Effortless Parallelism: Accelerate CPU-intensive tasks seamlessly.
-
Distributed by Design: Your pipeline script is both the manager and the worker. When deployed as a serverless function or a container, this design allows you to scale out massively by simply running more instances of the same code. Your logic scales the same way on a thousand cores as it does on one.
-
Powerful Context Management: Share state and configuration across your entire pipeline for advanced, stateful processing.
-
Resilient Error Handling: Isolate and manage errors at the chunk level, preventing a single bad record from failing your entire job.
-
Modern & Type-Safe: Leverage full support for modern Python with generic type hints for robust, maintainable code.
pip install laygo
Or for development:
git clone https://github.com/ringoldsdev/laygo-python.git
cd laygo-python
pip install -e ".[dev]"
If you're using this project in a dev container, you'll need to configure Git to use HTTPS instead of SSH for authentication:
# Switch to HTTPS remote URL
git remote set-url origin https://github.com/ringoldsdev/laygo-python.git
# Configure Git to use HTTPS for all GitHub operations
git config --global url."https://github.com/".insteadOf "git@github.com:"
from laygo import Pipeline
# Simple data transformation
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
result = (
Pipeline(data)
.transform(lambda t: t.filter(lambda x: x % 2 == 0)) # Keep even numbers
.transform(lambda t: t.map(lambda x: x * 2)) # Double them
.to_list()
)
print(result) # [4, 8, 12, 16, 20]
from laygo import Pipeline
from laygo import PipelineContext
# Create context with shared state
context: PipelineContext = {"multiplier": 3, "threshold": 10}
result = (
Pipeline([1, 2, 3, 4, 5])
.context(context)
.transform(lambda t: t.map(lambda x, ctx: x * ctx["multiplier"]))
.transform(lambda t: t.filter(lambda x, ctx: x > ctx["threshold"]))
.to_list()
)
print(result) # [12, 15]
from laygo import Pipeline
# Sample employee data processing
employees = [
{"name": "Alice", "age": 25, "salary": 50000},
{"name": "Bob", "age": 30, "salary": 60000},
{"name": "Charlie", "age": 35, "salary": 70000},
{"name": "David", "age": 28, "salary": 55000},
]
# Extract, Transform, Load pattern
high_earners = (
Pipeline(employees)
.transform(lambda t: t.filter(lambda emp: emp["age"] > 28)) # Extract
.transform(lambda t: t.map(lambda emp: { # Transform
"name": emp["name"],
"annual_salary": emp["salary"],
"monthly_salary": emp["salary"] / 12
}))
.transform(lambda t: t.filter(lambda emp: emp["annual_salary"] > 55000)) # Filter
.to_list()
)
from laygo import Transformer
# Create a reusable transformation pipeline
transformer = (
Transformer.init(int)
.filter(lambda x: x % 2 == 0) # Keep even numbers
.map(lambda x: x * 2) # Double them
.filter(lambda x: x > 5) # Keep > 5
)
# Apply to different datasets
result1 = list(transformer([1, 2, 3, 4, 5])) # [4, 8]
result2 = list(transformer(range(10))) # [4, 8, 12, 16, 20]
from laygo import Pipeline
from laygo import Transformer
# Create reusable transformation components
validate_data = Transformer.init(dict).filter(lambda x: x.get("id") is not None)
normalize_text = Transformer.init(dict).map(lambda x: {**x, "name": x["name"].strip().title()})
# Use transformers directly with Pipeline.transform()
result = (
Pipeline(raw_data)
.transform(validate_data) # Pass transformer directly
.transform(normalize_text) # Pass transformer directly
.to_list()
)
from laygo import Pipeline
from laygo import ParallelTransformer
# Process large datasets with multiple threads
large_data = range(100_000)
# Create parallel transformer
parallel_processor = (
ParallelTransformer.init(
int,
max_workers=4,
ordered=True, # Maintain result order
chunk_size=10000 # Process in chunks
).map(lambda x: x ** 2)
)
results = (
Pipeline(large_data)
.transform(parallel_processor)
.transform(lambda t: t.filter(lambda x: x > 100))
.first(1000) # Get first 1000 results
)
from laygo import Pipeline
from laygo import Transformer
def risky_operation(x):
if x == 5:
raise ValueError("Cannot process 5")
return x * 2
def error_handler(chunk, error, context):
print(f"Error in chunk {chunk}: {error}")
return [0] * len(chunk) # Return default values
# Pipeline with error recovery
result = (
Pipeline([1, 2, 3, 4, 5, 6])
.transform(lambda t: t.map(risky_operation).catch(
lambda sub_t: sub_t.map(lambda x: x + 1),
on_error=error_handler
))
.to_list()
)
- Efemel - A CLI tool that processes Python files as configuration markup and exports them to JSON/YAML, replacing traditional templating DSLs with native Python syntax.
This project is licensed under the MIT License - see the LICENSE file for details.
- Python 3.12+ - Core language with modern type hints
- Ruff - Code formatting and linting
- Pytest - Testing framework
- DevContainers - Consistent development environment
- GitHub Actions - CI/CD automation
β Star this repository if Laygo helps your data processing workflows!