Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: insert and delete executor #611

Merged
merged 8 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 71 additions & 24 deletions eva/executor/delete_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@
from typing import Iterator

import pandas as pd
from sqlalchemy import and_, or_

from eva.catalog.catalog_manager import CatalogManager
from eva.catalog.catalog_type import TableType
from eva.catalog.models.table_catalog import TableCatalogEntry
from eva.executor.abstract_executor import AbstractExecutor
from eva.executor.executor_utils import apply_predicate
from eva.expression.abstract_expression import ExpressionType
from eva.expression.comparison_expression import ComparisonExpression
from eva.expression.constant_value_expression import ConstantValueExpression
from eva.expression.logical_expression import LogicalExpression
from eva.expression.tuple_value_expression import TupleValueExpression
from eva.models.storage.batch import Batch
from eva.plan_nodes.project_plan import ProjectPlan
from eva.storage.storage_engine import StorageEngine
Expand All @@ -33,34 +39,75 @@ def __init__(self, node: ProjectPlan):
self.predicate = node.where_clause
self.catalog = CatalogManager()

def predicate_node_to_filter_clause(
self, table: TableCatalogEntry, predicate_node: ComparisonExpression
):
filter_clause = None
left = predicate_node.get_child(0)
right = predicate_node.get_child(1)

if type(left) == TupleValueExpression:
column = left.col_name
x = table.columns[column]
elif type(left) == ConstantValueExpression:
value = left.value
x = value
else:
left_filter_clause = self.predicate_node_to_filter_clause(table, left)

if type(right) == TupleValueExpression:
column = right.col_name
y = table.columns[column]
elif type(right) == ConstantValueExpression:
value = right.value
y = value
else:
right_filter_clause = self.predicate_node_to_filter_clause(table, right)

if type(predicate_node) == LogicalExpression:
if predicate_node.etype == ExpressionType.LOGICAL_AND:
filter_clause = and_(left_filter_clause, right_filter_clause)
elif predicate_node.etype == ExpressionType.LOGICAL_OR:
filter_clause = or_(left_filter_clause, right_filter_clause)

elif type(predicate_node) == ComparisonExpression:
assert (
predicate_node.etype != ExpressionType.COMPARE_CONTAINS
and predicate_node.etype != ExpressionType.COMPARE_IS_CONTAINED
), f"Predicate type {predicate_node.etype} not supported in delete"

if predicate_node.etype == ExpressionType.COMPARE_EQUAL:
filter_clause = x == y
elif predicate_node.etype == ExpressionType.COMPARE_GREATER:
filter_clause = x > y
elif predicate_node.etype == ExpressionType.COMPARE_LESSER:
filter_clause = x < y
elif predicate_node.etype == ExpressionType.COMPARE_GEQ:
filter_clause = x >= y
elif predicate_node.etype == ExpressionType.COMPARE_LEQ:
filter_clause = x <= y
elif predicate_node.etype == ExpressionType.COMPARE_NEQ:
filter_clause = x != y

return filter_clause

def exec(self, *args, **kwargs) -> Iterator[Batch]:
table_catalog = self.node.table_ref.table.table_obj
storage_engine = StorageEngine.factory(table_catalog)
del_batch = Batch()

assert (
table_catalog.table_type == TableType.STRUCTURED_DATA
), "DELETE only implemented for structured data"

del_batch = storage_engine.read(table_catalog)
del_batch = list(del_batch)[0]

# Added because of inconsistency in col_alias
# in structured data Batch project function
original_column_names = list(del_batch.frames.columns)
column_names = [
f"{table_catalog.name.lower()}.{name}"
for name in original_column_names
if not name == "_row_id"
]
column_names.insert(0, "_row_id")
del_batch.frames.columns = column_names
del_batch = apply_predicate(del_batch, self.predicate)

# All the batches that need to be deleted
del_batch.frames.columns = original_column_names
table_needed = del_batch.frames[[f"{self.predicate.children[0].col_name}"]]
for num in range(len(del_batch)):
storage_engine.delete(table_catalog, table_needed.iloc[num])

yield Batch(pd.DataFrame(["Deleted row"]))
table_to_delete_from = storage_engine._try_loading_table_via_reflection(
table_catalog.name
)

sqlalchemy_filter_clause = self.predicate_node_to_filter_clause(
table_to_delete_from, predicate_node=self.predicate
)
# verify where clause and convert to sqlalchemy supported filter
# https://stackoverflow.com/questions/34026210/where-filter-from-table-object-using-a-dictionary-or-kwargs

storage_engine.delete(table_catalog, sqlalchemy_filter_clause)
yield Batch(pd.DataFrame(["Deleted rows"]))
11 changes: 2 additions & 9 deletions eva/executor/insert_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from eva.models.storage.batch import Batch
from eva.plan_nodes.insert_plan import InsertPlan
from eva.storage.storage_engine import StorageEngine
from eva.utils.logging_manager import logger


class InsertExecutor(AbstractExecutor):
Expand All @@ -44,17 +43,11 @@ def exec(self, *args, **kwargs):
table_catalog_entry.table_type == TableType.STRUCTURED_DATA
), "INSERT only implemented for structured data"

