Skip to content

Commit 6374634

Browse files
kevjumbaadchia
authored andcommitted
fix: Enforce kw args in datasources (#2567)
* Update Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Update to keyword args Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Change kinesis to optional Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix review issues Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Add unit tests Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix imports Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
1 parent dd8b854 commit 6374634

File tree

9 files changed

+271
-42
lines changed

9 files changed

+271
-42
lines changed

sdk/python/feast/data_source.py

+168-35
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ class DataSource(ABC):
186186

187187
def __init__(
188188
self,
189+
*,
189190
event_timestamp_column: Optional[str] = None,
190191
created_timestamp_column: Optional[str] = None,
191192
field_mapping: Optional[Dict[str, str]] = None,
@@ -354,11 +355,12 @@ def get_table_column_names_and_types(
354355

355356
def __init__(
356357
self,
357-
name: str,
358-
event_timestamp_column: str,
359-
bootstrap_servers: str,
360-
message_format: StreamFormat,
361-
topic: str,
358+
*args,
359+
name: Optional[str] = None,
360+
event_timestamp_column: Optional[str] = "",
361+
bootstrap_servers: Optional[str] = None,
362+
message_format: Optional[StreamFormat] = None,
363+
topic: Optional[str] = None,
362364
created_timestamp_column: Optional[str] = "",
363365
field_mapping: Optional[Dict[str, str]] = None,
364366
date_partition_column: Optional[str] = "",
@@ -368,22 +370,62 @@ def __init__(
368370
timestamp_field: Optional[str] = "",
369371
batch_source: Optional[DataSource] = None,
370372
):
373+
positional_attributes = [
374+
"name",
375+
"event_timestamp_column",
376+
"bootstrap_servers",
377+
"message_format",
378+
"topic",
379+
]
380+
_name = name
381+
_event_timestamp_column = event_timestamp_column
382+
_bootstrap_servers = bootstrap_servers or ""
383+
_message_format = message_format
384+
_topic = topic or ""
385+
386+
if args:
387+
warnings.warn(
388+
(
389+
"Kafka parameters should be specified as a keyword argument instead of a positional arg."
390+
"Feast 0.23+ will not support positional arguments to construct Kafka sources"
391+
),
392+
DeprecationWarning,
393+
)
394+
if len(args) > len(positional_attributes):
395+
raise ValueError(
396+
f"Only {', '.join(positional_attributes)} are allowed as positional args when defining "
397+
f"Kafka sources, for backwards compatibility."
398+
)
399+
if len(args) >= 1:
400+
_name = args[0]
401+
if len(args) >= 2:
402+
_event_timestamp_column = args[1]
403+
if len(args) >= 3:
404+
_bootstrap_servers = args[2]
405+
if len(args) >= 4:
406+
_message_format = args[3]
407+
if len(args) >= 5:
408+
_topic = args[4]
409+
410+
if _message_format is None:
411+
raise ValueError("Message format must be specified for Kafka source")
412+
print("Asdfasdf")
371413
super().__init__(
372-
event_timestamp_column=event_timestamp_column,
414+
event_timestamp_column=_event_timestamp_column,
373415
created_timestamp_column=created_timestamp_column,
374416
field_mapping=field_mapping,
375417
date_partition_column=date_partition_column,
376418
description=description,
377419
tags=tags,
378420
owner=owner,
379-
name=name,
421+
name=_name,
380422
timestamp_field=timestamp_field,
381423
)
382424
self.batch_source = batch_source
383425
self.kafka_options = KafkaOptions(
384-
bootstrap_servers=bootstrap_servers,
385-
message_format=message_format,
386-
topic=topic,
426+
bootstrap_servers=_bootstrap_servers,
427+
message_format=_message_format,
428+
topic=_topic,
387429
)
388430

389431
def __eq__(self, other):
@@ -472,32 +514,56 @@ class RequestSource(DataSource):
472514

473515
def __init__(
474516
self,
475-
name: str,
476-
schema: Union[Dict[str, ValueType], List[Field]],
517+
*args,
518+
name: Optional[str] = None,
519+
schema: Optional[Union[Dict[str, ValueType], List[Field]]] = None,
477520
description: Optional[str] = "",
478521
tags: Optional[Dict[str, str]] = None,
479522
owner: Optional[str] = "",
480523
):
481524
"""Creates a RequestSource object."""
482-
super().__init__(name=name, description=description, tags=tags, owner=owner)
483-
if isinstance(schema, Dict):
525+
positional_attributes = ["name", "schema"]
526+
_name = name
527+
_schema = schema
528+
if args:
529+
warnings.warn(
530+
(
531+
"Request source parameters should be specified as a keyword argument instead of a positional arg."
532+
"Feast 0.23+ will not support positional arguments to construct request sources"
533+
),
534+
DeprecationWarning,
535+
)
536+
if len(args) > len(positional_attributes):
537+
raise ValueError(
538+
f"Only {', '.join(positional_attributes)} are allowed as positional args when defining "
539+
f"feature views, for backwards compatibility."
540+
)
541+
if len(args) >= 1:
542+
_name = args[0]
543+
if len(args) >= 2:
544+
_schema = args[1]
545+
546+
super().__init__(name=_name, description=description, tags=tags, owner=owner)
547+
if not _schema:
548+
raise ValueError("Schema needs to be provided for Request Source")
549+
if isinstance(_schema, Dict):
484550
warnings.warn(
485551
"Schema in RequestSource is changing type. The schema data type Dict[str, ValueType] is being deprecated in Feast 0.23. "
486552
"Please use List[Field] instead for the schema",
487553
DeprecationWarning,
488554
)
489555
schemaList = []
490-
for key, valueType in schema.items():
556+
for key, valueType in _schema.items():
491557
schemaList.append(
492558
Field(name=key, dtype=VALUE_TYPES_TO_FEAST_TYPES[valueType])
493559
)
494560
self.schema = schemaList
495-
elif isinstance(schema, List):
496-
self.schema = schema
561+
elif isinstance(_schema, List):
562+
self.schema = _schema
497563
else:
498564
raise Exception(
499565
"Schema type must be either dictionary or list, not "
500-
+ str(type(schema))
566+
+ str(type(_schema))
501567
)
502568

503569
def validate(self, config: RepoConfig):
@@ -643,12 +709,13 @@ def get_table_query_string(self) -> str:
643709

644710
def __init__(
645711
self,
646-
name: str,
647-
event_timestamp_column: str,
648-
created_timestamp_column: str,
649-
record_format: StreamFormat,
650-
region: str,
651-
stream_name: str,
712+
*args,
713+
name: Optional[str] = None,
714+
event_timestamp_column: Optional[str] = "",
715+
created_timestamp_column: Optional[str] = "",
716+
record_format: Optional[StreamFormat] = None,
717+
region: Optional[str] = "",
718+
stream_name: Optional[str] = "",
652719
field_mapping: Optional[Dict[str, str]] = None,
653720
date_partition_column: Optional[str] = "",
654721
description: Optional[str] = "",
@@ -657,10 +724,53 @@ def __init__(
657724
timestamp_field: Optional[str] = "",
658725
batch_source: Optional[DataSource] = None,
659726
):
727+
positional_attributes = [
728+
"name",
729+
"event_timestamp_column",
730+
"created_timestamp_column",
731+
"record_format",
732+
"region",
733+
"stream_name",
734+
]
735+
_name = name
736+
_event_timestamp_column = event_timestamp_column
737+
_created_timestamp_column = created_timestamp_column
738+
_record_format = record_format
739+
_region = region or ""
740+
_stream_name = stream_name or ""
741+
if args:
742+
warnings.warn(
743+
(
744+
"Kinesis parameters should be specified as a keyword argument instead of a positional arg."
745+
"Feast 0.23+ will not support positional arguments to construct kinesis sources"
746+
),
747+
DeprecationWarning,
748+
)
749+
if len(args) > len(positional_attributes):
750+
raise ValueError(
751+
f"Only {', '.join(positional_attributes)} are allowed as positional args when defining "
752+
f"kinesis sources, for backwards compatibility."
753+
)
754+
if len(args) >= 1:
755+
_name = args[0]
756+
if len(args) >= 2:
757+
_event_timestamp_column = args[1]
758+
if len(args) >= 3:
759+
_created_timestamp_column = args[2]
760+
if len(args) >= 4:
761+
_record_format = args[3]
762+
if len(args) >= 5:
763+
_region = args[4]
764+
if len(args) >= 6:
765+
_stream_name = args[5]
766+
767+
if _record_format is None:
768+
raise ValueError("Record format must be specified for kinesis source")
769+
660770
super().__init__(
661-
name=name,
662-
event_timestamp_column=event_timestamp_column,
663-
created_timestamp_column=created_timestamp_column,
771+
name=_name,
772+
event_timestamp_column=_event_timestamp_column,
773+
created_timestamp_column=_created_timestamp_column,
664774
field_mapping=field_mapping,
665775
date_partition_column=date_partition_column,
666776
description=description,
@@ -670,7 +780,7 @@ def __init__(
670780
)
671781
self.batch_source = batch_source
672782
self.kinesis_options = KinesisOptions(
673-
record_format=record_format, region=region, stream_name=stream_name
783+
record_format=_record_format, region=_region, stream_name=_stream_name
674784
)
675785

676786
def __eq__(self, other):
@@ -725,9 +835,9 @@ class PushSource(DataSource):
725835

726836
def __init__(
727837
self,
728-
*,
729-
name: str,
730-
batch_source: DataSource,
838+
*args,
839+
name: Optional[str] = None,
840+
batch_source: Optional[DataSource] = None,
731841
description: Optional[str] = "",
732842
tags: Optional[Dict[str, str]] = None,
733843
owner: Optional[str] = "",
@@ -744,10 +854,33 @@ def __init__(
744854
maintainer.
745855
746856
"""
747-
super().__init__(name=name, description=description, tags=tags, owner=owner)
748-
self.batch_source = batch_source
749-
if not self.batch_source:
750-
raise ValueError(f"batch_source is needed for push source {self.name}")
857+
positional_attributes = ["name", "batch_source"]
858+
_name = name
859+
_batch_source = batch_source
860+
if args:
861+
warnings.warn(
862+
(
863+
"Push source parameters should be specified as a keyword argument instead of a positional arg."
864+
"Feast 0.23+ will not support positional arguments to construct push sources"
865+
),
866+
DeprecationWarning,
867+
)
868+
if len(args) > len(positional_attributes):
869+
raise ValueError(
870+
f"Only {', '.join(positional_attributes)} are allowed as positional args when defining "
871+
f"push sources, for backwards compatibility."
872+
)
873+
if len(args) >= 1:
874+
_name = args[0]
875+
if len(args) >= 2:
876+
_batch_source = args[1]
877+
878+
super().__init__(name=_name, description=description, tags=tags, owner=owner)
879+
if not _batch_source:
880+
raise ValueError(
881+
f"batch_source parameter is needed for push source {self.name}"
882+
)
883+
self.batch_source = _batch_source
751884

752885
def __eq__(self, other):
753886
if not isinstance(other, PushSource):

sdk/python/feast/feature_view.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ def __init__(
137137
ValueError: A field mapping conflicts with an Entity or a Feature.
138138
"""
139139

140-
positional_attributes = ["name, entities, ttl"]
140+
positional_attributes = ["name", "entities", "ttl"]
141141

142142
_name = name
143143
_entities = entities

sdk/python/feast/infra/offline_stores/bigquery_source.py

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
class BigQuerySource(DataSource):
1717
def __init__(
1818
self,
19+
*,
1920
event_timestamp_column: Optional[str] = "",
2021
table: Optional[str] = None,
2122
created_timestamp_column: Optional[str] = "",

sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class SparkSourceFormat(Enum):
3030
class SparkSource(DataSource):
3131
def __init__(
3232
self,
33+
*,
3334
name: Optional[str] = None,
3435
table: Optional[str] = None,
3536
query: Optional[str] = None,

sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py

-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ def __init__(
8888
table: Optional[str] = None,
8989
created_timestamp_column: Optional[str] = "",
9090
field_mapping: Optional[Dict[str, str]] = None,
91-
date_partition_column: Optional[str] = None,
9291
query: Optional[str] = None,
9392
name: Optional[str] = None,
9493
description: Optional[str] = "",

sdk/python/feast/infra/offline_stores/file_source.py

+22-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
class FileSource(DataSource):
2121
def __init__(
2222
self,
23-
path: str,
23+
*args,
24+
path: Optional[str] = None,
2425
event_timestamp_column: Optional[str] = "",
2526
file_format: Optional[FileFormat] = None,
2627
created_timestamp_column: Optional[str] = "",
@@ -58,13 +59,31 @@ def __init__(
5859
>>> from feast import FileSource
5960
>>> file_source = FileSource(path="my_features.parquet", timestamp_field="event_timestamp")
6061
"""
61-
if path is None:
62+
positional_attributes = ["path"]
63+
_path = path
64+
if args:
65+
if args:
66+
warnings.warn(
67+
(
68+
"File Source parameters should be specified as a keyword argument instead of a positional arg."
69+
"Feast 0.23+ will not support positional arguments to construct File sources"
70+
),
71+
DeprecationWarning,
72+
)
73+
if len(args) > len(positional_attributes):
74+
raise ValueError(
75+
f"Only {', '.join(positional_attributes)} are allowed as positional args when defining "
76+
f"File sources, for backwards compatibility."
77+
)
78+
if len(args) >= 1:
79+
_path = args[0]
80+
if _path is None:
6281
raise ValueError(
6382
'No "path" argument provided. Please set "path" to the location of your file source.'
6483
)
6584
self.file_options = FileOptions(
6685
file_format=file_format,
67-
uri=path,
86+
uri=_path,
6887
s3_endpoint_override=s3_endpoint_override,
6988
)
7089

sdk/python/feast/infra/offline_stores/redshift_source.py

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
class RedshiftSource(DataSource):
1717
def __init__(
1818
self,
19+
*,
1920
event_timestamp_column: Optional[str] = "",
2021
table: Optional[str] = None,
2122
schema: Optional[str] = None,

sdk/python/feast/infra/offline_stores/snowflake_source.py

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
class SnowflakeSource(DataSource):
1616
def __init__(
1717
self,
18+
*,
1819
database: Optional[str] = None,
1920
warehouse: Optional[str] = None,
2021
schema: Optional[str] = None,

0 commit comments

Comments
 (0)