Skip to content

Commit

Permalink
feat: Delete and Insert operators for structured data (#587)
Browse files Browse the repository at this point in the history
- Added support for inserting and deleting tuples in structured data storage engine
- Made `batch_mem_size` an optional parameter
  • Loading branch information
jarulraj authored Feb 17, 2023
1 parent 034a74d commit 86825d3
Show file tree
Hide file tree
Showing 38 changed files with 673 additions and 108 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### [Breaking Changes]
### [Added]

* PR #568: feat: INSERT and DELETE

### [Changed]

* PR #581: fix: debugging issuse in yolov5
Expand Down
7 changes: 7 additions & 0 deletions eva/binder/statement_binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from eva.parser.alias import Alias
from eva.parser.create_index_statement import CreateIndexStatement
from eva.parser.create_mat_view_statement import CreateMaterializedViewStatement
from eva.parser.delete_statement import DeleteTableStatement
from eva.parser.explain_statement import ExplainStatement
from eva.parser.rename_statement import RenameTableStatement
from eva.parser.select_statement import SelectStatement
Expand Down Expand Up @@ -143,6 +144,12 @@ def _bind_select_statement(self, node: SelectStatement):
self.bind(node.union_link)
self._binder_context = current_context

@bind.register(DeleteTableStatement)
def _bind_delete_statement(self, node: DeleteTableStatement):
self.bind(node.table_ref)
if node.where_clause:
self.bind(node.where_clause)

@bind.register(CreateMaterializedViewStatement)
def _bind_create_mat_statement(self, node: CreateMaterializedViewStatement):
self.bind(node.query)
Expand Down
86 changes: 86 additions & 0 deletions eva/executor/delete_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# coding=utf-8
# Copyright 2018-2022 EVA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Generator, Iterator

import pandas as pd

from eva.catalog.catalog_manager import CatalogManager
from eva.catalog.catalog_type import TableType
from eva.executor.abstract_executor import AbstractExecutor
from eva.executor.executor_utils import ExecutorError, apply_predicate
from eva.models.storage.batch import Batch
from eva.plan_nodes.project_plan import ProjectPlan
from eva.storage.storage_engine import StorageEngine
from eva.utils.logging_manager import logger


class DeleteExecutor(AbstractExecutor):
""" """

def __init__(self, node: ProjectPlan):
super().__init__(node)
self.predicate = node.where_clause
self.catalog = CatalogManager()

def validate(self):
pass

def exec(self, **kwargs) -> Iterator[Batch]:
try:
table_catalog = self.node.table_ref.table.table_obj
storage_engine = StorageEngine.factory(table_catalog)

del_batch = Batch()

if table_catalog.table_type == TableType.VIDEO_DATA:
raise NotImplementedError("DELETE only implemented for structured data")
elif table_catalog.table_type == TableType.IMAGE_DATA:
raise NotImplementedError("DELETE only implemented for structured data")
elif table_catalog.table_type == TableType.STRUCTURED_DATA:
del_batch = storage_engine.read(table_catalog)
del_batch = list(del_batch)[0]

# Added because of inconsistency in col_alias in Structured data Batch project function
original_column_names = list(del_batch.frames.columns)
column_names = [
f"{table_catalog.name.lower()}.{name}"
for name in original_column_names
if not name == "_row_id"
]
column_names.insert(0, "_row_id")
del_batch.frames.columns = column_names
del_batch = apply_predicate(del_batch, self.predicate)

# All the batches that need to be deleted

if table_catalog.table_type == TableType.VIDEO_DATA:
storage_engine.delete(table_catalog, del_batch)
elif table_catalog.table_type == TableType.IMAGE_DATA:
storage_engine.delete(table_catalog, del_batch)
elif table_catalog.table_type == TableType.STRUCTURED_DATA:
del_batch.frames.columns = original_column_names
table_needed = del_batch.frames[
[f"{self.predicate.children[0].col_name}"]
]
for num in range(len(del_batch)):
storage_engine.delete(table_catalog, table_needed.iloc[num])
yield Batch(pd.DataFrame(["Deleted row"]))

except Exception as e:
logger.error(e)
raise ExecutorError(e)

def __call__(self, **kwargs) -> Generator[Batch, None, None]:
yield from self.exec(**kwargs)
69 changes: 63 additions & 6 deletions eva/executor/insert_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,78 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pandas as pd

from eva.catalog.catalog_manager import CatalogManager
from eva.catalog.catalog_type import TableType
from eva.catalog.models.table_catalog import TableCatalogEntry
from eva.executor.abstract_executor import AbstractExecutor
from eva.executor.executor_utils import ExecutorError
from eva.models.storage.batch import Batch
from eva.plan_nodes.insert_plan import InsertPlan
from eva.storage.abstract_storage_engine import AbstractStorageEngine
from eva.storage.storage_engine import StorageEngine
from eva.utils.logging_manager import logger


class InsertExecutor(AbstractExecutor):
def __init__(self, node: InsertPlan):
super().__init__(node)
self.catalog = CatalogManager()

def validate(self):
pass

def exec(self):
"""
Based on the table it constructs a valid tuple using the values
provided.
Right now we assume there are no missing values
"""
raise NotImplementedError
storage_engine = None
table_catalog_entry = None
try:
# Get catalog entry
table_name = self.node.table_ref.table.table_name
database_name = self.node.table_ref.table.database_name
table_catalog_entry = self.catalog.get_table_catalog_entry(
table_name, database_name
)

# Implemented only for STRUCTURED_DATA
if table_catalog_entry.table_type != TableType.STRUCTURED_DATA:
raise NotImplementedError("INSERT only implemented for structured data")

values_to_insert = []
for i in self.node.value_list:
values_to_insert.append(i.value)
tuple_to_insert = tuple(values_to_insert)
columns_to_insert = []
for i in self.node.column_list:
columns_to_insert.append(i.col_name)

# Adding all values to Batch for insert
logger.info(values_to_insert)
logger.info(columns_to_insert)
dataframe = pd.DataFrame([tuple_to_insert], columns=columns_to_insert)
batch = Batch(dataframe)

storage_engine = StorageEngine.factory(table_catalog_entry)
storage_engine.write(table_catalog_entry, batch)
except Exception as e:
err_msg = f"Insert failed: encountered unexpected error {str(e)}"
logger.error(err_msg)
raise ExecutorError(err_msg)
else:
yield Batch(
pd.DataFrame([f"Number of rows loaded: {str(len(values_to_insert))}"])
)

def _rollback_load(
self,
storage_engine: AbstractStorageEngine,
table_obj: TableCatalogEntry,
do_create: bool,
):
try:
if do_create:
storage_engine.drop(table_obj)
except Exception as e:
logger.exception(
f"Unexpected Exception {e} occured while rolling back. This is bad as the {self.media_type.name} table can be in a corrupt state. Please verify the table {table_obj} for correctness."
)
4 changes: 4 additions & 0 deletions eva/executor/plan_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from eva.executor.create_index_executor import CreateIndexExecutor
from eva.executor.create_mat_view_executor import CreateMaterializedViewExecutor
from eva.executor.create_udf_executor import CreateUDFExecutor
from eva.executor.delete_executor import DeleteExecutor
from eva.executor.drop_executor import DropExecutor
from eva.executor.drop_udf_executor import DropUDFExecutor
from eva.executor.executor_utils import ExecutorError
Expand Down Expand Up @@ -138,6 +139,9 @@ def _build_execution_tree(self, plan: AbstractPlan) -> AbstractExecutor:
executor_node = ApplyAndMergeExecutor(node=plan)
elif plan_opr_type == PlanOprType.FAISS_INDEX_SCAN:
executor_node = FaissIndexScanExecutor(node=plan)
elif plan_opr_type == PlanOprType.DELETE:
executor_node = DeleteExecutor(node=plan)

# EXPLAIN does not need to build execution tree for its children
if plan_opr_type != PlanOprType.EXPLAIN:
# Build Executor Tree for children
Expand Down
9 changes: 0 additions & 9 deletions eva/experimental/ray/optimizer/rules/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
if TYPE_CHECKING:
from eva.optimizer.optimizer_context import OptimizerContext

from eva.configuration.configuration_manager import ConfigurationManager
from eva.experimental.ray.planner.exchange_plan import ExchangePlan
from eva.expression.function_expression import FunctionExpression
from eva.optimizer.operators import (
Expand Down Expand Up @@ -92,19 +91,11 @@ def apply(self, before: LogicalGet, context: OptimizerContext):
# Configure the batch_mem_size. It decides the number of rows
# read in a batch from storage engine.
# ToDO: Experiment heuristics.

batch_mem_size = 30000000 # 30mb
config_batch_mem_size = ConfigurationManager().get_value(
"executor", "batch_mem_size"
)
if config_batch_mem_size:
batch_mem_size = config_batch_mem_size
scan = SeqScanPlan(None, before.target_list, before.alias)
lower = ExchangePlan(parallelism=1)
lower.append_child(
StoragePlan(
before.table_obj,
batch_mem_size=batch_mem_size,
predicate=before.predicate,
sampling_rate=before.sampling_rate,
)
Expand Down
5 changes: 1 addition & 4 deletions eva/optimizer/memo.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,5 @@ def add_group_expr(

assert (
expr.group_id is not UNDEFINED_GROUP_ID
), """Expr
should have a
valid group
id"""
), """Expr should have a valid group id"""
return expr
50 changes: 49 additions & 1 deletion eva/optimizer/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class OperatorType(IntEnum):
LOGICALFILTER = auto()
LOGICALPROJECT = auto()
LOGICALINSERT = auto()
LOGICALDELETE = auto()
LOGICALCREATE = auto()
LOGICALRENAME = auto()
LOGICALDROP = auto()
Expand Down Expand Up @@ -398,7 +399,7 @@ class LogicalInsert(Operator):
"""[Logical Node for Insert operation]
Arguments:
table(TableCatalogEntry): table to intert data into
table(TableCatalogEntry): table to insert data into
column_list{List[AbstractExpression]}:
[After binding annotated column_list]
value_list{List[AbstractExpression]}:
Expand Down Expand Up @@ -451,6 +452,53 @@ def __hash__(self) -> int:
)


class LogicalDelete(Operator):
"""[Logical Node for Delete Operation]
Arguments:
table_ref(TableCatalogEntry): table to delete tuples from,
where_clause(AbstractExpression): the predicate used to select which rows to delete,
"""

def __init__(
self,
table_ref: TableRef,
where_clause: AbstractExpression = None,
children=None,
):
super().__init__(OperatorType.LOGICALDELETE, children)
self._table_ref = table_ref
self._where_clause = where_clause

@property
def table_ref(self):
return self._table_ref

@property
def where_clause(self):
return self._where_clause

def __eq__(self, other):
is_subtree_equal = super().__eq__(other)
if not isinstance(other, LogicalDelete):
return False
return (
is_subtree_equal
and self.table_ref == other.table_ref
and self.where_clause == other.where_clause
)

def __hash__(self) -> int:
return hash(
(
super().__hash__(),
self.table_ref,
self.where_clause,
)
)


class LogicalCreate(Operator):
"""Logical node for create table operations
Expand Down
Loading

0 comments on commit 86825d3

Please sign in to comment.