Skip to content

Lightweight Python library for building in-memory data pipelines, providing a fluent API to layer transformations, manage context, and handle errors with elegant, chainable syntax.

License

Notifications You must be signed in to change notification settings

ringoldsdev/laygo-python

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

38 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Laygo - simple pipelines, serious scale

Lightweight Python library for building resilient data pipelines with a fluent API, designed to scale effortlessly from a single script to hundreds of cores and thousands of distributed serverless functions.

License: MIT Python 3.12+ Built with UV Code style: Ruff


🎯 Overview

Laygo is the lightweight Python library for data pipelines that I wish existed when I first started. It's designed from the ground up to make data engineering simpler, cleaner, and more intuitive, letting you build resilient, in-memory data workflows with an elegant, fluent API.

It's built to grow with you. Scale seamlessly from a single local script to thousands of concurrent serverless functions with minimal operational overhead.

Key Features:

  • Fluent & Readable: Craft complex data transformations with a clean, chainable method syntax that's easy to write and maintain.

  • Performance Optimized: Process data at maximum speed using chunked processing, lazy evaluation, and list comprehensions.

  • Memory Efficient: Built-in streaming and lazy iterators allow you to handle datasets far larger than available memory.

  • Effortless Parallelism: Accelerate CPU-intensive tasks seamlessly.

  • Distributed by Design: Your pipeline script is both the manager and the worker. When deployed as a serverless function or a container, this design allows you to scale out massively by simply running more instances of the same code. Your logic scales the same way on a thousand cores as it does on one.

  • Powerful Context Management: Share state and configuration across your entire pipeline for advanced, stateful processing.

  • Resilient Error Handling: Isolate and manage errors at the chunk level, preventing a single bad record from failing your entire job.

  • Modern & Type-Safe: Leverage full support for modern Python with generic type hints for robust, maintainable code.


πŸ“¦ Installation

pip install laygo

Or for development:

git clone https://github.com/ringoldsdev/laygo-python.git
cd laygo-python
pip install -e ".[dev]"

🐳 Dev Container Setup

If you're using this project in a dev container, you'll need to configure Git to use HTTPS instead of SSH for authentication:

# Switch to HTTPS remote URL
git remote set-url origin https://github.com/ringoldsdev/laygo-python.git

# Configure Git to use HTTPS for all GitHub operations
git config --global url."https://github.com/".insteadOf "git@github.com:"

▢️ Usage

Basic Pipeline Operations

from laygo import Pipeline

# Simple data transformation
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
result = (
    Pipeline(data)
    .transform(lambda t: t.filter(lambda x: x % 2 == 0))  # Keep even numbers
    .transform(lambda t: t.map(lambda x: x * 2))          # Double them
    .to_list()
)
print(result)  # [4, 8, 12, 16, 20]

Context-Aware Operations

from laygo import Pipeline
from laygo import PipelineContext

# Create context with shared state
context: PipelineContext = {"multiplier": 3, "threshold": 10}

result = (
    Pipeline([1, 2, 3, 4, 5])
    .context(context)
    .transform(lambda t: t.map(lambda x, ctx: x * ctx["multiplier"]))
    .transform(lambda t: t.filter(lambda x, ctx: x > ctx["threshold"]))
    .to_list()
)
print(result)  # [12, 15]

ETL Pipeline Example

from laygo import Pipeline

# Sample employee data processing
employees = [
    {"name": "Alice", "age": 25, "salary": 50000},
    {"name": "Bob", "age": 30, "salary": 60000},
    {"name": "Charlie", "age": 35, "salary": 70000},
    {"name": "David", "age": 28, "salary": 55000},
]

# Extract, Transform, Load pattern
high_earners = (
    Pipeline(employees)
    .transform(lambda t: t.filter(lambda emp: emp["age"] > 28))           # Extract
    .transform(lambda t: t.map(lambda emp: {                             # Transform
        "name": emp["name"],
        "annual_salary": emp["salary"],
        "monthly_salary": emp["salary"] / 12
    }))
    .transform(lambda t: t.filter(lambda emp: emp["annual_salary"] > 55000)) # Filter
    .to_list()
)

Using Transformers Directly

from laygo import Transformer

# Create a reusable transformation pipeline
transformer = (
    Transformer.init(int)
    .filter(lambda x: x % 2 == 0)   # Keep even numbers
    .map(lambda x: x * 2)           # Double them
    .filter(lambda x: x > 5)        # Keep > 5
)

# Apply to different datasets
result1 = list(transformer([1, 2, 3, 4, 5]))  # [4, 8]
result2 = list(transformer(range(10)))          # [4, 8, 12, 16, 20]

Custom Transformer Composition

from laygo import Pipeline
from laygo import Transformer

# Create reusable transformation components
validate_data = Transformer.init(dict).filter(lambda x: x.get("id") is not None)
normalize_text = Transformer.init(dict).map(lambda x: {**x, "name": x["name"].strip().title()})

# Use transformers directly with Pipeline.transform()
result = (
    Pipeline(raw_data)
    .transform(validate_data)      # Pass transformer directly
    .transform(normalize_text)     # Pass transformer directly
    .to_list()
)

Parallel Processing

from laygo import Pipeline
from laygo import ParallelTransformer

# Process large datasets with multiple threads
large_data = range(100_000)

# Create parallel transformer
parallel_processor = (
  ParallelTransformer.init(
    int,
    max_workers=4,
    ordered=True,    # Maintain result order
    chunk_size=10000 # Process in chunks
  ).map(lambda x: x ** 2)
)

results = (
    Pipeline(large_data)
    .transform(parallel_processor)
    .transform(lambda t: t.filter(lambda x: x > 100))
    .first(1000)  # Get first 1000 results
)

Error Handling and Recovery

from laygo import Pipeline
from laygo import Transformer

def risky_operation(x):
    if x == 5:
        raise ValueError("Cannot process 5")
    return x * 2

def error_handler(chunk, error, context):
    print(f"Error in chunk {chunk}: {error}")
    return [0] * len(chunk)  # Return default values

# Pipeline with error recovery
result = (
    Pipeline([1, 2, 3, 4, 5, 6])
    .transform(lambda t: t.map(risky_operation).catch(
        lambda sub_t: sub_t.map(lambda x: x + 1),
        on_error=error_handler
    ))
    .to_list()
)

βš™οΈ Projects using Laygo

  • Efemel - A CLI tool that processes Python files as configuration markup and exports them to JSON/YAML, replacing traditional templating DSLs with native Python syntax.

πŸ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.


πŸš€ Built With


⭐ Star this repository if Laygo helps your data processing workflows!

About

Lightweight Python library for building in-memory data pipelines, providing a fluent API to layer transformations, manage context, and handle errors with elegant, chainable syntax.

Resources

License

Stars

Watchers

Forks

Packages

No packages published