Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Module 4 #15

Merged
merged 2 commits into from
Jul 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/module-3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ on:
pull_request:
branches:
- main
paths:
- 'module-3/**'

jobs:
test:
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/module-4.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ on:
pull_request:
branches:
- main
paths:
- 'module-4/**'

jobs:
dagster-image:
Expand Down
6 changes: 3 additions & 3 deletions module-4/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

[Practice task](./PRACTICE.md)

***
***

# Reference implementation

***

# Setup
# Setup

Create kind cluster

Expand Down Expand Up @@ -41,7 +41,7 @@ pip install apache-airflow-providers-cncf-kubernetes==8.3.3
Run standalone airflow

```bash
export AIRFLOW_HOME=./airflow_pipelines
export AIRFLOW_HOME=$PWD/airflow_pipelines
export AIRFLOW__CORE__LOAD_EXAMPLES=False
export WANDB_PROJECT=****************
export WANDB_API_KEY=****************
Expand Down
7 changes: 6 additions & 1 deletion module-4/airflow_pipelines/dags/inference_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
with DAG(
start_date=datetime(2021, 1, 1),
catchup=False,
schedule_interval=None,
schedule_interval='0 9 * * *', # 9 AM UTC,
dag_id="inference_dag",
) as dag:
clean_storage_before_start = KubernetesPodOperator(
Expand All @@ -34,6 +34,7 @@
in_cluster=False,
namespace="default",
startup_timeout_seconds=600,
is_delete_operator_pod=False,
image_pull_policy="Always",
volumes=[volume],
volume_mounts=[volume_mount],
Expand All @@ -47,6 +48,7 @@
in_cluster=False,
namespace="default",
startup_timeout_seconds=600,
is_delete_operator_pod=False,
image_pull_policy="Always",
volumes=[volume],
volume_mounts=[volume_mount],
Expand All @@ -65,6 +67,7 @@
task_id="load_model",
env_vars={"WANDB_PROJECT": WANDB_PROJECT, "WANDB_API_KEY": WANDB_API_KEY},
in_cluster=False,
is_delete_operator_pod=False,
namespace="default",
volumes=[volume],
volume_mounts=[volume_mount],
Expand All @@ -85,6 +88,7 @@
in_cluster=False,
namespace="default",
startup_timeout_seconds=600,
is_delete_operator_pod=False,
image_pull_policy="Always",
volumes=[volume],
volume_mounts=[volume_mount],
Expand All @@ -98,6 +102,7 @@
in_cluster=False,
namespace="default",
startup_timeout_seconds=600,
is_delete_operator_pod=False,
image_pull_policy="Always",
volumes=[volume],
volume_mounts=[volume_mount],
Expand Down
1 change: 1 addition & 0 deletions module-4/airflow_pipelines/dags/training_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
schedule_interval=None,
dag_id="training_dag",
) as dag:

clean_storage_before_start = KubernetesPodOperator(
name="clean_storage_before_start",
image=DOCKER_IMAGE,
Expand Down
3 changes: 2 additions & 1 deletion module-4/dagster_pipelines/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
dagster-home/
dagster-home/
wandb
3 changes: 1 addition & 2 deletions module-4/dagster_pipelines/text2sql_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
import modal
from modal import Image


app = modal.App("ml-in-production-practice-dagster-pipeline")
env = {
"WANDB_PROJECT": os.getenv("WANDB_PROJECT"),
"WANDB_API_KEY": os.getenv("WANDB_API_KEY"),
}
custom_image = Image.from_registry("ghcr.io/kyryl-opens-ml/dagster-pipeline:pr-14").env(env)
custom_image = Image.from_registry("ghcr.io/kyryl-opens-ml/dagster-pipeline:main").env(env)
mount = modal.Mount.from_local_python_packages("dagster_pipelines", "dagster_pipelines")
timeout=10 * 60 * 60

Expand Down
49 changes: 24 additions & 25 deletions module-4/dagster_pipelines/text2sql_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,40 @@
import json
import logging
import sys
from dataclasses import dataclass
from functools import partial
from pathlib import Path
from random import randint, randrange

import datasets
import evaluate
import modal
import pandas as pd
import torch
from datasets import Dataset, DatasetDict
from peft import LoraConfig, TaskType
import transformers
from dagster import (
AssetCheckResult,
AssetExecutionContext,
Config,
Definitions,
MetadataValue,
asset,
asset_check,
)
from datasets import Dataset, DatasetDict, load_dataset
from peft import AutoPeftModelForCausalLM, LoraConfig, TaskType
from tqdm import tqdm
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
HfArgumentParser,
TrainingArguments,
pipeline,
set_seed,
)
from trl import SFTTrainer
import logging
import sys
from pathlib import Path

import datasets
import transformers
import wandb
from pathlib import Path
from random import randrange
import pandas as pd
from datasets import DatasetDict, load_dataset
from dataclasses import dataclass
import json
import logging
from pathlib import Path
import modal
import evaluate
from peft import AutoPeftModelForCausalLM
from tqdm import tqdm
from transformers import pipeline
from dagster import Config, asset, MetadataValue, AssetExecutionContext, asset_check, AssetCheckResult, Definitions
from random import randint

logger = logging.getLogger()

Expand Down Expand Up @@ -174,7 +174,7 @@ def train_model(dataset_chatml):
"per_device_eval_batch_size": 8,
"gradient_accumulation_steps": 4,
"learning_rate": 0.0001,
"num_train_epochs": 0.01,
"num_train_epochs": 3,
"warmup_ratio": 0.1,
"logging_first_step": True,
"logging_steps": 500,
Expand Down Expand Up @@ -304,6 +304,7 @@ def trained_model(process_dataset):

# modal
process_dataset_pandas = {'train': process_dataset['train'].to_pandas(), 'test': process_dataset['test'].to_pandas()}

model_training_job = modal.Function.lookup("ml-in-production-practice-dagster-pipeline", "training_job")
model_name, uri = model_training_job.remote(dataset_chatml_pandas=process_dataset_pandas)

Expand All @@ -312,8 +313,6 @@ def trained_model(process_dataset):

@asset(group_name="model", compute_kind="modal")
def model_metrics(context: AssetExecutionContext, trained_model, process_dataset):
model_path = f"/tmp/{trained_model}"
load_from_registry(model_name=trained_model, model_path=model_path)
# local
# metrics = evaluate_model(df=process_dataset['test'].to_pandas(), model_name=trained_model)

Expand Down
Loading