diff --git a/serving/pom.xml b/serving/pom.xml index c15881030e2..be573be45c5 100644 --- a/serving/pom.xml +++ b/serving/pom.xml @@ -250,6 +250,12 @@ com.fasterxml.jackson.dataformat jackson-dataformat-yaml + + + com.github.kstyrc + embedded-redis + test + diff --git a/serving/src/main/java/feast/serving/configuration/JobServiceConfig.java b/serving/src/main/java/feast/serving/configuration/JobServiceConfig.java index 2afbdaf90d1..4c6b652c46e 100644 --- a/serving/src/main/java/feast/serving/configuration/JobServiceConfig.java +++ b/serving/src/main/java/feast/serving/configuration/JobServiceConfig.java @@ -16,40 +16,55 @@ */ package feast.serving.configuration; -import feast.core.StoreProto.Store; -import feast.core.StoreProto.Store.RedisConfig; import feast.core.StoreProto.Store.StoreType; +import feast.serving.FeastProperties; import feast.serving.service.JobService; import feast.serving.service.NoopJobService; import feast.serving.service.RedisBackedJobService; import feast.serving.specs.CachedSpecService; +import java.util.Map; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPoolConfig; @Configuration public class JobServiceConfig { + public static final String DEFAULT_REDIS_MAX_CONN = "8"; + public static final String DEFAULT_REDIS_MAX_IDLE = "8"; + public static final String DEFAULT_REDIS_MAX_WAIT_MILLIS = "50"; + @Bean - public JobService jobService(Store jobStore, CachedSpecService specService) { + public JobService jobService(FeastProperties feastProperties, CachedSpecService specService) { if (!specService.getStore().getType().equals(StoreType.BIGQUERY)) { return new NoopJobService(); } - - switch (jobStore.getType()) { + StoreType storeType = StoreType.valueOf(feastProperties.getJobs().getStoreType()); + Map storeOptions = feastProperties.getJobs().getStoreOptions(); + switch (storeType) { case REDIS: - RedisConfig redisConfig = jobStore.getRedisConfig(); - Jedis jedis = new Jedis(redisConfig.getHost(), redisConfig.getPort()); - return new RedisBackedJobService(jedis); + JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); + jedisPoolConfig.setMaxTotal( + Integer.parseInt(storeOptions.getOrDefault("max-conn", DEFAULT_REDIS_MAX_CONN))); + jedisPoolConfig.setMaxIdle( + Integer.parseInt(storeOptions.getOrDefault("max-idle", DEFAULT_REDIS_MAX_IDLE))); + jedisPoolConfig.setMaxWaitMillis( + Integer.parseInt( + storeOptions.getOrDefault("max-wait-millis", DEFAULT_REDIS_MAX_WAIT_MILLIS))); + JedisPool jedisPool = + new JedisPool( + jedisPoolConfig, + storeOptions.get("host"), + Integer.parseInt(storeOptions.get("port"))); + return new RedisBackedJobService(jedisPool); case INVALID: case BIGQUERY: case CASSANDRA: case UNRECOGNIZED: default: throw new IllegalArgumentException( - String.format( - "Unsupported store type '%s' for job store name '%s'", - jobStore.getType(), jobStore.getName())); + String.format("Unsupported store type '%s' for job store", storeType)); } } } diff --git a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java index ea6dbc6ef71..3cc115978a3 100644 --- a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java +++ b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java @@ -22,12 +22,9 @@ import com.google.cloud.storage.StorageOptions; import feast.core.StoreProto.Store; import feast.core.StoreProto.Store.BigQueryConfig; -import feast.core.StoreProto.Store.Builder; import feast.core.StoreProto.Store.RedisConfig; -import feast.core.StoreProto.Store.StoreType; import feast.core.StoreProto.Store.Subscription; import feast.serving.FeastProperties; -import feast.serving.FeastProperties.JobProperties; import feast.serving.service.BigQueryServingService; import feast.serving.service.JobService; import feast.serving.service.NoopJobService; @@ -47,18 +44,6 @@ public class ServingServiceConfig { private static final Logger log = org.slf4j.LoggerFactory.getLogger(ServingServiceConfig.class); - @Bean(name = "JobStore") - public Store jobStoreDefinition(FeastProperties feastProperties) { - JobProperties jobProperties = feastProperties.getJobs(); - if (feastProperties.getJobs().getStoreType().equals("")) { - return Store.newBuilder().build(); - } - Map options = jobProperties.getStoreOptions(); - Builder storeDefinitionBuilder = - Store.newBuilder().setType(StoreType.valueOf(jobProperties.getStoreType())); - return setStoreConfig(storeDefinitionBuilder, options); - } - private Store setStoreConfig(Store.Builder builder, Map options) { switch (builder.getType()) { case REDIS: diff --git a/serving/src/main/java/feast/serving/service/BigQueryServingService.java b/serving/src/main/java/feast/serving/service/BigQueryServingService.java index 0743245d164..f23cbbe64ad 100644 --- a/serving/src/main/java/feast/serving/service/BigQueryServingService.java +++ b/serving/src/main/java/feast/serving/service/BigQueryServingService.java @@ -267,13 +267,12 @@ private TableId generateUUIDs(Table loadedEntityTable) { } private Job waitForJob(Job queryJob) throws InterruptedException { - Job completedJob = queryJob.waitFor( - RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs)), - RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs))); + Job completedJob = + queryJob.waitFor( + RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs)), + RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs))); if (completedJob == null) { - throw Status.INTERNAL - .withDescription("Job no longer exists") - .asRuntimeException(); + throw Status.INTERNAL.withDescription("Job no longer exists").asRuntimeException(); } else if (completedJob.getStatus().getError() != null) { throw Status.INTERNAL .withDescription("Job failed: " + completedJob.getStatus().getError()) diff --git a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java index 7bfce552254..230e20cd782 100644 --- a/serving/src/main/java/feast/serving/service/RedisBackedJobService.java +++ b/serving/src/main/java/feast/serving/service/RedisBackedJobService.java @@ -23,6 +23,8 @@ import org.joda.time.Duration; import org.slf4j.Logger; import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.exceptions.JedisConnectionException; // TODO: Do rate limiting, currently if clients call get() or upsert() // and an exceedingly high rate e.g. they wrap job reload in a while loop with almost no wait @@ -31,40 +33,53 @@ public class RedisBackedJobService implements JobService { private static final Logger log = org.slf4j.LoggerFactory.getLogger(RedisBackedJobService.class); - private final Jedis jedis; + private final JedisPool jedisPool; // Remove job state info after "defaultExpirySeconds" to prevent filling up Redis memory // and since users normally don't require info about relatively old jobs. private final int defaultExpirySeconds = (int) Duration.standardDays(1).getStandardSeconds(); - public RedisBackedJobService(Jedis jedis) { - this.jedis = jedis; + public RedisBackedJobService(JedisPool jedisPool) { + this.jedisPool = jedisPool; } @Override public Optional get(String id) { - String json = jedis.get(id); - if (json == null) { - return Optional.empty(); - } + Jedis jedis = null; Job job = null; - Builder builder = Job.newBuilder(); try { + jedis = jedisPool.getResource(); + String json = jedis.get(id); + if (json == null) { + return Optional.empty(); + } + Builder builder = Job.newBuilder(); JsonFormat.parser().merge(json, builder); job = builder.build(); + } catch (JedisConnectionException e) { + log.error(String.format("Failed to connect to the redis instance: %s", e)); } catch (Exception e) { log.error(String.format("Failed to parse JSON for Feast job: %s", e.getMessage())); + } finally { + if (jedis != null) { + jedis.close(); + } } - return Optional.ofNullable(job); } @Override public void upsert(Job job) { + Jedis jedis = null; try { + jedis = jedisPool.getResource(); jedis.set(job.getId(), JsonFormat.printer().omittingInsignificantWhitespace().print(job)); jedis.expire(job.getId(), defaultExpirySeconds); } catch (Exception e) { log.error(String.format("Failed to upsert job: %s", e.getMessage())); + } finally { + if (jedis != null) { + jedis.close(); + } } } } diff --git a/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java b/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java index a96c2bc8d73..61103af1092 100644 --- a/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java +++ b/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java @@ -336,13 +336,12 @@ private FieldValueList getTimestampLimits(String entityTableName) { } private Job waitForJob(Job queryJob) throws InterruptedException { - Job completedJob = queryJob.waitFor( - RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs())), - RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs()))); + Job completedJob = + queryJob.waitFor( + RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs())), + RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs()))); if (completedJob == null) { - throw Status.INTERNAL - .withDescription("Job no longer exists") - .asRuntimeException(); + throw Status.INTERNAL.withDescription("Job no longer exists").asRuntimeException(); } else if (completedJob.getStatus().getError() != null) { throw Status.INTERNAL .withDescription("Job failed: " + completedJob.getStatus().getError()) @@ -350,5 +349,4 @@ private Job waitForJob(Job queryJob) throws InterruptedException { } return completedJob; } - } diff --git a/serving/src/main/resources/application.yml b/serving/src/main/resources/application.yml index 072787492f0..96713c80287 100644 --- a/serving/src/main/resources/application.yml +++ b/serving/src/main/resources/application.yml @@ -47,6 +47,10 @@ feast: # store-options: # host: localhost # port: 6379 + # Optionally, you can configure the connection pool with the following items: + # max-conn: 8 + # max-idle: 8 + # max-wait-millis: 50 store-options: {} grpc: diff --git a/serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java b/serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java new file mode 100644 index 00000000000..9247375f59e --- /dev/null +++ b/serving/src/test/java/feast/serving/service/RedisBackedJobServiceTest.java @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.service; + +import java.io.IOException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPoolConfig; +import redis.embedded.RedisServer; + +public class RedisBackedJobServiceTest { + private static String REDIS_HOST = "localhost"; + private static int REDIS_PORT = 51235; + private RedisServer redis; + + @Before + public void setUp() throws IOException { + redis = new RedisServer(REDIS_PORT); + redis.start(); + } + + @After + public void teardown() { + redis.stop(); + } + + @Test + public void shouldRecoverIfRedisConnectionIsLost() { + JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); + jedisPoolConfig.setMaxTotal(1); + jedisPoolConfig.setMaxWaitMillis(10); + JedisPool jedisPool = new JedisPool(jedisPoolConfig, REDIS_HOST, REDIS_PORT); + RedisBackedJobService jobService = new RedisBackedJobService(jedisPool); + jobService.get("does not exist"); + redis.stop(); + try { + jobService.get("does not exist"); + } catch (Exception e) { + // pass, this should fail, and return a broken connection to the pool + } + redis.start(); + jobService.get("does not exist"); + } +}