NV-Ingest-Client is a tool designed for efficient ingestion and processing of large datasets. It provides both a Python API and a command-line interface to cater to various ingestion needs.

Table of Contents

  1. Installation
  2. Usage
  3. Command Line Interface (CLI)
  4. Examples


To install NV-Ingest-Client, run the following command in your terminal:

pip install [REPO_ROOT]/client

This command installs both the API libraries and the nv-ingest-cli tool which can subsequently be called from the command line.

API Libraries


Specification for creating a job for submission to the nv-ingest microservice.

  • Parameters:

    • payload (Dict): The payload data for the job.
    • tasks (Optional[List], optional): A list of tasks to be added to the job. Defaults to None.
    • source_id (Optional[str], optional): An identifier for the source of the job. Defaults to None.
    • source_name (Optional[str], optional): A name for the source of the job. Defaults to None.
    • document_type (Optional[str], optional): Type of the document. Defaults to 'txt'.
    • job_id (Optional[Union[UUID, str]], optional): A unique identifier for the job. Defaults to a new UUID.
    • extended_options (Optional[Dict], optional): Additional options for job processing. Defaults to None.
  • Attributes:

    • _payload (Dict): Storage for the payload data.
    • _tasks (List): Storage for the list of tasks.
    • _source_id (str): Storage for the source identifier.
    • _job_id (UUID): Storage for the job's unique identifier.
    • _extended_options (Dict): Storage for the additional options.
  • Methods:

    • to_dict() -> Dict:
      • Description: Converts the job specification to a dictionary for JSON serialization.
      • Returns: Dict: Dictionary representation of the job specification.
    • add_task(task):
      • Description: Adds a task to the job specification.
      • Parameters:
        • task: The task to be added. Assumes the task has a to_dict() method.
      • Raises:
        • ValueError: If the task does not have a to_dict() method or is not an instance of Task.
  • Properties:

    • payload: Getter/Setter for the payload data.
    • job_id: Getter/Setter for the job's unique identifier.
    • source_id: Getter/Setter for the source identifier.
    • source_name: Getter/Setter for the source name.
  • Example Usage:

    job_spec = JobSpec(
        payload={"data": "Example data"},
        tasks=[extract_task, split_task],
        extended_options={"tracing_options": {"trace": True}}


Task Factory

  • Function: task_factory(task_type, **kwargs)

    • Description: Factory method for creating task objects based on the provided task type. It dynamically selects the appropriate task class from a mapping and initializes it with any additional keyword arguments.
    • Parameters:
      • task_type (TaskType or str): The type of the task to create. Can be an enum member of TaskType or a string representing a valid task type.
      • **kwargs (dict): Additional keyword arguments to pass to the task's constructor.
    • Returns:
      • Task: An instance of the task corresponding to the given task type.
    • Raises:
      • ValueError: If an invalid task type is provided, or if any unexpected keyword arguments are passed that do not match the task constructor's parameters.
  • Example:

    # Assuming TaskType has 'Extract' and 'Split' as valid members and corresponding classes are defined.
    extract_task = task_factory('extract', document_type='PDF', extract_text=True)
    split_task = task_factory('split', split_by='sentence', split_length=100)


Object for document extraction tasks, extending the Task class.

  • Method: __init__(document_type, extract_method='pdfium', extract_text=False, extract_images=False, extract_tables=False)

    • Parameters:
      • document_type: Type of document.
      • extract_method: Method used for extraction. Default is 'pdfium'.
      • extract_text: Boolean indicating if text should be extracted. Default is False.
      • extract_images: Boolean indicating if images should be extracted. Default is False.
      • extract_tables: Boolean indicating if tables should be extracted. Default is False.
    • Description: Sets up configuration for the extraction task.
  • Method: to_dict()

    • Description: Converts task details to a dictionary for submission to message client. Includes handling for specific methods and document types.
    • Returns: Dict: Dictionary containing task type and properties.
  • Example:

    extract_task = ExtractTask(


Object for document splitting tasks, extending the Task class.

  • Method: __init__(split_by=None, split_length=None, split_overlap=None, max_character_length=None, sentence_window_size=None)

    • Parameters:
      • split_by: Criterion for splitting, e.g., 'word', 'sentence', 'passage'.
      • split_length: The length of each split segment.
      • split_overlap: Overlap length between segments.
      • max_character_length: Maximum character length for a split.
      • sentence_window_size: Window size for sentence-based splits.
    • Description: Sets up configuration for the splitting task.
  • Method: to_dict()

    • Description: Converts task details to a dictionary for submission to message client.
    • Returns: Dict: Dictionary containing task type and properties.
  • Example:

    split_task = SplitTask(


The NvIngestClient class provides a comprehensive suite of methods to handle job submission and retrieval processes efficiently. Below are the public methods available:


  • __init__: Initializes the NvIngestClient with customizable client allocator and Redis configuration.

    • Parameters:
      • message_client_allocator: A callable that returns an instance of the client used for communication.
      • message_client_hostname: Hostname of the message client server. Defaults to "localhost".
      • message_client_port: Port number of the message client server. Defaults to 7670.
      • message_client_kwargs: Additional keyword arguments for the message client.
      • msg_counter_id: Redis key for tracking message counts. Defaults to "nv-ingest-message-id".
      • worker_pool_size: Number of worker processes in the pool. Defaults to 1.
  • Example:

    client = NvIngestClient(
      message_client_hostname="localhost", # Host where nv-ingest-ms-runtime is running
      message_client_port=7670 # REST port, defaults to 7670

Submission Methods


Submits a job to a specified job queue. This method can optionally wait for a response if blocking is set to True.

  • Parameters:

    • job_id: The unique identifier of the job to be submitted.
    • job_queue_id: The ID of the job queue where the job will be submitted.
  • Returns:

    • Optional[Dict]: The job result if blocking is True and a result is available before the timeout, otherwise None.
  • Raises:

    • Exception: If submitting the job fails.
  • Example:

    client.submit_job(job_id, "morpheus_task_queue")


Submits multiple jobs to a specified job queue. This method does not wait for any of the jobs to complete.

  • Parameters:

    • job_ids: A list of job IDs to be submitted.
    • job_queue_id: The ID of the job queue where the jobs will be submitted.
  • Returns:

    • List[Union[Dict, None]]: A list of job results if blocking is True and results are available before the timeout, otherwise None.
  • Example:

    client.submit_jobs([job_id0, job_id1], "morpheus_task_queue")


Asynchronously submits one or more jobs to a specified job queue using a thread pool. This method handles both single job ID or a list of job IDs.

  • Parameters:

    • job_ids: A single job ID or a list of job IDs to be submitted.
    • job_queue_id: The ID of the job queue where the jobs will be submitted.
  • Returns:

    • Dict[Future, str]: A dictionary mapping futures to their respective job IDs for later retrieval of outcomes.
  • Notes:

    • This method queues the jobs for asynchronous submission and returns a mapping of futures to job IDs.
    • It does not wait for any of the jobs to complete.
    • Ensure that each job is in the proper state before submission.
  • Example:

    client.submit_job_async(job_id, "morpheus_task_queue")

Job Retrieval


  • Description: Fetches the job result from a message client, handling potential errors and state changes.

  • Method: fetch_job_result(job_id, timeout=10, data_only=True)

  • Parameters:

    • job_id (str): The identifier of the job.
    • timeout (float, optional): Timeout for the fetch operation in seconds. Defaults to 10.
    • data_only (bool, optional): If true, only returns the data part of the job result.
  • Returns:

    • Tuple[Dict, str]: The job result and the job ID.
  • Raises:

    • ValueError: If there is an error in decoding the job result.
    • TimeoutError: If the fetch operation times out.
    • Exception: For all other unexpected issues.
  • Example:

    job_id = client.add_job(job_spec)
    client.submit_job(job_id, TASK_QUEUE)
    generated_metadata = client.fetch_job_result(
        job_id, timeout=DEFAULT_JOB_TIMEOUT


  • Description: Fetches job results for a list or a single job ID asynchronously and returns a mapping of futures to job IDs.

  • Method: fetch_job_result_async(job_ids, timeout=10, data_only=True)

  • Parameters:

    • job_ids (Union[str, List[str]]): A single job ID or a list of job IDs.
    • timeout (float, optional): Timeout for fetching each job result, in seconds. Defaults to 10.
    • data_only (bool, optional): Whether to return only the data part of the job result.
  • Returns:

    • Dict[Future, str]: A dictionary mapping each future to its corresponding job ID.
  • Raises:

    • No explicit exceptions raised but leverages the exceptions from fetch_job_result.
  • Example:

    job_id = client.add_job(job_spec)
    client.submit_job(job_id, TASK_QUEUE)
    generated_metadata = client.fetch_job_result_async(
        job_id, timeout=DEFAULT_JOB_TIMEOUT

Job and Task Management


  • Description: Returns the number of jobs currently tracked by the client.

  • Method: job_count()

  • Returns: Integer representing the total number of jobs.

  • Example:



  • Description: Adds a job specification to the job tracking system.

  • Method: add_job(job_spec)

  • Parameters:

    • job_spec (JobSpec, optional): The job specification to add. If not provided, a new job ID will be generated.
  • Returns: String representing the job ID of the added job.

  • Raises:

    • ValueError: If a job with the specified job ID already exists.
  • Example:

    extract_task = ExtractTask(
    job_id = client.add_job(job_spec)


  • Description: Creates a new job with specified parameters and adds it to the job tracking dictionary.
  • Method: create_job(payload, source_id, source_name, document_type, tasks, job_id, extended_options)
  • Parameters:
    • payload (str): The payload associated with the job.
    • source_id (str): The source identifier for the job.
    • source_name (str): The unique name of the job's source data.
    • document_type (str, optional): The type of document to be processed.
    • tasks (list, optional): A list of tasks to be associated with the job.
    • job_id (uuid.UUID | str, optional): The unique identifier for the job.
    • extended_options (dict, optional): Additional options for job creation.
  • Returns: String representing the job ID.
  • Raises:
    • ValueError: If a job with the specified job ID already exists.


  • Description: Adds a task to an existing job.

  • Method: add_task(job_id, task)

  • Parameters:

    • job_id (str): The job ID to which the task will be added.
    • task (Task): The task to add.
  • Raises:

    • ValueError: If the job does not exist or is not in the correct state.
  • Example:

    job_spec = JobSpec(
            "tracing_options": {
                "trace": True,
                "ts_send": time.time_ns(),
    extract_task = ExtractTask(


  • Description: Creates a task with specified parameters and adds it to an existing job.

  • Method: create_task(job_id, task_type, task_params)

  • Parameters:

    • job_id (uuid.UUID | str): The unique identifier of the job.
    • task_type (TaskType): The type of the task.
    • task_params (dict, optional): Parameters for the task.
  • Raises:

    • ValueError: If the job does not exist or if an attempt is made to modify a job after its submission.
  • Example:

    job_id = client.add_job(job_spec)
    client.create_task(job_id, DedupTask, {content_type: "image", filter: True})

CLI Tool

After installation, you can use the nv-ingest-cli tool from the command line to manage and process datasets.

CLI Options

Here are the options provided by the CLI, explained:

  • --batch_size: Specifies the number of documents to process in a single batch. Default is 10. Must be 1 or more.
  • --doc: Adds a new document to be processed. Supports multiple entries. Files must exist.
  • --dataset: Specifies the path to a dataset definition file.
  • --client: Sets the client type with choices including REST, Redis, Kafka. Default is Redis.
  • --client_host: Specifies the DNS name or URL for the endpoint.
  • --client_port: Sets the port number for the client endpoint.
  • --client_kwargs: Provides additional arguments to pass to the client. Default is {}.
  • --concurrency_n: Defines the number of inflight jobs to maintain at one time. Default is 1.
  • --dry_run: Enables a dry run without executing actions.
  • --output_directory: Specifies the output directory for results.
  • --log_level: Sets the log level. Choices are DEBUG, INFO, WARNING, ERROR, CRITICAL. Default is INFO.
  • --shuffle_dataset: Shuffles the dataset before processing if enabled. Default is true.
  • --task: Allows for specification of tasks in JSON format. Supports multiple tasks.


You can find a notebook with examples using the client here.