Skip to content

Commit

Permalink
started working on workflow, main and pickfile functions, updated rea…
Browse files Browse the repository at this point in the history
…dme with contributing resources and added contributing.md
  • Loading branch information
jestyr27 committed Jan 17, 2025
1 parent 63769d9 commit 3c33f25
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 0 deletions.
3 changes: 3 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## Work-In-Progress...

## For our general contribution guidelines go to: https://www.uhstray.io/en/contributing
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ Python driven data assessment using pandas and flet

![Platform MVP Architecture](.images/mvp_architecture.png)

## Contributing Guidelines

- [Review our Code of Conduct](https://www.uhstray.io/en/code-of-conduct)
- [Check our CONTRIBUTING.MD](./CONTRIBUTING.md)

### File Workflow Architecture

![File Workflow Architecture](.images/file_workflow.png)
Expand Down
34 changes: 34 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#import the pickfiles python file
#path: pickfiles.py
import pickfiles as pf
import workflow as wf

# Create a pipeline
pipeline = wf.DataPipeline("data_processing")

# Create a source select popup



# Add a data source
source = wf.DataSource(
name="raw_data",
connection_string="postgresql://localhost:5432/db",
type="postgresql"
)
pipeline.add_source(source)

# Define and add tasks
def process_data(data):
# Processing logic here
pass

task = wf.Task(
name="process_raw_data",
function=process_data,
dependencies=[]
)
pipeline.add_task(task)

# Execute the pipeline
results = pipeline.execute()
62 changes: 62 additions & 0 deletions pickfiles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import flet
from flet import (
ElevatedButton,
FilePicker,
FilePickerResultEvent,
Page,
Row,
Text,
icons,
)


def main(page: Page):
# Pick files dialog
def pick_files_result(e: FilePickerResultEvent):
selected_files.value = (
", ".join(map(lambda f: f.name, e.files)) if e.files else "Cancelled!"
)
selected_files.update()

pick_files_dialog = FilePicker(on_result=pick_files_result)
selected_files = Text()

# Open directory dialog
def get_directory_result(e: FilePickerResultEvent):
directory_path.value = e.path if e.path else "Cancelled!"
directory_path.update()

get_directory_dialog = FilePicker(on_result=get_directory_result)
directory_path = Text()

# hide all dialogs in overlay
page.overlay.extend([pick_files_dialog, get_directory_dialog])

page.add(
Row(
[
ElevatedButton(
"Pick files",
icon=icons.UPLOAD_FILE,
on_click=lambda _: pick_files_dialog.pick_files(
allow_multiple=True
),
),
selected_files,
]
),
Row(
[
ElevatedButton(
"Open directory",
icon=icons.FOLDER_OPEN,
on_click=lambda _: get_directory_dialog.get_directory_path(),
disabled=page.web,
),
directory_path,
]
),
)


flet.app(target=main)
209 changes: 209 additions & 0 deletions workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
# define a class to manage the workflow of a data processing pipeline, including the following: sources, targets, steps, tasks, transformations, tests, and dependencies

from typing import Dict, List, Callable, Any, Optional
from datetime import datetime
from enum import Enum
import logging
from dataclasses import dataclass
import networkx as nx

class TaskStatus(Enum):
"""Enumeration of possible task statuses in the pipeline."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"

@dataclass
class DataSource:
"""Represents a data source in the pipeline."""
name: str
connection_string: str
type: str
schema: Optional[Dict] = None

@dataclass
class DataTarget:
"""Represents a data target/destination in the pipeline."""
name: str
connection_string: str
type: str
schema: Optional[Dict] = None

class Task:
"""Represents an individual task in the pipeline."""
def __init__(
self,
name: str,
function: Callable,
dependencies: List[str] = None,
retry_count: int = 3,
timeout: int = 3600
):
self.name = name
self.function = function
self.dependencies = dependencies or []
self.retry_count = retry_count
self.timeout = timeout
self.status = TaskStatus.PENDING
self.start_time = None
self.end_time = None
self.error = None

def execute(self, *args, **kwargs) -> Any:
"""Execute the task function with given arguments."""
self.start_time = datetime.now()
self.status = TaskStatus.RUNNING

try:
result = self.function(*args, **kwargs)
self.status = TaskStatus.COMPLETED
return result
except Exception as e:
self.status = TaskStatus.FAILED
self.error = str(e)
raise
finally:
self.end_time = datetime.now()

class DataPipeline:
"""
Main class for managing the data processing pipeline workflow.
This class implements a directed acyclic graph (DAG) based workflow system
that handles task dependencies, data transformations, and validation tests.
"""

def __init__(self, name: str):
self.name = name
self.sources: Dict[str, DataSource] = {}
self.targets: Dict[str, DataTarget] = {}
self.tasks: Dict[str, Task] = {}
self.transformations: Dict[str, Callable] = {}
self.tests: Dict[str, Callable] = {}
self.dag = nx.DiGraph()
self.logger = logging.getLogger(name)

def add_source(self, source: DataSource) -> None:
"""Add a data source to the pipeline."""
self.sources[source.name] = source

def add_target(self, target: DataTarget) -> None:
"""Add a data target to the pipeline."""
self.targets[target.name] = target

def add_task(self, task: Task) -> None:
"""
Add a task to the pipeline and update the DAG with its dependencies.
"""
self.tasks[task.name] = task
self.dag.add_node(task.name)

for dep in task.dependencies:
if dep not in self.tasks:
raise ValueError(f"Dependency task '{dep}' not found")
self.dag.add_edge(dep, task.name)

# Verify that the graph remains acyclic
if not nx.is_directed_acyclic_graph(self.dag):
self.dag.remove_node(task.name)
raise ValueError("Adding this task would create a cycle in the pipeline")

def add_transformation(self, name: str, transform_fn: Callable) -> None:
"""Add a data transformation function to the pipeline."""
self.transformations[name] = transform_fn

def add_test(self, name: str, test_fn: Callable) -> None:
"""Add a validation test to the pipeline."""
self.tests[name] = test_fn

def validate_pipeline(self) -> bool:
"""
Validate the pipeline configuration and structure.
Returns True if valid, raises exceptions otherwise.
"""
# Check for cycles in the DAG
if not nx.is_directed_acyclic_graph(self.dag):
raise ValueError("Pipeline contains circular dependencies")

# Verify all dependencies exist
for task in self.tasks.values():
for dep in task.dependencies:
if dep not in self.tasks:
raise ValueError(f"Task '{task.name}' depends on non-existent task '{dep}'")

# Verify sources and targets are properly configured
for source in self.sources.values():
if not source.connection_string:
raise ValueError(f"Source '{source.name}' missing connection string")

for target in self.targets.values():
if not target.connection_string:
raise ValueError(f"Target '{target.name}' missing connection string")

return True

def execute(self) -> Dict[str, TaskStatus]:
"""
Execute the pipeline in dependency order.
Returns a dictionary of task names and their final statuses.
"""
self.validate_pipeline()
execution_order = list(nx.topological_sort(self.dag))
results = {}

for task_name in execution_order:
task = self.tasks[task_name]

# Check if dependencies completed successfully
deps_ok = all(
self.tasks[dep].status == TaskStatus.COMPLETED
for dep in task.dependencies
)

if not deps_ok:
task.status = TaskStatus.SKIPPED
continue

try:
# Execute any associated tests before task execution
self._run_tests(task_name)

# Execute the task
task.execute()

# Run transformations if any are associated with this task
if task_name in self.transformations:
self.transformations[task_name](results.get(task_name))

except Exception as e:
self.logger.error(f"Task '{task_name}' failed: {str(e)}")
raise

results[task_name] = task.status

return results

def _run_tests(self, task_name: str) -> None:
"""Run validation tests associated with a task."""
if task_name in self.tests:
test_fn = self.tests[task_name]
try:
test_fn()
except Exception as e:
raise ValueError(f"Validation test failed for task '{task_name}': {str(e)}")

def get_task_status(self, task_name: str) -> TaskStatus:
"""Get the current status of a specific task."""
if task_name not in self.tasks:
raise ValueError(f"Task '{task_name}' not found")
return self.tasks[task_name].status

def reset(self) -> None:
"""Reset the pipeline state, clearing all task statuses."""
for task in self.tasks.values():
task.status = TaskStatus.PENDING
task.start_time = None
task.end_time = None
task.error = None

0 comments on commit 3c33f25

Please sign in to comment.