Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updated docs to correspond to the code #31

Merged
merged 2 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 33 additions & 34 deletions data-processing-lib/doc/advanced-transform-tutorial.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
# Advanced Transform Tutorial

In this example, we implement an [ededup](../../transforms/universal/ededup) transform that
removes duplicate documents across all files. In this tutorial, we will show the following:
In this example, we implement an [ededup](../../transforms/universal/ededup) transform that
removes duplicate documents across all files. In this tutorial, we will show the following:

* How to write the `ededup` transform to generate the output table.
* How to define transform-specific metadata that can be associated
with each table transformation and aggregated across all transformations
in a single run of the transform.
* How to implement custom `TransformRuntime` to create supporting Ray objects and supplement
* How to implement custom `TransformRuntime` to create supporting Ray objects and supplement
transform-specific metadata with the information about this statistics
* How to define command line arguments that can be used to configure
the operation of our _noop_ transform.

The complete task involves the following:

* EdedupTransform - class that implements the specific transformation
* EdedupRuntime - class that implements custom TransformRuntime to create supporting Ray objects and enhance job output
statistics
* EdedupRuntime - class that implements custom TransformRuntime to create supporting Ray objects and enhance job output
statistics
* EdedupTableTransformConfiguration - class that provides configuration for the
EdedupTransform and EdedupRuntime, including transform runtime class and the command line arguments used to
EdedupTransform and EdedupRuntime, including transform runtime class and the command line arguments used to
configure them.
* main() - simple creation and use of the TransformLauncher.

Expand All @@ -40,7 +39,6 @@ First, let's define the transform class. To do this we extend
the base abstract/interface class
[AbstractTableTransform](../src/data_processing/transform/table_transform.py),
which requires definition of the following:

* an initializer (i.e. `init()`) that accepts a dictionary of configuration
data. For this example, the configuration data will only be defined by
command line arguments (defined below).
Expand Down Expand Up @@ -127,7 +125,7 @@ metadata.
```
The single input to this method is the in-memory pyarrow table to be transformed.
The return of this function is a list of tables and optional metadata. In this
case of simple 1:1 table conversion the list will contain a single table, result of removing
case of simple 1:1 table conversion the list will contain a single table, result of removing
duplicates from input table.

The metadata is a free-form dictionary of keys with numeric values that will be aggregated
Expand All @@ -140,13 +138,12 @@ First, let's define the transform runtime class. To do this we extend
the base abstract/interface class
[DefaultTableTransformRuntime](../src/data_processing/ray/transform_runtime.py),
which requires definition of the following:

* an initializer (i.e. `init()`) that accepts a dictionary of configuration
data. For this example, the configuration data will only be defined by
command line arguments (defined below).
* the `get_transform_config()` method that takes `data_access_factory`, `statistics actor`, and
* the `get_transform_config()` method that takes `data_access_factory`, `statistics actor`, and
`list of files to process` and produces a dictionary of parameters used by transform.
* the `compute_execution_stats()` method that takes take a dictionary of metadata, enhances it and
* the `compute_execution_stats()` method that takes take a dictionary of metadata, enhances it and
produces an enhanced metadata dictionary.

We start with the simple definition of the class and its initializer
Expand All @@ -173,15 +170,14 @@ adds their handles to the transform parameters
)
return {"hashes": self.filters} | self.params
```
Inputs to this method includes a set of parameters, that moght not be needed for this transformer, but
Inputs to this method includes a set of parameters, that moght not be needed for this transformer, but
rather a superset of all parameters that can be used by different implementations of transform runtime (
see for example [block listing](../../transforms/universal/blocklist),
[fuzzy dedup](../../transforms/universal/fdedup), etc).
see for example [fuzzy dedup](../../transforms/universal/fdedup), etc).
The return of this function is a dictionary information for transformer initialization. In this
implementation we add additional parameters to the input dictionary, but in general, it can be a completely
new dictionary build here

Finally we define the `compute_execution_stats()` method, which which enhances metadata collected by statistics
Finally we define the `compute_execution_stats()` method, which which enhances metadata collected by statistics
class

```python
Expand All @@ -201,12 +197,12 @@ class
dedup_prst = 100 * (1.0 - stats.get("result_documents", 1) / stats.get("source_documents", 1))
return {"number of hashes": sum_hash, "hash memory, GB": sum_hash_mem, "de duplication %": dedup_prst} | stats
```
Input to this method is a dictionary of metadata collected by statistics object. It then enhances it by information
Input to this method is a dictionary of metadata collected by statistics object. It then enhances it by information
collected by hash actors and custom computations based on statistics data.

## EdedupTableTransformConfiguration

The final class we need to implement is `EdedupTableTransformConfiguration` class and its initializer that
The final class we need to implement is `EdedupTableTransformConfiguration` class and its initializer that
define the following:

* The short name for the transform
Expand All @@ -217,9 +213,12 @@ define the following:
First we define the class and its initializer,

```python
short_name = "ededup"
cli_prefix = f"{short_name}_"

class EdedupTableTransformConfiguration(DefaultTableTransformConfiguration):
def __init__(self):
super().__init__(name="ededup", runtime_class=EdedupRuntime, transform_class=EdedupTransform)
super().__init__(name=short_name, runtime_class=EdedupRuntime, transform_class=EdedupTransform)
self.params = {}
```

