Skip to content

Commit

Permalink
Impl. configuration of application loop and user service cache refres…
Browse files Browse the repository at this point in the history
…h intervals

Needed for efficient e2e testing.
  • Loading branch information
pvannierop committed Nov 19, 2024
1 parent 39ca6dc commit e50c3cc
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig.FITBIT_USERS_CONFIG;

import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -45,9 +46,12 @@ public class FitbitSourceConnector extends AbstractRestSourceConnector {

@Override
public void start(Map<String, String> props) {
logger.info("Starting Fitbit source connector");
super.start(props);
executor = Executors.newSingleThreadScheduledExecutor();

Duration applicationLoopInterval = config.getApplicationLoopInterval();

executor.scheduleAtFixedRate(() -> {
if (repository.hasPendingUpdates()) {
try {
Expand All @@ -66,7 +70,7 @@ public void start(Map<String, String> props) {
} else {
logger.info("No pending updates found. Not attempting to refresh users.");
}
}, 0, 5, TimeUnit.MINUTES);
}, 0, applicationLoopInterval.toSeconds(), TimeUnit.SECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ import java.io.IOException
import java.util.concurrent.ConcurrentHashMap
import java.util.stream.Stream
import kotlin.time.Duration.Companion.days
import kotlin.time.Duration.Companion.hours
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toKotlinDuration

@Suppress("unused")
class ServiceUserRepository : UserRepository {
Expand Down Expand Up @@ -92,8 +92,12 @@ class ServiceUserRepository : UserRepository {
clientSecret = config.fitbitUserRepositoryClientSecret,
)

val refreshDuration = config.userCacheRefreshInterval.toKotlinDuration()
userCache = CachedSet(
CacheConfig(refreshDuration = 1.hours, retryDuration = 1.minutes),
CacheConfig(
refreshDuration = refreshDuration,
retryDuration = if (refreshDuration > 1.minutes) 1.minutes else refreshDuration,
),
) {
makeRequest<Users> { url("users?source-type=FitBit") }
.users
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,4 @@ private <T> T makeRequest(Request request, ObjectReader reader) throws IOExcepti
throw new IOException("Failed to make request to user repository", ex);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@
public class RestSourceConnectorConfig extends AbstractConfig {
public static final Pattern COLON_PATTERN = Pattern.compile(":");

public static final String APPLICATION_LOOP_INTERVAL_CONFIG = "application.loop.interval.ms";
private static final String APPLICATION_LOOP_INTERVAL_DOC = "How often to perform the main application loop.";
private static final String APPLICATION_LOOP_INTERVAL_DISPLAY = "Application loop interval";
private static final Long APPLICATION_LOOP_INTERVAL_DEFAULT = 300000L; // 5 minutes

private static final String SOURCE_POLL_INTERVAL_CONFIG = "rest.source.poll.interval.ms";
private static final String SOURCE_POLL_INTERVAL_DOC = "How often to poll the source URL.";
private static final String SOURCE_POLL_INTERVAL_DISPLAY = "Polling interval";
private static final Long SOURCE_POLL_INTERVAL_DEFAULT = 60000L;
private static final Long SOURCE_POLL_INTERVAL_DEFAULT = 60000L; // 1 minute

static final String SOURCE_URL_CONFIG = "rest.source.base.url";
private static final String SOURCE_URL_DOC = "Base URL for REST source connector.";
Expand Down Expand Up @@ -81,6 +86,11 @@ public class RestSourceConnectorConfig extends AbstractConfig {
"Class to be used to generate REST requests";
private static final String REQUEST_GENERATOR_DISPLAY = "Request generator class";

public static final String USER_CACHE_REFRESH_INTERVAL_CONFIG = "user.cache.refresh.interval.ms";
private static final String USER_CACHE_REFRESH_INTERVAL_DOC = "How often to poll for new user registrations.";
private static final String USER_CACHE_REFRESH_INTERVAL_DISPLAY = "Refresh interval";
private static final Long USER_CACHE_REFRESH_INTERVAL_DEFAULT = 3600000L; // 1 hour

private final TopicSelector topicSelector;
private final PayloadToSourceRecordConverter payloadToSourceRecordConverter;
private final RequestGenerator requestGenerator;
Expand Down Expand Up @@ -171,7 +181,36 @@ public static ConfigDef conf() {
++orderInGroup,
Width.SHORT,
REQUEST_GENERATOR_DISPLAY)
;

.define(APPLICATION_LOOP_INTERVAL_CONFIG,
Type.LONG,
APPLICATION_LOOP_INTERVAL_DEFAULT,
Importance.LOW,
APPLICATION_LOOP_INTERVAL_DOC,
group,
++orderInGroup,
Width.SHORT,
APPLICATION_LOOP_INTERVAL_DISPLAY)

.define(USER_CACHE_REFRESH_INTERVAL_CONFIG,
Type.LONG,
USER_CACHE_REFRESH_INTERVAL_DEFAULT,
Importance.LOW,
USER_CACHE_REFRESH_INTERVAL_DOC,
group,
++orderInGroup,
Width.SHORT,
USER_CACHE_REFRESH_INTERVAL_DISPLAY

);
}

public Duration getApplicationLoopInterval() {
return Duration.ofMillis(this.getLong(APPLICATION_LOOP_INTERVAL_CONFIG));
}

public Duration getUserCacheRefreshInterval() {
return Duration.ofMillis(this.getLong(USER_CACHE_REFRESH_INTERVAL_CONFIG));
}

public Duration getPollInterval() {
Expand Down

0 comments on commit e50c3cc

Please sign in to comment.