@@ -134,6 +134,18 @@ def to_proto(self) -> DataSourceProto.KinesisOptions:
134
134
return kinesis_options_proto
135
135
136
136
137
+ _DATA_SOURCE_OPTIONS = {
138
+ DataSourceProto .SourceType .BATCH_FILE : "feast.infra.offline_stores.file_source.FileSource" ,
139
+ DataSourceProto .SourceType .BATCH_BIGQUERY : "feast.infra.offline_stores.bigquery_source.BigQuerySource" ,
140
+ DataSourceProto .SourceType .BATCH_REDSHIFT : "feast.infra.offline_stores.redshift_source.RedshiftSource" ,
141
+ DataSourceProto .SourceType .BATCH_SNOWFLAKE : "feast.infra.offline_stores.snowflake_source.SnowflakeSource" ,
142
+ DataSourceProto .SourceType .STREAM_KAFKA : "feast.data_source.KafkaSource" ,
143
+ DataSourceProto .SourceType .STREAM_KINESIS : "feast.data_source.KinesisSource" ,
144
+ DataSourceProto .SourceType .REQUEST_SOURCE : "feast.data_source.RequestDataSource" ,
145
+ DataSourceProto .SourceType .PUSH_SOURCE : "feast.data_source.PushSource" ,
146
+ }
147
+
148
+
137
149
class DataSource (ABC ):
138
150
"""
139
151
DataSource that can be used to source features.
@@ -210,13 +222,20 @@ def from_proto(data_source: DataSourceProto) -> Any:
210
222
Raises:
211
223
ValueError: The type of DataSource could not be identified.
212
224
"""
225
+ data_source_type = data_source .type
226
+ if not data_source_type or (
227
+ data_source_type
228
+ not in list (_DATA_SOURCE_OPTIONS .keys ())
229
+ + [DataSourceProto .SourceType .CUSTOM_SOURCE ]
230
+ ):
231
+ raise ValueError ("Could not identify the source type being added." )
213
232
214
- if data_source . data_source_class_type :
233
+ if data_source_type == DataSourceProto . SourceType . CUSTOM_SOURCE :
215
234
cls = get_data_source_class_from_type (data_source .data_source_class_type )
216
235
return cls .from_proto (data_source )
217
- raise ValueError (
218
- f"Could not identify the type for data source: { data_source . name } ."
219
- )
236
+
237
+ cls = get_data_source_class_from_type ( _DATA_SOURCE_OPTIONS [ data_source_type ])
238
+ return cls . from_proto ( data_source )
220
239
221
240
@abstractmethod
222
241
def to_proto (self ) -> DataSourceProto :
0 commit comments