Expand All @@ -237,23 +236,22 @@ In our case we will use `noop_`.

```python
def add_input_params(self, parser: ArgumentParser) -> None:
parser.add_argument("--hash_cpu", type=float, default=0.5, help="number of CPUs per hash")
parser.add_argument("--num_hashes", type=int, default=0, help="number of hash actors to use")
parser.add_argument("--doc_column", type=str, default="contents", help="key for accessing data")
parser.add_argument(f"--{cli_prefix}hash_cpu", type=float, default=0.5, help="number of CPUs per hash")
parser.add_argument(f"--{cli_prefix}num_hashes", type=int, default=0, help="number of hash actors to use")
parser.add_argument(f"--{cli_prefix}doc_column", type=str, default="contents", help="key for accessing data")
```
Next we implement a method that is called after the framework has parsed the CLI args
and which allows us to capture the `EdedupTransform`-specific arguments and optionally validate them.

```python
def apply_input_params(self, args: Namespace) -> bool:
if args.num_hashes <= 0:
print(f"Number of hashes should be greater then zero, provided {args.num_hashes}")
return False
self.params["doc_column"] = args.doc_column
self.params["hash_cpu"] = args.hash_cpu
self.params["num_hashes"] = args.num_hashes
print(f"exact dedup params are {self.params}")
return True
captured = CLIArgumentProvider.capture_parameters(args, cli_prefix, False)
self.params = self.params | captured
if self.params["num_hashes"] <= 0:
logger.info(f"Number of hashes should be greater then zero, provided {args.num_hashes}")
return False
logger.info(f"exact dedup params are {self.params}")
return True
```

## main()
Expand All @@ -272,14 +270,15 @@ A single method `launch()` is then invoked to run the transform in a Ray cluster

## Running

Assuming the above `main()` is placed in `ededup_transform.py` we can run the transform on local data as follows:
Assuming the above `main()` is placed in `ededup_transform.py` we can run the transform on data
in COS as follows:

```shell
python ededup_transform.py --ededup_hash_cpu 0.5 --ededup_num_hashes 2 --ededup_doc_column "contents" \
python ededup_transform.py --hash_cpu 0.5 --num_hashes 2 --doc_column "contents" \
--run_locally True \
--data_local_config="{'input_folder': '<project location>/transforms/universal/ededup/test-data/input', 'output_folder': '<project location>/transforms/universal/ededup/output'}"
--s3_cred "{'access_key': 'KEY', 'secret_key': 'SECRET', 'cos_url': 'https://s3.us-east.cloud-object-storage.appdomain.cloud'}" \
--s3_config "{'input_folder': 'cos-optimal-llm-pile/test/david/input/', 'output_folder': 'cos-optimal-llm-pile/test/david/output/'}"
```
This is a minimal set of options to run locally.
See the [launcher options](launcher-options.md) for a complete list of
transform-independent command line options.

22 changes: 11 additions & 11 deletions data-processing-lib/doc/launcher-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ usage: noop_transform.py [-h]
[--data_max_files MAX_FILES]
[--data_files_to_use DATA_FILES_TO_USE]
[--data_num_samples DATA_NUM_SAMPLES]
[--num_workers NUM_WORKERS]
[--worker_options WORKER_OPTIONS]
[--pipeline_id PIPELINE_ID] [--job_id JOB_ID]
[--creation_delay CREATION_DELAY]
[--code_location CODE_LOCATION]
[--runtime_num_workers NUM_WORKERS]
[--runtime_worker_options WORKER_OPTIONS]
[--runtime_pipeline_id PIPELINE_ID] [--job_id JOB_ID]
[--runtime_creation_delay CREATION_DELAY]
[--runtime_code_location CODE_LOCATION]

Driver for NOOP processing

Expand Down Expand Up @@ -57,9 +57,9 @@ options:
files extensions to use, default .parquet
--data_num_samples DATA_NUM_SAMPLES
number of randomply picked files to use
--num_workers NUM_WORKERS
--runtime_num_workers NUM_WORKERS
number of workers
--worker_options WORKER_OPTIONS
--runtime_worker_options WORKER_OPTIONS
AST string defining worker resource requirements.
num_cpus: Required number of CPUs.
num_gpus: Required number of GPUs
Expand All @@ -70,12 +70,12 @@ options:
scheduling_strategy, _metadata, concurrency_groups, lifetime, max_concurrency, max_restarts,
max_task_retries, max_pending_calls, namespace, get_if_exists
Example: { 'num_cpus': '8', 'num_gpus': '1', 'resources': '{"special_hardware": 1, "custom_label": 1}' }
--pipeline_id PIPELINE_ID
--runtime_pipeline_id PIPELINE_ID
pipeline id
--job_id JOB_ID job id
--creation_delay CREATION_DELAY
--runtime_job_id JOB_ID job id
--runtime_creation_delay CREATION_DELAY
delay between actor' creation
--code_location CODE_LOCATION
--runtime_code_location CODE_LOCATION
AST string containing code location
github: Github repository URL.
commit_hash: github commit hash
Expand Down
Loading