values_to_insert = []
for i in self.node.value_list:
values_to_insert.append(i.value)
values_to_insert = [val_node.value for val_node in self.node.value_list]
tuple_to_insert = tuple(values_to_insert)
columns_to_insert = []
for i in self.node.column_list:
columns_to_insert.append(i.col_name)
columns_to_insert = [col_node.col_name for col_node in self.node.column_list]

# Adding all values to Batch for insert
logger.info(values_to_insert)
logger.info(columns_to_insert)
dataframe = pd.DataFrame([tuple_to_insert], columns=columns_to_insert)
batch = Batch(dataframe)

Expand Down
2 changes: 1 addition & 1 deletion eva/expression/expression_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def to_conjunction_list(
) -> List[AbstractExpression]:
"""Convert expression tree to list of conjuntives

Note: It does not normalize the expression tree before extracting the conjuntives.
Note: It does not normalize the expression tree before extracting the conjunctives.

Args:
expression_tree (AbstractExpression): expression tree to transform
Expand Down
30 changes: 8 additions & 22 deletions eva/storage/sqlite_storage_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Dict, Iterator, List
from typing import Iterator, List

import numpy as np
import pandas as pd
from sqlalchemy import Table, and_, inspect
from sqlalchemy import Table, inspect
from sqlalchemy.sql.expression import ColumnElement

from eva.catalog.catalog_type import ColumnType
from eva.catalog.models.base_model import BaseModel
Expand Down Expand Up @@ -184,34 +185,19 @@ def read(
logger.exception(err_msg)
raise Exception(err_msg)

def delete(self, table: TableCatalogEntry, where_clause: Dict[str, Any]):
def delete(
self, table: TableCatalogEntry, sqlalchemy_filter_clause: ColumnElement[bool]
):
"""Delete tuples from the table where rows satisfy the where_clause.
The current implementation only handles equality predicates.

Argument:
table: table metadata object of the table
where_clause (Dict[str, Any]): where clause use to find the tuples to
remove. The key should be the column name and value should be the tuple
value. The function assumes an equality condition
where_clause: clause used to find the tuples to remove.
"""
try:
table_to_delete_from = self._try_loading_table_via_reflection(table.name)
table_columns = [
col.name
for col in table_to_delete_from.columns
if col.name != "_row_id"
]
filter_clause = []
# verify where clause and convert to sqlalchemy supported filter
# https://stackoverflow.com/questions/34026210/where-filter-from-table-object-using-a-dictionary-or-kwargs
for column, value in where_clause.items():
if column not in table_columns:
raise Exception(
f"where_clause contains a column {column} not in the table {table_to_delete_from}"
)
filter_clause.append(table_to_delete_from.columns[column] == value)

d = table_to_delete_from.delete().where(and_(*filter_clause))
d = table_to_delete_from.delete().where(sqlalchemy_filter_clause)
self._sql_engine.execute(d)
self._sql_session.commit()
except Exception as e:
Expand Down
6 changes: 3 additions & 3 deletions test/integration_tests/test_delete_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@ def test_should_delete_single_image_in_table(self):
)

def test_should_delete_tuple_in_table(self):
delete_query = "DELETE FROM testDeleteOne WHERE id < 20;"
delete_query = """DELETE FROM testDeleteOne WHERE
id < 20 OR dummyfloat < 2 AND id < 5 AND 20 > id
AND id <= 20 AND id >= 5 OR id != 15 OR id = 15;"""
batch = execute_query_fetch_all(delete_query)

query = "SELECT * FROM testDeleteOne;"
batch = execute_query_fetch_all(query)
from eva.utils.logging_manager import logger

logger.info(batch)
np.testing.assert_array_equal(
batch.frames["testdeleteone.id"].array,
np.array([25], dtype=np.int64),
Expand Down
26 changes: 6 additions & 20 deletions test/integration_tests/test_insert_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,14 @@ def test_should_load_video_in_table(self):
query = f"""LOAD VIDEO '{self.video_file_path}' INTO MyVideo;"""
execute_query_fetch_all(query)

insert_query = """ INSERT INTO MyVideo (id, data) VALUES (
[
[
40,
[
[[40, 40, 40], [40, 40, 40]],
[[40, 40, 40], [40, 40, 40]]
]
]
]);"""
insert_query = """ INSERT INTO MyVideo (id, data) VALUES
(40, [[40, 40, 40], [40, 40, 40]],
[[40, 40, 40], [40, 40, 40]]);"""
execute_query_fetch_all(insert_query)

insert_query_2 = """ INSERT INTO MyVideo (id, data) VALUES (
[
[
41,
[
[[41, 41, 41] , [41, 41, 41]],
[[41, 41, 41], [41, 41, 41]]
]
]
]);"""
insert_query_2 = """ INSERT INTO MyVideo (id, data) VALUES
( 41, [[41, 41, 41] , [41, 41, 41]],
[[41, 41, 41], [41, 41, 41]]);"""
execute_query_fetch_all(insert_query_2)

query = "SELECT id, data FROM MyVideo WHERE id = 40"
Expand Down