diff --git a/.github/workflows/module-3.yaml b/.github/workflows/module-3.yaml index f3cea1c..8ebc123 100644 --- a/.github/workflows/module-3.yaml +++ b/.github/workflows/module-3.yaml @@ -8,6 +8,8 @@ on: pull_request: branches: - main + paths: + - 'module-3/**' jobs: test: diff --git a/.github/workflows/module-4.yaml b/.github/workflows/module-4.yaml index f9c9b29..49f624f 100644 --- a/.github/workflows/module-4.yaml +++ b/.github/workflows/module-4.yaml @@ -8,6 +8,8 @@ on: pull_request: branches: - main + paths: + - 'module-4/**' jobs: dagster-image: diff --git a/module-4/README.md b/module-4/README.md index 23a4b7e..8c3c741 100644 --- a/module-4/README.md +++ b/module-4/README.md @@ -6,13 +6,13 @@ [Practice task](./PRACTICE.md) -*** +*** # Reference implementation *** -# Setup +# Setup Create kind cluster @@ -41,7 +41,7 @@ pip install apache-airflow-providers-cncf-kubernetes==8.3.3 Run standalone airflow ```bash -export AIRFLOW_HOME=./airflow_pipelines +export AIRFLOW_HOME=$PWD/airflow_pipelines export AIRFLOW__CORE__LOAD_EXAMPLES=False export WANDB_PROJECT=**************** export WANDB_API_KEY=**************** diff --git a/module-4/airflow_pipelines/dags/inference_dag.py b/module-4/airflow_pipelines/dags/inference_dag.py index a009407..92e1888 100644 --- a/module-4/airflow_pipelines/dags/inference_dag.py +++ b/module-4/airflow_pipelines/dags/inference_dag.py @@ -23,7 +23,7 @@ with DAG( start_date=datetime(2021, 1, 1), catchup=False, - schedule_interval=None, + schedule_interval='0 9 * * *', # 9 AM UTC, dag_id="inference_dag", ) as dag: clean_storage_before_start = KubernetesPodOperator( @@ -34,6 +34,7 @@ in_cluster=False, namespace="default", startup_timeout_seconds=600, + is_delete_operator_pod=False, image_pull_policy="Always", volumes=[volume], volume_mounts=[volume_mount], @@ -47,6 +48,7 @@ in_cluster=False, namespace="default", startup_timeout_seconds=600, + is_delete_operator_pod=False, image_pull_policy="Always", volumes=[volume], volume_mounts=[volume_mount], @@ -65,6 +67,7 @@ task_id="load_model", env_vars={"WANDB_PROJECT": WANDB_PROJECT, "WANDB_API_KEY": WANDB_API_KEY}, in_cluster=False, + is_delete_operator_pod=False, namespace="default", volumes=[volume], volume_mounts=[volume_mount], @@ -85,6 +88,7 @@ in_cluster=False, namespace="default", startup_timeout_seconds=600, + is_delete_operator_pod=False, image_pull_policy="Always", volumes=[volume], volume_mounts=[volume_mount], @@ -98,6 +102,7 @@ in_cluster=False, namespace="default", startup_timeout_seconds=600, + is_delete_operator_pod=False, image_pull_policy="Always", volumes=[volume], volume_mounts=[volume_mount], diff --git a/module-4/airflow_pipelines/dags/training_dag.py b/module-4/airflow_pipelines/dags/training_dag.py index 0197734..8ef4214 100644 --- a/module-4/airflow_pipelines/dags/training_dag.py +++ b/module-4/airflow_pipelines/dags/training_dag.py @@ -24,6 +24,7 @@ schedule_interval=None, dag_id="training_dag", ) as dag: + clean_storage_before_start = KubernetesPodOperator( name="clean_storage_before_start", image=DOCKER_IMAGE, diff --git a/module-4/dagster_pipelines/.gitignore b/module-4/dagster_pipelines/.gitignore index e0de937..0a5d0f2 100644 --- a/module-4/dagster_pipelines/.gitignore +++ b/module-4/dagster_pipelines/.gitignore @@ -1 +1,2 @@ -dagster-home/ \ No newline at end of file +dagster-home/ +wandb \ No newline at end of file diff --git a/module-4/dagster_pipelines/text2sql_functions.py b/module-4/dagster_pipelines/text2sql_functions.py index d4c57a4..e8226f1 100644 --- a/module-4/dagster_pipelines/text2sql_functions.py +++ b/module-4/dagster_pipelines/text2sql_functions.py @@ -3,13 +3,12 @@ import modal from modal import Image - app = modal.App("ml-in-production-practice-dagster-pipeline") env = { "WANDB_PROJECT": os.getenv("WANDB_PROJECT"), "WANDB_API_KEY": os.getenv("WANDB_API_KEY"), } -custom_image = Image.from_registry("ghcr.io/kyryl-opens-ml/dagster-pipeline:pr-14").env(env) +custom_image = Image.from_registry("ghcr.io/kyryl-opens-ml/dagster-pipeline:main").env(env) mount = modal.Mount.from_local_python_packages("dagster_pipelines", "dagster_pipelines") timeout=10 * 60 * 60 diff --git a/module-4/dagster_pipelines/text2sql_pipeline.py b/module-4/dagster_pipelines/text2sql_pipeline.py index 33a110c..4d5a7ff 100644 --- a/module-4/dagster_pipelines/text2sql_pipeline.py +++ b/module-4/dagster_pipelines/text2sql_pipeline.py @@ -1,40 +1,40 @@ +import json import logging +import sys +from dataclasses import dataclass from functools import partial from pathlib import Path +from random import randint, randrange +import datasets +import evaluate +import modal +import pandas as pd import torch -from datasets import Dataset, DatasetDict -from peft import LoraConfig, TaskType +import transformers +from dagster import ( + AssetCheckResult, + AssetExecutionContext, + Config, + Definitions, + MetadataValue, + asset, + asset_check, +) +from datasets import Dataset, DatasetDict, load_dataset +from peft import AutoPeftModelForCausalLM, LoraConfig, TaskType +from tqdm import tqdm from transformers import ( AutoModelForCausalLM, AutoTokenizer, HfArgumentParser, TrainingArguments, + pipeline, set_seed, ) from trl import SFTTrainer -import logging -import sys -from pathlib import Path -import datasets -import transformers import wandb -from pathlib import Path -from random import randrange -import pandas as pd -from datasets import DatasetDict, load_dataset -from dataclasses import dataclass -import json -import logging -from pathlib import Path -import modal -import evaluate -from peft import AutoPeftModelForCausalLM -from tqdm import tqdm -from transformers import pipeline -from dagster import Config, asset, MetadataValue, AssetExecutionContext, asset_check, AssetCheckResult, Definitions -from random import randint logger = logging.getLogger() @@ -174,7 +174,7 @@ def train_model(dataset_chatml): "per_device_eval_batch_size": 8, "gradient_accumulation_steps": 4, "learning_rate": 0.0001, - "num_train_epochs": 0.01, + "num_train_epochs": 3, "warmup_ratio": 0.1, "logging_first_step": True, "logging_steps": 500, @@ -304,6 +304,7 @@ def trained_model(process_dataset): # modal process_dataset_pandas = {'train': process_dataset['train'].to_pandas(), 'test': process_dataset['test'].to_pandas()} + model_training_job = modal.Function.lookup("ml-in-production-practice-dagster-pipeline", "training_job") model_name, uri = model_training_job.remote(dataset_chatml_pandas=process_dataset_pandas) @@ -312,8 +313,6 @@ def trained_model(process_dataset): @asset(group_name="model", compute_kind="modal") def model_metrics(context: AssetExecutionContext, trained_model, process_dataset): - model_path = f"/tmp/{trained_model}" - load_from_registry(model_name=trained_model, model_path=model_path) # local # metrics = evaluate_model(df=process_dataset['test'].to_pandas(), model_name=trained_model)