Skip to content

Commit dcaccaa

Browse files
Chen ZhilingShu Heng
Chen Zhiling
authored and
Shu Heng
committed
Change RedisBackedJobService to use a connection pool (#439)
* Change job service connection to use a connection pool * Close connection * Remove isConnected condition * Add simple test to test that pool recovers broken connections * Catch JedisExceptions separately * Catch only JedisConnectionException
1 parent 17828c5 commit dcaccaa

File tree

8 files changed

+131
-49
lines changed

8 files changed

+131
-49
lines changed

serving/pom.xml

+6
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,12 @@
250250
<groupId>com.fasterxml.jackson.dataformat</groupId>
251251
<artifactId>jackson-dataformat-yaml</artifactId>
252252
</dependency>
253+
254+
<dependency>
255+
<groupId>com.github.kstyrc</groupId>
256+
<artifactId>embedded-redis</artifactId>
257+
<scope>test</scope>
258+
</dependency>
253259
</dependencies>
254260

255261
<profiles>

serving/src/main/java/feast/serving/configuration/JobServiceConfig.java

+27-12
Original file line numberDiff line numberDiff line change
@@ -16,40 +16,55 @@
1616
*/
1717
package feast.serving.configuration;
1818

19-
import feast.core.StoreProto.Store;
20-
import feast.core.StoreProto.Store.RedisConfig;
2119
import feast.core.StoreProto.Store.StoreType;
20+
import feast.serving.FeastProperties;
2221
import feast.serving.service.JobService;
2322
import feast.serving.service.NoopJobService;
2423
import feast.serving.service.RedisBackedJobService;
2524
import feast.serving.specs.CachedSpecService;
25+
import java.util.Map;
2626
import org.springframework.context.annotation.Bean;
2727
import org.springframework.context.annotation.Configuration;
28-
import redis.clients.jedis.Jedis;
28+
import redis.clients.jedis.JedisPool;
29+
import redis.clients.jedis.JedisPoolConfig;
2930

3031
@Configuration
3132
public class JobServiceConfig {
3233

34+
public static final String DEFAULT_REDIS_MAX_CONN = "8";
35+
public static final String DEFAULT_REDIS_MAX_IDLE = "8";
36+
public static final String DEFAULT_REDIS_MAX_WAIT_MILLIS = "50";
37+
3338
@Bean
34-
public JobService jobService(Store jobStore, CachedSpecService specService) {
39+
public JobService jobService(FeastProperties feastProperties, CachedSpecService specService) {
3540
if (!specService.getStore().getType().equals(StoreType.BIGQUERY)) {
3641
return new NoopJobService();
3742
}
38-
39-
switch (jobStore.getType()) {
43+
StoreType storeType = StoreType.valueOf(feastProperties.getJobs().getStoreType());
44+
Map<String, String> storeOptions = feastProperties.getJobs().getStoreOptions();
45+
switch (storeType) {
4046
case REDIS:
41-
RedisConfig redisConfig = jobStore.getRedisConfig();
42-
Jedis jedis = new Jedis(redisConfig.getHost(), redisConfig.getPort());
43-
return new RedisBackedJobService(jedis);
47+
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
48+
jedisPoolConfig.setMaxTotal(
49+
Integer.parseInt(storeOptions.getOrDefault("max-conn", DEFAULT_REDIS_MAX_CONN)));
50+
jedisPoolConfig.setMaxIdle(
51+
Integer.parseInt(storeOptions.getOrDefault("max-idle", DEFAULT_REDIS_MAX_IDLE)));
52+
jedisPoolConfig.setMaxWaitMillis(
53+
Integer.parseInt(
54+
storeOptions.getOrDefault("max-wait-millis", DEFAULT_REDIS_MAX_WAIT_MILLIS)));
55+
JedisPool jedisPool =
56+
new JedisPool(
57+
jedisPoolConfig,
58+
storeOptions.get("host"),
59+
Integer.parseInt(storeOptions.get("port")));
60+
return new RedisBackedJobService(jedisPool);
4461
case INVALID:
4562
case BIGQUERY:
4663
case CASSANDRA:
4764
case UNRECOGNIZED:
4865
default:
4966
throw new IllegalArgumentException(
50-
String.format(
51-
"Unsupported store type '%s' for job store name '%s'",
52-
jobStore.getType(), jobStore.getName()));
67+
String.format("Unsupported store type '%s' for job store", storeType));
5368
}
5469
}
5570
}

serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java

-15
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,9 @@
2222
import com.google.cloud.storage.StorageOptions;
2323
import feast.core.StoreProto.Store;
2424
import feast.core.StoreProto.Store.BigQueryConfig;
25-
import feast.core.StoreProto.Store.Builder;
2625
import feast.core.StoreProto.Store.RedisConfig;
27-
import feast.core.StoreProto.Store.StoreType;
2826
import feast.core.StoreProto.Store.Subscription;
2927
import feast.serving.FeastProperties;
30-
import feast.serving.FeastProperties.JobProperties;
3128
import feast.serving.service.BigQueryServingService;
3229
import feast.serving.service.JobService;
3330
import feast.serving.service.NoopJobService;
@@ -47,18 +44,6 @@ public class ServingServiceConfig {
4744

4845
private static final Logger log = org.slf4j.LoggerFactory.getLogger(ServingServiceConfig.class);
4946

50-
@Bean(name = "JobStore")
51-
public Store jobStoreDefinition(FeastProperties feastProperties) {
52-
JobProperties jobProperties = feastProperties.getJobs();
53-
if (feastProperties.getJobs().getStoreType().equals("")) {
54-
return Store.newBuilder().build();
55-
}
56-
Map<String, String> options = jobProperties.getStoreOptions();
57-
Builder storeDefinitionBuilder =
58-
Store.newBuilder().setType(StoreType.valueOf(jobProperties.getStoreType()));
59-
return setStoreConfig(storeDefinitionBuilder, options);
60-
}
61-
6247
private Store setStoreConfig(Store.Builder builder, Map<String, String> options) {
6348
switch (builder.getType()) {
6449
case REDIS:

serving/src/main/java/feast/serving/service/BigQueryServingService.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -267,13 +267,12 @@ private TableId generateUUIDs(Table loadedEntityTable) {
267267
}
268268

269269
private Job waitForJob(Job queryJob) throws InterruptedException {
270-
Job completedJob = queryJob.waitFor(
271-
RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs)),
272-
RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs)));
270+
Job completedJob =
271+
queryJob.waitFor(
272+
RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs)),
273+
RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs)));
273274
if (completedJob == null) {
274-
throw Status.INTERNAL
275-
.withDescription("Job no longer exists")
276-
.asRuntimeException();
275+
throw Status.INTERNAL.withDescription("Job no longer exists").asRuntimeException();
277276
} else if (completedJob.getStatus().getError() != null) {
278277
throw Status.INTERNAL
279278
.withDescription("Job failed: " + completedJob.getStatus().getError())

serving/src/main/java/feast/serving/service/RedisBackedJobService.java

+24-9
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.joda.time.Duration;
2424
import org.slf4j.Logger;
2525
import redis.clients.jedis.Jedis;
26+
import redis.clients.jedis.JedisPool;
27+
import redis.clients.jedis.exceptions.JedisConnectionException;
2628

2729
// TODO: Do rate limiting, currently if clients call get() or upsert()
2830
// and an exceedingly high rate e.g. they wrap job reload in a while loop with almost no wait
@@ -31,40 +33,53 @@
3133
public class RedisBackedJobService implements JobService {
3234

3335
private static final Logger log = org.slf4j.LoggerFactory.getLogger(RedisBackedJobService.class);
34-
private final Jedis jedis;
36+
private final JedisPool jedisPool;
3537
// Remove job state info after "defaultExpirySeconds" to prevent filling up Redis memory
3638
// and since users normally don't require info about relatively old jobs.
3739
private final int defaultExpirySeconds = (int) Duration.standardDays(1).getStandardSeconds();
3840

39-
public RedisBackedJobService(Jedis jedis) {
40-
this.jedis = jedis;
41+
public RedisBackedJobService(JedisPool jedisPool) {
42+
this.jedisPool = jedisPool;
4143
}
4244

4345
@Override
4446
public Optional<Job> get(String id) {
45-
String json = jedis.get(id);
46-
if (json == null) {
47-
return Optional.empty();
48-
}
47+
Jedis jedis = null;
4948
Job job = null;
50-
Builder builder = Job.newBuilder();
5149
try {
50+
jedis = jedisPool.getResource();
51+
String json = jedis.get(id);
52+
if (json == null) {
53+
return Optional.empty();
54+
}
55+
Builder builder = Job.newBuilder();
5256
JsonFormat.parser().merge(json, builder);
5357
job = builder.build();
58+
} catch (JedisConnectionException e) {
59+
log.error(String.format("Failed to connect to the redis instance: %s", e));
5460
} catch (Exception e) {
5561
log.error(String.format("Failed to parse JSON for Feast job: %s", e.getMessage()));
62+
} finally {
63+
if (jedis != null) {
64+
jedis.close();
65+
}
5666
}
57-
5867
return Optional.ofNullable(job);
5968
}
6069

6170
@Override
6271
public void upsert(Job job) {
72+
Jedis jedis = null;
6373
try {
74+
jedis = jedisPool.getResource();
6475
jedis.set(job.getId(), JsonFormat.printer().omittingInsignificantWhitespace().print(job));
6576
jedis.expire(job.getId(), defaultExpirySeconds);
6677
} catch (Exception e) {
6778
log.error(String.format("Failed to upsert job: %s", e.getMessage()));
79+
} finally {
80+
if (jedis != null) {
81+
jedis.close();
82+
}
6883
}
6984
}
7085
}

serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java

+5-7
Original file line numberDiff line numberDiff line change
@@ -336,19 +336,17 @@ private FieldValueList getTimestampLimits(String entityTableName) {
336336
}
337337

338338
private Job waitForJob(Job queryJob) throws InterruptedException {
339-
Job completedJob = queryJob.waitFor(
340-
RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs())),
341-
RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs())));
339+
Job completedJob =
340+
queryJob.waitFor(
341+
RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs())),
342+
RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs())));
342343
if (completedJob == null) {
343-
throw Status.INTERNAL
344-
.withDescription("Job no longer exists")
345-
.asRuntimeException();
344+
throw Status.INTERNAL.withDescription("Job no longer exists").asRuntimeException();
346345
} else if (completedJob.getStatus().getError() != null) {
347346
throw Status.INTERNAL
348347
.withDescription("Job failed: " + completedJob.getStatus().getError())
349348
.asRuntimeException();
350349
}
351350
return completedJob;
352351
}
353-
354352
}

