Skip to content

Commit

Permalink
work in progress
Browse files Browse the repository at this point in the history
Signed-off-by: John Andersen <johnandersenpdx@gmail.com>
  • Loading branch information
pdxjohnny committed May 27, 2022
1 parent 97a6ddc commit e256580
Show file tree
Hide file tree
Showing 15 changed files with 8,337 additions and 119 deletions.
84 changes: 84 additions & 0 deletions DFFML_MAIN_PACKAGE_OVERLAY.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
definitions:
DataFlowAfterOverlaysApplied:
name: DataFlowAfterOverlaysApplied
primitive: object
DataFlowAfterOverlaysMerged:
name: DataFlowAfterOverlaysMerged
primitive: object
DataFlowBeingOverlayed:
name: DataFlowBeingOverlayed
primitive: object
DataFlowBeingOverlayedAsDict:
name: DataFlowBeingOverlayedAsDict
primitive: object
DataFlowToApplyAsOverlay:
name: DataFlowToApplyAsOverlay
primitive: object
DataFlowWeAreApplyingOverlaysToByRunningOverlayDataflowAndPassingAsAnInput:
name: DataFlowWeAreApplyingOverlaysToByRunningOverlayDataflowAndPassingAsAnInput
primitive: object
unused.dict.dataflow:
name: unused.dict.dataflow
primitive: dict
flow:
apply_overlay_to_dataflow_to_be_executed:
inputs:
dataflow_we_are_applying_overlays_to_by_running_overlay_dataflow_and_passing_as_an_input:
- seed
merged:
- dataflow_fromdict: overlays_merged
dataflow_fromdict:
inputs:
merged:
- dataflow_todict: dataflow_as_dict
dataflow_todict:
inputs:
dataflow:
- seed
merge:
inputs:
dst:
- dataflow_todict: dataflow_as_dict
src:
- seed
linked: true
operations:
apply_overlay_to_dataflow_to_be_executed:
inputs:
dataflow_we_are_applying_overlays_to_by_running_overlay_dataflow_and_passing_as_an_input: DataFlowWeAreApplyingOverlaysToByRunningOverlayDataflowAndPassingAsAnInput
merged: DataFlowAfterOverlaysMerged
name: dataflow_fromdict
outputs:
overlayed: DataFlowAfterOverlaysApplied
retry: 0
stage: output
dataflow_fromdict:
inputs:
merged: DataFlowBeingOverlayedAsDict
name: dataflow_fromdict
outputs:
overlays_merged: DataFlowAfterOverlaysMerged
retry: 0
stage: output
dataflow_todict:
inputs:
dataflow: DataFlowBeingOverlayed
name: dataflow_todict
outputs:
dataflow_as_dict: DataFlowBeingOverlayedAsDict
retry: 0
stage: processing
merge:
inputs:
dst: DataFlowBeingOverlayedAsDict
src: DataFlowToApplyAsOverlay
name: dataflow_merge
outputs:
merged_dataflow: unused.dict.dataflow
retry: 0
stage: output
seed:
- definition: DataFlowToApplyAsOverlay
origin: seed
value:
operations: {}
78 changes: 76 additions & 2 deletions dffml/df/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,47 @@ def load(cls, loading: str = None):
return loading_classes


def op_sysctx(
**kwargs,
):
# TODO Something about __doc__ not perseved with functools.wrap? Double
# check to see if we need to do anything or maybe op already handles it? Do
# we need to do anything more here?
@functools.wrap(func)
def wrapped():



return op(
config_cls=make_config(),
imp_enter={
"":
},
ctx_enter={
"sysctx":
},
)(wrapped)


def op_dataflow(
system_context,
*args,
**kwargs,
):
# TODO Something about __doc__ not perseved with functools.wrap? Double
# check to see if we need to do anything or maybe op already handles it? Do
# we need to do anything more here?
@functools.wrap(func)
def wrapped():

return opsc(
SystemContext(
inputs=[
],
),
)(wrapped)


def op(
*args,
imp_enter=None,
Expand Down Expand Up @@ -371,6 +412,35 @@ def wrap(func):
.replace(" ", "")
)

