diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d56880f7..bbc611c49 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### [Breaking Changes] ### [Added] + +* PR #568: feat: INSERT and DELETE + ### [Changed] * PR #581: fix: debugging issuse in yolov5 diff --git a/eva/binder/statement_binder.py b/eva/binder/statement_binder.py index 11949d6a6..f7be1f1fe 100644 --- a/eva/binder/statement_binder.py +++ b/eva/binder/statement_binder.py @@ -30,6 +30,7 @@ from eva.parser.alias import Alias from eva.parser.create_index_statement import CreateIndexStatement from eva.parser.create_mat_view_statement import CreateMaterializedViewStatement +from eva.parser.delete_statement import DeleteTableStatement from eva.parser.explain_statement import ExplainStatement from eva.parser.rename_statement import RenameTableStatement from eva.parser.select_statement import SelectStatement @@ -143,6 +144,12 @@ def _bind_select_statement(self, node: SelectStatement): self.bind(node.union_link) self._binder_context = current_context + @bind.register(DeleteTableStatement) + def _bind_delete_statement(self, node: DeleteTableStatement): + self.bind(node.table_ref) + if node.where_clause: + self.bind(node.where_clause) + @bind.register(CreateMaterializedViewStatement) def _bind_create_mat_statement(self, node: CreateMaterializedViewStatement): self.bind(node.query) diff --git a/eva/executor/delete_executor.py b/eva/executor/delete_executor.py new file mode 100644 index 000000000..d7e602403 --- /dev/null +++ b/eva/executor/delete_executor.py @@ -0,0 +1,86 @@ +# coding=utf-8 +# Copyright 2018-2022 EVA +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Generator, Iterator + +import pandas as pd + +from eva.catalog.catalog_manager import CatalogManager +from eva.catalog.catalog_type import TableType +from eva.executor.abstract_executor import AbstractExecutor +from eva.executor.executor_utils import ExecutorError, apply_predicate +from eva.models.storage.batch import Batch +from eva.plan_nodes.project_plan import ProjectPlan +from eva.storage.storage_engine import StorageEngine +from eva.utils.logging_manager import logger + + +class DeleteExecutor(AbstractExecutor): + """ """ + + def __init__(self, node: ProjectPlan): + super().__init__(node) + self.predicate = node.where_clause + self.catalog = CatalogManager() + + def validate(self): + pass + + def exec(self, **kwargs) -> Iterator[Batch]: + try: + table_catalog = self.node.table_ref.table.table_obj + storage_engine = StorageEngine.factory(table_catalog) + + del_batch = Batch() + + if table_catalog.table_type == TableType.VIDEO_DATA: + raise NotImplementedError("DELETE only implemented for structured data") + elif table_catalog.table_type == TableType.IMAGE_DATA: + raise NotImplementedError("DELETE only implemented for structured data") + elif table_catalog.table_type == TableType.STRUCTURED_DATA: + del_batch = storage_engine.read(table_catalog) + del_batch = list(del_batch)[0] + + # Added because of inconsistency in col_alias in Structured data Batch project function + original_column_names = list(del_batch.frames.columns) + column_names = [ + f"{table_catalog.name.lower()}.{name}" + for name in original_column_names + if not name == "_row_id" + ] + column_names.insert(0, "_row_id") + del_batch.frames.columns = column_names + del_batch = apply_predicate(del_batch, self.predicate) + + # All the batches that need to be deleted + + if table_catalog.table_type == TableType.VIDEO_DATA: + storage_engine.delete(table_catalog, del_batch) + elif table_catalog.table_type == TableType.IMAGE_DATA: + storage_engine.delete(table_catalog, del_batch) + elif table_catalog.table_type == TableType.STRUCTURED_DATA: + del_batch.frames.columns = original_column_names + table_needed = del_batch.frames[ + [f"{self.predicate.children[0].col_name}"] + ] + for num in range(len(del_batch)): + storage_engine.delete(table_catalog, table_needed.iloc[num]) + yield Batch(pd.DataFrame(["Deleted row"])) + + except Exception as e: + logger.error(e) + raise ExecutorError(e) + + def __call__(self, **kwargs) -> Generator[Batch, None, None]: + yield from self.exec(**kwargs) diff --git a/eva/executor/insert_executor.py b/eva/executor/insert_executor.py index 21671ccc6..cd38fc62d 100644 --- a/eva/executor/insert_executor.py +++ b/eva/executor/insert_executor.py @@ -12,21 +12,78 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import pandas as pd + +from eva.catalog.catalog_manager import CatalogManager +from eva.catalog.catalog_type import TableType +from eva.catalog.models.table_catalog import TableCatalogEntry from eva.executor.abstract_executor import AbstractExecutor +from eva.executor.executor_utils import ExecutorError +from eva.models.storage.batch import Batch from eva.plan_nodes.insert_plan import InsertPlan +from eva.storage.abstract_storage_engine import AbstractStorageEngine +from eva.storage.storage_engine import StorageEngine +from eva.utils.logging_manager import logger class InsertExecutor(AbstractExecutor): def __init__(self, node: InsertPlan): super().__init__(node) + self.catalog = CatalogManager() def validate(self): pass def exec(self): - """ - Based on the table it constructs a valid tuple using the values - provided. - Right now we assume there are no missing values - """ - raise NotImplementedError + storage_engine = None + table_catalog_entry = None + try: + # Get catalog entry + table_name = self.node.table_ref.table.table_name + database_name = self.node.table_ref.table.database_name + table_catalog_entry = self.catalog.get_table_catalog_entry( + table_name, database_name + ) + + # Implemented only for STRUCTURED_DATA + if table_catalog_entry.table_type != TableType.STRUCTURED_DATA: + raise NotImplementedError("INSERT only implemented for structured data") + + values_to_insert = [] + for i in self.node.value_list: + values_to_insert.append(i.value) + tuple_to_insert = tuple(values_to_insert) + columns_to_insert = [] + for i in self.node.column_list: + columns_to_insert.append(i.col_name) + + # Adding all values to Batch for insert + logger.info(values_to_insert) + logger.info(columns_to_insert) + dataframe = pd.DataFrame([tuple_to_insert], columns=columns_to_insert) + batch = Batch(dataframe) + + storage_engine = StorageEngine.factory(table_catalog_entry) + storage_engine.write(table_catalog_entry, batch) + except Exception as e: + err_msg = f"Insert failed: encountered unexpected error {str(e)}" + logger.error(err_msg) + raise ExecutorError(err_msg) + else: + yield Batch( + pd.DataFrame([f"Number of rows loaded: {str(len(values_to_insert))}"]) + ) + + def _rollback_load( + self, + storage_engine: AbstractStorageEngine, + table_obj: TableCatalogEntry, + do_create: bool, + ): + try: + if do_create: + storage_engine.drop(table_obj) + except Exception as e: + logger.exception( + f"Unexpected Exception {e} occured while rolling back. This is bad as the {self.media_type.name} table can be in a corrupt state. Please verify the table {table_obj} for correctness." + ) diff --git a/eva/executor/plan_executor.py b/eva/executor/plan_executor.py index 929d2bb3f..57e8786a1 100644 --- a/eva/executor/plan_executor.py +++ b/eva/executor/plan_executor.py @@ -20,6 +20,7 @@ from eva.executor.create_index_executor import CreateIndexExecutor from eva.executor.create_mat_view_executor import CreateMaterializedViewExecutor from eva.executor.create_udf_executor import CreateUDFExecutor +from eva.executor.delete_executor import DeleteExecutor from eva.executor.drop_executor import DropExecutor from eva.executor.drop_udf_executor import DropUDFExecutor from eva.executor.executor_utils import ExecutorError @@ -138,6 +139,9 @@ def _build_execution_tree(self, plan: AbstractPlan) -> AbstractExecutor: executor_node = ApplyAndMergeExecutor(node=plan) elif plan_opr_type == PlanOprType.FAISS_INDEX_SCAN: executor_node = FaissIndexScanExecutor(node=plan) + elif plan_opr_type == PlanOprType.DELETE: + executor_node = DeleteExecutor(node=plan) + # EXPLAIN does not need to build execution tree for its children if plan_opr_type != PlanOprType.EXPLAIN: # Build Executor Tree for children diff --git a/eva/experimental/ray/optimizer/rules/rules.py b/eva/experimental/ray/optimizer/rules/rules.py index 490d639d9..6049ef12f 100644 --- a/eva/experimental/ray/optimizer/rules/rules.py +++ b/eva/experimental/ray/optimizer/rules/rules.py @@ -21,7 +21,6 @@ if TYPE_CHECKING: from eva.optimizer.optimizer_context import OptimizerContext -from eva.configuration.configuration_manager import ConfigurationManager from eva.experimental.ray.planner.exchange_plan import ExchangePlan from eva.expression.function_expression import FunctionExpression from eva.optimizer.operators import ( @@ -92,19 +91,11 @@ def apply(self, before: LogicalGet, context: OptimizerContext): # Configure the batch_mem_size. It decides the number of rows # read in a batch from storage engine. # ToDO: Experiment heuristics. - - batch_mem_size = 30000000 # 30mb - config_batch_mem_size = ConfigurationManager().get_value( - "executor", "batch_mem_size" - ) - if config_batch_mem_size: - batch_mem_size = config_batch_mem_size scan = SeqScanPlan(None, before.target_list, before.alias) lower = ExchangePlan(parallelism=1) lower.append_child( StoragePlan( before.table_obj, - batch_mem_size=batch_mem_size, predicate=before.predicate, sampling_rate=before.sampling_rate, ) diff --git a/eva/optimizer/memo.py b/eva/optimizer/memo.py index b3f939a14..17244d01f 100644 --- a/eva/optimizer/memo.py +++ b/eva/optimizer/memo.py @@ -128,8 +128,5 @@ def add_group_expr( assert ( expr.group_id is not UNDEFINED_GROUP_ID - ), """Expr - should have a - valid group - id""" + ), """Expr should have a valid group id""" return expr diff --git a/eva/optimizer/operators.py b/eva/optimizer/operators.py index 6e78858e5..267e73d09 100644 --- a/eva/optimizer/operators.py +++ b/eva/optimizer/operators.py @@ -40,6 +40,7 @@ class OperatorType(IntEnum): LOGICALFILTER = auto() LOGICALPROJECT = auto() LOGICALINSERT = auto() + LOGICALDELETE = auto() LOGICALCREATE = auto() LOGICALRENAME = auto() LOGICALDROP = auto() @@ -398,7 +399,7 @@ class LogicalInsert(Operator): """[Logical Node for Insert operation] Arguments: - table(TableCatalogEntry): table to intert data into + table(TableCatalogEntry): table to insert data into column_list{List[AbstractExpression]}: [After binding annotated column_list] value_list{List[AbstractExpression]}: @@ -451,6 +452,53 @@ def __hash__(self) -> int: ) +class LogicalDelete(Operator): + """[Logical Node for Delete Operation] + + Arguments: + table_ref(TableCatalogEntry): table to delete tuples from, + where_clause(AbstractExpression): the predicate used to select which rows to delete, + + """ + + def __init__( + self, + table_ref: TableRef, + where_clause: AbstractExpression = None, + children=None, + ): + super().__init__(OperatorType.LOGICALDELETE, children) + self._table_ref = table_ref + self._where_clause = where_clause + + @property + def table_ref(self): + return self._table_ref + + @property + def where_clause(self): + return self._where_clause + + def __eq__(self, other): + is_subtree_equal = super().__eq__(other) + if not isinstance(other, LogicalDelete): + return False + return ( + is_subtree_equal + and self.table_ref == other.table_ref + and self.where_clause == other.where_clause + ) + + def __hash__(self) -> int: + return hash( + ( + super().__hash__(), + self.table_ref, + self.where_clause, + ) + ) + + class LogicalCreate(Operator): """Logical node for create table operations diff --git a/eva/optimizer/rules/rules.py b/eva/optimizer/rules/rules.py index 05e562e40..131d36c34 100644 --- a/eva/optimizer/rules/rules.py +++ b/eva/optimizer/rules/rules.py @@ -41,7 +41,6 @@ if TYPE_CHECKING: from eva.optimizer.optimizer_context import OptimizerContext -from eva.configuration.configuration_manager import ConfigurationManager from eva.optimizer.operators import ( Dummy, LogicalApplyAndMerge, @@ -49,6 +48,7 @@ LogicalCreateIndex, LogicalCreateMaterializedView, LogicalCreateUDF, + LogicalDelete, LogicalDrop, LogicalDropUDF, LogicalExplain, @@ -75,6 +75,7 @@ from eva.plan_nodes.create_index_plan import CreateIndexPlan from eva.plan_nodes.create_plan import CreatePlan from eva.plan_nodes.create_udf_plan import CreateUDFPlan +from eva.plan_nodes.delete_plan import DeletePlan from eva.plan_nodes.drop_plan import DropPlan from eva.plan_nodes.drop_udf_plan import DropUDFPlan from eva.plan_nodes.faiss_index_scan_plan import FaissIndexScanPlan @@ -691,6 +692,22 @@ def apply(self, before: LogicalInsert, context: OptimizerContext): yield after +class LogicalDeleteToPhysical(Rule): + def __init__(self): + pattern = Pattern(OperatorType.LOGICALDELETE) + super().__init__(RuleType.LOGICAL_DELETE_TO_PHYSICAL, pattern) + + def promise(self): + return Promise.LOGICAL_DELETE_TO_PHYSICAL + + def check(self, before: Operator, context: OptimizerContext): + return True + + def apply(self, before: LogicalDelete, context: OptimizerContext): + after = DeletePlan(before.table_ref, before.where_clause) + yield after + + class LogicalLoadToPhysical(Rule): def __init__(self): pattern = Pattern(OperatorType.LOGICALLOADDATA) @@ -703,20 +720,9 @@ def check(self, before: Operator, context: OptimizerContext): return True def apply(self, before: LogicalLoadData, context: OptimizerContext): - # Configure the batch_mem_size. - # We assume the optimizer decides the batch_mem_size. - # ToDO: Experiment heuristics. - - batch_mem_size = 30000000 # 30mb - config_batch_mem_size = ConfigurationManager().get_value( - "executor", "batch_mem_size" - ) - if config_batch_mem_size: - batch_mem_size = config_batch_mem_size after = LoadDataPlan( before.table_info, before.path, - batch_mem_size, before.column_list, before.file_options, ) @@ -735,21 +741,10 @@ def check(self, before: Operator, context: OptimizerContext): return True def apply(self, before: LogicalUpload, context: OptimizerContext): - # Configure the batch_mem_size. - # We assume the optimizer decides the batch_mem_size. - # ToDO: Experiment heuristics. - - batch_mem_size = 30000000 # 30mb - config_batch_mem_size = ConfigurationManager().get_value( - "executor", "batch_mem_size" - ) - if config_batch_mem_size: - batch_mem_size = config_batch_mem_size after = UploadPlan( before.path, before.video_blob, before.table_info, - batch_mem_size, before.column_list, before.file_options, ) @@ -772,18 +767,10 @@ def apply(self, before: LogicalGet, context: OptimizerContext): # Configure the batch_mem_size. It decides the number of rows # read in a batch from storage engine. # ToDO: Experiment heuristics. - - batch_mem_size = 30000000 # 30mb - config_batch_mem_size = ConfigurationManager().get_value( - "executor", "batch_mem_size" - ) - if config_batch_mem_size: - batch_mem_size = config_batch_mem_size after = SeqScanPlan(None, before.target_list, before.alias) after.append_child( StoragePlan( before.table_obj, - batch_mem_size=batch_mem_size, predicate=before.predicate, sampling_rate=before.sampling_rate, ) diff --git a/eva/optimizer/rules/rules_base.py b/eva/optimizer/rules/rules_base.py index 765940715..72e41acf6 100644 --- a/eva/optimizer/rules/rules_base.py +++ b/eva/optimizer/rules/rules_base.py @@ -55,6 +55,7 @@ class RuleType(Flag): LOGICAL_ORDERBY_TO_PHYSICAL = auto() LOGICAL_LIMIT_TO_PHYSICAL = auto() LOGICAL_INSERT_TO_PHYSICAL = auto() + LOGICAL_DELETE_TO_PHYSICAL = auto() LOGICAL_LOAD_TO_PHYSICAL = auto() LOGICAL_UPLOAD_TO_PHYSICAL = auto() LOGICAL_CREATE_TO_PHYSICAL = auto() @@ -96,6 +97,7 @@ class Promise(IntEnum): LOGICAL_ORDERBY_TO_PHYSICAL = auto() LOGICAL_LIMIT_TO_PHYSICAL = auto() LOGICAL_INSERT_TO_PHYSICAL = auto() + LOGICAL_DELETE_TO_PHYSICAL = auto() LOGICAL_RENAME_TO_PHYSICAL = auto() LOGICAL_DROP_TO_PHYSICAL = auto() LOGICAL_LOAD_TO_PHYSICAL = auto() diff --git a/eva/optimizer/rules/rules_manager.py b/eva/optimizer/rules/rules_manager.py index 208ad58c8..9c60cd4c3 100644 --- a/eva/optimizer/rules/rules_manager.py +++ b/eva/optimizer/rules/rules_manager.py @@ -35,6 +35,7 @@ LogicalCreateMaterializedViewToPhysical, LogicalCreateToPhysical, LogicalCreateUDFToPhysical, + LogicalDeleteToPhysical, LogicalDerivedGetToPhysical, LogicalDropToPhysical, LogicalDropUDFToPhysical, @@ -97,6 +98,7 @@ def __init__(self): LogicalCreateUDFToPhysical(), LogicalDropUDFToPhysical(), LogicalInsertToPhysical(), + LogicalDeleteToPhysical(), LogicalLoadToPhysical(), LogicalUploadToPhysical(), LogicalSampleToUniformSample(), diff --git a/eva/optimizer/statement_to_opr_convertor.py b/eva/optimizer/statement_to_opr_convertor.py index 8edc62228..16103399f 100644 --- a/eva/optimizer/statement_to_opr_convertor.py +++ b/eva/optimizer/statement_to_opr_convertor.py @@ -18,6 +18,7 @@ LogicalCreateIndex, LogicalCreateMaterializedView, LogicalCreateUDF, + LogicalDelete, LogicalDrop, LogicalDropUDF, LogicalExplain, @@ -25,6 +26,7 @@ LogicalFunctionScan, LogicalGet, LogicalGroupBy, + LogicalInsert, LogicalJoin, LogicalLimit, LogicalLoadData, @@ -42,6 +44,7 @@ from eva.parser.create_mat_view_statement import CreateMaterializedViewStatement from eva.parser.create_statement import CreateTableStatement from eva.parser.create_udf_statement import CreateUDFStatement +from eva.parser.delete_statement import DeleteTableStatement from eva.parser.drop_statement import DropTableStatement from eva.parser.drop_udf_statement import DropUDFStatement from eva.parser.explain_statement import ExplainStatement @@ -186,6 +189,14 @@ def visit_insert(self, statement: AbstractStatement): Arguments: statement {AbstractStatement} - - [input insert statement] """ + # not removing previous commented code + insert_data_opr = LogicalInsert( + statement.table_ref, + statement.column_list, + statement.value_list, + ) + self._plan = insert_data_opr + """ table_ref = statement.table table_metainfo = bind_dataset(table_ref.table) @@ -317,6 +328,13 @@ def visit_create_index(self, statement: CreateIndexStatement): ) self._plan = create_index_opr + def visit_delete(self, statement: DeleteTableStatement): + delete_opr = LogicalDelete( + statement.table_ref, + statement.where_clause, + ) + self._plan = delete_opr + def visit(self, statement: AbstractStatement): """Based on the instance of the statement the corresponding visit is called. @@ -351,6 +369,8 @@ def visit(self, statement: AbstractStatement): self.visit_explain(statement) elif isinstance(statement, CreateIndexStatement): self.visit_create_index(statement) + elif isinstance(statement, DeleteTableStatement): + self.visit_delete(statement) return self._plan @property diff --git a/eva/parser/delete_statement.py b/eva/parser/delete_statement.py new file mode 100644 index 000000000..8637a509e --- /dev/null +++ b/eva/parser/delete_statement.py @@ -0,0 +1,63 @@ +# coding=utf-8 +# Copyright 2018-2022 EVA +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from eva.expression.abstract_expression import AbstractExpression +from eva.parser.statement import AbstractStatement +from eva.parser.table_ref import TableRef +from eva.parser.types import StatementType + + +class DeleteTableStatement(AbstractStatement): + """Delete Table Statement constructed after parsing the input query + + Attributes: + TableRef: table reference in the delete table statement + _where_clause : predicate of the select query, represented as a expression tree. + """ + + def __init__( + self, + table_ref: TableRef, + where_clause: AbstractExpression = None, + ): + super().__init__(StatementType.DELETE) + self._table_ref = table_ref + self._where_clause = where_clause + + def __str__(self) -> str: + delete_str = f"DELETE FROM {self._table}" + if self._where_clause is not None: + delete_str += " WHERE " + str(self._where_clause) + + return delete_str + + @property + def table_ref(self): + return self._table_ref + + @property + def where_clause(self): + return self._where_clause + + @where_clause.setter + def where_clause(self, where_expr: AbstractExpression): + self._where_clause = where_expr + + def __eq__(self, other): + if not isinstance(other, DeleteTableStatement): + return False + return self._table == other._table and self.where_clause == other.where_clause + + def __hash__(self) -> int: + return hash((super().__hash__(), self.table, tuple(self.where_clause))) diff --git a/eva/parser/eva.lark b/eva/parser/eva.lark index 6f2b32ba2..41c19d5a6 100644 --- a/eva/parser/eva.lark +++ b/eva/parser/eva.lark @@ -70,7 +70,7 @@ drop_udf: DROP UDF if_exists? udf_name // Primary DML Statements -delete_statement: DELETE FROM table_name (WHERE expression)? order_by_clause? (LIMIT decimal_literal)? +delete_statement: DELETE FROM table_name (WHERE where_expr)? insert_statement: INSERT INTO? table_name (("(" uid_list ")")? insert_statement_value) @@ -510,6 +510,7 @@ REAL_LITERAL: (DEC_DIGIT+)? "." DEC_DIGIT+ | DEC_DIGIT+ "." EXPONENT_NUM_PART | (DEC_DIGIT+)? "." (DEC_DIGIT+ EXPONENT_NUM_PART) | DEC_DIGIT+ EXPONENT_NUM_PART + | (DEC_DIGIT+) "." (DEC_DIGIT+) "e" "-"? DEC_DIGIT+ // Hack for dotID diff --git a/eva/parser/insert_statement.py b/eva/parser/insert_statement.py index 65f4fb9f4..391ecb50b 100644 --- a/eva/parser/insert_statement.py +++ b/eva/parser/insert_statement.py @@ -36,12 +36,12 @@ class InsertTableStatement(AbstractStatement): def __init__( self, - table: TableRef, + table_ref: TableRef, column_list: List[AbstractExpression] = None, value_list: List[AbstractExpression] = None, ): super().__init__(StatementType.INSERT) - self._table = table + self._table_ref = table_ref self._column_list = column_list self._value_list = value_list @@ -52,8 +52,8 @@ def __str__(self) -> str: return print_str @property - def table(self) -> TableRef: - return self._table + def table_ref(self) -> TableRef: + return self._table_ref @property def column_list(self) -> List[AbstractExpression]: @@ -67,7 +67,7 @@ def __eq__(self, other): if not isinstance(other, InsertTableStatement): return False return ( - self.table == other.table + self.table_ref == other.table_ref and self.column_list == other.column_list and self.value_list == other.value_list ) @@ -76,7 +76,7 @@ def __hash__(self) -> int: return hash( ( super().__hash__(), - self.table, + self.table_ref, tuple(self.column_list), tuple(self.value_list), ) diff --git a/eva/parser/lark_visitor/__init__.py b/eva/parser/lark_visitor/__init__.py index df2c56f71..4c69000d7 100644 --- a/eva/parser/lark_visitor/__init__.py +++ b/eva/parser/lark_visitor/__init__.py @@ -18,6 +18,7 @@ from eva.parser.lark_visitor._common_clauses_ids import CommonClauses from eva.parser.lark_visitor._create_statements import CreateTable +from eva.parser.lark_visitor._delete_statement import Delete from eva.parser.lark_visitor._drop_statement import DropTable from eva.parser.lark_visitor._explain_statement import Explain from eva.parser.lark_visitor._expressions import Expressions @@ -41,7 +42,6 @@ class LarkBaseInterpreter(visitors.Interpreter): # Override default behavior of Interpreter def visit_children(self, tree: Tree[_Leaf_T]) -> List: - output = [ self._visit_tree(child) if isinstance(child, Tree) else child for child in tree.children @@ -70,6 +70,7 @@ class LarkInterpreter( DropTable, Show, Explain, + Delete, ): def __init__(self, query): super().__init__() diff --git a/eva/parser/lark_visitor/_delete_statement.py b/eva/parser/lark_visitor/_delete_statement.py new file mode 100644 index 000000000..6937a6d21 --- /dev/null +++ b/eva/parser/lark_visitor/_delete_statement.py @@ -0,0 +1,37 @@ +# coding=utf-8 +# Copyright 2018-2022 EVA +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from lark.tree import Tree + +from eva.parser.delete_statement import DeleteTableStatement +from eva.parser.table_ref import TableRef + + +################################################################## +# DELETE STATEMENTS +################################################################## +class Delete: + def delete_statement(self, tree): + table_ref = None + where_clause = None + for child in tree.children: + if isinstance(child, Tree): + if child.data == "table_name": + table_name = self.visit(child) + table_ref = TableRef(table_name) + elif child.data == "where_expr": + where_clause = self.visit(child) + + delete_stmt = DeleteTableStatement(table_ref, where_clause) + return delete_stmt diff --git a/eva/parser/types.py b/eva/parser/types.py index 3730c431f..67e8442fb 100644 --- a/eva/parser/types.py +++ b/eva/parser/types.py @@ -33,6 +33,7 @@ class StatementType(EVAEnum): RENAME # noqa: F821 DROP # noqa: F821 INSERT # noqa: F821 + DELETE # noqa: F821 CREATE_UDF # noqa: F821 LOAD_DATA # noqa: F821 UPLOAD # noqa: F821 diff --git a/eva/plan_nodes/delete_plan.py b/eva/plan_nodes/delete_plan.py new file mode 100644 index 000000000..8042eace2 --- /dev/null +++ b/eva/plan_nodes/delete_plan.py @@ -0,0 +1,62 @@ +# coding=utf-8 +# Copyright 2018-2022 EVA +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from eva.expression.abstract_expression import AbstractExpression +from eva.parser.table_ref import TableRef +from eva.plan_nodes.abstract_plan import AbstractPlan +from eva.plan_nodes.types import PlanOprType + + +class DeletePlan(AbstractPlan): + """This plan is used for storing information required for insert + operations. + + Args: + table (TableCatalogEntry): table to insert into + column_list (List[AbstractExpression]): list of annotated column + value_list (List[AbstractExpression]): list of abstract expression + for the values to insert + """ + + def __init__( + self, + table_ref: TableRef, + where_clause: AbstractExpression = None, + ): + super().__init__(PlanOprType.DELETE) + self._table_ref = table_ref + self._where_clause = where_clause + + @property + def table_ref(self): + return self._table_ref + + @property + def where_clause(self): + return self._where_clause + + def __str__(self): + return "DeletePlan(table={}, \ + where_clause={})".format( + self.table_ref, self._where_clause + ) + + def __hash__(self) -> int: + return hash( + ( + super().__hash__(), + self.table_ref, + self._where_clause, + ) + ) diff --git a/eva/plan_nodes/insert_plan.py b/eva/plan_nodes/insert_plan.py index a84afdcb8..ecfc48323 100644 --- a/eva/plan_nodes/insert_plan.py +++ b/eva/plan_nodes/insert_plan.py @@ -33,28 +33,27 @@ class InsertPlan(AbstractPlan): def __init__( self, - table: TableCatalogEntry, + table_ref: TableCatalogEntry, column_list: List[AbstractExpression], value_list: List[AbstractExpression], ): - super().__init__(PlanOprType.INSERT) - self.table = table - self.columns_list = column_list + self.table_ref = table_ref + self.column_list = column_list self.value_list = value_list def __str__(self): return "InsertPlan(table={}, \ column_list={}, \ value_list={})".format( - self.table, self.columns_list, self.value_list + self.table_ref, self.columns_list, self.value_list ) def __hash__(self) -> int: return hash( ( super().__hash__(), - self.table, + self.table_ref, tuple(self.column_list), tuple(self.value_list), ) diff --git a/eva/plan_nodes/load_data_plan.py b/eva/plan_nodes/load_data_plan.py index 74cd8e191..26c5bc352 100644 --- a/eva/plan_nodes/load_data_plan.py +++ b/eva/plan_nodes/load_data_plan.py @@ -36,16 +36,16 @@ def __init__( self, table_info: TableInfo, file_path: Path, - batch_mem_size: int, column_list: List[AbstractExpression] = None, file_options: dict = None, + batch_mem_size: int = 30000000, ): super().__init__(PlanOprType.LOAD_DATA) self._table_info = table_info self._file_path = file_path - self._batch_mem_size = batch_mem_size self._column_list = column_list self._file_options = file_options + self._batch_mem_size = batch_mem_size @property def table_info(self): @@ -69,14 +69,14 @@ def file_options(self): def __str__(self): return "LoadDataPlan(table_id={}, file_path={}, \ - batch_mem_size={}, \ column_list={}, \ - file_options={})".format( + file_options={}, \ + batch_mem_size={})".format( self.table_info, self.file_path, - self.batch_mem_size, self.column_list, self.file_options, + self.batch_mem_size, ) def __hash__(self) -> int: @@ -85,8 +85,8 @@ def __hash__(self) -> int: super().__hash__(), self.table_info, self.file_path, - self.batch_mem_size, tuple(self.column_list), frozenset(self.file_options.items()), + self.batch_mem_size, ) ) diff --git a/eva/plan_nodes/storage_plan.py b/eva/plan_nodes/storage_plan.py index 2e9032e0f..10448bee8 100644 --- a/eva/plan_nodes/storage_plan.py +++ b/eva/plan_nodes/storage_plan.py @@ -37,7 +37,6 @@ class StoragePlan(AbstractPlan): def __init__( self, table: TableCatalogEntry, - batch_mem_size: int, skip_frames: int = 0, offset: int = None, limit: int = None, @@ -45,6 +44,7 @@ def __init__( curr_shard: int = 0, predicate: AbstractExpression = None, sampling_rate: int = None, + batch_mem_size: int = 30000000, ): super().__init__(PlanOprType.STORAGE_PLAN) self._table = table diff --git a/eva/plan_nodes/types.py b/eva/plan_nodes/types.py index c63cc9ba9..c32d5b93d 100644 --- a/eva/plan_nodes/types.py +++ b/eva/plan_nodes/types.py @@ -22,6 +22,7 @@ class PlanOprType(Enum): STORAGE_PLAN = auto() PP_FILTER = auto() INSERT = auto() + DELETE = auto() CREATE = auto() RENAME = auto() DROP = auto() diff --git a/eva/plan_nodes/upload_plan.py b/eva/plan_nodes/upload_plan.py index 0086acd5b..53df6b611 100644 --- a/eva/plan_nodes/upload_plan.py +++ b/eva/plan_nodes/upload_plan.py @@ -38,17 +38,17 @@ def __init__( file_path: Path, video_blob: str, table_info: TableInfo, - batch_mem_size: int, column_list: List[AbstractExpression] = None, file_options: dict = None, + batch_mem_size: int = 30000000, ): super().__init__(PlanOprType.UPLOAD) self._file_path = file_path self._video_blob = video_blob self._table_info = table_info - self._batch_mem_size = batch_mem_size self._column_list = column_list self._file_options = file_options + self._batch_mem_size = batch_mem_size @property def file_path(self): @@ -78,15 +78,15 @@ def __str__(self): return "UploadPlan(file_path={}, \ video_blob={}, \ table_id={}, \ - batch_mem_size={}, \ column_list={}, \ - file_options={})".format( + file_options={}, \ + batch_mem_size={})".format( self.file_path, "video blob", self.table_info, - self.batch_mem_size, self.column_list, self.file_options, + self.batch_mem_size, ) def __hash__(self) -> int: @@ -96,8 +96,8 @@ def __hash__(self) -> int: self.file_path, self.video_blob, self.table_info, - self.batch_mem_size, tuple(self.column_list), frozenset(self.file_options.items()), + self.batch_mem_size, ) ) diff --git a/eva/readers/abstract_reader.py b/eva/readers/abstract_reader.py index e00e180cc..05d5f4b91 100644 --- a/eva/readers/abstract_reader.py +++ b/eva/readers/abstract_reader.py @@ -32,13 +32,13 @@ class AbstractReader(metaclass=ABCMeta): file_url (str): path to read data from """ - def __init__(self, file_url: str, batch_mem_size: int, offset=None): + def __init__(self, file_url: str, offset=None, batch_mem_size: int = 30000000): # Opencv doesn't support pathlib.Path so convert to raw str if isinstance(file_url, Path): file_url = str(file_url) self.file_url = file_url - self.batch_mem_size = batch_mem_size self.offset = offset + self.batch_mem_size = batch_mem_size def read(self) -> Iterator[Batch]: """ diff --git a/eva/readers/image/opencv_image_reader.py b/eva/readers/image/opencv_image_reader.py index 859510931..b37383699 100644 --- a/eva/readers/image/opencv_image_reader.py +++ b/eva/readers/image/opencv_image_reader.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os from typing import Dict, Iterator import cv2 @@ -33,3 +34,11 @@ def _read(self) -> Iterator[Dict]: raise Exception(err_msg) else: yield {"data": frame} + + def delete(self): + file_url = str(self.file_url) + try: + os.remove(file_url) + except Exception as e: + raise FileNotFoundError(f"Error was: {e}") + # shutil remove or something diff --git a/eva/storage/abstract_storage_engine.py b/eva/storage/abstract_storage_engine.py index 37f1948e7..bd5d05e5a 100644 --- a/eva/storage/abstract_storage_engine.py +++ b/eva/storage/abstract_storage_engine.py @@ -52,7 +52,7 @@ def write(self, table: TableCatalogEntry, rows: Batch): def read( self, table: TableCatalogEntry, - batch_mem_size: int, + batch_mem_size: int = 30000000, predicate: AbstractExpression = None, ) -> Iterator[Batch]: """Interface responsible for yielding row/rows to the client. diff --git a/eva/storage/sqlite_storage_engine.py b/eva/storage/sqlite_storage_engine.py index 129c4e7f2..b0d4d0ba5 100644 --- a/eva/storage/sqlite_storage_engine.py +++ b/eva/storage/sqlite_storage_engine.py @@ -148,9 +148,7 @@ def write(self, table: TableCatalogEntry, rows: Batch): raise Exception(err_msg) def read( - self, - table: TableCatalogEntry, - batch_mem_size: int, + self, table: TableCatalogEntry, batch_mem_size: int = 30000000 ) -> Iterator[Batch]: """ Reads the table and return a batch iterator for the diff --git a/eva/udfs/udf_bootstrap_queries.py b/eva/udfs/udf_bootstrap_queries.py index b7bee9c76..094af91bc 100644 --- a/eva/udfs/udf_bootstrap_queries.py +++ b/eva/udfs/udf_bootstrap_queries.py @@ -179,8 +179,8 @@ def init_builtin_udfs(mode="debug"): ArrayCount_udf_query, Timestamp_udf_query, Crop_udf_query, - Open_udf_query, YoloV5_udf_query, + Open_udf_query, Similarity_udf_query # Disabled because required packages (eg., easy_ocr might not be preinstalled) # face_detection_udf_query, diff --git a/eva/udfs/yolo_object_detector.py b/eva/udfs/yolo_object_detector.py index 6235425c5..3b51f5d2f 100644 --- a/eva/udfs/yolo_object_detector.py +++ b/eva/udfs/yolo_object_detector.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging from typing import List import pandas as pd @@ -43,6 +44,7 @@ def name(self) -> str: return "yolo" def setup(self, threshold=0.85): + logging.getLogger("yolov5").setLevel(logging.CRITICAL) # yolov5 self.threshold = threshold self.model = torch.hub.load("ultralytics/yolov5", "yolov5s", verbose=False) diff --git a/eva/utils/logging_manager.py b/eva/utils/logging_manager.py index 987c0644c..31fb21308 100644 --- a/eva/utils/logging_manager.py +++ b/eva/utils/logging_manager.py @@ -27,4 +27,4 @@ logger = logging.getLogger(__name__) logger.addHandler(LOG_handler) -logger.setLevel(logging.INFO) +logger.setLevel(logging.WARN) diff --git a/setup.py b/setup.py index 2c4af4f3c..433f8a8e3 100644 --- a/setup.py +++ b/setup.py @@ -101,7 +101,7 @@ def read(path, encoding="utf-8"): "facenet-pytorch>=2.5.2", # FACE DETECTION "easyocr>=1.5.0", # OCR EXTRACTION "ipython", - "yolov5<=7.0.6", # OBJECT DETECTION + "yolov5<=7.0.6", # OBJECT DETECTION "detoxify" # TEXT TOXICITY CLASSIFICATION ] diff --git a/test/data/features.csv b/test/data/features.csv new file mode 100644 index 000000000..a6b3b8aae --- /dev/null +++ b/test/data/features.csv @@ -0,0 +1,4 @@ +,demo.name +0,test_eva/similarity/data/sad.jpg +1,test_eva/similarity/data/happy.jpg +2,test_eva/similarity/data/angry.jpg diff --git a/test/integration_tests/test_delete_executor.py b/test/integration_tests/test_delete_executor.py new file mode 100644 index 000000000..cdca20a61 --- /dev/null +++ b/test/integration_tests/test_delete_executor.py @@ -0,0 +1,134 @@ +# coding=utf-8 +# Copyright 2018-2022 EVA +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import unittest +from test.util import file_remove, load_inbuilt_udfs + +import numpy as np + +from eva.catalog.catalog_manager import CatalogManager +from eva.configuration.configuration_manager import ConfigurationManager +from eva.configuration.constants import EVA_ROOT_DIR +from eva.server.command_handler import execute_query_fetch_all + + +class DeleteExecutorTest(unittest.TestCase): + def setUp(self): + # Bootstrap configuration manager. + ConfigurationManager() + + # Reset catalog. + CatalogManager().reset() + + load_inbuilt_udfs() + + create_table_query = """ + CREATE TABLE IF NOT EXISTS testDeleteOne + ( + id INTEGER, + feat NDARRAY FLOAT32(1, 3), + input NDARRAY UINT8(1, 3) + ); + """ + execute_query_fetch_all(create_table_query) + + insert_query1 = """ + INSERT INTO testDeleteOne (id, feat, input) + VALUES (5, [[0, 0, 0]], [[0, 0, 0]]); + """ + execute_query_fetch_all(insert_query1) + insert_query2 = """ + INSERT INTO testDeleteOne (id, feat, input) + VALUES (15, [[100, 100, 100]], [[100, 100, 100]]); + """ + execute_query_fetch_all(insert_query2) + insert_query3 = """ + INSERT INTO testDeleteOne (id, feat, input) + VALUES (25, [[200, 200, 200]], [[200, 200, 200]]); + """ + execute_query_fetch_all(insert_query3) + + #################################################### + # Create a table for testing Delete with Video Data# + #################################################### + + path = f"{EVA_ROOT_DIR}/data/sample_videos/1/*.mp4" + query = f'LOAD VIDEO "{path}" INTO TestDeleteVideos;' + _ = execute_query_fetch_all(query) + + def tearDown(self): + file_remove("dummy.avi") + + # integration test + @unittest.skip("Not supported in current version") + def test_should_delete_single_video_in_table(self): + path = f"{EVA_ROOT_DIR}/data/sample_videos/1/2.mp4" + delete_query = f"""DELETE FROM TestDeleteVideos WHERE name="{path}";""" + batch = execute_query_fetch_all(delete_query) + + query = "SELECT name FROM MyVideo" + batch = execute_query_fetch_all(query) + self.assertIsNone( + np.testing.assert_array_equal( + batch.frames["data"][0], + np.array([[[40, 40, 40], [40, 40, 40]], [[40, 40, 40], [40, 40, 40]]]), + ) + ) + + query = "SELECT id, data FROM MyVideo WHERE id = 41;" + batch = execute_query_fetch_all(query) + self.assertIsNone( + np.testing.assert_array_equal( + batch.frames["data"][0], + np.array([[[41, 41, 41], [41, 41, 41]], [[41, 41, 41], [41, 41, 41]]]), + ) + ) + + @unittest.skip("Not supported in current version") + def test_should_delete_single_image_in_table(self): + path = f"{EVA_ROOT_DIR}/data/sample_videos/1/2.mp4" + delete_query = f"""DELETE FROM TestDeleteVideos WHERE name="{path}";""" + batch = execute_query_fetch_all(delete_query) + + query = "SELECT name FROM MyVideo" + batch = execute_query_fetch_all(query) + self.assertIsNone( + np.testing.assert_array_equal( + batch.frames["data"][0], + np.array([[[40, 40, 40], [40, 40, 40]], [[40, 40, 40], [40, 40, 40]]]), + ) + ) + + query = "SELECT id, data FROM MyVideo WHERE id = 41;" + batch = execute_query_fetch_all(query) + self.assertIsNone( + np.testing.assert_array_equal( + batch.frames["data"][0], + np.array([[[41, 41, 41], [41, 41, 41]], [[41, 41, 41], [41, 41, 41]]]), + ) + ) + + def test_should_delete_tuple_in_table(self): + delete_query = "DELETE FROM testDeleteOne WHERE id < 20;" + batch = execute_query_fetch_all(delete_query) + + query = "SELECT * FROM testDeleteOne;" + batch = execute_query_fetch_all(query) + from eva.utils.logging_manager import logger + + logger.info(batch) + np.testing.assert_array_equal( + batch.frames["testdeleteone.id"].array, + np.array([25], dtype=np.int64), + ) diff --git a/test/integration_tests/test_insert_executor.py b/test/integration_tests/test_insert_executor.py index fe8724428..27ef9d1d5 100644 --- a/test/integration_tests/test_insert_executor.py +++ b/test/integration_tests/test_insert_executor.py @@ -13,12 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. import unittest -from test.util import create_sample_video, file_remove +from test.util import create_sample_video, file_remove, load_inbuilt_udfs import numpy as np +import pandas as pd from eva.catalog.catalog_manager import CatalogManager from eva.server.command_handler import execute_query_fetch_all +from eva.utils.logging_manager import logger class InsertExecutorTest(unittest.TestCase): @@ -27,6 +29,15 @@ def setUp(self): CatalogManager().reset() create_sample_video() + query = """CREATE TABLE IF NOT EXISTS CSVTable + ( + name TEXT(100) + ); + """ + execute_query_fetch_all(query) + + load_inbuilt_udfs() + def tearDown(self): file_remove("dummy.avi") @@ -36,14 +47,28 @@ def test_should_load_video_in_table(self): query = """LOAD VIDEO 'dummy.avi' INTO MyVideo;""" execute_query_fetch_all(query) - insert_query = """ INSERT INTO MyVideo (id, data) VALUES (40, - [[[40, 40, 40] , [40, 40, 40]], - [[40, 40, 40], [40, 40, 40]]]);""" + insert_query = """ INSERT INTO MyVideo (id, data) VALUES ( + [ + [ + 40, + [ + [[40, 40, 40], [40, 40, 40]], + [[40, 40, 40], [40, 40, 40]] + ] + ] + ]);""" execute_query_fetch_all(insert_query) - insert_query_2 = """ INSERT INTO MyVideo (id, data) VALUES (41, - [[[41, 41, 41] , [41, 41, 41]], - [[41, 41, 41], [41, 41, 41]]]);""" + insert_query_2 = """ INSERT INTO MyVideo (id, data) VALUES ( + [ + [ + 41, + [ + [[41, 41, 41] , [41, 41, 41]], + [[41, 41, 41], [41, 41, 41]] + ] + ] + ]);""" execute_query_fetch_all(insert_query_2) query = "SELECT id, data FROM MyVideo WHERE id = 40" @@ -63,3 +88,30 @@ def test_should_load_video_in_table(self): np.array([[[41, 41, 41], [41, 41, 41]], [[41, 41, 41], [41, 41, 41]]]), ) ) + + def test_should_insert_tuples_in_table(self): + data = pd.read_csv("./test/data/features.csv") + for i in data.iterrows(): + logger.info(i[1][1]) + query = f"""INSERT INTO CSVTable (name) VALUES ( + '{i[1][1]}' + );""" + logger.info(query) + batch = execute_query_fetch_all(query) + + query = "SELECT name FROM CSVTable;" + batch = execute_query_fetch_all(query) + logger.info(batch) + + self.assertIsNone( + np.testing.assert_array_equal( + batch.frames["csvtable.name"].array, + np.array( + [ + "test_eva/similarity/data/sad.jpg", + "test_eva/similarity/data/happy.jpg", + "test_eva/similarity/data/angry.jpg", + ] + ), + ) + ) diff --git a/test/integration_tests/test_select_executor.py b/test/integration_tests/test_select_executor.py index effe83648..c4ad6b656 100644 --- a/test/integration_tests/test_select_executor.py +++ b/test/integration_tests/test_select_executor.py @@ -153,7 +153,7 @@ def test_should_load_and_select_real_video_in_table(self): select_query = "SELECT id, data FROM MNIST;" actual_batch = execute_query_fetch_all(select_query) actual_batch.sort("mnist.id") - video_reader = OpenCVReader("data/mnist/mnist.mp4", batch_mem_size=30000000) + video_reader = OpenCVReader("data/mnist/mnist.mp4") expected_batch = Batch(frames=pd.DataFrame()) for batch in video_reader.read(): batch.frames["name"] = "mnist.mp4" diff --git a/test/optimizer/rules/test_rules.py b/test/optimizer/rules/test_rules.py index b7981cb06..2372215da 100644 --- a/test/optimizer/rules/test_rules.py +++ b/test/optimizer/rules/test_rules.py @@ -38,6 +38,7 @@ LogicalCreateMaterializedViewToPhysical, LogicalCreateToPhysical, LogicalCreateUDFToPhysical, + LogicalDeleteToPhysical, LogicalDerivedGetToPhysical, LogicalDropToPhysical, LogicalDropUDFToPhysical, @@ -190,6 +191,7 @@ def test_supported_rules(self): LogicalCreateUDFToPhysical(), LogicalDropUDFToPhysical(), LogicalInsertToPhysical(), + LogicalDeleteToPhysical(), LogicalLoadToPhysical(), LogicalUploadToPhysical(), LogicalSampleToUniformSample(), diff --git a/test/plan_nodes/test_plan.py b/test/plan_nodes/test_plan.py index 0d7c73851..5516ec405 100644 --- a/test/plan_nodes/test_plan.py +++ b/test/plan_nodes/test_plan.py @@ -111,13 +111,13 @@ def test_load_data_plan(self): column_list = None batch_mem_size = 3000 plan_str = "LoadDataPlan(table_id={}, file_path={}, \ - batch_mem_size={}, \ column_list={}, \ - file_options={})".format( - table_info, file_path, batch_mem_size, column_list, file_options + file_options={}, \ + batch_mem_size={})".format( + table_info, file_path, column_list, file_options, batch_mem_size ) plan = LoadDataPlan( - table_info, file_path, batch_mem_size, column_list, file_options + table_info, file_path, column_list, file_options, batch_mem_size ) self.assertEqual(plan.opr_type, PlanOprType.LOAD_DATA) self.assertEqual(plan.table_info, table_info) @@ -138,23 +138,18 @@ def test_upload_plan(self): plan_str = "UploadPlan(file_path={}, \ video_blob={}, \ table_id={}, \ - batch_mem_size={}, \ column_list={}, \ - file_options={})".format( + file_options={}, \ + batch_mem_size={})".format( file_path, "video blob", table_info, - batch_mem_size, column_list, file_options, + batch_mem_size, ) plan = UploadPlan( - file_path, - video_blob, - table_info, - batch_mem_size, - column_list, - file_options, + file_path, video_blob, table_info, column_list, file_options, batch_mem_size ) self.assertEqual(plan.opr_type, PlanOprType.UPLOAD) self.assertEqual(plan.file_path, file_path)