Crinkle is a framework for organizing the execution of complex processing flows by implementing the “Chain of Responsability” pattern (🔋included).
Merchan:
- Minimalist: Translate complex computational requirements into simple, yet elegant. and less code.
- Grow large: Grow large while keep everything maintainable and fast
- Empower testability: Make things more testable by breaking large processes into small, specialized pieces with composition.
- Opinionated Yet Intuitive: Quickly understand and work with.
💡 Inspired by Apache Commons Chain
A recent and currently supported version of Python.
$ pip install crinkle
For an introduction to the Chain of Responsability Pattern and its use cases, see Chain of Responsibility by Refactoring Guru
from pydantic import BaseModel
from typing import Dict, List
from crinkle import Context, Flow
class Order(BaseModel):
loyalty: bool # just to simplify the example
items: List
discounts: List
context = Context[Order, Dict](
initial_state=Order(),
additional_data={}, # Optional
)
flow = Flow(name='Promotions/Discounts Flow')
@flow.processor(name='Discounts pre-conditions Processor')
def discounts_pre_conditions_processor(context: Context[Order, Dict]) -> bool:
if context.state.loyalty:
return False # Go to next processor
return True # Stop flow without going to next processors
@flow.processor(name='Manufacturer Coupons Processor')
def manufacturer_coupons_processor(context: Context[Order, Dict]) -> bool:
# do stuff with context
return False # Go to next processor
# Async is also supported
@flow.processor(name='Buy one get one Processor')
async def bogo_processor(context: Context[Order, Dict]) -> bool:
# do stuff with context
return False # This is the last processor, so end of flow
# Just like Apache Commons Chain, the Flow will be forced to terminate
# if a processor returns True (this would mean processing is complete).
flow.execute(context)
# **context.state** has the state of the Order after all processing
Crinkle has flavors for all tastes! OOP is also supported, see example:
from typing import Dict, List
from crinkle import ProcessorBase, ProcessorBaseAsync, Flow, Context
class Order(BaseModel):
loyalty: bool # just to simplify the example
items: List
discounts: List
context = Context[Order, Dict](
initial_state=Order(),
additional_data={}, # Optional
)
class DiscountsPreConditionsProcessor(ProcessorBase):
def __init__(self, name: str):
self.name = name
def process(self, context: Context[Order, Dict]) -> bool:
# Go to next processor
if context.state.loyalty:
return False
return True # Stop flow without going to next processors
class ManufacturerCouponsProcessor(ProcessorBase):
def __init__(self, name: str):
self.name = name
def process(self, context: Context[Order, Dict]) -> bool:
# do stuff with context
return False # Go to next processor
class BOGOProcessor(ProcessorBaseAsync):
def __init__(self, name: str):
self.name = name
# Async is also supported
async def process(self, context: Context[Order, Dict]) -> bool:
# do stuff with context
return False # This is the last processor, so end of flow
flow = Flow(
flow_name='Promotions/Discounts Flow',
processors=[
DiscountsPreConditionsProcessor(name='...'),
ManufacturerCouponsProcessor(name='...'),
BOGOProcessor(name='...'),
],
)
# OR
flow.add_processor(DiscountsPreConditionsProcessor(name='...'))
flow.add_processor(ManufacturerCouponsProcessor(name='...'))
flow.add_processor(BOGOProcessor(name='...'))
# Just like Apache Commons Chain, the Flow will be forced to terminate
# if a processor returns True (this would mean processing is complete).
flow.execute(context)
# **context.state** has the state of the Order after all processing
This project is licensed under the terms of the MIT license.