Skip to content

Commit

Permalink
vdk-dag: improve DAGs docs and example (#1984)
Browse files Browse the repository at this point in the history
What: 
Improve DAGs user-facing documentation and example based on the feedback
and discussions with users.
The feedback we got from users is:

- The Requirements section in the example links VDK DAGs to the VEP, it
should rather link to the README. > Addressed.
- Would it be possible to run the example with a DB of choice or am I
required to use Trino? > Addressed.
- The team name has to be added on several occasions and this could lead
to some consistency issues. > Addressed.
- The DAG-specific configuration variables are not very visible in the
README. > Addressed.

Signed-off-by: Yoan Salambashev <ysalambashev@vmware.com>
  • Loading branch information
yonitoo authored May 9, 2023
1 parent e6d68ca commit 289d6b7
Show file tree
Hide file tree
Showing 15 changed files with 146 additions and 158 deletions.
105 changes: 52 additions & 53 deletions examples/dag-with-args-example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ In this example you will use the Versatile Data Kit to develop six Data Jobs - t
from separate JSON files, and will subsequently insert the data into Trino tables. The next three jobs will read the
data inserted by the previous two jobs, and will print the data to the terminal. The sixth Data Job will be a DAG job
which will manage the other five and ensure that the third, fourth and fifth jobs run only when the previous two finish
successfully. All the Trino-related details (tables, schema, catalog) will be passed individually to each job as job
arguments in JSON format.
successfully. All the DB-related (Trino is chosen but could be any other) details (tables, schema) will be passed
individually to each job as job arguments in JSON format.

The DAG Job uses a separate job input object separate from the one usually used for job
operations in VDK Data Jobs and must be imported.
Expand Down Expand Up @@ -45,7 +45,8 @@ To run this example, you need:
* Versatile Data Kit
* Trino DB
* `vdk-trino` - VDK plugin for a connection to a Trino database
* [VDK DAGs](https://github.com/vmware/versatile-data-kit/tree/main/specs/vep-1243-vdk-meta-jobs)
* [VDK DAGs README](https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/vdk-dag)
* [VDK DAGs Specification](https://github.com/vmware/versatile-data-kit/tree/main/specs/vep-1243-vdk-dag)

## Configuration

Expand Down Expand Up @@ -123,7 +124,6 @@ def run(job_input: IJobInput):
data_job_dir = pathlib.Path(job_input.get_job_directory())
data_file = data_job_dir / "data.json"

db_catalog = job_input.get_arguments().get("db_catalog")
db_schema = job_input.get_arguments().get("db_schema")
db_table = job_input.get_arguments().get("db_table")

Expand All @@ -133,7 +133,7 @@ def run(job_input: IJobInput):

rows = [tuple(i.values()) for i in data]
insert_query = f"""
INSERT INTO {db_catalog}.{db_schema}.{db_table} VALUES
INSERT INTO {db_schema}.{db_table} VALUES
""" + ", ".join(
str(i) for i in rows
)
Expand Down Expand Up @@ -225,7 +225,6 @@ def run(job_input: IJobInput):
data_job_dir = pathlib.Path(job_input.get_job_directory())
data_file = data_job_dir / "data.json"

db_catalog = job_input.get_arguments().get("db_catalog")
db_schema = job_input.get_arguments().get("db_schema")
db_table = job_input.get_arguments().get("db_table")

Expand All @@ -235,13 +234,13 @@ def run(job_input: IJobInput):

rows = [tuple(i.values()) for i in data]
insert_query = f"""
INSERT INTO {db_catalog}.{db_schema}.{db_table} VALUES
INSERT INTO {db_schema}.{db_table} VALUES
""" + ", ".join(
str(i) for i in rows
)

create_query = f"""
CREATE TABLE IF NOT EXISTS {db_catalog}.{db_schema}.{db_table}
CREATE TABLE IF NOT EXISTS {db_schema}.{db_table}
(
id varchar,
first_name varchar,
Expand Down Expand Up @@ -298,9 +297,7 @@ vdk-trino

```
read-job-usa/
├── 10_transform.py
├── 20_drop_table_one.sql
├── 30_drop_table_two.sql
├── 10_read.py
├── config.ini
├── requirements.txt
```
Expand All @@ -313,16 +310,15 @@ from vdk.api.job_input import IJobInput


def run(job_input: IJobInput):
db_catalog = job_input.get_arguments().get("db_catalog")
db_schema = job_input.get_arguments().get("db_schema")
db_tables = job_input.get_arguments().get("db_tables")

job1_data = job_input.execute_query(
f"SELECT * FROM {db_catalog}.{db_schema}.{db_tables[0]} "
f"SELECT * FROM {db_schema}.{db_tables[0]} "
f"WHERE Country = 'USA'"
)
job2_data = job_input.execute_query(
f"SELECT * FROM {db_catalog}.{db_schema}.{db_tables[1]} "
f"SELECT * FROM {db_schema}.{db_tables[1]} "
f"WHERE Country = 'USA'"
)

Expand Down Expand Up @@ -361,7 +357,7 @@ vdk-trino

```
read-job-canada/
├── 10_transform.py
├── 10_read.py
├── config.ini
├── requirements.txt
```
Expand All @@ -374,16 +370,15 @@ from vdk.api.job_input import IJobInput


def run(job_input: IJobInput):
db_catalog = job_input.get_arguments().get("db_catalog")
db_schema = job_input.get_arguments().get("db_schema")
db_tables = job_input.get_arguments().get("db_tables")

job1_data = job_input.execute_query(
f"SELECT * FROM {db_catalog}.{db_schema}.{db_tables[0]} "
f"SELECT * FROM {db_schema}.{db_tables[0]} "
f"WHERE Country = 'Canada'"
)
job2_data = job_input.execute_query(
f"SELECT * FROM {db_catalog}.{db_schema}.{db_tables[1]} "
f"SELECT * FROM {db_schema}.{db_tables[1]} "
f"WHERE Country = 'Canada'"
)

Expand Down Expand Up @@ -422,7 +417,7 @@ vdk-trino

```
read-job-rest-of-world/
├── 10_transform.py
├── 10_read.py
├── 20_drop_table_one.sql
├── 30_drop_table_two.sql
├── config.ini
Expand All @@ -437,16 +432,15 @@ from vdk.api.job_input import IJobInput


def run(job_input: IJobInput):
db_catalog = job_input.get_arguments().get("db_catalog")
db_schema = job_input.get_arguments().get("db_schema")
db_tables = job_input.get_arguments().get("db_tables")

job1_data = job_input.execute_query(
f"SELECT * FROM {db_catalog}.{db_schema}.{db_tables[0]} "
f"SELECT * FROM {db_schema}.{db_tables[0]} "
f"WHERE Country NOT IN ('USA', 'Canada')"
)
job2_data = job_input.execute_query(
f"SELECT * FROM {db_catalog}.{db_schema}.{db_tables[1]} "
f"SELECT * FROM {db_schema}.{db_tables[1]} "
f"WHERE Country NOT IN ('USA', 'Canada')"
)

Expand Down Expand Up @@ -511,75 +505,70 @@ dag-job/
<summary>dag_job.py</summary>

```python
from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput
from vdk.plugin.dag.dag_runner import DagInput


JOBS_RUN_ORDER = [
{
"job_name": "ingest-job-table-one",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"arguments": {
"db_table": "test_dag_one",
"db_schema": "default",
"db_catalog": "memory",
},
"depends_on": [],
},
{
"job_name": "ingest-job-table-two",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"arguments": {
"db_table": "test_dag_two",
"db_schema": "default",
"db_catalog": "memory",
},
"depends_on": [],
},
{
"job_name": "read-job-usa",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"arguments": {
"db_tables": ["test_dag_one", "test_dag_two"],
"db_schema": "default",
"db_catalog": "memory",
},
"depends_on": ["ingest-job-table-one", "ingest-job-table-two"],
},
{
"job_name": "read-job-canada",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"arguments": {
"db_tables": ["test_dag_one", "test_dag_two"],
"db_schema": "default",
"db_catalog": "memory",
},
"depends_on": ["ingest-job-table-one", "ingest-job-table-two"],
},
{
"job_name": "read-job-rest-of-world",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"arguments": {
"db_tables": ["test_dag_one", "test_dag_two"],
"db_schema": "default",
"db_catalog": "memory",
},
"depends_on": ["ingest-job-table-one", "ingest-job-table-two"],
},
]


def run(job_input):
MetaJobInput().run_meta_job(JOBS_RUN_ORDER)
def run(job_input) -> None:
DagInput().run_dag(JOBS_RUN_ORDER)

```
</details>

Note that the `run_meta_job` method belongs to the `MetaJobInput` object which must be imported
Note that the `run_dag` method belongs to the `DAGInput` object which must be imported
and instantiated separately from the default `IJobInput` object which is passed to the `run` function by default.

<details>
Expand All @@ -598,44 +587,46 @@ and instantiated separately from the default `IJobInput` object which is passed
team = my-team

[vdk]
meta_jobs_max_concurrent_running_jobs = 2
dags_max_concurrent_running_jobs = 2
dags_delayed_jobs_min_delay_seconds = 1
dags_delayed_jobs_randomized_added_delay_seconds = 1
```
</details>

meta_jobs_delayed_jobs_randomized_added_delay_seconds = 1
meta_jobs_delayed_jobs_min_delay_seconds = 1
<details>
<summary>requirements.txt</summary>

```text
vdk-dag
```
</details>

Note that the VDK DAG Job does not require the `vdk-trino` dependency.
Component jobs are responsible for their own dependencies, and the DAG Job only handles their triggering.

### Configuration details

Setting [meta_jobs_max_concurrent_running_jobs](https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_configuration.py#L87)
Setting [dags_max_concurrent_running_jobs](https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_plugin_configuration.py#L87)
to 2 in the DAG Job config.ini file would mean that the jobs in the DAG will be executed in the following order:
* ingest-job-table-one, ingest-job-table-two
* read-job-usa, read-job-canada
* read-job-rest-of-world

When the ingest jobs are both finished, all of the read jobs are ready to start but when the aforementioned limit is
When the ingest jobs are both finished, all the read jobs are ready to start but when the aforementioned limit is
hit (after read-job-usa and read-job-canada are started), the following message is logged:

![DAG concurrent running jobs limit hit](images/dag-concurrent-running-jobs-limit-hit.png)
Then the delayed read-job-rest-of-world is started after any of the currently running Data Jobs finishes.

The other two configurations are set in order to have a short fixed delay for delayed jobs such as the last read job.
Check the [configuration](https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_configuration.py)
Check the [configuration](https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_plugin_configuration.py)
for more details.

<details>
<summary>requirements.txt</summary>

```text
vdk-meta-jobs
```
</details>

Note that the VDK DAG Job does not require the `vdk-trino` dependency.
Component jobs are responsible for their own dependencies, and the DAG Job only handles their triggering.

## Execution

[Here](https://github.com/vmware/versatile-data-kit/tree/main/specs/vep-1243-vdk-dag#high-level-design) you can read
more about the DAG execution.

### Create and deploy Data Jobs

To do so, open a terminal, navigate to the parent directory of the data job
Expand Down Expand Up @@ -671,6 +662,14 @@ vdk create -n dag-job -t my-team --no-template && \
vdk deploy -n dag-job -t my-team -p dag-job -r "dag-with-args-example"
```

Note: The team name has to be consistent everywhere (in the config.ini, in each job of the DAG dict of jobs and
while creating&deploying the jobs). Instead of passing the team name each time, you can set a default value:
```console
vdk set-default -t my-team
```
This would then be used in all commands that require a team. However, you would still have to provide the same value
for team name in the config.ini file and the DAG dict of jobs.

### Run DAG Job

You can now run your DAG Job through the Execution API by using one of the following commands*:
Expand Down
21 changes: 8 additions & 13 deletions examples/dag-with-args-example/dag-job/dag_job.py
Original file line number Diff line number Diff line change
@@ -1,66 +1,61 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput
from vdk.plugin.dag.dag_runner import DagInput


JOBS_RUN_ORDER = [
{
"job_name": "ingest-job-table-one",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"arguments": {
"db_table": "test_dag_one",
"db_schema": "default",
"db_catalog": "memory",
},
"depends_on": [],
},
{
"job_name": "ingest-job-table-two",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"arguments": {
"db_table": "test_dag_two",
"db_schema": "default",
"db_catalog": "memory",
},
"depends_on": [],
},
{
"job_name": "read-job-usa",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"arguments": {
"db_tables": ["test_dag_one", "test_dag_two"],
"db_schema": "default",
"db_catalog": "memory",
},
"depends_on": ["ingest-job-table-one", "ingest-job-table-two"],
},
{
"job_name": "read-job-canada",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"arguments": {
"db_tables": ["test_dag_one", "test_dag_two"],
"db_schema": "default",
"db_catalog": "memory",
},
"depends_on": ["ingest-job-table-one", "ingest-job-table-two"],
},
{
"job_name": "read-job-rest-of-world",
"team_name": "my-team",
"fail_meta_job_on_error": True,
"fail_dag_on_error": True,
"arguments": {
"db_tables": ["test_dag_one", "test_dag_two"],
"db_schema": "default",
"db_catalog": "memory",
},
"depends_on": ["ingest-job-table-one", "ingest-job-table-two"],
},
]


def run(job_input):
MetaJobInput().run_meta_job(JOBS_RUN_ORDER)
def run(job_input) -> None:
DagInput().run_dag(JOBS_RUN_ORDER)
1 change: 1 addition & 0 deletions examples/dag-with-args-example/dag-job/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
vdk-dag
Loading

0 comments on commit 289d6b7

Please sign in to comment.