Skip to content

Commit f2d4db5

Browse files
Chen Zhilingfeast-ci-bot
Chen Zhiling
authored andcommitted
Async job management (#361)
* Async job management * Add feature set status for safe ingestion * Make job update timeout configurable * Set job update timeout * Change JobInfo to Job, move Job object instantiation to JobManagers * Increase kafka wait time * Remove Info from method * Update feature set yamls to follow new format * Change toSpec to toProto, refactor job start signature, change Apply to take full FeatureSet object * Erase all traces of jobInfo * Remove status and created timestamp from constructor * Remove queue buffer limit
1 parent bc4b974 commit f2d4db5

File tree

58 files changed

+1931
-1473
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1931
-1473
lines changed

.prow/scripts/test-end-to-end-batch.sh

+4-2
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ nohup /tmp/kafka/bin/zookeeper-server-start.sh /tmp/kafka/config/zookeeper.prope
7676
sleep 5
7777
tail -n10 /var/log/zookeeper.log
7878
nohup /tmp/kafka/bin/kafka-server-start.sh /tmp/kafka/config/server.properties &> /var/log/kafka.log 2>&1 &
79-
sleep 10
79+
sleep 20
8080
tail -n10 /var/log/kafka.log
8181

8282
echo "
@@ -108,6 +108,8 @@ feast:
108108
jobs:
109109
runner: DirectRunner
110110
options: {}
111+
updates:
112+
timeoutSeconds: 240
111113
metrics:
112114
enabled: false
113115
@@ -141,7 +143,7 @@ EOF
141143
nohup java -jar core/target/feast-core-0.3.2-SNAPSHOT.jar \
142144
--spring.config.location=file:///tmp/core.application.yml \
143145
&> /var/log/feast-core.log &
144-
sleep 30
146+
sleep 35
145147
tail -n10 /var/log/feast-core.log
146148
echo "
147149
============================================================

.prow/scripts/test-end-to-end.sh

+4-2
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ nohup /tmp/kafka/bin/zookeeper-server-start.sh /tmp/kafka/config/zookeeper.prope
5959
sleep 5
6060
tail -n10 /var/log/zookeeper.log
6161
nohup /tmp/kafka/bin/kafka-server-start.sh /tmp/kafka/config/server.properties &> /var/log/kafka.log 2>&1 &
62-
sleep 10
62+
sleep 20
6363
tail -n10 /var/log/kafka.log
6464

6565
echo "
@@ -91,6 +91,8 @@ feast:
9191
jobs:
9292
runner: DirectRunner
9393
options: {}
94+
updates:
95+
timeoutSeconds: 240
9496
metrics:
9597
enabled: false
9698
@@ -124,7 +126,7 @@ EOF
124126
nohup java -jar core/target/feast-core-0.3.2-SNAPSHOT.jar \
125127
--spring.config.location=file:///tmp/core.application.yml \
126128
&> /var/log/feast-core.log &
127-
sleep 30
129+
sleep 35
128130
tail -n10 /var/log/feast-core.log
129131

130132
echo "

core/src/main/java/feast/core/config/FeastProperties.java

+8
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@ public static class JobProperties {
3737
private String runner;
3838
private Map<String, String> options;
3939
private MetricsProperties metrics;
40+
private JobUpdatesProperties updates;
41+
}
42+
43+
@Getter
44+
@Setter
45+
public static class JobUpdatesProperties {
46+
47+
private long timeoutSeconds;
4048
}
4149

4250
@Getter

core/src/main/java/feast/core/config/JobConfig.java

+8-48
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,12 @@
2323
import com.google.api.services.dataflow.DataflowScopes;
2424
import com.google.common.base.Strings;
2525
import feast.core.config.FeastProperties.JobProperties;
26+
import feast.core.config.FeastProperties.JobUpdatesProperties;
2627
import feast.core.job.JobManager;
27-
import feast.core.job.JobMonitor;
28-
import feast.core.job.NoopJobMonitor;
2928
import feast.core.job.Runner;
3029
import feast.core.job.dataflow.DataflowJobManager;
31-
import feast.core.job.dataflow.DataflowJobMonitor;
3230
import feast.core.job.direct.DirectJobRegistry;
3331
import feast.core.job.direct.DirectRunnerJobManager;
34-
import feast.core.job.direct.DirectRunnerJobMonitor;
3532
import java.io.IOException;
3633
import java.security.GeneralSecurityException;
3734
import java.util.HashMap;
@@ -54,7 +51,7 @@ public class JobConfig {
5451
@Bean
5552
@Autowired
5653
public JobManager getJobManager(
57-
FeastProperties feastProperties, DirectJobRegistry directJobRegistry) throws Exception {
54+
FeastProperties feastProperties, DirectJobRegistry directJobRegistry) {
5855

5956
JobProperties jobProperties = feastProperties.getJobs();
6057
Runner runner = Runner.fromString(jobProperties.getRunner());
@@ -97,52 +94,15 @@ public JobManager getJobManager(
9794
}
9895
}
9996

100-
/** Get a Job Monitor given the runner type and dataflow configuration. */
101-
@Bean
102-
public JobMonitor getJobMonitor(
103-
FeastProperties feastProperties, DirectJobRegistry directJobRegistry) throws Exception {
104-
105-
JobProperties jobProperties = feastProperties.getJobs();
106-
Runner runner = Runner.fromString(jobProperties.getRunner());
107-
Map<String, String> jobOptions = jobProperties.getOptions();
108-
109-
switch (runner) {
110-
case DATAFLOW:
111-
if (Strings.isNullOrEmpty(jobOptions.getOrDefault("region", null))
112-
|| Strings.isNullOrEmpty(jobOptions.getOrDefault("project", null))) {
113-
log.warn(
114-
"Project and location of the Dataflow runner is not configured, will not do job monitoring");
115-
return new NoopJobMonitor();
116-
}
117-
try {
118-
GoogleCredential credential =
119-
GoogleCredential.getApplicationDefault().createScoped(DataflowScopes.all());
120-
Dataflow dataflow =
121-
new Dataflow(
122-
GoogleNetHttpTransport.newTrustedTransport(),
123-
JacksonFactory.getDefaultInstance(),
124-
credential);
125-
126-
return new DataflowJobMonitor(
127-
dataflow, jobOptions.get("project"), jobOptions.get("region"));
128-
} catch (IOException e) {
129-
log.error(
130-
"Unable to find credential required for Dataflow monitoring API: {}", e.getMessage());
131-
} catch (GeneralSecurityException e) {
132-
log.error("Security exception while ");
133-
} catch (Exception e) {
134-
log.error("Unable to initialize DataflowJobMonitor", e);
135-
}
136-
case DIRECT:
137-
return new DirectRunnerJobMonitor(directJobRegistry);
138-
default:
139-
return new NoopJobMonitor();
140-
}
141-
}
142-
14397
/** Get a direct job registry */
14498
@Bean
14599
public DirectJobRegistry directJobRegistry() {
146100
return new DirectJobRegistry();
147101
}
102+
103+
/** Extracts job update options from feast core options. */
104+
@Bean
105+
public JobUpdatesProperties jobUpdatesProperties(FeastProperties feastProperties) {
106+
return feastProperties.getJobs().getUpdates();
107+
}
148108
}

core/src/main/java/feast/core/dao/FeatureSetRepository.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ public interface FeatureSetRepository extends JpaRepository<FeatureSet, String>
3636
List<FeatureSet> findByName(String name);
3737

3838
// find all versions of featureSets with names matching the regex
39-
@Query(nativeQuery = true, value = "SELECT * FROM feature_sets "
40-
+ "WHERE name LIKE ?1 ORDER BY name ASC, version ASC")
39+
@Query(
40+
nativeQuery = true,
41+
value = "SELECT * FROM feature_sets " + "WHERE name LIKE ?1 ORDER BY name ASC, version ASC")
4142
List<FeatureSet> findByNameWithWildcardOrderByNameAscVersionAsc(String name);
4243

4344
// find all feature sets and order by name and version
4445
List<FeatureSet> findAllByOrderByNameAscVersionAsc();
45-
4646
}

