Skip to content

Commit

Permalink
chore: reducing coverage loss (#619)
Browse files Browse the repository at this point in the history
* adding delete operation

* Adding Insert Statement

* checkpoint

* supporting multiple entries

* implemented for structured data error

* adding parser visitor for delete

* delete executor

* delete plan and rules

* adding delete to plan executor

* change position of LogicalDelete

* logical delimeter

* delete test case

* adding test case

* adding test case

* adding delete testcase

* adding predicate to delete executor

* adding delete to Image storage

* bug fix in delete

* fixing testcase

* adding test case for insert statement

* remove order_by from statement_binder.py

* better variable names, using Batch

* error message for insert

* removing order_by and limit from delete

* remove order_by and limit

* use f-string

* adding to changelog

* removing commit messages

* formatting

* fixing comments

* formatting

* eva insert f32 values

* fix: should delete range

* delete multiple rows

* udf bootstrap

* try to run tests in parallel

* minor fix for ray to work

* ray fixes

---------

Co-authored-by: Aryan-Rajoria <aryanrajoria1003@gmail.com>
Co-authored-by: Gaurav <gaurav21776@gmail.com>
  • Loading branch information
3 people authored Mar 23, 2023
1 parent b798c23 commit 77c07d7
Show file tree
Hide file tree
Showing 113 changed files with 1,028 additions and 861 deletions.
3 changes: 0 additions & 3 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,3 @@ exclude_lines =
class LogicalExchangeToPhysical(Rule):
class LogicalExchange(Operator):

[html]
show_contexts = True

76 changes: 33 additions & 43 deletions eva/binder/statement_binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,31 +90,30 @@ def _bind_create_index_statement(self, node: CreateIndexStatement):
# TODO: create index currently only works on TableInfo, but will extend later.
assert node.table_ref.is_table_atom(), "Index can only be created on Tableinfo"

if IndexType.is_faiss_index_type(node.index_type):
if not node.udf_func:
# Feature table type needs to be float32 numpy array.
col_def = node.col_list[0]
table_ref_obj = node.table_ref.table.table_obj
col = [
col for col in table_ref_obj.columns if col.name == col_def.name
][0]
if not col.array_type == NdArrayType.FLOAT32:
raise BinderError("Index input needs to be float32.")
if not len(col.array_dimensions) == 2:
raise BinderError("Index input needs to be 2 dimensional.")
else:
# Output of the UDF should be 2 dimension and float32 type.
catalog_manager = CatalogManager()
udf_obj = catalog_manager.get_udf_catalog_entry_by_name(
node.udf_func.name
)
for output in udf_obj.outputs:
if not output.array_type == NdArrayType.FLOAT32:
raise BinderError("Index input needs to be float32.")
if not len(output.array_dimensions) == 2:
raise BinderError("Index input needs to be 2 dimensional.")
assert IndexType.is_faiss_index_type(
node.index_type
), "Index type {} is not supported.".format(node.index_type)

if not node.udf_func:
# Feature table type needs to be float32 numpy array.
col_def = node.col_list[0]
table_ref_obj = node.table_ref.table.table_obj
col = [col for col in table_ref_obj.columns if col.name == col_def.name][0]
assert (
col.array_type == NdArrayType.FLOAT32
), "Index input needs to be float32."
assert len(col.array_dimensions) == 2
else:
raise BinderError("Index type {} is not supported.".format(node.index_type))
# Output of the UDF should be 2 dimension and float32 type.
catalog_manager = CatalogManager()
udf_obj = catalog_manager.get_udf_catalog_entry_by_name(node.udf_func.name)
for output in udf_obj.outputs:
assert (
output.array_type == NdArrayType.FLOAT32
), "Index input needs to be float32."
assert (
len(output.array_dimensions) == 2
), "Index input needs to be 2 dimensional."

@bind.register(SelectStatement)
def _bind_select_statement(self, node: SelectStatement):
Expand Down Expand Up @@ -158,10 +157,9 @@ def _bind_create_mat_statement(self, node: CreateMaterializedViewStatement):
@bind.register(RenameTableStatement)
def _bind_rename_table_statement(self, node: RenameTableStatement):
self.bind(node.old_table_ref)
if node.old_table_ref.table.table_obj.table_type == TableType.STRUCTURED_DATA:
err_msg = "Rename not yet supported on structured data"
logger.exception(err_msg)
raise BinderError(err_msg)
assert (
node.old_table_ref.table.table_obj.table_type != TableType.STRUCTURED_DATA
), "Rename not yet supported on structured data"

@bind.register(TableRef)
def _bind_tableref(self, node: TableRef):
Expand Down Expand Up @@ -230,14 +228,10 @@ def _bind_func_expr(self, node: FunctionExpression):
# Verify the consistency of the UDF. If the checksum of the UDF does not match
# the one stored in the catalog, an error will be thrown and the user will be
# asked to register the UDF again.
if get_file_checksum(udf_obj.impl_file_path) != udf_obj.checksum:
err_msg = (
f"UDF file {udf_obj.impl_file_path} has been modified from the "
"registration. Please create a new UDF using the CREATE UDF command or "
"UPDATE the existing one."
)
logger.error(err_msg)
raise BinderError(err_msg)
assert (
get_file_checksum(udf_obj.impl_file_path) == udf_obj.checksum
), f"""UDF file {udf_obj.impl_file_path} has been modified from the
registration. Please create a new UDF using the CREATE UDF command or UPDATE the existing one."""

try:
node.function = load_udf_class_from_file(
Expand Down Expand Up @@ -279,10 +273,6 @@ def _bind_func_expr(self, node: FunctionExpression):
]
node.alias = Alias(node.alias.alias_name, output_aliases)

if len(node.alias.col_names) != len(node.output_objs):
err_msg = (
f"Expected {len(node.output_objs)} output columns for "
f"{node.alias.alias_name}, got {len(node.alias.col_names)}."
)
logger.error(err_msg)
raise BinderError(err_msg)
assert len(node.alias.col_names) == len(
node.output_objs
), f"""Expected {len(node.output_objs)} output columns for {node.alias.alias_name}, got {len(node.alias.col_names)}."""
20 changes: 9 additions & 11 deletions eva/catalog/catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
from eva.parser.create_statement import ColumnDefinition
from eva.parser.table_ref import TableInfo
from eva.parser.types import FileFormatType
from eva.utils.errors import CatalogError
from eva.utils.generic_utils import generate_file_path, get_file_checksum
from eva.utils.logging_manager import logger

Expand Down Expand Up @@ -380,14 +379,17 @@ def create_and_insert_multimedia_table_catalog_entry(
Returns:
TableCatalogEntry: newly inserted table catalog entry
"""
assert format_type in [
FileFormatType.VIDEO,
FileFormatType.IMAGE,
], f"Format Type {format_type} is not supported"

if format_type is FileFormatType.VIDEO:
columns = get_video_table_column_definitions()
table_type = TableType.VIDEO_DATA
elif format_type is FileFormatType.IMAGE:
columns = get_image_table_column_definitions()
table_type = TableType.IMAGE_DATA
else:
raise CatalogError(f"Format Type {format_type} is not supported")

return self.create_and_insert_table_catalog_entry(
TableInfo(name), columns, table_type=table_type
Expand All @@ -407,10 +409,9 @@ def get_multimedia_metadata_table_catalog_entry(
# use file_url as the metadata table name
media_metadata_name = Path(input_table.file_url).stem
obj = self.get_table_catalog_entry(media_metadata_name)
if not obj:
err = f"Table with name {media_metadata_name} does not exist in catalog"
logger.exception(err)
raise CatalogError(err)
assert (
obj is not None
), f"Table with name {media_metadata_name} does not exist in catalog"

return obj

Expand All @@ -432,10 +433,7 @@ def create_and_insert_multimedia_metadata_table_catalog_entry(
# use file_url as the metadata table name
media_metadata_name = Path(input_table.file_url).stem
obj = self.get_table_catalog_entry(media_metadata_name)
if obj:
err_msg = f"Table with name {media_metadata_name} already exists"
logger.exception(err_msg)
raise CatalogError(err_msg)
assert obj is None, "Table with name {media_metadata_name} already exists"

columns = [ColumnDefinition("file_url", ColumnType.TEXT, None, None)]
obj = self.create_and_insert_table_catalog_entry(
Expand Down
14 changes: 14 additions & 0 deletions eva/catalog/sql_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,25 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from sqlalchemy import create_engine, event
from sqlalchemy.orm import scoped_session, sessionmaker

from eva.configuration.configuration_manager import ConfigurationManager

IDENTIFIER_COLUMN = "_row_id"

# import os
# def prefix_worker_id(uri: str):
# try:
# worker_id = os.environ["PYTEST_XDIST_WORKER"]
# base = "eva_catalog.db"
# uri = uri.replace(base, str(worker_id) + "_" + base)
# except KeyError:
# # Single threaded mode
# pass
# return uri


class SQLConfig:
"""Singleton class for configuring connection to the database.
Expand Down Expand Up @@ -46,6 +58,8 @@ def __init__(self):
Retrieves the database uri for connection from ConfigurationManager.
"""
uri = ConfigurationManager().get_value("core", "catalog_database_uri")
# parallelize using xdist
# worker_uri = prefix_worker_id(str(uri))
# set echo=True to log SQL
self.engine = create_engine(uri)

Expand Down
13 changes: 6 additions & 7 deletions eva/executor/abstract_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC, abstractmethod
from typing import Iterable, List, TypeVar
from typing import Generator, Iterable, List, TypeVar

from eva.models.storage.batch import Batch
from eva.plan_nodes.abstract_plan import AbstractPlan
Expand Down Expand Up @@ -51,22 +51,21 @@ def children(self) -> List[AbstractExecutor]:
return self._children

@children.setter
def children(self, children: List["AbstractExecutor"]):
def children(self, children):
self._children = children

@property
def node(self) -> AbstractPlan:
return self._node

@abstractmethod
def validate(self):
pass

@abstractmethod
def exec(self) -> Iterable[Batch]:
def exec(self, *args, **kwargs) -> Iterable[Batch]:
"""
This method is implemented by every executor.
Contains logic for that executor;
For retrival based executor : It fetchs frame batches from
child nodes and emits it to parent node.
"""

def __call__(self, *args, **kwargs) -> Generator[Batch, None, None]:
yield from self.exec(*args, **kwargs)
33 changes: 12 additions & 21 deletions eva/executor/apply_and_merge_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
from typing import Iterator

from eva.executor.abstract_executor import AbstractExecutor
from eva.executor.executor_utils import ExecutorError
from eva.models.storage.batch import Batch
from eva.plan_nodes.apply_and_merge_plan import ApplyAndMergePlan
from eva.utils.logging_manager import logger


class ApplyAndMergeExecutor(AbstractExecutor):
Expand All @@ -38,26 +36,19 @@ def __init__(self, node: ApplyAndMergePlan):
self.do_unnest = node.do_unnest
self.alias = node.alias

def validate(self):
pass

def exec(self, *args, **kwargs) -> Iterator[Batch]:
child_executor = self.children[0]
for batch in child_executor.exec(**kwargs):
res = self.func_expr.evaluate(batch)
try:
if not res.empty():
if self.do_unnest:
res.unnest()

# Merge the results to the input.
# This assumes that the batch index is preserved by the function
# call. Since both the batch and the results are sorted, we could
# perform a sorted merge, though the typical small size of the
# batch and results should not significantly impact performance.
merged_batch = Batch.join(batch, res)
merged_batch.reset_index()
yield merged_batch
except Exception as e:
logger.error(e)
raise ExecutorError(e)
if not res.empty():
if self.do_unnest:
res.unnest()

# Merge the results to the input.
# This assumes that the batch index is preserved by the function
# call. Since both the batch and the results are sorted, we could
# perform a sorted merge, though the typical small size of the
# batch and results should not significantly impact performance.
merged_batch = Batch.join(batch, res)
merged_batch.reset_index()
yield merged_batch
5 changes: 1 addition & 4 deletions eva/executor/create_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ def __init__(self, node: CreatePlan):
super().__init__(node)
self.catalog = CatalogManager()

def validate(self):
pass

def exec(self):
def exec(self, *args, **kwargs):
if not handle_if_not_exists(self.node.table_info, self.node.if_not_exists):
catalog_entry = self.catalog.create_and_insert_table_catalog_entry(
self.node.table_info, self.node.column_list
Expand Down
44 changes: 5 additions & 39 deletions eva/executor/create_index_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ class CreateIndexExecutor(AbstractExecutor):
def __init__(self, node: CreateIndexPlan):
super().__init__(node)

def validate(self):
pass

def exec(self):
def exec(self, *args, **kwargs):
catalog_manager = CatalogManager()
if catalog_manager.get_index_catalog_entry_by_name(self.node.name):
msg = f"Index {self.node.name} already exists."
Expand All @@ -60,10 +57,12 @@ def exec(self):
# Get the index type.
index_type = self.node.index_type

assert IndexType.is_faiss_index_type(
index_type
), "Index type {} is not supported.".format(index_type)

if IndexType.is_faiss_index_type(index_type):
self._create_faiss_index()
else:
raise ExecutorError("Index type {} is not supported.".format(index_type))

yield Batch(
pd.DataFrame(
Expand All @@ -78,39 +77,6 @@ def _get_index_save_path(self):
/ Path("{}_{}.index".format(self.node.index_type, self.node.name))
)

# Comment out since Index IO is not needed for now.
# def _get_index_io_list(self, input_dim):
# # Input dimension is inferred from the actual feature.
# catalog_manager = CatalogManager()
# input_index_io = catalog_manager.index_io(
# "input_feature",
# ColumnType.NDARRAY,
# NdArrayType.FLOAT32,
# [Dimension.ANYDIM, input_dim],
# True,
# )

# # Output dimension depends on number of searched
# # feature vectors and top N similar feature vectors.
# # IndexIO has detailed documentation about input and
# # output format of index.
# id_index_io = catalog_manager.index_io(
# "logical_id",
# ColumnType.NDARRAY,
# NdArrayType.INT64,
# [Dimension.ANYDIM, Dimension.ANYDIM],
# False,
# )
# distance_index_io = catalog_manager.index_io(
# "distance",
# ColumnType.NDARRAY,
# NdArrayType.FLOAT32,
# [Dimension.ANYDIM, Dimension.ANYDIM],
# False,
# )

# return [input_index_io, id_index_io, distance_index_io]

def _create_faiss_index(self):
try:
catalog_manager = CatalogManager()
Expand Down
5 changes: 1 addition & 4 deletions eva/executor/create_mat_view_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ def __init__(self, node: CreateMaterializedViewPlan):
super().__init__(node)
self.catalog = CatalogManager()

def validate(self):
pass

def exec(self):
def exec(self, *args, **kwargs):
"""Create materialized view executor"""
if not handle_if_not_exists(self.node.view, self.node.if_not_exists):
child = self.children[0]
Expand Down
5 changes: 1 addition & 4 deletions eva/executor/create_udf_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ class CreateUDFExecutor(AbstractExecutor):
def __init__(self, node: CreateUDFPlan):
super().__init__(node)

def validate(self):
pass

def exec(self):
def exec(self, *args, **kwargs):
"""Create udf executor
Calls the catalog to insert a udf catalog entry.
Expand Down
Loading

0 comments on commit 77c07d7

Please sign in to comment.