From e7aec43958121cbe193dd2860c1daa2ddb6301e5 Mon Sep 17 00:00:00 2001 From: pvannierop Date: Tue, 19 Nov 2024 16:14:57 +0100 Subject: [PATCH] Fix incorrect class name in logger definition --- .../connect/rest/fitbit/route/FitbitPollingRoute.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java index 53e17b97..e2573143 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java @@ -100,7 +100,7 @@ public abstract class FitbitPollingRoute implements PollingRequestRoute { protected static final TemporalAmount ONE_SECOND = SECONDS.getDuration(); protected static final TemporalAmount ONE_MINUTE = MINUTES.getDuration(); - private static final Logger logger = LoggerFactory.getLogger(FitbitSleepRoute.class); + private static final Logger logger = LoggerFactory.getLogger(FitbitPollingRoute.class); /** Committed offsets. */ private Map offsets; @@ -164,7 +164,7 @@ public void requestEmpty(RestRequest request) { @Override public void requestFailed(RestRequest request, Response response) { if (response != null && response.code() == 429) { - User user = ((FitbitRestRequest)request).getUser(); + User user = ((FitbitRestRequest) request).getUser(); tooManyRequestsForUser.add(user); String cooldownString = response.header("Retry-After"); Duration cooldown = getTooManyRequestsCooldown(); @@ -179,6 +179,8 @@ public void requestFailed(RestRequest request, Response response) { lastPollPerUser.put(user.getId(), backOff); logger.info("Too many requests for user {}. Backing off until {}", user, backOff.plus(getPollIntervalPerUser())); + } else if (response != null) { + logger.warn("Failed to make request {}. Response is: {}", request, response); } else { logger.warn("Failed to make request {}", request); } @@ -197,8 +199,11 @@ public Stream requests() { lastPoll = Instant.now(); try { return userRepository.stream() + // Collect Instant of nextPoll for each user .map(u -> new AbstractMap.SimpleImmutableEntry<>(u, nextPoll(u))) + // Keep users where the lastPoll is later than the nextPoll for the user (i.e., user needs to be polled) .filter(u -> lastPoll.isAfter(u.getValue())) + // Sort users by nextPoll (old to new?) .sorted(Map.Entry.comparingByValue()) .flatMap(u -> this.createRequests(u.getKey())) .filter(Objects::nonNull);