Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: reducing coverage loss #619

Merged
merged 163 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from 157 commits
Commits
Show all changes
163 commits
Select commit Hold shift + click to select a range
c9afc12
adding delete operation
aryan-rajoria Jan 22, 2023
34dfbf7
Adding Insert Statement
aryan-rajoria Jan 25, 2023
9fa9857
checkpoint
aryan-rajoria Jan 25, 2023
ebe26d3
supporting multiple entries
aryan-rajoria Jan 25, 2023
bc722dd
implemented for structured data error
aryan-rajoria Jan 25, 2023
0e29858
adding parser visitor for delete
aryan-rajoria Jan 25, 2023
c1a7864
delete executor
aryan-rajoria Jan 26, 2023
5ac631a
delete plan and rules
aryan-rajoria Jan 26, 2023
3238e95
adding delete to plan executor
aryan-rajoria Jan 26, 2023
02a1d28
change position of LogicalDelete
aryan-rajoria Jan 26, 2023
01181b5
logical delimeter
aryan-rajoria Jan 30, 2023
562a7ca
delete test case
aryan-rajoria Jan 30, 2023
9887732
adding test case
aryan-rajoria Feb 2, 2023
d2a1a3d
adding test case
aryan-rajoria Feb 2, 2023
f09c613
adding delete testcase
aryan-rajoria Feb 3, 2023
79a6168
adding predicate to delete executor
aryan-rajoria Feb 3, 2023
5ce1991
adding delete to Image storage
aryan-rajoria Feb 3, 2023
91d7b06
bug fix in delete
aryan-rajoria Feb 9, 2023
0aac934
fixing testcase
aryan-rajoria Feb 9, 2023
ee48803
adding test case for insert statement
aryan-rajoria Feb 9, 2023
fc2f243
remove order_by from statement_binder.py
aryan-rajoria Feb 9, 2023
343a4a2
better variable names, using Batch
aryan-rajoria Feb 9, 2023
121451f
error message for insert
aryan-rajoria Feb 9, 2023
5b47c15
removing order_by and limit from delete
aryan-rajoria Feb 10, 2023
8c75a5e
remove order_by and limit
aryan-rajoria Feb 10, 2023
6772cd0
use f-string
aryan-rajoria Feb 10, 2023
7a10d67
adding to changelog
aryan-rajoria Feb 10, 2023
1a4204f
removing commit messages
aryan-rajoria Feb 14, 2023
e96d3a4
formatting
aryan-rajoria Feb 14, 2023
640e7ed
fixing comments
aryan-rajoria Feb 14, 2023
cb50de3
formatting
aryan-rajoria Feb 14, 2023
533de9e
eva insert f32 values
aryan-rajoria Feb 15, 2023
e8081e4
Merge branch 'delete-operation' of github.com:Aryan-Rajoria/eva into …
jarulraj Feb 16, 2023
97bfac4
checkpoint
jarulraj Feb 16, 2023
0127ad0
checkpoint
jarulraj Feb 16, 2023
fc353c9
checkpoint
jarulraj Feb 16, 2023
10fd83a
checkpoint
jarulraj Feb 16, 2023
d559fe1
checkpoint
jarulraj Feb 16, 2023
445a710
checkpoint
jarulraj Feb 16, 2023
d246715
checkpoint
jarulraj Feb 16, 2023
17247fe
checkpoint
jarulraj Feb 16, 2023
8a2d5a3
fix: should delete range
aryan-rajoria Feb 16, 2023
365ae2f
delete multiple rows
aryan-rajoria Feb 16, 2023
88e74e3
udf bootstrap
aryan-rajoria Feb 16, 2023
0d4cde2
checkpoint
jarulraj Feb 16, 2023
a8918a7
checkpoint
jarulraj Feb 16, 2023
40bc613
checkpoint
jarulraj Feb 17, 2023
5a2faac
checkpoint
jarulraj Feb 17, 2023
5f4a802
checkpoint
jarulraj Feb 17, 2023
e952beb
checkpoint
jarulraj Feb 17, 2023
247b041
checkpoint
jarulraj Feb 17, 2023
08420d1
checkpoint
jarulraj Feb 17, 2023
536ae74
checkpoint
jarulraj Feb 17, 2023
d30fcbd
checkpoint
jarulraj Feb 17, 2023
31aa92d
checkpoint
jarulraj Feb 17, 2023
db930a4
checkpoint
jarulraj Feb 17, 2023
f76e856
checkpoint
jarulraj Feb 17, 2023
0ae4cd7
checkpoint
jarulraj Feb 17, 2023
c7c763e
checkpoint
jarulraj Feb 18, 2023
2711582
checkpoint
jarulraj Feb 18, 2023
86c0513
checkpoint
jarulraj Feb 18, 2023
4fcc190
checkpoint
jarulraj Feb 18, 2023
94d0ae0
checkpoint
jarulraj Feb 18, 2023
0701eb8
checkpoint
jarulraj Feb 18, 2023
b31ad91
checkpoint
jarulraj Feb 18, 2023
2bf76ff
checkpoint
jarulraj Feb 18, 2023
d72d778
checkpoint
jarulraj Feb 18, 2023
d74791c
checkpoint
jarulraj Feb 18, 2023
b783430
checkpoint
jarulraj Feb 18, 2023
2450485
checkpoint
jarulraj Feb 18, 2023
b78280e
checkpoint
jarulraj Feb 18, 2023
166aa32
checkpoint
jarulraj Feb 18, 2023
22d1386
Updated config.yml
jarulraj Feb 18, 2023
ba6bda9
checkpoint
jarulraj Feb 18, 2023
f9f4a2b
checkpoint
jarulraj Feb 18, 2023
429e513
checkpoint
jarulraj Feb 18, 2023
7e525a2
checkpoint
jarulraj Feb 18, 2023
1afe38a
checkpoint
jarulraj Feb 18, 2023
9744432
checkpoint
jarulraj Feb 18, 2023
cb859db
checkpoint
jarulraj Feb 19, 2023
acf36ce
checkpoint
jarulraj Feb 19, 2023
768a8a2
checkpoint
jarulraj Feb 19, 2023
32da4af
checkpoint
jarulraj Feb 19, 2023
13c7710
checkpoint
jarulraj Feb 19, 2023
ea2cee7
checkpoint
jarulraj Feb 19, 2023
eb56fcc
checkpoint
jarulraj Feb 19, 2023
ad339a6
checkpoint
jarulraj Feb 19, 2023
db1f388
checkpoint
jarulraj Feb 19, 2023
80bf59c
checkpoint
jarulraj Feb 19, 2023
a740112
checkpoint
jarulraj Feb 19, 2023
41c2051
checkpoint
jarulraj Feb 19, 2023
9fe2f81
checkpoint
jarulraj Feb 20, 2023
f37b92c
checkpoint
jarulraj Feb 20, 2023
46e2106
checkpoint
jarulraj Feb 20, 2023
35168e7
checkpoint
jarulraj Feb 20, 2023
e16eb3c
checkpoint
jarulraj Feb 20, 2023
101761e
Merge branch 'master' into coverage
jarulraj Feb 20, 2023
6732aef
checkpoint
jarulraj Mar 15, 2023
7f07e13
checkpoint
jarulraj Mar 16, 2023
cb27222
Merge branch 'master' into coverage
jarulraj Mar 17, 2023
f7f0274
checkpoint
jarulraj Mar 17, 2023
463183c
checkpoint
jarulraj Mar 17, 2023
b230ad9
checkpoint
jarulraj Mar 17, 2023
6ae22c8
checkpoint
jarulraj Mar 17, 2023
61f3ca4
checkpoint
jarulraj Mar 18, 2023
9834ebf
checkpoint
jarulraj Mar 18, 2023
51ad946
checkpoint
jarulraj Mar 18, 2023
b9e5ea6
checkpoint
jarulraj Mar 18, 2023
b9314fb
checkpoint
jarulraj Mar 19, 2023
43c04a4
checkpoint
jarulraj Mar 19, 2023
8e4a32f
checkpoint
jarulraj Mar 19, 2023
1481442
checkpoint
jarulraj Mar 19, 2023
3acbc20
checkpoint
jarulraj Mar 19, 2023
3611298
checkpoint
jarulraj Mar 19, 2023
f6b27a0
checkpoint
jarulraj Mar 19, 2023
11eb92d
checkpoint
jarulraj Mar 19, 2023
c1db3ce
checkpoint
jarulraj Mar 19, 2023
25f48a7
checkpoint
jarulraj Mar 19, 2023
562bcbd
checkpoint
jarulraj Mar 19, 2023
01a1bd4
checkpoint
jarulraj Mar 19, 2023
a1ec020
checkpoint
jarulraj Mar 19, 2023
307d3d4
checkpoint
jarulraj Mar 19, 2023
98733a6
checkpoint
jarulraj Mar 19, 2023
80292fc
checkpoint
jarulraj Mar 19, 2023
a80e44e
checkpoint
jarulraj Mar 19, 2023
27a2a47
checkpoint
jarulraj Mar 19, 2023
2494795
checkpoint
jarulraj Mar 19, 2023
a128b98
checkpoint
jarulraj Mar 19, 2023
7c4aec1
checkpoint
jarulraj Mar 19, 2023
94b892b
checkpoint
jarulraj Mar 19, 2023
b846e82
checkpoint
jarulraj Mar 20, 2023
d1c2c1c
checkpoint
jarulraj Mar 20, 2023
6d61eb1
checkpoint
jarulraj Mar 20, 2023
c774038
checkpoint
jarulraj Mar 20, 2023
5419356
checkpoint
jarulraj Mar 20, 2023
4c22deb
checkpoint
jarulraj Mar 20, 2023
93f6c58
checkpoint
jarulraj Mar 20, 2023
49e6f97
checkpoint
jarulraj Mar 20, 2023
1d82644
checkpoint
jarulraj Mar 20, 2023
8bfa3d5
checkpoint
jarulraj Mar 21, 2023
87dc9a9
checkpoint
jarulraj Mar 21, 2023
d9f5f3d
checkpoint
jarulraj Mar 21, 2023
20ef35d
checkpoint
jarulraj Mar 21, 2023
734fa8a
checkpoint
jarulraj Mar 21, 2023
162cc42
checkpoint
jarulraj Mar 21, 2023
508e4a7
checkpoint
jarulraj Mar 21, 2023
31dcd9a
checkpoint
jarulraj Mar 21, 2023
7ac35d9
checkpoint
jarulraj Mar 21, 2023
53c3fec
checkpoint
jarulraj Mar 21, 2023
356310b
checkpoint
jarulraj Mar 21, 2023
c5b6c77
checkpoint
jarulraj Mar 21, 2023
03c9b50
checkpoint
jarulraj Mar 21, 2023
212e001
checkpoint
jarulraj Mar 21, 2023
1cda24c
checkpoint
jarulraj Mar 21, 2023
bbff19b
checkpoint
jarulraj Mar 21, 2023
c104ec3
checkpoint
jarulraj Mar 21, 2023
36d1946
try to run tests in parallel
jarulraj Mar 22, 2023
f15e0a3
checkpoint
jarulraj Mar 22, 2023
bfb344d
checkpoint
jarulraj Mar 22, 2023
7d89836
checkpoint
jarulraj Mar 22, 2023
b586931
checkpoint
jarulraj Mar 22, 2023
8f43895
minor fix for ray to work
gaurav274 Mar 23, 2023
a57f720
ray fixes
gaurav274 Mar 23, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,3 @@ exclude_lines =
class LogicalExchangeToPhysical(Rule):
class LogicalExchange(Operator):

