Skip to content

Commit

Permalink
fix: insert and delete executor (#611)
Browse files Browse the repository at this point in the history
* Insert feedback (#610)

* fix to insert and delete executor

* variable names

* removing unnecessary arrays

* use SQLalchemy for delete execution predicates

* formatting

* moving delete_predicate to expression_utils

* formatting

---------

Co-authored-by: jarulraj <arulraj@gatech.edu>
  • Loading branch information
aryan-rajoria and jarulraj authored Mar 28, 2023
1 parent c352c5e commit 274dd26
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 79 deletions.
95 changes: 71 additions & 24 deletions eva/executor/delete_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@
from typing import Iterator

import pandas as pd
from sqlalchemy import and_, or_

from eva.catalog.catalog_manager import CatalogManager
from eva.catalog.catalog_type import TableType
from eva.catalog.models.table_catalog import TableCatalogEntry
from eva.executor.abstract_executor import AbstractExecutor
from eva.executor.executor_utils import apply_predicate
from eva.expression.abstract_expression import ExpressionType
from eva.expression.comparison_expression import ComparisonExpression
from eva.expression.constant_value_expression import ConstantValueExpression
from eva.expression.logical_expression import LogicalExpression
from eva.expression.tuple_value_expression import TupleValueExpression
from eva.models.storage.batch import Batch
from eva.plan_nodes.project_plan import ProjectPlan
from eva.storage.storage_engine import StorageEngine
Expand All @@ -33,34 +39,75 @@ def __init__(self, node: ProjectPlan):
self.predicate = node.where_clause
self.catalog = CatalogManager()

def predicate_node_to_filter_clause(
self, table: TableCatalogEntry, predicate_node: ComparisonExpression
):
filter_clause = None
left = predicate_node.get_child(0)
right = predicate_node.get_child(1)

if type(left) == TupleValueExpression:
column = left.col_name
x = table.columns[column]
elif type(left) == ConstantValueExpression:
value = left.value
x = value
else:
left_filter_clause = self.predicate_node_to_filter_clause(table, left)

if type(right) == TupleValueExpression:
column = right.col_name
y = table.columns[column]
elif type(right) == ConstantValueExpression:
value = right.value
y = value
else:
right_filter_clause = self.predicate_node_to_filter_clause(table, right)

if type(predicate_node) == LogicalExpression:
if predicate_node.etype == ExpressionType.LOGICAL_AND:
filter_clause = and_(left_filter_clause, right_filter_clause)
elif predicate_node.etype == ExpressionType.LOGICAL_OR:
filter_clause = or_(left_filter_clause, right_filter_clause)

elif type(predicate_node) == ComparisonExpression:
assert (
predicate_node.etype != ExpressionType.COMPARE_CONTAINS
and predicate_node.etype != ExpressionType.COMPARE_IS_CONTAINED
), f"Predicate type {predicate_node.etype} not supported in delete"

if predicate_node.etype == ExpressionType.COMPARE_EQUAL:
filter_clause = x == y
elif predicate_node.etype == ExpressionType.COMPARE_GREATER:
filter_clause = x > y
elif predicate_node.etype == ExpressionType.COMPARE_LESSER:
filter_clause = x < y
elif predicate_node.etype == ExpressionType.COMPARE_GEQ:
filter_clause = x >= y
elif predicate_node.etype == ExpressionType.COMPARE_LEQ:
filter_clause = x <= y
elif predicate_node.etype == ExpressionType.COMPARE_NEQ:
filter_clause = x != y

return filter_clause

def exec(self, *args, **kwargs) -> Iterator[Batch]:
table_catalog = self.node.table_ref.table.table_obj
storage_engine = StorageEngine.factory(table_catalog)
del_batch = Batch()

assert (
table_catalog.table_type == TableType.STRUCTURED_DATA
), "DELETE only implemented for structured data"

del_batch = storage_engine.read(table_catalog)
del_batch = list(del_batch)[0]

# Added because of inconsistency in col_alias
# in structured data Batch project function
original_column_names = list(del_batch.frames.columns)
column_names = [
f"{table_catalog.name.lower()}.{name}"
for name in original_column_names
if not name == "_row_id"
]
column_names.insert(0, "_row_id")
del_batch.frames.columns = column_names
del_batch = apply_predicate(del_batch, self.predicate)

# All the batches that need to be deleted
del_batch.frames.columns = original_column_names
table_needed = del_batch.frames[[f"{self.predicate.children[0].col_name}"]]
for num in range(len(del_batch)):
storage_engine.delete(table_catalog, table_needed.iloc[num])

yield Batch(pd.DataFrame(["Deleted row"]))
table_to_delete_from = storage_engine._try_loading_table_via_reflection(
table_catalog.name
)

sqlalchemy_filter_clause = self.predicate_node_to_filter_clause(
table_to_delete_from, predicate_node=self.predicate
)
# verify where clause and convert to sqlalchemy supported filter
# https://stackoverflow.com/questions/34026210/where-filter-from-table-object-using-a-dictionary-or-kwargs

storage_engine.delete(table_catalog, sqlalchemy_filter_clause)
yield Batch(pd.DataFrame(["Deleted rows"]))
11 changes: 2 additions & 9 deletions eva/executor/insert_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from eva.models.storage.batch import Batch
from eva.plan_nodes.insert_plan import InsertPlan
from eva.storage.storage_engine import StorageEngine
from eva.utils.logging_manager import logger


class InsertExecutor(AbstractExecutor):
Expand All @@ -44,17 +43,11 @@ def exec(self, *args, **kwargs):
table_catalog_entry.table_type == TableType.STRUCTURED_DATA
), "INSERT only implemented for structured data"

values_to_insert = []
for i in self.node.value_list:
values_to_insert.append(i.value)
values_to_insert = [val_node.value for val_node in self.node.value_list]
tuple_to_insert = tuple(values_to_insert)
columns_to_insert = []
for i in self.node.column_list:
columns_to_insert.append(i.col_name)
columns_to_insert = [col_node.col_name for col_node in self.node.column_list]

# Adding all values to Batch for insert
logger.info(values_to_insert)
logger.info(columns_to_insert)
dataframe = pd.DataFrame([tuple_to_insert], columns=columns_to_insert)
batch = Batch(dataframe)

Expand Down
2 changes: 1 addition & 1 deletion eva/expression/expression_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def to_conjunction_list(
) -> List[AbstractExpression]:
"""Convert expression tree to list of conjuntives
Note: It does not normalize the expression tree before extracting the conjuntives.
Note: It does not normalize the expression tree before extracting the conjunctives.
Args:
expression_tree (AbstractExpression): expression tree to transform
Expand Down
30 changes: 8 additions & 22 deletions eva/storage/sqlite_storage_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Dict, Iterator, List
from typing import Iterator, List

import numpy as np
import pandas as pd
from sqlalchemy import Table, and_, inspect
from sqlalchemy import Table, inspect
from sqlalchemy.sql.expression import ColumnElement

from eva.catalog.catalog_type import ColumnType
from eva.catalog.models.base_model import BaseModel
Expand Down Expand Up @@ -184,34 +185,19 @@ def read(
logger.exception(err_msg)
raise Exception(err_msg)

def delete(self, table: TableCatalogEntry, where_clause: Dict[str, Any]):
def delete(
self, table: TableCatalogEntry, sqlalchemy_filter_clause: ColumnElement[bool]
):
"""Delete tuples from the table where rows satisfy the where_clause.
The current implementation only handles equality predicates.
Argument:
table: table metadata object of the table
where_clause (Dict[str, Any]): where clause use to find the tuples to
remove. The key should be the column name and value should be the tuple
value. The function assumes an equality condition
where_clause: clause used to find the tuples to remove.
"""
try:
table_to_delete_from = self._try_loading_table_via_reflection(table.name)
table_columns = [
col.name
for col in table_to_delete_from.columns
if col.name != "_row_id"
]
filter_clause = []
# verify where clause and convert to sqlalchemy supported filter
# https://stackoverflow.com/questions/34026210/where-filter-from-table-object-using-a-dictionary-or-kwargs
for column, value in where_clause.items():
if column not in table_columns:
raise Exception(
f"where_clause contains a column {column} not in the table {table_to_delete_from}"
)
filter_clause.append(table_to_delete_from.columns[column] == value)

d = table_to_delete_from.delete().where(and_(*filter_clause))
d = table_to_delete_from.delete().where(sqlalchemy_filter_clause)
self._sql_engine.execute(d)
self._sql_session.commit()
except Exception as e:
Expand Down
6 changes: 3 additions & 3 deletions test/integration_tests/test_delete_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@ def test_should_delete_single_image_in_table(self):
)

def test_should_delete_tuple_in_table(self):
delete_query = "DELETE FROM testDeleteOne WHERE id < 20;"
delete_query = """DELETE FROM testDeleteOne WHERE
id < 20 OR dummyfloat < 2 AND id < 5 AND 20 > id
AND id <= 20 AND id >= 5 OR id != 15 OR id = 15;"""
batch = execute_query_fetch_all(delete_query)

query = "SELECT * FROM testDeleteOne;"
batch = execute_query_fetch_all(query)
from eva.utils.logging_manager import logger

logger.info(batch)
np.testing.assert_array_equal(
batch.frames["testdeleteone.id"].array,
np.array([25], dtype=np.int64),
Expand Down
26 changes: 6 additions & 20 deletions test/integration_tests/test_insert_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,14 @@ def test_should_load_video_in_table(self):
query = f"""LOAD VIDEO '{self.video_file_path}' INTO MyVideo;"""
execute_query_fetch_all(query)

insert_query = """ INSERT INTO MyVideo (id, data) VALUES (
[
[
40,
[
[[40, 40, 40], [40, 40, 40]],
[[40, 40, 40], [40, 40, 40]]
]
]
]);"""
insert_query = """ INSERT INTO MyVideo (id, data) VALUES
(40, [[40, 40, 40], [40, 40, 40]],
[[40, 40, 40], [40, 40, 40]]);"""
execute_query_fetch_all(insert_query)

insert_query_2 = """ INSERT INTO MyVideo (id, data) VALUES (
[
[
41,
[
[[41, 41, 41] , [41, 41, 41]],
[[41, 41, 41], [41, 41, 41]]
]
]
]);"""
insert_query_2 = """ INSERT INTO MyVideo (id, data) VALUES
( 41, [[41, 41, 41] , [41, 41, 41]],
[[41, 41, 41], [41, 41, 41]]);"""
execute_query_fetch_all(insert_query_2)

query = "SELECT id, data FROM MyVideo WHERE id = 40"
Expand Down

0 comments on commit 274dd26

Please sign in to comment.