Skip to content

Commit b542df5

Browse files
oavdeevkhorshuheng
authored andcommitted
add get and list jobs interface to launcher and implement it for EMR
Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
1 parent d6bf41e commit b542df5

File tree

5 files changed

+133
-23
lines changed

5 files changed

+133
-23
lines changed

sdk/python/feast/pyspark/abc.py

+8
Original file line numberDiff line numberDiff line change
@@ -470,3 +470,11 @@ def stage_dataframe(
470470
FileSource: representing the uploaded dataframe.
471471
"""
472472
raise NotImplementedError
473+
474+
@abc.abstractmethod
475+
def get_job_by_id(self, job_id: str) -> SparkJob:
476+
raise NotImplementedError
477+
478+
@abc.abstractmethod
479+
def list_jobs(self, include_terminated: bool) -> List[SparkJob]:
480+
raise NotImplementedError

sdk/python/feast/pyspark/launchers/aws/emr.py

+81-6
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import boto3
77
import pandas
8+
from botocore.config import Config as BotoConfig
89

910
from feast.data_format import ParquetFormat
1011
from feast.data_source import FileSource
@@ -14,6 +15,7 @@
1415
JobLauncher,
1516
RetrievalJob,
1617
RetrievalJobParameters,
18+
SparkJob,
1719
SparkJobFailure,
1820
SparkJobStatus,
1921
StreamIngestionJob,
@@ -22,13 +24,19 @@
2224

2325
from .emr_utils import (
2426
FAILED_STEP_STATES,
27+
HISTORICAL_RETRIEVAL_JOB_TYPE,
2528
IN_PROGRESS_STEP_STATES,
29+
OFFLINE_TO_ONLINE_JOB_TYPE,
30+
STREAM_TO_ONLINE_JOB_TYPE,
2631
SUCCEEDED_STEP_STATES,
2732
TERMINAL_STEP_STATES,
2833
EmrJobRef,
34+
JobInfo,
2935
_cancel_job,
3036
_get_job_state,
3137
_historical_retrieval_step,
38+
_job_ref_to_str,
39+
_list_jobs,
3240
_load_new_cluster_template,
3341
_random_string,
3442
_s3_upload,
@@ -50,7 +58,7 @@ def __init__(self, emr_client, job_ref: EmrJobRef):
5058
self._emr_client = emr_client
5159

5260
def get_id(self) -> str:
53-
return f'{self._job_ref.cluster_id}:{self._job_ref.step_id or ""}'
61+
return _job_ref_to_str(self._job_ref)
5462

5563
def get_status(self) -> SparkJobStatus:
5664
emr_state = _get_job_state(self._emr_client, self._job_ref)
@@ -164,7 +172,10 @@ def __init__(
164172
self._region = region
165173

166174
def _emr_client(self):
167-
return boto3.client("emr", region_name=self._region)
175+
176+
# Use an increased number of retries since DescribeStep calls have a pretty low rate limit.
177+
config = BotoConfig(retries={"max_attempts": 10, "mode": "standard"})
178+
return boto3.client("emr", region_name=self._region, config=config)
168179

169180
def _submit_emr_job(self, step: Dict[str, Any]) -> EmrJobRef:
170181
"""
@@ -211,15 +222,15 @@ def historical_feature_retrieval(
211222
)
212223

213224
step = _historical_retrieval_step(
214-
pyspark_script_path, args=job_params.get_arguments()
225+
pyspark_script_path,
226+
args=job_params.get_arguments(),
227+
output_file_uri=job_params.get_destination_path(),
215228
)
216229

217230
job_ref = self._submit_emr_job(step)
218231

219232
return EmrRetrievalJob(
220-
self._emr_client(),
221-
job_ref,
222-
os.path.join(job_params.get_destination_path()),
233+
self._emr_client(), job_ref, job_params.get_destination_path(),
223234
)
224235

225236
def offline_to_online_ingestion(
@@ -297,3 +308,67 @@ def stage_dataframe(
297308
file_format=ParquetFormat(),
298309
file_url=file_url,
299310
)
311+
312+
def _job_from_job_info(self, job_info: JobInfo) -> SparkJob:
313+
if job_info.job_type == HISTORICAL_RETRIEVAL_JOB_TYPE:
314+
assert job_info.output_file_uri is not None
315+
return EmrRetrievalJob(
316+
emr_client=self._emr_client(),
317+
job_ref=job_info.job_ref,
318+
output_file_uri=job_info.output_file_uri,
319+
)
320+
elif job_info.job_type == OFFLINE_TO_ONLINE_JOB_TYPE:
321+
return EmrBatchIngestionJob(
322+
emr_client=self._emr_client(), job_ref=job_info.job_ref,
323+
)
324+
elif job_info.job_type == STREAM_TO_ONLINE_JOB_TYPE:
325+
return EmrStreamIngestionJob(
326+
emr_client=self._emr_client(), job_ref=job_info.job_ref,
327+
)
328+
else:
329+
# We should never get here
330+
raise ValueError(f"Unknown job type {job_info.job_type}")
331+
332+
def list_jobs(self, include_terminated: bool) -> List[SparkJob]:
333+
"""
334+
Find EMR job by a string id.
335+
336+
Args:
337+
include_terminated: whether to include terminated jobs.
338+
339+
Returns:
340+
A list of SparkJob instances.
341+
"""
342+
343+
jobs = _list_jobs(
344+
emr_client=self._emr_client(),
345+
job_type=None,
346+
table_name=None,
347+
active_only=not include_terminated,
348+
)
349+
350+
result = []
351+
for job_info in jobs:
352+
result.append(self._job_from_job_info(job_info))
353+
return result
354+
355+
def get_job_by_id(self, job_id: str) -> SparkJob:
356+
"""
357+
Find EMR job by a string id. Note that it will also return terminated jobs.
358+
359+
Raises:
360+
KeyError if the job not found.
361+
"""
362+
# FIXME: this doesn't have to be a linear search but that'll do for now
363+
jobs = _list_jobs(
364+
emr_client=self._emr_client(),
365+
job_type=None,
366+
table_name=None,
367+
active_only=True,
368+
)
369+
370+
for job_info in jobs:
371+
if _job_ref_to_str(job_info.job_ref) == job_id:
372+
return self._job_from_job_info(job_info)
373+
else:
374+
raise KeyError(f"Job not found {job_id}")

sdk/python/feast/pyspark/launchers/aws/emr_utils.py

+28-16
Original file line numberDiff line numberDiff line change
@@ -165,15 +165,27 @@ def _sync_offline_to_online_step(
165165
}
166166

167167

168+
class EmrJobRef(NamedTuple):
169+
""" EMR job reference. step_id can be None when using on-demand clusters, in that case each
170+
cluster has only one step """
171+
172+
cluster_id: str
173+
step_id: Optional[str]
174+
175+
176+
def _job_ref_to_str(job_ref: EmrJobRef) -> str:
177+
return ":".join(["emr", job_ref.cluster_id, job_ref.step_id or ""])
178+
179+
168180
class JobInfo(NamedTuple):
181+
job_ref: EmrJobRef
169182
job_type: str
170-
cluster_id: str
171-
step_id: str
172-
table_name: str
173183
state: str
184+
table_name: Optional[str]
185+
output_file_uri: Optional[str]
174186

175187

176-
def list_jobs(
188+
def _list_jobs(
177189
emr_client, job_type: Optional[str], table_name: Optional[str], active_only=True
178190
) -> List[JobInfo]:
179191
"""
@@ -212,6 +224,10 @@ def list_jobs(
212224
) or props.get("feast.step_metadata.offline_to_online.table_name")
213225
step_job_type = props["feast.step_metadata.job_type"]
214226

227+
output_file_uri = props.get(
228+
"feast.step_metadata.historical_retrieval.output_file_uri"
229+
)
230+
215231
if table_name and step_table_name != table_name:
216232
continue
217233

@@ -221,32 +237,24 @@ def list_jobs(
221237
res.append(
222238
JobInfo(
223239
job_type=step_job_type,
224-
cluster_id=cluster_id,
225-
step_id=step["Id"],
240+
job_ref=EmrJobRef(cluster_id, step["Id"]),
226241
state=step["Status"]["State"],
227242
table_name=step_table_name,
243+
output_file_uri=output_file_uri,
228244
)
229245
)
230246
return res
231247

232248

233249
def _get_stream_to_online_job(emr_client, table_name: str) -> List[JobInfo]:
234-
return list_jobs(
250+
return _list_jobs(
235251
emr_client,
236252
job_type=STREAM_TO_ONLINE_JOB_TYPE,
237253
table_name=table_name,
238254
active_only=True,
239255
)
240256

241257

242-
class EmrJobRef(NamedTuple):
243-
""" EMR job reference. step_id can be None when using on-demand clusters, in that case each
244-
cluster has only one step """
245-
246-
cluster_id: str
247-
step_id: Optional[str]
248-
249-
250258
def _get_first_step_id(emr_client, cluster_id: str) -> str:
251259
response = emr_client.list_steps(ClusterId=cluster_id,)
252260
assert len(response["Steps"]) == 1
@@ -329,7 +337,7 @@ def _upload_dataframe(s3prefix: str, df: pandas.DataFrame) -> str:
329337

330338

331339
def _historical_retrieval_step(
332-
pyspark_script_path: str, args: List[str],
340+
pyspark_script_path: str, args: List[str], output_file_uri: str,
333341
) -> Dict[str, Any]:
334342

335343
return {
@@ -340,6 +348,10 @@ def _historical_retrieval_step(
340348
"Key": "feast.step_metadata.job_type",
341349
"Value": HISTORICAL_RETRIEVAL_JOB_TYPE,
342350
},
351+
{
352+
"Key": "feast.step_metadata.historical_retrieval.output_file_uri",
353+
"Value": output_file_uri,
354+
},
343355
],
344356
"Args": ["spark-submit", pyspark_script_path] + args,
345357
"Jar": "command-runner.jar",

sdk/python/feast/pyspark/launchers/gcloud/dataproc.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import os
22
import uuid
3-
from typing import cast
3+
from typing import List, cast
44
from urllib.parse import urlparse
55

66
from google.api_core.operation import Operation
@@ -14,6 +14,7 @@
1414
JobLauncher,
1515
RetrievalJob,
1616
RetrievalJobParameters,
17+
SparkJob,
1718
SparkJobFailure,
1819
SparkJobParameters,
1920
SparkJobStatus,
@@ -173,3 +174,9 @@ def stage_dataframe(
173174
self, df, event_timestamp_column: str, created_timestamp_column: str,
174175
):
175176
raise NotImplementedError
177+
178+
def get_job_by_id(self, job_id: str) -> SparkJob:
179+
raise NotImplementedError
180+
181+
def list_jobs(self, include_terminated: bool) -> List[SparkJob]:
182+
raise NotImplementedError

sdk/python/feast/pyspark/launchers/standalone/local.py

+8
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import subprocess
44
import uuid
55
from contextlib import closing
6+
from typing import List
67

78
import requests
89
from requests.exceptions import RequestException
@@ -13,6 +14,7 @@
1314
JobLauncher,
1415
RetrievalJob,
1516
RetrievalJobParameters,
17+
SparkJob,
1618
SparkJobFailure,
1719
SparkJobParameters,
1820
SparkJobStatus,
@@ -226,3 +228,9 @@ def stage_dataframe(
226228
self, df, event_timestamp_column: str, created_timestamp_column: str,
227229
):
228230
raise NotImplementedError
231+
232+
def get_job_by_id(self, job_id: str) -> SparkJob:
233+
raise NotImplementedError
234+
235+
def list_jobs(self, include_terminated: bool) -> List[SparkJob]:
236+
raise NotImplementedError

0 commit comments

Comments
 (0)