diff --git a/docker-compose.yaml b/docker-compose.yaml index 2c52908..4d4e10c 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -59,6 +59,7 @@ x-airflow-common: &airflow-common AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true" AIRFLOW__CORE__LOAD_EXAMPLES: "false" AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session" + AIRFLOW__WEBSERVER__SECRET_KEY: "secretkey" # yamllint disable rule:line-length # Use simple http server on scheduler for health checks # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server diff --git a/poetry.lock b/poetry.lock index 00279a8..b381f26 100644 --- a/poetry.lock +++ b/poetry.lock @@ -211,6 +211,7 @@ apache-airflow-providers-google = {version = "*", optional = true, markers = "ex apache-airflow-providers-http = "*" apache-airflow-providers-imap = "*" apache-airflow-providers-openlineage = {version = "*", optional = true, markers = "extra == \"openlineage\""} +apache-airflow-providers-postgres = {version = "*", optional = true, markers = "extra == \"postgres\""} apache-airflow-providers-sftp = {version = "*", optional = true, markers = "extra == \"sftp\""} apache-airflow-providers-smtp = "*" apache-airflow-providers-sqlite = "*" @@ -691,6 +692,27 @@ openlineage-python = ">=1.16.0" [package.extras] common-sql = ["apache-airflow-providers-common-sql"] +[[package]] +name = "apache-airflow-providers-postgres" +version = "5.11.2" +description = "Provider package apache-airflow-providers-postgres for Apache Airflow" +optional = false +python-versions = "~=3.8" +files = [ + {file = "apache_airflow_providers_postgres-5.11.2-py3-none-any.whl", hash = "sha256:7f1dce7abaf36f9fdb3a5153bdfbc6e325202c4b573e58041815d32fac431549"}, + {file = "apache_airflow_providers_postgres-5.11.2.tar.gz", hash = "sha256:07698b96b1de2e9dee26de9296c404e6c8b5f86c7c32dcbb3c8107b8e218fdb2"}, +] + +[package.dependencies] +apache-airflow = ">=2.7.0" +apache-airflow-providers-common-sql = ">=1.3.1" +psycopg2-binary = ">=2.9.4" + +[package.extras] +amazon = ["apache-airflow-providers-amazon (>=2.6.0)"] +common-sql = ["apache-airflow-providers-common-sql"] +openlineage = ["apache-airflow-providers-openlineage"] + [[package]] name = "apache-airflow-providers-sftp" version = "4.10.2" @@ -7524,4 +7546,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.0" python-versions = "^3.10, <3.11" -content-hash = "cdaea3f606819e1c089fb0beeda74099d22eb4af58db5dfdddccef1bfddafa80" +content-hash = "e265039be8deb1d7f10b2f938937637a8aeefd4146996005ab508c0424ff5229" diff --git a/pyproject.toml b/pyproject.toml index ab5fcef..605255c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ packages = [{include = "ot_orchestration", from = "src"}] [tool.poetry.dependencies] python = "^3.10, <3.11" -apache-airflow = {extras = ["apache-beam", "async", "celery", "google", "kubernetes", "openlineage", "postgress", "sftp"], version = "^2.9.2"} +apache-airflow = {extras = ["apache-beam", "async", "celery", "google", "kubernetes", "openlineage", "postgres", "sftp"], version = "^2.9.2"} apache-airflow-providers-google = "^10.19.0" returns = {extras = ["compatible-mypy"], version = "^0.23.0"} pydantic = "^2.7.4" diff --git a/src/ot_orchestration/common_airflow.py b/src/ot_orchestration/common_airflow.py index bb8fa8f..2c14e75 100644 --- a/src/ot_orchestration/common_airflow.py +++ b/src/ot_orchestration/common_airflow.py @@ -43,3 +43,13 @@ "schedule": "@once", "catchup": False, } + +platform_dag_kwargs = { + "start_date": pendulum.now(tz="Europe/London").subtract(days=1), + "schedule": "@once", + "catchup": False, + "tags": [ + "platform", + "experimental", + ], +} diff --git a/src/ot_orchestration/dags/configs/pis.yaml b/src/ot_orchestration/dags/configs/pis.yaml new file mode 100644 index 0000000..f852bf3 --- /dev/null +++ b/src/ot_orchestration/dags/configs/pis.yaml @@ -0,0 +1,394 @@ +--- +work_dir: ./work +gcs_url: gs://open-targets-pre-data-releases/24.06dev-test/input +log_level: TRACE +force: no + +scratchpad: + efo_release_version: v3.65.0 + ensembl_release_version: '111' + chembl_release_version: '34' + release: '24.06' + +steps: + baseline_expression: + - name: download_latest baseline_expression + source: gs://otar000-evidence_input/BaselineExpression/json + destination: expression-inputs/baseline_expression.json.gz + + disease: + - name: download efo otar_slim + source: https://github.com/EBISPOT/efo/releases/download/${efo_release_version}/efo_otar_slim.owl + destination: ontology-inputs/ontology-efo.owl + - name: download hpo + source: https://raw.githubusercontent.com/obophenotype/human-phenotype-ontology/v2021-10-10/hp.json + destination: ontology-inputs/ontology-hpo.json + - name: download hpo hpo-phenotypes + source: http://purl.obolibrary.org/obo/hp/hpoa/phenotype.hpoa + destination: ontology-inputs/hpo-phenotypes.hpoa + - name: download mondo + source: https://github.com/monarch-initiative/mondo/releases/download/v2021-12-30/mondo.json + destination: ontology-inputs/ontology-mondo.json + + drug: + - name: download drugbank vocabulary + source: https://go.drugbank.com/releases/5-1-9/downloads/all-drugbank-vocabulary + destination: chembl-inputs/drugbank_vocabulary.csv + - name: download_latest drugbank + source: gs://otar001-core/DrugBank/annotation + destination: chembl-inputs/drugbank.csv.gz + - name: elasticsearch chembl drug indications + url: https://www.ebi.ac.uk/chembl/elk/es + destination: chembl-inputs/chembl_${chembl_release_version}_drug_indication.jsonl + index: chembl_${chembl_release_version}_drug_indication + fields: + - _metadata + - molecule_chembl_id + - efo_id + - max_phase_for_ind + - indication_refs + - name: elasticsearch chembl mechanisms of action + url: https://www.ebi.ac.uk/chembl/elk/es + destination: chembl-inputs/chembl_${chembl_release_version}_mechanism.jsonl + index: chembl_${chembl_release_version}_mechanism + fields: + - _metadata + - molecule_chembl_id + - target_chembl_id + - action_type + - mechanism_of_action + - mechanism_refs + - record_id + - parent_molecule_chembl_id + - name: elasticsearch chembl molecules + url: https://www.ebi.ac.uk/chembl/elk/es + destination: chembl-inputs/chembl_${chembl_release_version}_molecule.jsonl + index: chembl_${chembl_release_version}_molecule + fields: + - molecule_chembl_id + - molecule_hierarchy + - molecule_type + - pref_name + - first_approval + - max_phase + - withdrawn_flag + - black_box_warning + - molecule_synonyms + - cross_references + - chebi_par_id + - molecule_structures + - name: elasticsearch chembl targets + url: https://www.ebi.ac.uk/chembl/elk/es + destination: chembl-inputs/chembl_${chembl_release_version}_target.jsonl + index: chembl_${chembl_release_version}_target + fields: + - _metadata + - target_chembl_id + - target_components + - target_type + - pref_name + - name: elasticsearch chembl drug warnings + url: https://www.ebi.ac.uk/chembl/elk/es + destination: chembl-inputs/chembl_${chembl_release_version}_drug_warning.jsonl + index: chembl_${chembl_release_version}_drug_warning + fields: + - _metadata + - molecule_chembl_id + - parent_molecule_chembl_id + - efo_id + - efo_term + - efo_id_for_warning_class + - warning_class + - warning_country + - warning_description + - warning_id + - warning_refs + - warning_type + - warning_year + + evidence: + - name: download_latest cancerbiomarkers + source: gs://otar000-evidence_input/CancerBiomarkers/json + destination: evidence-files/cancerbiomarkers.json.gz + - name: download_latest chembl + source: gs://otar000-evidence_input/ChEMBL/json + destination: evidence-files/chembl.json.gz + - name: download_latest clingen + source: gs://otar000-evidence_input/ClinGen/json + destination: evidence-files/clingen.json.gz + - name: download_latest crispr + source: gs://otar000-evidence_input/CRISPR/json + destination: evidence-files/crispr.json.gz + - name: download_latest crispr_screen + source: gs://otar000-evidence_input/Crispr_screens/json + destination: evidence-files/crispr_screen.json.gz + - name: download_latest gene2phenotype + source: gs://otar000-evidence_input/Gene2Phenotype/json + destination: evidence-files/gene2phenotype.json.gz + - name: download_latest gene_burden + source: gs://otar000-evidence_input/GeneBurden/json + destination: evidence-files/gene_burden.json.gz + - name: download_latest evidences + source: gs://otar000-evidence_input/Genetics_portal/json + destination: evidence-files/genetics-portal-evidences.json.gz + - name: download_latest genomics_england + source: gs://otar000-evidence_input/GenomicsEngland/json + destination: evidence-files/genomics_england.json.gz + - name: download_latest impc + source: gs://otar000-evidence_input/IMPC/json + destination: evidence-files/impc.json.gz + - name: download_latest intogen + source: gs://otar000-evidence_input/IntOgen/json + destination: evidence-files/intogen.json.gz + - name: download_latest orphanet + source: gs://otar000-evidence_input/Orphanet/json + destination: evidence-files/orphanet.json.gz + - name: download_latest progeny + source: gs://otar000-evidence_input/PROGENy/json + destination: evidence-files/progeny.json.gz + - name: download_latest slapenrich + source: gs://otar000-evidence_input/SLAPEnrich/json + destination: evidence-files/slapenrich.json.gz + - name: download_latest sysbio + source: gs://otar000-evidence_input/SysBio/json + destination: evidence-files/sysbio.json.gz + - name: download_latest reactome + source: gs://otar006-reactome + destination: evidence-files/reactome.json.gz + - name: get_file_list cosmic + source: gs://otar007-cosmic + pattern: '!hallmarks' + sentinel: cosmic_file_list + - name: download_latest cosmic + source: ${cosmic_file_list} + destination: evidence-files/cosmic.json.gz + - name: download_latest atlas + source: gs://otar010-atlas + destination: evidence-files/atlas.json.bz2 + - name: download_latest uniprot + source: gs://otar011-uniprot + destination: evidence-files/uniprot.json.gz + - name: download_latest eva + source: gs://otar012-eva/disease-target-evidence + destination: evidence-files/eva.json.gz + + expression: + - name: download expression hierarchy + source: https://raw.githubusercontent.com/opentargets/expression_hierarchy/master/process/curation.tsv + destination: expression-inputs/expression_hierarchy_curation.tsv + - name: download tissue translation map + source: https://raw.githubusercontent.com/opentargets/expression_hierarchy/master/process/map_with_efos.json + destination: expression-inputs/tissue-translation-map.json + - name: download normal tissues + source: https://www.proteinatlas.org/download/normal_tissue.tsv.zip + destination: expression-inputs/normal_tissue.tsv.gz + - name: download_latest baseline expression binned + source: gs://atlas_baseline_expression/baseline_expression_binned + destination: expression-inputs/baseline_expression_binned.tsv + - name: download_latest baseline expression counts + source: gs://atlas_baseline_expression/baseline_expression_counts + destination: expression-inputs/baseline_expression_counts.tsv + - name: download_latest baseline expression zscore binned + source: gs://atlas_baseline_expression/baseline_expression_zscore_binned + destination: expression-inputs/baseline_expression_zscore_binned.tsv + + go: + - name: download gene-ontology + source: http://purl.obolibrary.org/obo/go.obo + destination: gene-ontology-inputs/go.obo + + homologues: + - name: explode homologues + foreach: + - species: caenorhabditis_elegans + - species: canis_lupus_familiaris + - species: cavia_porcellus + - species: danio_rerio + - species: drosophila_melanogaster + - species: macaca_mulatta + - species: mus_musculus + - species: oryctolagus_cuniculus + - species: pan_troglodytes + - species: rattus_norvegicus + - species: sus_scrofa + - species: xenopus_tropicalis + - species: homo_sapiens + do: + - name: download ${species} gene dictionary + source: https://ftp.ensembl.org/pub/release-${ensembl_release_version}/json/${species}/${species}.json + destination: target-inputs/homologue/gene_dictionary/${species}.json + - name: download ${species} protein homologies + source: https://ftp.ensembl.org/pub/release-${ensembl_release_version}/tsv/ensembl-compara/homologies/${species}/Compara.${ensembl_release_version}.protein_default.homologies.tsv.gz + destination: target-inputs/homologue/homologies/protein-${species}.tsv.gz + - name: download ${species} ncrna homologies + source: https://ftp.ensembl.org/pub/release-${ensembl_release_version}/tsv/ensembl-compara/homologies/${species}/Compara.${ensembl_release_version}.ncrna_default.homologies.tsv.gz + destination: target-inputs/homologue/homologies/ncrna-${species}.tsv.gz + + + interactions: + - name: download ensembl interactions grch38 + source: https://ftp.ensembl.org/pub/release-${ensembl_release_version}/gtf/homo_sapiens/Homo_sapiens.GRCh38.${ensembl_release_version}.chr.gtf.gz + destination: interactions-inputs/Homo_sapiens.GRCh38.chr.gtf.gz + - name: download human 9606 idmapping + source: https://ftp.ebi.ac.uk/pub/databases/uniprot/current_release/knowledgebase/idmapping/by_organism/HUMAN_9606_idmapping.dat.gz + destination: interactions-inputs/HUMAN_9606_idmapping.dat.gz + - name: download intact interactors + source: https://ftp.ebi.ac.uk/pub/databases/intact/various/ot_graphdb/current/data/interactor_pair_interactions.json + destination: interactions-inputs/intact-interactors.json + - name: download rna central + source: https://ftp.ebi.ac.uk/pub/databases/RNAcentral/current_release/id_mapping/database_mappings/ensembl.tsv + destination: interactions-inputs/rna_central_ensembl.tsv + - name: download_latest string interactions + source: otar001-core/stringInteractions + destination: interactions-inputs/string-interactions.txt.gz + + literature: + - name: download literature + source: https://ftp.ebi.ac.uk/pub/databases/pmc/DOI/PMID_PMCID_DOI.csv.gz + destination: literature-inputs/PMID_PMCID_DOI.csv.gz + + mouse_phenotypes: + - name: download_latest mouse phenotypes + source: otar001-core/MousePhenotypes + destination: mouse-phenotypes-inputs/mouse_phenotypes.json.gz + + openfda: + - name: download blacklisted events + source: https://raw.githubusercontent.com/opentargets/platform-etl-backend/master/src/main/resources/blacklisted_events.txt + destination: fda-inputs/blacklisted_events.txt + - name: explode fda events + foreach_function: urls_from_json + foreach_function_args: + source: https://api.fda.gov/download.json + destination: fda-inputs/fda_events.json + json_path: .results.drug.event.partitions[].file + do: + - name: download fda events ${destination} + source: ${source} + destination: fda-inputs/${destination} + + otar: + - name: download otar meta spreadsheet + source: https://docs.google.com/spreadsheets/d/1CV_shXJy1ACM09HZBB_-3Nl6l_dfkrA26elMAF0ttHs/export?format=csv&gid=1179867447 + destination: otar-inputs/otar_meta.csv + - name: download otar project to efo spreadsheet + source: https://docs.google.com/spreadsheets/d/1CV_shXJy1ACM09HZBB_-3Nl6l_dfkrA26elMAF0ttHs/export?format=csv&gid=72910172 + destination: otar-inputs/otar_project_to_efo.csv + + pharmacogenomics: + - name: download_latest pharmacogenomics + source: otar012-eva/pharmacogenomics + destination: pharmacogenomics-inputs/pharmacogenomics.json.gz + + ppp_evidence: + - name: download_latest validation lab + source: otar013-ppp/validation_lab + destination: evidence-files/validation_lab.json.gz + - name: download_latest encore + source: otar013-ppp/encore + destination: evidence-files/encore.json.gz + - name: download_latest ot_crispr + source: otar013-ppp/ot_crispr + destination: evidence-files/ot_crispr.json.gz + + reactome: + - name: download reactome pathways + source: https://reactome.org/download/current/ReactomePathways.txt + destination: reactome-inputs/ReactomePathways.txt + - name: download reactome pathways relation + source: https://reactome.org/download/current/ReactomePathwaysRelation.txt + destination: reactome-inputs/ReactomePathwaysRelation.txt + + so: + - name: download so + source: https://raw.githubusercontent.com/The-Sequence-Ontology/SO-Ontologies/master/Ontology_Files/so.json + destination: so-inputs/so.json + + target: + - name: download hgnc complete set + source: https://ftp.ebi.ac.uk/pub/databases/genenames/new/json/hgnc_complete_set.json + destination: target-inputs/genenames/hgnc_complete_set.json + - name: download Ensembl2Reactome + source: https://reactome.org/download/current/Ensembl2Reactome.txt + destination: target-inputs/reactome/Ensembl2Reactome.txt + - name: download human_all_hcop_sixteen_column + source: https://ftp.ebi.ac.uk/pub/databases/genenames/hcop/human_all_hcop_sixteen_column.txt.gz + destination: target-inputs/genenames/human_all_hcop_sixteen_column.txt.gz + - name: download gene identifiers + source: https://cog.sanger.ac.uk/cmp/download/gene_identifiers_latest.csv.gz + destination: target-inputs/project-scores/gene_identifiers_latest.csv.gz + - name: download uniprot + source: https://rest.uniprot.org/uniprotkb/stream?compressed=true&format=txt&query=%28%28reviewed%3Atrue%29%29%20AND%20%28model_organism%3A9606%29 + destination: target-inputs/uniprot/uniprot.txt.gz + - name: download uniprot-ssl + source: https://rest.uniprot.org/locations/stream?compressed=true&fields=id%2Cname%2Ccategory&format=tsv&query=%28%2A%29 + destination: target-inputs/uniprot/uniprot-ssl.tsv.gz + - name: download gencode + source: https://ftp.ebi.ac.uk/pub/databases/gencode/Gencode_human/release_40/gencode.v40.annotation.gff3.gz + destination: target-inputs/gencode/gencode.gff3.gz + - name: download ensembl + source: https://ftp.ebi.ac.uk/pub/databases/RNAcentral/current_release/id_mapping/database_mappings/ensembl.tsv + destination: target-inputs/go/ensembl.tsv + - name: download goa_human gaf + source: https://ftp.ebi.ac.uk/pub/databases/GO/goa/HUMAN/goa_human.gaf.gz + destination: target-inputs/go/goa_human.gaf.gz + - name: download goa_human_rna gaf + source: https://ftp.ebi.ac.uk/pub/databases/GO/goa/HUMAN/goa_human_rna.gaf.gz + destination: target-inputs/go/goa_human_rna.gaf.gz + - name: download goa_human gpa + source: https://ftp.ebi.ac.uk/pub/databases/GO/goa/HUMAN/goa_human.gpa.gz + destination: target-inputs/go/goa_human_eco.gpa.gz + - name: download ensembl vertebrates + source: https://ftp.ensembl.org/pub/release-${ensembl_release_version}/species_EnsemblVertebrates.txt + destination: target-inputs/homologue/species_EnsemblVertebrates.txt + - name: download homosapiens gene_info + source: https://ftp.ncbi.nlm.nih.gov/gene/DATA/GENE_INFO/Mammalia/Homo_sapiens.gene_info.gz + destination: target-inputs/ncbi/Homo_sapiens.gene_info.gz + - name: download gnomad + source: https://storage.googleapis.com/gcp-public-data--gnomad/release/2.1.1/constraint/gnomad.v2.1.1.lof_metrics.by_gene.txt.bgz + destination: target-inputs/gnomad/gnomad_lof_by_gene.txt.gz + - name: download protein atlas subcellular location + source: https://www.proteinatlas.org/download/subcellular_location.tsv.zip + destination: target-inputs/hpa/subcellular_location.tsv.gz + - name: download essentiality matrices + source: https://cog.sanger.ac.uk/cmp/download/essentiality_matrices.zip + destination: target-inputs/project-scores/essentiality_matrices.zip + - name: download ensembl + source: https://ftp.ensembl.org/pub/release-${ensembl_release_version}/json/homo_sapiens/homo_sapiens.json + destination: target-inputs/ensembl/homo_sapiens.jsonl + - name: download_latest essentiality + source: gs://otar000-evidence_input/Essentiality/json + destination: target-inputs/gene-essentiality/essentiality.json.gz + - name: download_latest subcellular locations + source: gs://otar001-core/subcellularLocations + destination: target-inputs/hpa/subcellular_locations_ssl.tsv + - name: download_latest teps + source: gs://otar001-core/TEPs + destination: target-inputs/tep/tep.json.gz + - name: download_latest chemical probes + source: gs://otar001-core/ChemicalProbes/annotation + destination: target-inputs/chemicalprobes/chemicalprobes.json.gz + - name: download_latest target safety + source: gs://otar001-core/TargetSafety/json + destination: target-inputs/safety/safetyLiabilities.json.gz + - name: download_latest tractability + source: gs://otar001-core/Tractability/${release} + destination: target-inputs/tractability/tractability.tsv + - name: get_file_list cosmic + source: gs://otar007-cosmic + pattern: 'hallmarks' + sentinel: cosmic_file_list + - name: download_latest cosmic + source: ${cosmic_file_list} + destination: evidence-files/cosmic.json.gz + + target_engine: + - name: download protein atlas + source: https://www.proteinatlas.org/download/proteinatlas.json.gz + destination: targetEngine-inputs/proteinatlas.json.gz + - name: download uniprot locations + source: https://rest.uniprot.org/locations/stream?compressed=true&fields=id%2Cname%2Ccategory%2Cgene_ontologies%2Cpart_of%2Cis_a&format=tsv&query=%28%2A%29 + destination: targetEngine-inputs/uniprot_locations.tsv.gz + - name: download mouse phenotype scores + source: https://raw.githubusercontent.com/opentargets/target_engine/main/src/data_flow/phenotypeScores/20230825_mousePheScores.csv + destination: targetEngine-inputs/mouse_pheno_scores.csv diff --git a/src/ot_orchestration/dags/gwas_catalog_dag.py b/src/ot_orchestration/dags/gwas_catalog_dag.py index e232661..88b3d47 100644 --- a/src/ot_orchestration/dags/gwas_catalog_dag.py +++ b/src/ot_orchestration/dags/gwas_catalog_dag.py @@ -29,7 +29,6 @@ @dag(start_date=RUN_DATE, dag_id=GWAS_CATALOG_CONFIG_DAG_ID) def gwas_catalog_dag(**kwargs: Dag_Params) -> None: """GWAS catalog DAG.""" - # [START PREPARE CURATION MANIFEST] @task_group(group_id="curation") def gwas_catalog_curation() -> None: diff --git a/src/ot_orchestration/dags/platform_input_support.py b/src/ot_orchestration/dags/platform_input_support.py new file mode 100644 index 0000000..cdf29c4 --- /dev/null +++ b/src/ot_orchestration/dags/platform_input_support.py @@ -0,0 +1,96 @@ +"""DAG for the Platform Input Support phase of the platform pipeline. + +The platform input support phase will run a series of tasks that fetch the input +data for the platform pipeline. Each step is completely independent, and they can +be run in parallel. Each step runs in a Cloud Run job. The steps are defined in the +`pis.yaml` configuration file, and the DAG is created dynamically from that file. +""" + +from pathlib import Path + +from airflow.decorators import task +from airflow.models.dag import DAG +from airflow.models.taskinstance import TaskInstance +from airflow.providers.google.cloud.operators.cloud_run import ( + CloudRunCreateJobOperator, + CloudRunDeleteJobOperator, +) +from airflow.utils.task_group import TaskGroup + +from ot_orchestration.common_airflow import ( + GCP_REGION, + platform_dag_kwargs, + shared_dag_args, +) +from ot_orchestration.operators.cloud_run_fetch_logs_operator import ( + CloudRunExecuteJobWithLogsOperator, +) +from ot_orchestration.utils.cloud_run import clean_name, create_cloud_run_job, strhash +from ot_orchestration.utils.utils import read_yaml_config + +PIS_CONFIG_PATH = Path(__file__).parent / "configs" / "pis.yaml" +PIS_IMAGE = "europe-west1-docker.pkg.dev/open-targets-eu-dev/platform-input-support-test/platform-input-support-test:latest" +PIS_GCP_PROJECT = "open-targets-eu-dev" +PIS_MACHINE_SPEC = {"cpu": "1", "memory": "512Mi"} + + +def get_steps() -> list[str]: + """Read the steps from the PIS configuration file.""" + yaml_config = read_yaml_config(PIS_CONFIG_PATH) + return yaml_config["steps"].keys() + + +with DAG( + "platform_input_support", + default_args=shared_dag_args, + description="Open Targets Platform — platform-input-support", + **platform_dag_kwargs, +) as dag: + for step_name in get_steps(): + clean_step_name = clean_name(step_name) + + with TaskGroup(group_id=clean_step_name): + + @task + def create_job(task_instance: TaskInstance | None = None): + """Create a Cloud Run job.""" + job_name = f"{clean_step_name}-{strhash(task_instance.run_id)}" + env = {"PIS_STEP": step_name} + c = CloudRunCreateJobOperator( + task_id=f"{task_instance.task_id}_create", + project_id=PIS_GCP_PROJECT, + region=GCP_REGION, + job_name=job_name, + job=create_cloud_run_job(PIS_IMAGE, env, PIS_MACHINE_SPEC), + dag=dag, + ) + c.execute(context=task_instance.get_template_context()) + + @task + def execute_job(task_instance: TaskInstance | None = None): + """Execute a Cloud Run job.""" + job_name = f"{clean_step_name}-{strhash(task_instance.run_id)}" + e = CloudRunExecuteJobWithLogsOperator( + task_id=f"{task_instance.task_id}_execute", + project_id=PIS_GCP_PROJECT, + region=GCP_REGION, + job_name=job_name, + dag=dag, + ) + e.execute(context=task_instance.get_template_context()) + + @task + def delete_job(task_instance: TaskInstance | None = None): + """Delete a Cloud Run job.""" + job_name = f"{clean_step_name}-{strhash(task_instance.run_id)}" + d = CloudRunDeleteJobOperator( + task_id=f"{task_instance.task_id}_delete", + project_id=PIS_GCP_PROJECT, + region=GCP_REGION, + job_name=job_name, + trigger_rule="all_done", + dag=dag, + ) + d.execute(context=task_instance.get_template_context()) + + create_job() >> execute_job() >> delete_job() diff --git a/src/ot_orchestration/operators/cloud_run.py b/src/ot_orchestration/operators/cloud_run.py new file mode 100644 index 0000000..033db31 --- /dev/null +++ b/src/ot_orchestration/operators/cloud_run.py @@ -0,0 +1,34 @@ +"""Custom operator to execute a Cloud Run job and fetch logs from it.""" + +from airflow.providers.google.cloud.operators.cloud_run import ( + CloudRunExecuteJobOperator, +) +from airflow.utils.decorators import apply_defaults +from google.cloud import logging + + +class CloudRunExecuteJobWithLogsOperator(CloudRunExecuteJobOperator): + """Custom operator to execute a Cloud Run job and fetch logs from it.""" + + @apply_defaults + def __init__(self, *args, project_id, region, job_name, **kwargs): + super().__init__( + project_id=project_id, + region=region, + job_name=job_name, + *args, + **kwargs, + ) + self.project_id = project_id + self.region = region + self.job_name = job_name + + def execute(self, context): + """Execute the Cloud Run job and then fetch logs.""" + super().execute(context) + + client = logging.Client(project=self.project_id) + query = f'resource.type = "cloud_run_job" resource.labels.job_name = "{self.job_name}"' + entries = client.list_entries(filter_=query, order_by=logging.ASCENDING) + for entry in entries: + self.log.info(entry.payload) diff --git a/src/ot_orchestration/operators/cloud_run_fetch_logs_operator.py b/src/ot_orchestration/operators/cloud_run_fetch_logs_operator.py new file mode 100644 index 0000000..033db31 --- /dev/null +++ b/src/ot_orchestration/operators/cloud_run_fetch_logs_operator.py @@ -0,0 +1,34 @@ +"""Custom operator to execute a Cloud Run job and fetch logs from it.""" + +from airflow.providers.google.cloud.operators.cloud_run import ( + CloudRunExecuteJobOperator, +) +from airflow.utils.decorators import apply_defaults +from google.cloud import logging + + +class CloudRunExecuteJobWithLogsOperator(CloudRunExecuteJobOperator): + """Custom operator to execute a Cloud Run job and fetch logs from it.""" + + @apply_defaults + def __init__(self, *args, project_id, region, job_name, **kwargs): + super().__init__( + project_id=project_id, + region=region, + job_name=job_name, + *args, + **kwargs, + ) + self.project_id = project_id + self.region = region + self.job_name = job_name + + def execute(self, context): + """Execute the Cloud Run job and then fetch logs.""" + super().execute(context) + + client = logging.Client(project=self.project_id) + query = f'resource.type = "cloud_run_job" resource.labels.job_name = "{self.job_name}"' + entries = client.list_entries(filter_=query, order_by=logging.ASCENDING) + for entry in entries: + self.log.info(entry.payload) diff --git a/src/ot_orchestration/utils/cloud_run.py b/src/ot_orchestration/utils/cloud_run.py new file mode 100644 index 0000000..5eb77c1 --- /dev/null +++ b/src/ot_orchestration/utils/cloud_run.py @@ -0,0 +1,33 @@ +"""Airflow boilerplate code that interfaces with Cloud Run operators which can be shared by several DAGs.""" + +import hashlib +import re + +from google.cloud import run_v2 + + +def clean_name(step_name: str) -> str: + """Clean a string into a valid cloud run job name.""" + clean_step_name = re.sub(r"[^a-z0-9-]", "-", step_name.lower()) + return f"platform-input-support-{clean_step_name}" + + +def strhash(s: str) -> str: + """Create a simple hash from a string.""" + return hashlib.sha256(s.encode()).hexdigest()[:5] + + +def create_cloud_run_job( + image: str, env: dict[str, str], limits: dict[str, str] +) -> run_v2.Job: + """Create a Cloud Run job instance for a given step.""" + job = run_v2.Job() + env = [run_v2.EnvVar({"name": k, "value": v}) for k, v in env.items()] + container = run_v2.Container( + image=image, + resources=run_v2.ResourceRequirements(limits=limits), + env=env, + ) + job.template.template.containers.append(container) + job.template.template.max_retries = 0 + return job