1
+ import logging
1
2
from datetime import datetime
2
3
from typing import Any , Callable , Dict , List , Optional , Sequence , Tuple
3
4
4
5
from pydantic .typing import Literal
6
+ from pymilvus import (
7
+ Collection ,
8
+ CollectionSchema ,
9
+ DataType ,
10
+ FieldSchema ,
11
+ connections ,
12
+ utility ,
13
+ )
5
14
6
15
from feast import Entity , RepoConfig
7
16
from feast .expediagroup .vectordb .vector_feature_view import VectorFeatureView
8
17
from feast .expediagroup .vectordb .vector_online_store import VectorOnlineStore
18
+ from feast .field import Field
9
19
from feast .protos .feast .types .EntityKey_pb2 import EntityKey as EntityKeyProto
10
20
from feast .protos .feast .types .Value_pb2 import Value as ValueProto
11
21
from feast .repo_config import FeastConfigBaseModel
22
+ from feast .types import (
23
+ Array ,
24
+ FeastType ,
25
+ Float32 ,
26
+ Float64 ,
27
+ Int32 ,
28
+ Int64 ,
29
+ Invalid ,
30
+ String ,
31
+ )
32
+ from feast .usage import log_exceptions_and_usage
33
+
34
+ logger = logging .getLogger (__name__ )
12
35
13
36
14
37
class MilvusOnlineStoreConfig (FeastConfigBaseModel ):
@@ -17,13 +40,47 @@ class MilvusOnlineStoreConfig(FeastConfigBaseModel):
17
40
type : Literal ["milvus" ] = "milvus"
18
41
"""Online store type selector"""
19
42
43
+ alias : str = "default"
44
+ """ alias for milvus connection"""
45
+
20
46
host : str
21
47
""" the host URL """
22
48
49
+ username : str
50
+ """ username to connect to Milvus """
51
+
52
+ password : str
53
+ """ password to connect to Milvus """
54
+
23
55
port : int = 19530
24
56
""" the port to connect to a Milvus instance. Should be the one used for GRPC (default: 19530) """
25
57
26
58
59
+ class MilvusConnectionManager :
60
+ def __init__ (self , online_config : RepoConfig ):
61
+ self .online_config = online_config
62
+
63
+ def __enter__ (self ):
64
+ # Connecting to Milvus
65
+ logger .info (
66
+ f"Connecting to Milvus with alias { self .online_config .alias } and host { self .online_config .host } and default port { self .online_config .port } ."
67
+ )
68
+ connections .connect (
69
+ host = self .online_config .host ,
70
+ username = self .online_config .username ,
71
+ password = self .online_config .password ,
72
+ use_secure = True ,
73
+ )
74
+
75
+ def __exit__ (self , exc_type , exc_value , traceback ):
76
+ # Disconnecting from Milvus
77
+ logger .info ("Closing the connection to Milvus" )
78
+ connections .disconnect (self .online_config .alias )
79
+ logger .info ("Connection Closed" )
80
+ if exc_type is not None :
81
+ logger .error (f"An exception of type { exc_type } occurred: { exc_value } " )
82
+
83
+
27
84
class MilvusOnlineStore (VectorOnlineStore ):
28
85
def online_write_batch (
29
86
self ,
@@ -49,6 +106,7 @@ def online_read(
49
106
"to be implemented in https://jira.expedia.biz/browse/EAPC-7972"
50
107
)
51
108
109
+ @log_exceptions_and_usage (online_store = "milvus" )
52
110
def update (
53
111
self ,
54
112
config : RepoConfig ,
@@ -58,9 +116,41 @@ def update(
58
116
entities_to_keep : Sequence [Entity ],
59
117
partial : bool ,
60
118
):
61
- raise NotImplementedError (
62
- "to be implemented in https://jira.expedia.biz/browse/EAPC-7970"
63
- )
119
+ with MilvusConnectionManager (config .online_store ):
120
+ for table_to_keep in tables_to_keep :
121
+ collection_available = utility .has_collection (table_to_keep .name )
122
+ try :
123
+ if collection_available :
124
+ logger .info (f"Collection { table_to_keep .name } already exists." )
125
+ else :
126
+ schema = self ._convert_featureview_schema_to_milvus_readable (
127
+ table_to_keep .schema ,
128
+ table_to_keep .vector_field ,
129
+ table_to_keep .dimensions ,
130
+ )
131
+
132
+ collection = Collection (name = table_to_keep .name , schema = schema )
133
+ logger .info (f"Collection name is { collection .name } " )
134
+ logger .info (
135
+ f"Collection { table_to_keep .name } has been created successfully."
136
+ )
137
+ except Exception as e :
138
+ logger .error (f"Collection update failed due to { e } " )
139
+
140
+ for table_to_delete in tables_to_delete :
141
+ collection_available = utility .has_collection (table_to_delete .name )
142
+ try :
143
+ if collection_available :
144
+ utility .drop_collection (table_to_delete .name )
145
+ logger .info (
146
+ f"Collection { table_to_delete .name } has been deleted successfully."
147
+ )
148
+ else :
149
+ logger .warning (
150
+ f"Collection { table_to_delete .name } does not exist or is already deleted."
151
+ )
152
+ except Exception as e :
153
+ logger .error (f"Collection deletion failed due to { e } " )
64
154
65
155
def teardown (
66
156
self ,
@@ -71,3 +161,72 @@ def teardown(
71
161
raise NotImplementedError (
72
162
"to be implemented in https://jira.expedia.biz/browse/EAPC-7974"
73
163
)
164
+
165
+ def _convert_featureview_schema_to_milvus_readable (
166
+ self , feast_schema : List [Field ], vector_field , vector_field_dimensions
167
+ ) -> CollectionSchema :
168
+ """
169
+ Converting a schema understood by Feast to a schema that is readable by Milvus so that it
170
+ can be used when a collection is created in Milvus.
171
+
172
+ Parameters:
173
+ feast_schema (List[Field]): Schema stored in VectorFeatureView.
174
+
175
+ Returns:
176
+ (CollectionSchema): Schema readable by Milvus.
177
+
178
+ """
179
+ boolean_mapping_from_string = {"True" : True , "False" : False }
180
+ field_list = []
181
+ dimension = None
182
+
183
+ for field in feast_schema :
184
+ if field .name == vector_field :
185
+ field_name = vector_field
186
+ dimension = vector_field_dimensions
187
+ else :
188
+ field_name = field .name
189
+
190
+ data_type = self ._feast_to_milvus_data_type (field .dtype )
191
+
192
+ if field .tags :
193
+ description = field .tags .get ("description" , " " )
194
+ is_primary = boolean_mapping_from_string .get (
195
+ field .tags .get ("is_primary" , "False" )
196
+ )
197
+
198
+ # Appending the above converted values to construct a FieldSchema
199
+ field_list .append (
200
+ FieldSchema (
201
+ name = field_name ,
202
+ dtype = data_type ,
203
+ description = description ,
204
+ is_primary = is_primary ,
205
+ dim = dimension ,
206
+ )
207
+ )
208
+ # Returning a CollectionSchema which is a list of type FieldSchema.
209
+ return CollectionSchema (field_list )
210
+
211
+ def _feast_to_milvus_data_type (self , feast_type : FeastType ) -> DataType :
212
+ """
213
+ Mapping for converting Feast data type to a data type compatible wih Milvus.
214
+
215
+ Parameters:
216
+ feast_type (FeastType): This is a type associated with a Feature that is stored in a VectorFeatureView, readable with Feast.
217
+
218
+ Returns:
219
+ DataType : DataType associated with what Milvus can understand and associate its Feature types to
220
+ """
221
+
222
+ return {
223
+ Int32 : DataType .INT32 ,
224
+ Int64 : DataType .INT64 ,
225
+ Float32 : DataType .FLOAT ,
226
+ Float64 : DataType .DOUBLE ,
227
+ String : DataType .STRING ,
228
+ Invalid : DataType .UNKNOWN ,
229
+ Array (Float32 ): DataType .FLOAT_VECTOR ,
230
+ # TODO: Need to think about list of binaries and list of bytes
231
+ # FeastType.BYTES_LIST: DataType.BINARY_VECTOR
232
+ }.get (feast_type , None )
0 commit comments