Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impl. configuration of application loop and user cache refresh durations #152

Merged
merged 3 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig.FITBIT_USERS_CONFIG;

import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -45,9 +46,12 @@ public class FitbitSourceConnector extends AbstractRestSourceConnector {

@Override
public void start(Map<String, String> props) {
logger.info("Starting Fitbit source connector");
super.start(props);
executor = Executors.newSingleThreadScheduledExecutor();

Duration applicationLoopInterval = config.getApplicationLoopInterval();

executor.scheduleAtFixedRate(() -> {
if (repository.hasPendingUpdates()) {
try {
Expand All @@ -66,7 +70,7 @@ public void start(Map<String, String> props) {
} else {
logger.info("No pending updates found. Not attempting to refresh users.");
}
}, 0, 5, TimeUnit.MINUTES);
}, 0, applicationLoopInterval.toSeconds(), TimeUnit.SECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public abstract class FitbitPollingRoute implements PollingRequestRoute {
protected static final TemporalAmount ONE_SECOND = SECONDS.getDuration();
protected static final TemporalAmount ONE_MINUTE = MINUTES.getDuration();

private static final Logger logger = LoggerFactory.getLogger(FitbitSleepRoute.class);
private static final Logger logger = LoggerFactory.getLogger(FitbitPollingRoute.class);

/** Committed offsets. */
private Map<String, Instant> offsets;
Expand Down Expand Up @@ -164,7 +164,7 @@ public void requestEmpty(RestRequest request) {
@Override
public void requestFailed(RestRequest request, Response response) {
if (response != null && response.code() == 429) {
User user = ((FitbitRestRequest)request).getUser();
User user = ((FitbitRestRequest) request).getUser();
tooManyRequestsForUser.add(user);
String cooldownString = response.header("Retry-After");
Duration cooldown = getTooManyRequestsCooldown();
Expand All @@ -179,6 +179,8 @@ public void requestFailed(RestRequest request, Response response) {
lastPollPerUser.put(user.getId(), backOff);
logger.info("Too many requests for user {}. Backing off until {}",
user, backOff.plus(getPollIntervalPerUser()));
} else if (response != null) {
logger.warn("Failed to make request {}. Response is: {}", request, response);
} else {
logger.warn("Failed to make request {}", request);
}
Expand All @@ -197,8 +199,11 @@ public Stream<FitbitRestRequest> requests() {
lastPoll = Instant.now();
try {
return userRepository.stream()
// Collect Instant of nextPoll for each user
.map(u -> new AbstractMap.SimpleImmutableEntry<>(u, nextPoll(u)))
// Keep users where the lastPoll is later than the nextPoll for the user (i.e., user needs to be polled)
.filter(u -> lastPoll.isAfter(u.getValue()))
// Sort users by nextPoll (old to new?)
.sorted(Map.Entry.comparingByValue())
.flatMap(u -> this.createRequests(u.getKey()))
.filter(Objects::nonNull);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ import java.io.IOException
import java.util.concurrent.ConcurrentHashMap
import java.util.stream.Stream
import kotlin.time.Duration.Companion.days
import kotlin.time.Duration.Companion.hours
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toKotlinDuration

@Suppress("unused")
class ServiceUserRepository : UserRepository {
Expand Down Expand Up @@ -92,8 +92,12 @@ class ServiceUserRepository : UserRepository {
clientSecret = config.fitbitUserRepositoryClientSecret,
)

val refreshDuration = config.userCacheRefreshInterval.toKotlinDuration()
userCache = CachedSet(
CacheConfig(refreshDuration = 1.hours, retryDuration = 1.minutes),
CacheConfig(
refreshDuration = refreshDuration,
retryDuration = if (refreshDuration > 1.minutes) 1.minutes else refreshDuration,
),
) {
makeRequest<Users> { url("users?source-type=FitBit") }
.users
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,4 @@ private <T> T makeRequest(Request request, ObjectReader reader) throws IOExcepti
throw new IOException("Failed to make request to user repository", ex);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@
public class RestSourceConnectorConfig extends AbstractConfig {
public static final Pattern COLON_PATTERN = Pattern.compile(":");

public static final String APPLICATION_LOOP_INTERVAL_CONFIG = "application.loop.interval.ms";
private static final String APPLICATION_LOOP_INTERVAL_DOC = "How often to perform the main application loop.";
private static final String APPLICATION_LOOP_INTERVAL_DISPLAY = "Application loop interval";
private static final Long APPLICATION_LOOP_INTERVAL_DEFAULT = 300000L; // 5 minutes

private static final String SOURCE_POLL_INTERVAL_CONFIG = "rest.source.poll.interval.ms";
private static final String SOURCE_POLL_INTERVAL_DOC = "How often to poll the source URL.";
private static final String SOURCE_POLL_INTERVAL_DISPLAY = "Polling interval";
private static final Long SOURCE_POLL_INTERVAL_DEFAULT = 60000L;
private static final Long SOURCE_POLL_INTERVAL_DEFAULT = 60000L; // 1 minute

static final String SOURCE_URL_CONFIG = "rest.source.base.url";
private static final String SOURCE_URL_DOC = "Base URL for REST source connector.";
Expand Down Expand Up @@ -81,6 +86,11 @@ public class RestSourceConnectorConfig extends AbstractConfig {
"Class to be used to generate REST requests";
private static final String REQUEST_GENERATOR_DISPLAY = "Request generator class";

public static final String USER_CACHE_REFRESH_INTERVAL_CONFIG = "user.cache.refresh.interval.ms";
private static final String USER_CACHE_REFRESH_INTERVAL_DOC = "How often to poll for new user registrations.";
private static final String USER_CACHE_REFRESH_INTERVAL_DISPLAY = "Refresh interval";
private static final Long USER_CACHE_REFRESH_INTERVAL_DEFAULT = 3600000L; // 1 hour

private final TopicSelector topicSelector;
private final PayloadToSourceRecordConverter payloadToSourceRecordConverter;
private final RequestGenerator requestGenerator;
Expand Down Expand Up @@ -171,7 +181,36 @@ public static ConfigDef conf() {
++orderInGroup,
Width.SHORT,
REQUEST_GENERATOR_DISPLAY)
;

.define(APPLICATION_LOOP_INTERVAL_CONFIG,
Type.LONG,
APPLICATION_LOOP_INTERVAL_DEFAULT,
Importance.LOW,
APPLICATION_LOOP_INTERVAL_DOC,
group,
++orderInGroup,
Width.SHORT,
APPLICATION_LOOP_INTERVAL_DISPLAY)

.define(USER_CACHE_REFRESH_INTERVAL_CONFIG,
Type.LONG,
USER_CACHE_REFRESH_INTERVAL_DEFAULT,
Importance.LOW,
USER_CACHE_REFRESH_INTERVAL_DOC,
group,
++orderInGroup,
Width.SHORT,
USER_CACHE_REFRESH_INTERVAL_DISPLAY

);
}

public Duration getApplicationLoopInterval() {
return Duration.ofMillis(this.getLong(APPLICATION_LOOP_INTERVAL_CONFIG));
}

public Duration getUserCacheRefreshInterval() {
return Duration.ofMillis(this.getLong(USER_CACHE_REFRESH_INTERVAL_CONFIG));
}

public Duration getPollInterval() {
Expand Down
Loading