config_cls=make_config(),
imp_enter={
"":
},
ctx_enter={
"sysctx":
},
)(wrapped)


def op_dataflow(
system_context,
*args,
**kwargs,
):
# TODO Something about __doc__ not perseved with functools.wrap? Double
# check to see if we need to do anything or maybe op already handles it? Do
# we need to do anything more here?
@functools.wrap(func)
def wrapped():

return opsc(
SystemContext(
inputs=[
],
),



# Create the test method which creates the contexts and runs
async def test(**kwargs):
async with func.imp(BaseConfig()) as obj:
Expand Down Expand Up @@ -464,9 +534,13 @@ async def convert_asyncgen(outputs):
)
return func

# This case handles if op was called with no arguments, args will be a tuple
# with one element, that element being func, the function to wrap.
if args:
# This case handles if op was called with no keyword arguments, args
# will be a tuple with one element, that element being func, the
# function to wrap.
if len(args) > 1:
raise NotImpelmentedError("Finish after/while we work on entities/alice/alice/conversation.py")

return wrap(args[0])

return wrap
Expand Down
30 changes: 16 additions & 14 deletions dffml/high_level/dataflow.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import asyncio
from typing import Optional, Tuple, List, Union, Dict, Any, AsyncIterator

from ..overlay.overlay import Overlay
from ..overlay.overlay import Overlay, SystemContext, DFFMLOverlaysInstalled
from ..df.types import DataFlow, Input
from ..df.memory import MemoryOrchestrator
from ..df.base import BaseInputSetContext, BaseOrchestrator, BaseInputSet


class _LOAD_DEFAULT:
class _APPLY_INSTALLED_OVERLAYS:
pass


LOAD_DEFAULT = _LOAD_DEFAULT()
APPLY_INSTALLED_OVERLAYS = _APPLY_INSTALLED_OVERLAYS()


async def run(
Expand All @@ -21,7 +21,7 @@ async def run(
strict: bool = True,
ctx: Optional[BaseInputSetContext] = None,
halt: Optional[asyncio.Event] = None,
overlay: Union[None, _LOAD_DEFAULT, DataFlow] = LOAD_DEFAULT,
overlay: Union[None, _APPLY_INSTALLED_OVERLAYS, SystemContext] = APPLY_INSTALLED_OVERLAYS,
) -> AsyncIterator[Tuple[BaseInputSetContext, Dict[str, Any]]]:
"""
Run a DataFlow
Expand Down Expand Up @@ -189,20 +189,22 @@ async def run(
# TODO(alice) Rework once we have system context. Run overlay system context
# using orchestrator from that. System context is basic clay a dataclass
# with the properties as this functions arguments.
if overlay is LOAD_DEFAULT:
if overlay is APPLY_INSTALLED_OVERLAYS:
# Load defaults via entrypoints, aka installed dataflows registered as
# plugins.
# TODO Maybe pass orchestrator to default
overlay = await Overlay.default(orchestrator)
# Apply overlay if given or installed
if overlay is not None:
# This effectivly creates a new system context, a direct ancestor of the
# of the one that got passed in and the overlay. Therefore they are both
# listed in the input parents when we finally split this out so that run
# is called as an operation, where the overlay is applied prior to
# calling run.
dataflow = await overlay.apply(orchestrator, dataflow)
overlay = DFFMLOverlaysInstalled
async with orchestrator:
# Apply overlay if given or installed
if overlay is not None:
# This effectivly creates a new system context, a direct ancestor of the
# of the one that got passed in and the overlay. Therefore they are both
# listed in the input parents when we finally split this out so that run
# is called as an operation, where the overlay is applied prior to
# calling run.
async with overlay_cls(orchestrator=orchestrator) as overlay:
async with overlay() as overlay_context:
dataflow = await overlay_context.apply(dataflow)
async with orchestrator(dataflow) as ctx:
async for ctx, results in ctx.run(*input_sets, strict=strict):
yield ctx, results
Loading

0 comments on commit e256580

Please sign in to comment.