core/src/main/java/feast/core/dao/JobInfoRepository.java core/src/main/java/feast/core/dao/JobRepository.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,17 @@
1616
*/
1717
package feast.core.dao;
1818

19-
import feast.core.model.JobInfo;
19+
import feast.core.model.Job;
2020
import feast.core.model.JobStatus;
2121
import java.util.Collection;
2222
import java.util.List;
2323
import org.springframework.data.jpa.repository.JpaRepository;
2424
import org.springframework.stereotype.Repository;
2525

26-
/** JPA repository supplying JobInfo objects keyed by ID. */
26+
/** JPA repository supplying Job objects keyed by ID. */
2727
@Repository
28-
public interface JobInfoRepository extends JpaRepository<JobInfo, String> {
29-
List<JobInfo> findByStatusNotIn(Collection<JobStatus> statuses);
28+
public interface JobRepository extends JpaRepository<Job, String> {
29+
List<Job> findByStatusNotIn(Collection<JobStatus> statuses);
3030

31-
List<JobInfo> findBySourceIdAndStoreName(String sourceId, String storeName);
31+
List<Job> findBySourceIdAndStoreNameOrderByLastUpdatedDesc(String sourceId, String storeName);
3232
}

core/src/main/java/feast/core/dao/MetricsRepository.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,5 @@
2323

2424
@Repository
2525
public interface MetricsRepository extends JpaRepository<Metrics, Long> {
26-
List<Metrics> findByJobInfo_Id(String id);
26+
List<Metrics> findByJob_Id(String id);
2727
}

core/src/main/java/feast/core/job/NoopJobMonitor.java core/src/main/java/feast/core/dao/SourceRepository.java

+5-10
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,10 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package feast.core.job;
17+
package feast.core.dao;
1818

19-
import feast.core.model.JobInfo;
20-
import feast.core.model.JobStatus;
19+
import feast.core.model.Source;
20+
import org.springframework.data.jpa.repository.JpaRepository;
2121

22-
public class NoopJobMonitor implements JobMonitor {
23-
24-
@Override
25-
public JobStatus getJobStatus(JobInfo job) {
26-
return JobStatus.UNKNOWN;
27-
}
28-
}
22+
/** JPA repository supplying Source objects keyed by id. */
23+
public interface SourceRepository extends JpaRepository<Source, String> {}

core/src/main/java/feast/core/grpc/CoreServiceImpl.java

+2-67
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package feast.core.grpc;
1818

19-
import com.google.common.collect.Lists;
2019
import com.google.protobuf.InvalidProtocolBufferException;
2120
import feast.core.CoreServiceGrpc.CoreServiceImplBase;
2221
import feast.core.CoreServiceProto.ApplyFeatureSetRequest;
@@ -28,43 +27,28 @@
2827
import feast.core.CoreServiceProto.ListFeatureSetsRequest;
2928
import feast.core.CoreServiceProto.ListFeatureSetsResponse;
3029
import feast.core.CoreServiceProto.ListStoresRequest;
31-
import feast.core.CoreServiceProto.ListStoresRequest.Filter;
3230
import feast.core.CoreServiceProto.ListStoresResponse;
3331
import feast.core.CoreServiceProto.UpdateStoreRequest;
3432
import feast.core.CoreServiceProto.UpdateStoreResponse;
35-
import feast.core.CoreServiceProto.UpdateStoreResponse.Status;
36-
import feast.core.FeatureSetProto.FeatureSetSpec;
37-
import feast.core.SourceProto;
38-
import feast.core.StoreProto.Store;
39-
import feast.core.StoreProto.Store.Subscription;
4033
import feast.core.exception.RetrievalException;
4134
import feast.core.grpc.interceptors.MonitoringInterceptor;
42-
import feast.core.service.JobCoordinatorService;
4335
import feast.core.service.SpecService;
4436
import io.grpc.StatusRuntimeException;
4537
import io.grpc.stub.StreamObserver;
46-
import java.util.HashSet;
47-
import java.util.Set;
48-
import java.util.stream.Collectors;
4938
import lombok.extern.slf4j.Slf4j;
5039
import org.lognet.springboot.grpc.GRpcService;
5140
import org.springframework.beans.factory.annotation.Autowired;
52-
import org.springframework.transaction.annotation.Transactional;
5341

54-
/**
55-
* Implementation of the feast core GRPC service.
56-
*/
42+
/** Implementation of the feast core GRPC service. */
5743
@Slf4j
5844
@GRpcService(interceptors = {MonitoringInterceptor.class})
5945
public class CoreServiceImpl extends CoreServiceImplBase {
6046

6147
private SpecService specService;
62-
private JobCoordinatorService jobCoordinatorService;
6348

6449
@Autowired
65-
public CoreServiceImpl(SpecService specService, JobCoordinatorService jobCoordinatorService) {
50+
public CoreServiceImpl(SpecService specService) {
6651
this.specService = specService;
67-
this.jobCoordinatorService = jobCoordinatorService;
6852
}
6953

7054
@Override
@@ -118,31 +102,6 @@ public void applyFeatureSet(
118102
ApplyFeatureSetRequest request, StreamObserver<ApplyFeatureSetResponse> responseObserver) {
119103
try {
120104
ApplyFeatureSetResponse response = specService.applyFeatureSet(request.getFeatureSet());
121-
ListStoresResponse stores = specService.listStores(Filter.newBuilder().build());
122-
for (Store store : stores.getStoreList()) {
123-
Set<FeatureSetSpec> featureSetSpecs = new HashSet<>();
124-
for (Subscription subscription : store.getSubscriptionsList()) {
125-
featureSetSpecs.addAll(
126-
specService
127-
.listFeatureSets(
128-
ListFeatureSetsRequest.Filter.newBuilder()
129-
.setFeatureSetName(subscription.getName())
130-
.setFeatureSetVersion(subscription.getVersion())
131-
.build())
132-
.getFeatureSetsList());
133-
}
134-
if (!featureSetSpecs.isEmpty() && featureSetSpecs.contains(response.getFeatureSet())) {
135-
// We use the response featureSet source because it contains the information
136-
// about whether to default to the default feature stream or not
137-
SourceProto.Source source = response.getFeatureSet().getSource();
138-
featureSetSpecs =
139-
featureSetSpecs.stream()
140-
.filter(fs -> fs.getSource().equals(source))
141-
.collect(Collectors.toSet());
142-
jobCoordinatorService.startOrUpdateJob(
143-
Lists.newArrayList(featureSetSpecs), source, store);
144-
}
145-
}
146105
responseObserver.onNext(response);
147106
responseObserver.onCompleted();
148107
} catch (Exception e) {
@@ -158,30 +117,6 @@ public void updateStore(
158117
UpdateStoreResponse response = specService.updateStore(request);
159118
responseObserver.onNext(response);
160119
responseObserver.onCompleted();
161-
162-
if (!response.getStatus().equals(Status.NO_CHANGE)) {
163-
Set<FeatureSetSpec> featureSetSpecs = new HashSet<>();
164-
Store store = response.getStore();
165-
for (Subscription subscription : store.getSubscriptionsList()) {
166-
featureSetSpecs.addAll(
167-
specService
168-
.listFeatureSets(
169-
ListFeatureSetsRequest.Filter.newBuilder()
170-
.setFeatureSetName(subscription.getName())
171-
.setFeatureSetVersion(subscription.getVersion())
172-
.build())
173-
.getFeatureSetsList());
174-
}
175-
if (featureSetSpecs.size() == 0) {
176-
return;
177-
}
178-
featureSetSpecs.stream()
179-
.collect(Collectors.groupingBy(FeatureSetSpec::getSource))
180-
.entrySet()
181-
.stream()
182-
.forEach(
183-
kv -> jobCoordinatorService.startOrUpdateJob(kv.getValue(), kv.getKey(), store));
184-
}
185120
} catch (Exception e) {
186121
log.error("Exception has occurred in UpdateStore method: ", e);
187122
responseObserver.onError(e);

core/src/main/java/feast/core/job/JobManager.java

+16-12
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@
1616
*/
1717
package feast.core.job;
1818

19-
import feast.core.FeatureSetProto.FeatureSetSpec;
20-
import feast.core.StoreProto.Store;
21-
import feast.core.model.JobInfo;
22-
import java.util.List;
19+
import feast.core.model.Job;
20+
import feast.core.model.JobStatus;
2321

2422
public interface JobManager {
2523

@@ -33,25 +31,31 @@ public interface JobManager {
3331
/**
3432
* Start an import job.
3533
*
36-
* @param name of job to run
37-
* @param featureSets list of featureSets to be populated by the job
38-
* @param sink Store to sink features to
39-
* @return runner specific job id
34+
* @param job job to start
35+
* @return Job
4036
*/
41-
String startJob(String name, List<FeatureSetSpec> featureSets, Store sink);
37+
Job startJob(Job job);
4238

4339
/**
4440
* Update already running job with new set of features to ingest.
4541
*
46-
* @param jobInfo jobInfo of target job to change
47-
* @return job runner specific job id
42+
* @param job job of target job to change
43+
* @return Job
4844
*/
49-
String updateJob(JobInfo jobInfo);
45+
Job updateJob(Job job);
5046

5147
/**
5248
* Abort a job given runner-specific job ID.
5349
*
5450
* @param extId runner specific job id.
5551
*/
5652
void abortJob(String extId);
53+
54+
/**
55+
* Get status of a job given runner-specific job ID.
56+
*
57+
* @param job job.
58+
* @return job status.
59+
*/
60+
JobStatus getJobStatus(Job job);
5761
}

0 commit comments

Comments
 (0)