diff --git a/serving/pom.xml b/serving/pom.xml
index c15881030e2..be573be45c5 100644
--- a/serving/pom.xml
+++ b/serving/pom.xml
@@ -250,6 +250,12 @@
com.fasterxml.jackson.dataformat
jackson-dataformat-yaml
+
+
+ com.github.kstyrc
+ embedded-redis
+ test
+
diff --git a/serving/src/main/java/feast/serving/configuration/JobServiceConfig.java b/serving/src/main/java/feast/serving/configuration/JobServiceConfig.java
index 2afbdaf90d1..4c6b652c46e 100644
--- a/serving/src/main/java/feast/serving/configuration/JobServiceConfig.java
+++ b/serving/src/main/java/feast/serving/configuration/JobServiceConfig.java
@@ -16,40 +16,55 @@
*/
package feast.serving.configuration;
-import feast.core.StoreProto.Store;
-import feast.core.StoreProto.Store.RedisConfig;
import feast.core.StoreProto.Store.StoreType;
+import feast.serving.FeastProperties;
import feast.serving.service.JobService;
import feast.serving.service.NoopJobService;
import feast.serving.service.RedisBackedJobService;
import feast.serving.specs.CachedSpecService;
+import java.util.Map;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
@Configuration
public class JobServiceConfig {
+ public static final String DEFAULT_REDIS_MAX_CONN = "8";
+ public static final String DEFAULT_REDIS_MAX_IDLE = "8";
+ public static final String DEFAULT_REDIS_MAX_WAIT_MILLIS = "50";
+
@Bean
- public JobService jobService(Store jobStore, CachedSpecService specService) {
+ public JobService jobService(FeastProperties feastProperties, CachedSpecService specService) {
if (!specService.getStore().getType().equals(StoreType.BIGQUERY)) {
return new NoopJobService();
}
-
- switch (jobStore.getType()) {
+ StoreType storeType = StoreType.valueOf(feastProperties.getJobs().getStoreType());
+ Map storeOptions = feastProperties.getJobs().getStoreOptions();
+ switch (storeType) {
case REDIS:
- RedisConfig redisConfig = jobStore.getRedisConfig();
- Jedis jedis = new Jedis(redisConfig.getHost(), redisConfig.getPort());
- return new RedisBackedJobService(jedis);
+ JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
+ jedisPoolConfig.setMaxTotal(
+ Integer.parseInt(storeOptions.getOrDefault("max-conn", DEFAULT_REDIS_MAX_CONN)));
+ jedisPoolConfig.setMaxIdle(
+ Integer.parseInt(storeOptions.getOrDefault("max-idle", DEFAULT_REDIS_MAX_IDLE)));
+ jedisPoolConfig.setMaxWaitMillis(
+ Integer.parseInt(
+ storeOptions.getOrDefault("max-wait-millis", DEFAULT_REDIS_MAX_WAIT_MILLIS)));
+ JedisPool jedisPool =
+ new JedisPool(
+ jedisPoolConfig,
+ storeOptions.get("host"),
+ Integer.parseInt(storeOptions.get("port")));
+ return new RedisBackedJobService(jedisPool);
case INVALID:
case BIGQUERY:
case CASSANDRA:
case UNRECOGNIZED:
default:
throw new IllegalArgumentException(
- String.format(
- "Unsupported store type '%s' for job store name '%s'",
- jobStore.getType(), jobStore.getName()));
+ String.format("Unsupported store type '%s' for job store", storeType));
}
}
}
diff --git a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java
index ea6dbc6ef71..3cc115978a3 100644
--- a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java
+++ b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java
@@ -22,12 +22,9 @@
import com.google.cloud.storage.StorageOptions;
import feast.core.StoreProto.Store;
import feast.core.StoreProto.Store.BigQueryConfig;
-import feast.core.StoreProto.Store.Builder;
import feast.core.StoreProto.Store.RedisConfig;
-import feast.core.StoreProto.Store.StoreType;
import feast.core.StoreProto.Store.Subscription;
import feast.serving.FeastProperties;
-import feast.serving.FeastProperties.JobProperties;
import feast.serving.service.BigQueryServingService;
import feast.serving.service.JobService;
import feast.serving.service.NoopJobService;
@@ -47,18 +44,6 @@ public class ServingServiceConfig {
private static final Logger log = org.slf4j.LoggerFactory.getLogger(ServingServiceConfig.class);
- @Bean(name = "JobStore")
- public Store jobStoreDefinition(FeastProperties feastProperties) {
- JobProperties jobProperties = feastProperties.getJobs();
- if (feastProperties.getJobs().getStoreType().equals("")) {
- return Store.newBuilder().build();
- }
- Map options = jobProperties.getStoreOptions();
- Builder storeDefinitionBuilder =
- Store.newBuilder().setType(StoreType.valueOf(jobProperties.getStoreType()));
- return setStoreConfig(storeDefinitionBuilder, options);
- }
-
private Store setStoreConfig(Store.Builder builder, Map options) {
switch (builder.getType()) {
case REDIS:
diff --git a/serving/src/main/java/feast/serving/service/BigQueryServingService.java b/serving/src/main/java/feast/serving/service/BigQueryServingService.java
index 0743245d164..f23cbbe64ad 100644
--- a/serving/src/main/java/feast/serving/service/BigQueryServingService.java
+++ b/serving/src/main/java/feast/serving/service/BigQueryServingService.java
@@ -267,13 +267,12 @@ private TableId generateUUIDs(Table loadedEntityTable) {
}
private Job waitForJob(Job queryJob) throws InterruptedException {
- Job completedJob = queryJob.waitFor(
- RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs)),
- RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs)));
+ Job completedJob =
+ queryJob.waitFor(
+ RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs)),
+ RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs)));
if (completedJob == null) {
- throw Status.INTERNAL
- .withDescription("Job no longer exists")
- .asRuntimeException();
+ throw Status.INTERNAL.withDescription("Job no longer exists").asRuntimeException();
} else if (completedJob.getStatus().getError() != null) {
throw Status.INTERNAL
.withDescription("Job failed: " + completedJob.getStatus().getError())
diff --git a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java
index 7bfce552254..230e20cd782 100644
--- a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java
+++ b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java
@@ -23,6 +23,8 @@
import org.joda.time.Duration;
import org.slf4j.Logger;
import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.exceptions.JedisConnectionException;
// TODO: Do rate limiting, currently if clients call get() or upsert()
// and an exceedingly high rate e.g. they wrap job reload in a while loop with almost no wait
@@ -31,40 +33,53 @@
public class RedisBackedJobService implements JobService {
private static final Logger log = org.slf4j.LoggerFactory.getLogger(RedisBackedJobService.class);
- private final Jedis jedis;
+ private final JedisPool jedisPool;
// Remove job state info after "defaultExpirySeconds" to prevent filling up Redis memory
// and since users normally don't require info about relatively old jobs.
private final int defaultExpirySeconds = (int) Duration.standardDays(1).getStandardSeconds();
- public RedisBackedJobService(Jedis jedis) {
- this.jedis = jedis;
+ public RedisBackedJobService(JedisPool jedisPool) {
+ this.jedisPool = jedisPool;
}
@Override
public Optional get(String id) {
- String json = jedis.get(id);
- if (json == null) {
- return Optional.empty();
- }
+ Jedis jedis = null;
Job job = null;
- Builder builder = Job.newBuilder();
try {
+ jedis = jedisPool.getResource();
+ String json = jedis.get(id);
+ if (json == null) {
+ return Optional.empty();
+ }
+ Builder builder = Job.newBuilder();
JsonFormat.parser().merge(json, builder);
job = builder.build();
+ } catch (JedisConnectionException e) {
+ log.error(String.format("Failed to connect to the redis instance: %s", e));
} catch (Exception e) {
log.error(String.format("Failed to parse JSON for Feast job: %s", e.getMessage()));
+ } finally {
+ if (jedis != null) {
+ jedis.close();
+ }
}
-
return Optional.ofNullable(job);
}
@Override
public void upsert(Job job) {
+ Jedis jedis = null;
try {
+ jedis = jedisPool.getResource();
jedis.set(job.getId(), JsonFormat.printer().omittingInsignificantWhitespace().print(job));
jedis.expire(job.getId(), defaultExpirySeconds);
} catch (Exception e) {
log.error(String.format("Failed to upsert job: %s", e.getMessage()));
+ } finally {
+ if (jedis != null) {
+ jedis.close();
+ }
}
}
}
diff --git a/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java b/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java
index a96c2bc8d73..61103af1092 100644
--- a/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java
+++ b/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java
@@ -336,13 +336,12 @@ private FieldValueList getTimestampLimits(String entityTableName) {
}
private Job waitForJob(Job queryJob) throws InterruptedException {
- Job completedJob = queryJob.waitFor(
- RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs())),
- RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs())));
+ Job completedJob =
+ queryJob.waitFor(
+ RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs())),
+ RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs())));
if (completedJob == null) {
- throw Status.INTERNAL
- .withDescription("Job no longer exists")
- .asRuntimeException();
+ throw Status.INTERNAL.withDescription("Job no longer exists").asRuntimeException();
} else if (completedJob.getStatus().getError() != null) {
throw Status.INTERNAL
.withDescription("Job failed: " + completedJob.getStatus().getError())
@@ -350,5 +349,4 @@ private Job waitForJob(Job queryJob) throws InterruptedException {
}
return completedJob;
}
-
}
diff --git a/serving/src/main/resources/application.yml b/serving/src/main/resources/application.yml
index 072787492f0..96713c80287 100644
--- a/serving/src/main/resources/application.yml
+++ b/serving/src/main/resources/application.yml
@@ -47,6 +47,10 @@ feast:
# store-options:
# host: localhost
# port: 6379
+ # Optionally, you can configure the connection pool with the following items:
+ # max-conn: 8
+ # max-idle: 8
+ # max-wait-millis: 50
store-options: {}
grpc:
diff --git a/serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java b/serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java
new file mode 100644
index 00000000000..9247375f59e
--- /dev/null
+++ b/serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java
@@ -0,0 +1,60 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.serving.service;
+
+import java.io.IOException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.embedded.RedisServer;
+
+public class RedisBackedJobServiceTest {
+ private static String REDIS_HOST = "localhost";
+ private static int REDIS_PORT = 51235;
+ private RedisServer redis;
+
+ @Before
+ public void setUp() throws IOException {
+ redis = new RedisServer(REDIS_PORT);
+ redis.start();
+ }
+
+ @After
+ public void teardown() {
+ redis.stop();
+ }
+
+ @Test
+ public void shouldRecoverIfRedisConnectionIsLost() {
+ JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
+ jedisPoolConfig.setMaxTotal(1);
+ jedisPoolConfig.setMaxWaitMillis(10);
+ JedisPool jedisPool = new JedisPool(jedisPoolConfig, REDIS_HOST, REDIS_PORT);
+ RedisBackedJobService jobService = new RedisBackedJobService(jedisPool);
+ jobService.get("does not exist");
+ redis.stop();
+ try {
+ jobService.get("does not exist");
+ } catch (Exception e) {
+ // pass, this should fail, and return a broken connection to the pool
+ }
+ redis.start();
+ jobService.get("does not exist");
+ }
+}