serving/src/main/resources/application.yml

+4
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ feast:
4747
# store-options:
4848
# host: localhost
4949
# port: 6379
50+
# Optionally, you can configure the connection pool with the following items:
51+
# max-conn: 8
52+
# max-idle: 8
53+
# max-wait-millis: 50
5054
store-options: {}
5155

5256
grpc:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright 2018-2020 The Feast Authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package feast.serving.service;
18+
19+
import java.io.IOException;
20+
import org.junit.After;
21+
import org.junit.Before;
22+
import org.junit.Test;
23+
import redis.clients.jedis.JedisPool;
24+
import redis.clients.jedis.JedisPoolConfig;
25+
import redis.embedded.RedisServer;
26+
27+
public class RedisBackedJobServiceTest {
28+
private static String REDIS_HOST = "localhost";
29+
private static int REDIS_PORT = 51235;
30+
private RedisServer redis;
31+
32+
@Before
33+
public void setUp() throws IOException {
34+
redis = new RedisServer(REDIS_PORT);
35+
redis.start();
36+
}
37+
38+
@After
39+
public void teardown() {
40+
redis.stop();
41+
}
42+
43+
@Test
44+
public void shouldRecoverIfRedisConnectionIsLost() {
45+
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
46+
jedisPoolConfig.setMaxTotal(1);
47+
jedisPoolConfig.setMaxWaitMillis(10);
48+
JedisPool jedisPool = new JedisPool(jedisPoolConfig, REDIS_HOST, REDIS_PORT);
49+
RedisBackedJobService jobService = new RedisBackedJobService(jedisPool);
50+
jobService.get("does not exist");
51+
redis.stop();
52+
try {
53+
jobService.get("does not exist");
54+
} catch (Exception e) {
55+
// pass, this should fail, and return a broken connection to the pool
56+
}
57+
redis.start();
58+
jobService.get("does not exist");
59+
}
60+
}

0 commit comments

Comments
 (0)