-
Notifications
You must be signed in to change notification settings - Fork 264
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: reducing coverage loss #619
Changes from 150 commits
c9afc12
34dfbf7
9fa9857
ebe26d3
bc722dd
0e29858
c1a7864
5ac631a
3238e95
02a1d28
01181b5
562a7ca
9887732
d2a1a3d
f09c613
79a6168
5ce1991
91d7b06
0aac934
ee48803
fc2f243
343a4a2
121451f
5b47c15
8c75a5e
6772cd0
7a10d67
1a4204f
e96d3a4
640e7ed
cb50de3
533de9e
e8081e4
97bfac4
0127ad0
fc353c9
10fd83a
d559fe1
445a710
d246715
17247fe
8a2d5a3
365ae2f
88e74e3
0d4cde2
a8918a7
40bc613
5a2faac
5f4a802
e952beb
247b041
08420d1
536ae74
d30fcbd
31aa92d
db930a4
f76e856
0ae4cd7
c7c763e
2711582
86c0513
4fcc190
94d0ae0
0701eb8
b31ad91
2bf76ff
d72d778
d74791c
b783430
2450485
b78280e
166aa32
22d1386
ba6bda9
f9f4a2b
429e513
7e525a2
1afe38a
9744432
cb859db
acf36ce
768a8a2
32da4af
13c7710
ea2cee7
eb56fcc
ad339a6
db1f388
80bf59c
a740112
41c2051
9fe2f81
f37b92c
46e2106
35168e7
e16eb3c
101761e
6732aef
7f07e13
cb27222
f7f0274
463183c
b230ad9
6ae22c8
61f3ca4
9834ebf
51ad946
b9e5ea6
b9314fb
43c04a4
8e4a32f
1481442
3acbc20
3611298
f6b27a0
11eb92d
c1db3ce
25f48a7
562bcbd
01a1bd4
a1ec020
307d3d4
98733a6
80292fc
a80e44e
27a2a47
2494795
a128b98
7c4aec1
94b892b
b846e82
d1c2c1c
6d61eb1
c774038
5419356
4c22deb
93f6c58
49e6f97
1d82644
8bfa3d5
87dc9a9
d9f5f3d
20ef35d
734fa8a
162cc42
508e4a7
31dcd9a
7ac35d9
53c3fec
356310b
c5b6c77
03c9b50
212e001
1cda24c
bbff19b
c104ec3
36d1946
f15e0a3
bfb344d
7d89836
b586931
8f43895
a57f720
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,7 +40,6 @@ | |
from eva.parser.create_statement import ColumnDefinition | ||
from eva.parser.table_ref import TableInfo | ||
from eva.parser.types import FileFormatType | ||
from eva.utils.errors import CatalogError | ||
from eva.utils.generic_utils import generate_file_path, get_file_checksum | ||
from eva.utils.logging_manager import logger | ||
|
||
|
@@ -380,14 +379,17 @@ def create_and_insert_multimedia_table_catalog_entry( | |
Returns: | ||
TableCatalogEntry: newly inserted table catalog entry | ||
""" | ||
assert format_type in [ | ||
FileFormatType.VIDEO, | ||
FileFormatType.IMAGE, | ||
], f"Format Type {format_type} is not supported" | ||
|
||
if format_type is FileFormatType.VIDEO: | ||
columns = get_video_table_column_definitions() | ||
table_type = TableType.VIDEO_DATA | ||
elif format_type is FileFormatType.IMAGE: | ||
columns = get_image_table_column_definitions() | ||
table_type = TableType.IMAGE_DATA | ||
else: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, removing this will result in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||
raise CatalogError(f"Format Type {format_type} is not supported") | ||
|
||
return self.create_and_insert_table_catalog_entry( | ||
TableInfo(name), columns, table_type=table_type | ||
|
@@ -407,10 +409,9 @@ def get_multimedia_metadata_table_catalog_entry( | |
# use file_url as the metadata table name | ||
media_metadata_name = Path(input_table.file_url).stem | ||
obj = self.get_table_catalog_entry(media_metadata_name) | ||
if not obj: | ||
err = f"Table with name {media_metadata_name} does not exist in catalog" | ||
logger.exception(err) | ||
raise CatalogError(err) | ||
assert ( | ||
obj is not None | ||
), f"Table with name {media_metadata_name} does not exist in catalog" | ||
|
||
return obj | ||
|
||
|
@@ -432,10 +433,7 @@ def create_and_insert_multimedia_metadata_table_catalog_entry( | |
# use file_url as the metadata table name | ||
media_metadata_name = Path(input_table.file_url).stem | ||
obj = self.get_table_catalog_entry(media_metadata_name) | ||
if obj: | ||
err_msg = f"Table with name {media_metadata_name} already exists" | ||
logger.exception(err_msg) | ||
raise CatalogError(err_msg) | ||
assert obj is None, "Table with name {media_metadata_name} already exists" | ||
|
||
columns = [ColumnDefinition("file_url", ColumnType.TEXT, None, None)] | ||
obj = self.create_and_insert_table_catalog_entry( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,9 +47,6 @@ class CreateIndexExecutor(AbstractExecutor): | |
def __init__(self, node: CreateIndexPlan): | ||
super().__init__(node) | ||
|
||
def validate(self): | ||
pass | ||
|
||
def exec(self): | ||
catalog_manager = CatalogManager() | ||
if catalog_manager.get_index_catalog_entry_by_name(self.node.name): | ||
|
@@ -60,10 +57,12 @@ def exec(self): | |
# Get the index type. | ||
index_type = self.node.index_type | ||
|
||
assert IndexType.is_faiss_index_type( | ||
index_type | ||
), "Index type {} is not supported.".format(index_type) | ||
|
||
if IndexType.is_faiss_index_type(index_type): | ||
self._create_faiss_index() | ||
else: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Won't this result in saying the index is successfully created? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||
raise ExecutorError("Index type {} is not supported.".format(index_type)) | ||
|
||
yield Batch( | ||
pd.DataFrame( | ||
|
@@ -78,39 +77,6 @@ def _get_index_save_path(self): | |
/ Path("{}_{}.index".format(self.node.index_type, self.node.name)) | ||
) | ||
|
||
# Comment out since Index IO is not needed for now. | ||
# def _get_index_io_list(self, input_dim): | ||
# # Input dimension is inferred from the actual feature. | ||
# catalog_manager = CatalogManager() | ||
# input_index_io = catalog_manager.index_io( | ||
# "input_feature", | ||
# ColumnType.NDARRAY, | ||
# NdArrayType.FLOAT32, | ||
# [Dimension.ANYDIM, input_dim], | ||
# True, | ||
# ) | ||
|
||
# # Output dimension depends on number of searched | ||
# # feature vectors and top N similar feature vectors. | ||
# # IndexIO has detailed documentation about input and | ||
# # output format of index. | ||
# id_index_io = catalog_manager.index_io( | ||
# "logical_id", | ||
# ColumnType.NDARRAY, | ||
# NdArrayType.INT64, | ||
# [Dimension.ANYDIM, Dimension.ANYDIM], | ||
# False, | ||
# ) | ||
# distance_index_io = catalog_manager.index_io( | ||
# "distance", | ||
# ColumnType.NDARRAY, | ||
# NdArrayType.FLOAT32, | ||
# [Dimension.ANYDIM, Dimension.ANYDIM], | ||
# False, | ||
# ) | ||
|
||
# return [input_index_io, id_index_io, distance_index_io] | ||
|
||
def _create_faiss_index(self): | ||
try: | ||
catalog_manager = CatalogManager() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,18 +12,17 @@ | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
from typing import Generator, Iterator | ||
from typing import Iterator | ||
|
||
import pandas as pd | ||
|
||
from eva.catalog.catalog_manager import CatalogManager | ||
from eva.catalog.catalog_type import TableType | ||
from eva.executor.abstract_executor import AbstractExecutor | ||
from eva.executor.executor_utils import ExecutorError, apply_predicate | ||
from eva.executor.executor_utils import apply_predicate | ||
from eva.models.storage.batch import Batch | ||
from eva.plan_nodes.project_plan import ProjectPlan | ||
from eva.storage.storage_engine import StorageEngine | ||
from eva.utils.logging_manager import logger | ||
|
||
|
||
class DeleteExecutor(AbstractExecutor): | ||
|
@@ -34,53 +33,35 @@ def __init__(self, node: ProjectPlan): | |
self.predicate = node.where_clause | ||
self.catalog = CatalogManager() | ||
|
||
def validate(self): | ||
pass | ||
|
||
def exec(self, **kwargs) -> Iterator[Batch]: | ||
try: | ||
table_catalog = self.node.table_ref.table.table_obj | ||
storage_engine = StorageEngine.factory(table_catalog) | ||
|
||
del_batch = Batch() | ||
table_catalog = self.node.table_ref.table.table_obj | ||
storage_engine = StorageEngine.factory(table_catalog) | ||
del_batch = Batch() | ||
|
||
if table_catalog.table_type == TableType.VIDEO_DATA: | ||
raise NotImplementedError("DELETE only implemented for structured data") | ||
elif table_catalog.table_type == TableType.IMAGE_DATA: | ||
raise NotImplementedError("DELETE only implemented for structured data") | ||
elif table_catalog.table_type == TableType.STRUCTURED_DATA: | ||
del_batch = storage_engine.read(table_catalog) | ||
del_batch = list(del_batch)[0] | ||
assert ( | ||
table_catalog.table_type == TableType.STRUCTURED_DATA | ||
), "DELETE only implemented for structured data" | ||
|
||
# Added because of inconsistency in col_alias in Structured data Batch project function | ||
original_column_names = list(del_batch.frames.columns) | ||
column_names = [ | ||
f"{table_catalog.name.lower()}.{name}" | ||
for name in original_column_names | ||
if not name == "_row_id" | ||
] | ||
column_names.insert(0, "_row_id") | ||
del_batch.frames.columns = column_names | ||
del_batch = apply_predicate(del_batch, self.predicate) | ||
del_batch = storage_engine.read(table_catalog) | ||
del_batch = list(del_batch)[0] | ||
|
||
# All the batches that need to be deleted | ||
# Added because of inconsistency in col_alias in Structured data Batch project function | ||
original_column_names = list(del_batch.frames.columns) | ||
column_names = [ | ||
f"{table_catalog.name.lower()}.{name}" | ||
for name in original_column_names | ||
if not name == "_row_id" | ||
] | ||
column_names.insert(0, "_row_id") | ||
del_batch.frames.columns = column_names | ||
del_batch = apply_predicate(del_batch, self.predicate) | ||
|
||
if table_catalog.table_type == TableType.VIDEO_DATA: | ||
storage_engine.delete(table_catalog, del_batch) | ||
elif table_catalog.table_type == TableType.IMAGE_DATA: | ||
storage_engine.delete(table_catalog, del_batch) | ||
elif table_catalog.table_type == TableType.STRUCTURED_DATA: | ||
del_batch.frames.columns = original_column_names | ||
table_needed = del_batch.frames[ | ||
[f"{self.predicate.children[0].col_name}"] | ||
] | ||
for num in range(len(del_batch)): | ||
storage_engine.delete(table_catalog, table_needed.iloc[num]) | ||
yield Batch(pd.DataFrame(["Deleted row"])) | ||
# All the batches that need to be deleted | ||
|
||
except Exception as e: | ||
logger.error(e) | ||
raise ExecutorError(e) | ||
if table_catalog.table_type == TableType.STRUCTURED_DATA: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do not need this if check, the previous There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, resolved. |
||
del_batch.frames.columns = original_column_names | ||
table_needed = del_batch.frames[[f"{self.predicate.children[0].col_name}"]] | ||
for num in range(len(del_batch)): | ||
storage_engine.delete(table_catalog, table_needed.iloc[num]) | ||
|
||
def __call__(self, **kwargs) -> Generator[Batch, None, None]: | ||
yield from self.exec(**kwargs) | ||
yield Batch(pd.DataFrame(["Deleted row"])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like
eva/catalog/catalog_manager.py
, shall we also replacelogger.execption and raise BinderError
with assert for consistency?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, done