Skip to content

Commit d579424

Browse files
author
zhilingc
committed
Add uniqueness constraint to FeatureSets, fix tests
1 parent fd9a9d7 commit d579424

File tree

6 files changed

+23
-11
lines changed

6 files changed

+23
-11
lines changed

core/src/main/java/feast/core/job/JobUpdateTask.java

+3-9
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package feast.core.job;
1818

19+
import com.google.common.collect.Sets;
1920
import feast.core.log.Action;
2021
import feast.core.log.AuditLogger;
2122
import feast.core.log.Resource;
@@ -35,7 +36,6 @@
3536
import java.util.concurrent.Future;
3637
import java.util.concurrent.TimeUnit;
3738
import java.util.concurrent.TimeoutException;
38-
import java.util.stream.Collectors;
3939
import lombok.Getter;
4040
import lombok.extern.slf4j.Slf4j;
4141

@@ -102,14 +102,8 @@ public Job call() {
102102
}
103103

104104
boolean featureSetsChangedFor(Job job) {
105-
Set<String> existingFeatureSetsPopulatedByJob =
106-
job.getFeatureSets().stream()
107-
.map(fs -> fs.getProject() + "/" + fs.getName())
108-
.collect(Collectors.toSet());
109-
Set<String> newFeatureSetsPopulatedByJob =
110-
featureSets.stream()
111-
.map(fs -> fs.getProject() + "/" + fs.getName())
112-
.collect(Collectors.toSet());
105+
Set<FeatureSet> existingFeatureSetsPopulatedByJob = Sets.newHashSet(job.getFeatureSets());
106+
Set<FeatureSet> newFeatureSetsPopulatedByJob = Sets.newHashSet(featureSets);
113107

114108
return !newFeatureSetsPopulatedByJob.equals(existingFeatureSetsPopulatedByJob);
115109
}

core/src/main/java/feast/core/model/Entity.java

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
public class Entity {
3333
@EmbeddedId private EntityReference reference;
3434

35+
/** Data type of the entity. String representation of {@link ValueType} * */
3536
private String type;
3637

3738
public Entity() {}

core/src/main/java/feast/core/model/FeatureSet.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
@Getter
3434
@Setter
3535
@javax.persistence.Entity
36-
@Table(name = "feature_sets")
36+
@Table(
37+
name = "feature_sets",
38+
uniqueConstraints = @UniqueConstraint(columnNames = {"name", "version", "project_name"}))
3739
public class FeatureSet extends AbstractTimestampEntity implements Comparable<FeatureSet> {
3840

3941
// Id of the featureSet, defined as project/feature_set_name:feature_set_version

core/src/main/java/feast/core/service/JobCoordinatorService.java

+1
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ public Optional<Job> getJob(Source source, Store store) {
195195
return Optional.of(jobs.get(0));
196196
}
197197

198+
// TODO: optimize this to make less calls to the database.
198199
private List<FeatureSet> featureSetsFromProto(List<FeatureSetProto.FeatureSet> protos) {
199200
return protos.stream()
200201
.map(FeatureSetProto.FeatureSet::getSpec)

core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java

+14
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.mockito.Mockito.when;
2626
import static org.mockito.MockitoAnnotations.initMocks;
2727

28+
import com.google.common.collect.Lists;
2829
import com.google.protobuf.InvalidProtocolBufferException;
2930
import feast.core.CoreServiceProto.ListFeatureSetsRequest.Filter;
3031
import feast.core.CoreServiceProto.ListFeatureSetsResponse;
@@ -194,6 +195,13 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep
194195
when(specService.listStores(any()))
195196
.thenReturn(ListStoresResponse.newBuilder().addStore(store).build());
196197

198+
for (FeatureSetProto.FeatureSet fs : Lists.newArrayList(featureSet1, featureSet2)) {
199+
FeatureSetSpec spec = fs.getSpec();
200+
when(featureSetRepository.findFeatureSetByNameAndProject_NameAndVersion(
201+
spec.getName(), spec.getProject(), spec.getVersion()))
202+
.thenReturn(FeatureSet.fromProto(fs));
203+
}
204+
197205
when(jobManager.startJob(argThat(new JobMatcher(expectedInput)))).thenReturn(expected);
198206
when(jobManager.getRunnerType()).thenReturn(Runner.DATAFLOW);
199207

@@ -318,6 +326,12 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException {
318326
when(jobManager.startJob(argThat(new JobMatcher(expectedInput1)))).thenReturn(expected1);
319327
when(jobManager.startJob(argThat(new JobMatcher(expectedInput2)))).thenReturn(expected2);
320328
when(jobManager.getRunnerType()).thenReturn(Runner.DATAFLOW);
329+
for (FeatureSetProto.FeatureSet fs : Lists.newArrayList(featureSet1, featureSet2)) {
330+
FeatureSetSpec spec = fs.getSpec();
331+
when(featureSetRepository.findFeatureSetByNameAndProject_NameAndVersion(
332+
spec.getName(), spec.getProject(), spec.getVersion()))
333+
.thenReturn(FeatureSet.fromProto(fs));
334+
}
321335

322336
JobCoordinatorService jcs =
323337
new JobCoordinatorService(

protos/feast/core/FeatureSet.proto

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ message EntitySpec {
6969
// Name of the entity.
7070
string name = 1;
7171

72-
// Value type of the feature.
72+
// Value type of the entity.
7373
feast.types.ValueType.Enum value_type = 2;
7474
}
7575

0 commit comments

Comments
 (0)