[html]
show_contexts = True

8 changes: 4 additions & 4 deletions eva/binder/statement_binder.py
Copy link
Collaborator

@xzdandy xzdandy Mar 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like eva/catalog/catalog_manager.py, shall we also replace logger.execption and raise BinderError with assert for consistency?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, done

Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ def _bind_create_index_statement(self, node: CreateIndexStatement):
col = [
col for col in table_ref_obj.columns if col.name == col_def.name
][0]
if not col.array_type == NdArrayType.FLOAT32:
raise BinderError("Index input needs to be float32.")
if not len(col.array_dimensions) == 2:
raise BinderError("Index input needs to be 2 dimensional.")
assert (
col.array_type == NdArrayType.FLOAT32
), "Index input needs to be float32."
assert len(col.array_dimensions) == 2
else:
# Output of the UDF should be 2 dimension and float32 type.
catalog_manager = CatalogManager()
Expand Down
20 changes: 9 additions & 11 deletions eva/catalog/catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
from eva.parser.create_statement import ColumnDefinition
from eva.parser.table_ref import TableInfo
from eva.parser.types import FileFormatType
from eva.utils.errors import CatalogError
from eva.utils.generic_utils import generate_file_path, get_file_checksum
from eva.utils.logging_manager import logger

Expand Down Expand Up @@ -380,14 +379,17 @@ def create_and_insert_multimedia_table_catalog_entry(
Returns:
TableCatalogEntry: newly inserted table catalog entry
"""
assert format_type in [
FileFormatType.VIDEO,
FileFormatType.IMAGE,
], f"Format Type {format_type} is not supported"

if format_type is FileFormatType.VIDEO:
columns = get_video_table_column_definitions()
table_type = TableType.VIDEO_DATA
elif format_type is FileFormatType.IMAGE:
columns = get_image_table_column_definitions()
table_type = TableType.IMAGE_DATA
else:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, removing this will result in create_and_insert_table_catalog_entry to fail. But, we won't know why? Is it acceptable or am I missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added assert

raise CatalogError(f"Format Type {format_type} is not supported")

return self.create_and_insert_table_catalog_entry(
TableInfo(name), columns, table_type=table_type
Expand All @@ -407,10 +409,9 @@ def get_multimedia_metadata_table_catalog_entry(
# use file_url as the metadata table name
media_metadata_name = Path(input_table.file_url).stem
obj = self.get_table_catalog_entry(media_metadata_name)
if not obj:
err = f"Table with name {media_metadata_name} does not exist in catalog"
logger.exception(err)
raise CatalogError(err)
assert (
obj is not None
), f"Table with name {media_metadata_name} does not exist in catalog"

return obj

Expand All @@ -432,10 +433,7 @@ def create_and_insert_multimedia_metadata_table_catalog_entry(
# use file_url as the metadata table name
media_metadata_name = Path(input_table.file_url).stem
obj = self.get_table_catalog_entry(media_metadata_name)
if obj:
err_msg = f"Table with name {media_metadata_name} already exists"
logger.exception(err_msg)
raise CatalogError(err_msg)
assert obj is None, "Table with name {media_metadata_name} already exists"

columns = [ColumnDefinition("file_url", ColumnType.TEXT, None, None)]
obj = self.create_and_insert_table_catalog_entry(
Expand Down
16 changes: 15 additions & 1 deletion eva/catalog/sql_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

from sqlalchemy import create_engine, event
from sqlalchemy.orm import scoped_session, sessionmaker

Expand All @@ -20,6 +22,17 @@
IDENTIFIER_COLUMN = "_row_id"


def prefix_worker_id(uri: str):
try:
worker_id = os.environ["PYTEST_XDIST_WORKER"]
base = "eva_catalog.db"
uri = uri.replace(base, str(worker_id) + "_" + base)
except KeyError:
# Single threaded mode
pass
return uri


class SQLConfig:
"""Singleton class for configuring connection to the database.

Expand Down Expand Up @@ -47,7 +60,8 @@ def __init__(self):
"""
uri = ConfigurationManager().get_value("core", "catalog_database_uri")
# set echo=True to log SQL
self.engine = create_engine(uri)
updated_uri = prefix_worker_id(str(uri))
self.engine = create_engine(updated_uri)

if self.engine.url.get_backend_name() == "sqlite":
# enforce foreign key constraint and wal logging for sqlite
Expand Down
8 changes: 0 additions & 8 deletions eva/executor/abstract_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,10 @@ def children(self) -> List[AbstractExecutor]:
"""
return self._children

@children.setter
def children(self, children: List["AbstractExecutor"]):
self._children = children

@property
def node(self) -> AbstractPlan:
return self._node

@abstractmethod
def validate(self):
pass

@abstractmethod
def exec(self) -> Iterable[Batch]:
"""
Expand Down
33 changes: 12 additions & 21 deletions eva/executor/apply_and_merge_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
from typing import Iterator

from eva.executor.abstract_executor import AbstractExecutor
from eva.executor.executor_utils import ExecutorError
from eva.models.storage.batch import Batch
from eva.plan_nodes.apply_and_merge_plan import ApplyAndMergePlan
from eva.utils.logging_manager import logger


class ApplyAndMergeExecutor(AbstractExecutor):
Expand All @@ -38,26 +36,19 @@ def __init__(self, node: ApplyAndMergePlan):
self.do_unnest = node.do_unnest
self.alias = node.alias

def validate(self):
pass

def exec(self, *args, **kwargs) -> Iterator[Batch]:
child_executor = self.children[0]
for batch in child_executor.exec(**kwargs):
res = self.func_expr.evaluate(batch)
try:
if not res.empty():
if self.do_unnest:
res.unnest()

# Merge the results to the input.
# This assumes that the batch index is preserved by the function
# call. Since both the batch and the results are sorted, we could
# perform a sorted merge, though the typical small size of the
# batch and results should not significantly impact performance.
merged_batch = Batch.join(batch, res)
merged_batch.reset_index()
yield merged_batch
except Exception as e:
logger.error(e)
raise ExecutorError(e)
if not res.empty():
if self.do_unnest:
res.unnest()

# Merge the results to the input.
# This assumes that the batch index is preserved by the function
# call. Since both the batch and the results are sorted, we could
# perform a sorted merge, though the typical small size of the
# batch and results should not significantly impact performance.
merged_batch = Batch.join(batch, res)
merged_batch.reset_index()
yield merged_batch
3 changes: 0 additions & 3 deletions eva/executor/create_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ def __init__(self, node: CreatePlan):
super().__init__(node)
self.catalog = CatalogManager()

def validate(self):
pass

def exec(self):
if not handle_if_not_exists(self.node.table_info, self.node.if_not_exists):
catalog_entry = self.catalog.create_and_insert_table_catalog_entry(
Expand Down
42 changes: 4 additions & 38 deletions eva/executor/create_index_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ class CreateIndexExecutor(AbstractExecutor):
def __init__(self, node: CreateIndexPlan):
super().__init__(node)

def validate(self):
pass

def exec(self):
catalog_manager = CatalogManager()
if catalog_manager.get_index_catalog_entry_by_name(self.node.name):
Expand All @@ -60,10 +57,12 @@ def exec(self):
# Get the index type.
index_type = self.node.index_type

assert IndexType.is_faiss_index_type(
index_type
), "Index type {} is not supported.".format(index_type)

if IndexType.is_faiss_index_type(index_type):
self._create_faiss_index()
else:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this result in saying the index is successfully created?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added assert

raise ExecutorError("Index type {} is not supported.".format(index_type))

yield Batch(
pd.DataFrame(
Expand All @@ -78,39 +77,6 @@ def _get_index_save_path(self):
/ Path("{}_{}.index".format(self.node.index_type, self.node.name))
)

# Comment out since Index IO is not needed for now.
# def _get_index_io_list(self, input_dim):
# # Input dimension is inferred from the actual feature.
# catalog_manager = CatalogManager()
# input_index_io = catalog_manager.index_io(
# "input_feature",
# ColumnType.NDARRAY,
# NdArrayType.FLOAT32,
# [Dimension.ANYDIM, input_dim],
# True,
# )

# # Output dimension depends on number of searched
# # feature vectors and top N similar feature vectors.
# # IndexIO has detailed documentation about input and
# # output format of index.
# id_index_io = catalog_manager.index_io(
# "logical_id",
# ColumnType.NDARRAY,
# NdArrayType.INT64,
# [Dimension.ANYDIM, Dimension.ANYDIM],
# False,
# )
# distance_index_io = catalog_manager.index_io(
# "distance",
# ColumnType.NDARRAY,
# NdArrayType.FLOAT32,
# [Dimension.ANYDIM, Dimension.ANYDIM],
# False,
# )

# return [input_index_io, id_index_io, distance_index_io]

def _create_faiss_index(self):
try:
catalog_manager = CatalogManager()
Expand Down
3 changes: 0 additions & 3 deletions eva/executor/create_mat_view_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ def __init__(self, node: CreateMaterializedViewPlan):
super().__init__(node)
self.catalog = CatalogManager()

def validate(self):
pass

def exec(self):
"""Create materialized view executor"""
if not handle_if_not_exists(self.node.view, self.node.if_not_exists):
Expand Down
3 changes: 0 additions & 3 deletions eva/executor/create_udf_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ class CreateUDFExecutor(AbstractExecutor):
def __init__(self, node: CreateUDFPlan):
super().__init__(node)

def validate(self):
pass

def exec(self):
"""Create udf executor

Expand Down
73 changes: 27 additions & 46 deletions eva/executor/delete_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Generator, Iterator
from typing import Iterator

import pandas as pd

from eva.catalog.catalog_manager import CatalogManager
from eva.catalog.catalog_type import TableType
from eva.executor.abstract_executor import AbstractExecutor
from eva.executor.executor_utils import ExecutorError, apply_predicate
from eva.executor.executor_utils import apply_predicate
from eva.models.storage.batch import Batch
from eva.plan_nodes.project_plan import ProjectPlan
from eva.storage.storage_engine import StorageEngine
from eva.utils.logging_manager import logger


class DeleteExecutor(AbstractExecutor):
Expand All @@ -34,53 +33,35 @@ def __init__(self, node: ProjectPlan):
self.predicate = node.where_clause
self.catalog = CatalogManager()

def validate(self):
pass

def exec(self, **kwargs) -> Iterator[Batch]:
try:
table_catalog = self.node.table_ref.table.table_obj
storage_engine = StorageEngine.factory(table_catalog)

del_batch = Batch()
table_catalog = self.node.table_ref.table.table_obj
storage_engine = StorageEngine.factory(table_catalog)
del_batch = Batch()

if table_catalog.table_type == TableType.VIDEO_DATA:
raise NotImplementedError("DELETE only implemented for structured data")
elif table_catalog.table_type == TableType.IMAGE_DATA:
raise NotImplementedError("DELETE only implemented for structured data")
elif table_catalog.table_type == TableType.STRUCTURED_DATA:
del_batch = storage_engine.read(table_catalog)
del_batch = list(del_batch)[0]
assert (
table_catalog.table_type == TableType.STRUCTURED_DATA
), "DELETE only implemented for structured data"

# Added because of inconsistency in col_alias in Structured data Batch project function
original_column_names = list(del_batch.frames.columns)
column_names = [
f"{table_catalog.name.lower()}.{name}"
for name in original_column_names
if not name == "_row_id"
]
column_names.insert(0, "_row_id")
del_batch.frames.columns = column_names
del_batch = apply_predicate(del_batch, self.predicate)
del_batch = storage_engine.read(table_catalog)
del_batch = list(del_batch)[0]

# All the batches that need to be deleted
# Added because of inconsistency in col_alias in Structured data Batch project function
original_column_names = list(del_batch.frames.columns)
column_names = [
f"{table_catalog.name.lower()}.{name}"
for name in original_column_names
if not name == "_row_id"
]
column_names.insert(0, "_row_id")
del_batch.frames.columns = column_names
del_batch = apply_predicate(del_batch, self.predicate)

if table_catalog.table_type == TableType.VIDEO_DATA:
storage_engine.delete(table_catalog, del_batch)
elif table_catalog.table_type == TableType.IMAGE_DATA:
storage_engine.delete(table_catalog, del_batch)
elif table_catalog.table_type == TableType.STRUCTURED_DATA:
del_batch.frames.columns = original_column_names
table_needed = del_batch.frames[
[f"{self.predicate.children[0].col_name}"]
]
for num in range(len(del_batch)):
storage_engine.delete(table_catalog, table_needed.iloc[num])
yield Batch(pd.DataFrame(["Deleted row"]))
# All the batches that need to be deleted

except Exception as e:
logger.error(e)
raise ExecutorError(e)
if table_catalog.table_type == TableType.STRUCTURED_DATA:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need this if check, the previous assert ensures that table_catalog.table_type is TableType.STRUCTURED_DATA

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, resolved.

del_batch.frames.columns = original_column_names
table_needed = del_batch.frames[[f"{self.predicate.children[0].col_name}"]]
for num in range(len(del_batch)):
storage_engine.delete(table_catalog, table_needed.iloc[num])

def __call__(self, **kwargs) -> Generator[Batch, None, None]:
yield from self.exec(**kwargs)
yield Batch(pd.DataFrame(["Deleted row"]))
Loading