1
1
import os
2
- import time
3
2
import uuid
4
3
from datetime import datetime
5
4
16
15
from feast .feature import Feature
17
16
from feast .feature_table import FeatureTable
18
17
from feast .value_type import ValueType
18
+ from feast .wait import wait_retry_backoff
19
19
20
20
DIR_PATH = os .path .dirname (os .path .realpath (__file__ ))
21
21
PROJECT_NAME = "basic_" + uuid .uuid4 ().hex .upper ()[0 :6 ]
@@ -236,6 +236,7 @@ def test_get_list_alltypes(
236
236
assert actual_list_feature_table == alltypes_featuretable
237
237
238
238
239
+ @pytest .mark .bq
239
240
def test_ingest (
240
241
client : Client ,
241
242
customer_entity : Entity ,
@@ -255,12 +256,31 @@ def test_ingest(
255
256
client .apply_feature_table (bq_featuretable )
256
257
client .ingest (bq_featuretable , bq_dataset , timeout = 120 )
257
258
258
- # Give time to allow data to propagate to BQ table
259
- time .sleep (15 )
260
-
259
+ from google .api_core .exceptions import NotFound
261
260
from google .cloud import bigquery
262
261
263
262
bq_client = bigquery .Client (project = gcp_project )
263
+
264
+ # Poll BQ for table until the table has been created
265
+ def try_get_table ():
266
+ table_exist = False
267
+ table_resp = None
268
+ try :
269
+ table_resp = bq_client .get_table (bq_table_id )
270
+
271
+ if table_resp and table_resp .table_id == bq_table_id .split ("." )[- 1 ]:
272
+ table_exist = True
273
+ except NotFound :
274
+ pass
275
+
276
+ return table_resp , table_exist
277
+
278
+ wait_retry_backoff (
279
+ retry_fn = try_get_table ,
280
+ timeout_secs = 30 ,
281
+ timeout_msg = "Timed out trying to get bigquery table" ,
282
+ )
283
+
264
284
query_string = f"SELECT * FROM `{ bq_table_id } `"
265
285
266
286
job = bq_client .query (query_string )
0 commit comments