Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create WriteToVectorDB module #22

14 changes: 13 additions & 1 deletion examples/llm/common/content_extractor_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def process_content(docs: list[Document], file_meta: FileMeta, chunk_size: int,
for chunk in split_text:
processed_data.append({
'title': file_meta.file_name,
'link': 'none',
'source': f"{file_meta.file_type}:{file_meta.file_path}",
'summary': 'none',
'content': chunk
Expand Down Expand Up @@ -272,6 +273,14 @@ def file_content_extractor(builder: mrc.Builder):
"txt": TextConverter()
}

chunk_params = {
file_type: {
"chunk_size": converters_meta.get(file_type, {}).get("chunk_size", chunk_size),
"chunk_overlap": converters_meta.get(file_type, {}).get("chunk_overlap", chunk_overlap)
}
for file_type in converters.keys()
}

def parse_files(open_files: typing.List[fsspec.core.OpenFile]) -> MessageMeta:
data = []
with ThreadPoolExecutor(max_workers=num_threads) as executor:
Expand All @@ -292,7 +301,10 @@ def parse_files(open_files: typing.List[fsspec.core.OpenFile]) -> MessageMeta:
for file_meta, future in zip(files_meta, futures):
docs = future.result()
if docs:
result = process_content(docs, file_meta, chunk_size, chunk_overlap)
file_type_chunk_params = chunk_params[file_meta.file_type]
result = process_content(docs, file_meta,
file_type_chunk_params["chunk_size"],
file_type_chunk_params["chunk_overlap"])
if result:
data.extend(result)

Expand Down
57 changes: 12 additions & 45 deletions examples/llm/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import logging

import pymilvus
from morpheus.service.vdb.milvus_client import DATA_TYPE_MAP
from langchain.embeddings import HuggingFaceEmbeddings

from morpheus.llm.services.nemo_llm_service import NeMoLLMService
Expand Down Expand Up @@ -45,51 +46,17 @@ def build_llm_service(model_name: str, llm_service: str, tokens_to_generate: int
return llm_service.get_client(model_name, **model_kwargs)


def build_milvus_config(embedding_size: int):
milvus_resource_kwargs = {
"index_conf": {
"field_name": "embedding",
"metric_type": "L2",
"index_type": "HNSW",
"params": {
"M": 8,
"efConstruction": 64,
},
},
"schema_conf": {
"enable_dynamic_field": True,
"schema_fields": [
pymilvus.FieldSchema(name="id",
dtype=pymilvus.DataType.INT64,
description="Primary key for the collection",
is_primary=True,
auto_id=True).to_dict(),
pymilvus.FieldSchema(name="title",
dtype=pymilvus.DataType.VARCHAR,
description="Title or heading of the data entry",
max_length=65_535).to_dict(),
pymilvus.FieldSchema(name="source",
dtype=pymilvus.DataType.VARCHAR,
description="Source or origin of the data entry",
max_length=65_535).to_dict(),
pymilvus.FieldSchema(name="summary",
dtype=pymilvus.DataType.VARCHAR,
description="Brief summary or abstract of the data content",
max_length=65_535).to_dict(),
pymilvus.FieldSchema(name="content",
dtype=pymilvus.DataType.VARCHAR,
description="Main content or body of the data entry",
max_length=65_535).to_dict(),
pymilvus.FieldSchema(name="embedding",
dtype=pymilvus.DataType.FLOAT_VECTOR,
description="Embedding vectors representing the data entry",
dim=embedding_size).to_dict(),
],
"description": "Collection schema for diverse data sources"
}
}

return milvus_resource_kwargs
def build_milvus_config(resource_schema_config: dict):

schema_fields = []
for field_data in resource_schema_config["schema_conf"]["schema_fields"]:
field_data["dtype"] = DATA_TYPE_MAP.get(field_data["dtype"])
field_schema = pymilvus.FieldSchema(**field_data)
schema_fields.append(field_schema.to_dict())

resource_schema_config["schema_conf"]["schema_fields"] = schema_fields

return resource_schema_config


def build_milvus_service(embedding_size: int, uri: str = "http://localhost:19530"):
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/vdb_upload/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def process_vdb_sources(pipe: Pipeline, config: Config, vdb_source_config: typin
return vdb_sources


def build_milvus_config(embedding_size: int) -> typing.Dict[str, typing.Any]:
def build_defualt_milvus_config(embedding_size: int) -> typing.Dict[str, typing.Any]:
"""
Builds the configuration for Milvus.

Expand Down
1 change: 0 additions & 1 deletion examples/llm/vdb_upload/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def pipeline(pipeline_config: Config, source_config: typing.List, vdb_config: ty
monitor_2 = pipe.add_stage(
MonitorStage(pipeline_config, description="Inference rate", unit="events", delayed_start=True))

# TODO(Bhargav): Convert WriteToVectorDBStage to module + retain backwards compatibility.
vector_db = pipe.add_stage(WriteToVectorDBStage(pipeline_config, **vdb_config))

monitor_3 = pipe.add_stage(
Expand Down
12 changes: 9 additions & 3 deletions examples/llm/vdb_upload/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
import yaml

from morpheus.config import Config, PipelineModes
from ..common.utils import build_milvus_config
from .common import build_defualt_milvus_config
from ..common.utils import build_rss_urls
from ..common.utils import build_milvus_config

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -218,7 +219,7 @@ def build_cli_configs(source_type, enable_cache, embedding_size, isolate_embeddi
cli_vdb_conf = {
'embedding_size': embedding_size,
'recreate': True,
'resource_kwargs': build_milvus_config(embedding_size) if (vector_db_service == 'milvus') else None,
'resource_kwargs': build_defualt_milvus_config(embedding_size) if (vector_db_service == 'milvus') else None,
'resource_name': vector_db_resource_name,
'service': vector_db_service,
'uri': vector_db_uri,
Expand Down Expand Up @@ -308,7 +309,12 @@ def build_final_config(vdb_conf_path, cli_source_conf, cli_embeddings_conf, cli_
pipeline_conf = merge_configs(vdb_pipeline_config.get('pipeline', {}), cli_pipeline_conf)
source_conf = vdb_pipeline_config.get('sources', []) + list(cli_source_conf.values())
tokenizer_conf = merge_configs(vdb_pipeline_config.get('tokenizer', {}), cli_tokenizer_conf)
vdb_conf = merge_configs(vdb_pipeline_config.get('vdb', {}), cli_vdb_conf)
vdb_conf = vdb_pipeline_config.get('vdb', {})
resource_schema = vdb_conf.pop("resource_shema", None)

if resource_schema:
vdb_conf["resource_kwargs"] = build_milvus_config(resource_schema)
vdb_conf = merge_configs(vdb_conf, cli_vdb_conf)

# TODO: class labels depends on this, so it should be a pipeline level parameter, not a vdb level parameter
pipeline_conf['embedding_size'] = vdb_conf.get('embedding_size')
Expand Down
40 changes: 39 additions & 1 deletion examples/llm/vdb_upload/vdb_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,46 @@ vdb_pipeline:
model_name: "bert-base-uncased-hash"

vdb:
embedding_size: 384 # Size of the embeddings to store in the vector database
embedding_size: 384
recreate: True # Whether to recreate the resource if it already exists
resource_name: "VDB2" # Identifier for the resource in the vector database
service: "milvus" # Specify the type of vector database
uri: "http://localhost:19530" # URI for connecting to the Vector Database server
resource_schema:
index_conf:
field_name: embedding
metric_type: L2
index_type: HNSW
params:
M: 8
efConstruction: 64

schema_conf:
enable_dynamic_field: true
schema_fields:
- name: id
dtype: INT64
description: Primary key for the collection
is_primary: true
auto_id: true
- name: title
dtype: VARCHAR
description: Title or heading of the data entry
max_length: 65_535
- name: source
dtype: VARCHAR
description: Source or origin of the data entry
max_length: 65_535
- name: summary
dtype: VARCHAR
description: Brief summary or abstract of the data content
max_length: 65_535
- name: content
dtype: VARCHAR
description: Main content or body of the data entry
max_length: 65_535
- name: embedding
dtype: FLOAT_VECTOR
description: Embedding vectors representing the data entry
dim: 384 # Size of the embeddings to store in the vector database
description: Collection schema for diverse data sources
Empty file.
Loading
Loading