Skip to content

Commit

Permalink
DefaultServiceDiscoveryRetryStrategy may emit duplicated SD events (#…
Browse files Browse the repository at this point in the history
…1586)

Motivation:

When `DefaultServiceDiscoveryRetryStrategy` retains addresses till
success and recovers from the previous SD error, it doesn't check which
of new addresses were previously retained. As the result, it may
propagate duplicated "available" events to `RoundRobinLoadBalancer`.
RRLB does not support duplicated entries. It may lead to retaining
duplicated addresses after they become unavailable, causing failures
of requests.

Modifications:

- Filter out previously retained events when we recover from SD-error;
- Do not return duplicated `unavailable` events on subsequent SD-errors;
- Enhance tests to catch these use-cases;

Result:

`DefaultServiceDiscoveryRetryStrategyTest` does not emit duplicated SD
events for the same availability state => RRLB does not retain duplicated
entries.
  • Loading branch information
idelpivnitskiy committed May 28, 2021
1 parent dc8972f commit 5556e22
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ public Publisher<Collection<E>> apply(final Publisher<Collection<E>> sdEvents) {
return defer(() -> {
EventsCache<ResolvedAddress, E> eventsCache = new EventsCache<>(flipAvailability);
if (retainAddressesTillSuccess) {
return sdEvents.map(eventsCache::consume)
return sdEvents.map(eventsCache::consumeAndFilter)
.beforeOnError(__ -> eventsCache.errorSeen())
.retryWhen(retryStrategy);
}

return sdEvents.map(eventsCache::consume)
return sdEvents.map(eventsCache::consumeAndFilter)
.onErrorResume(cause -> {
final Collection<E> events = eventsCache.errorSeen();
return events == null ? failed(cause) : Publisher.from(events.stream()
Expand Down Expand Up @@ -226,13 +226,13 @@ private static final class EventsCache<R, E extends ServiceDiscovererEvent<R>> {
Collection<E> errorSeen() {
if (retainedAddresses == NONE_RETAINED) {
retainedAddresses = new HashMap<>(activeAddresses);
activeAddresses.clear();
return retainedAddresses.isEmpty() ? null : retainedAddresses.values();
}
activeAddresses.clear();
return retainedAddresses.isEmpty() ? null : retainedAddresses.values();
return null; // We already returned retainedAddresses for previous error, return null to avoid duplicates
}

@Nullable
Collection<E> consume(final Collection<E> events) {
Collection<E> consumeAndFilter(final Collection<E> events) {
if (retainedAddresses == NONE_RETAINED) {
for (E e : events) {
if (e.isAvailable()) {
Expand All @@ -246,27 +246,25 @@ Collection<E> consume(final Collection<E> events) {

// we have seen an error, replace cache with new addresses and deactivate the ones which are not present
// in the new list.
List<E> toReturn = new ArrayList<>(activeAddresses.size() + retainedAddresses.size());
for (E event : events) {
final R address = event.address();
if (event.isAvailable()) {
activeAddresses.put(address, event);
} else {
activeAddresses.remove(address);
if (retainedAddresses.remove(address) == null) {
toReturn.add(event);
}
} else if (activeAddresses.remove(address) != null || retainedAddresses.remove(address) != null) {
toReturn.add(event);
}
}

List<E> toReturn = new ArrayList<>(activeAddresses.values());
for (R address : activeAddresses.keySet()) {
retainedAddresses.remove(address);
for (E event : retainedAddresses.values()) {
toReturn.add(flipAvailability.apply(event));
}

if (!retainedAddresses.isEmpty()) {
for (E evt : retainedAddresses.values()) {
toReturn.add(flipAvailability.apply(evt));
}
}
retainedAddresses = noneRetained();
return toReturn.isEmpty() ? null : toReturn;
return toReturn;
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ public abstract HttpClientBuilder<U, R, SDE> serviceDiscoverer(
*
* @param retryStrategy a retry strategy to retry errors emitted by {@link ServiceDiscoverer}.
* @return {@code this}.
* @see DefaultServiceDiscoveryRetryStrategy.Builder
*/
public abstract HttpClientBuilder<U, R, SDE> retryServiceDiscoveryErrors(
ServiceDiscoveryRetryStrategy<R, SDE> retryStrategy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,36 @@ public void overlapAddressPostRetry() throws Exception {

final DefaultServiceDiscovererEvent<String> evt1 = sendUpAndVerifyReceive(state, "addr1", sdEvents);
final DefaultServiceDiscovererEvent<String> evt2 = sendUpAndVerifyReceive(state, "addr2", sdEvents);
final DefaultServiceDiscovererEvent<String> evt3 = sendUpAndVerifyReceive(state, "addr3", sdEvents);

sdEvents = triggerRetry(state, sdEvents);

verifyNoEventsReceived(state);

final DefaultServiceDiscovererEvent<String> evt3 = new DefaultServiceDiscovererEvent<>("addr3", true);
sdEvents.onNext(asList(evt2, evt3));
final DefaultServiceDiscovererEvent<String> evt4 = new DefaultServiceDiscovererEvent<>("addr4", true);
sdEvents.onNext(asList(evt2, evt4));

final List<Collection<ServiceDiscovererEvent<String>>> items = state.subscriber.takeOnNext(1);
assertThat("Unexpected items received.", items, hasSize(1));
assertThat("Unexpected event received", items.get(0),
containsInAnyOrder(flipAvailable(evt1), evt2, evt3));
containsInAnyOrder(evt4, flipAvailable(evt1), flipAvailable(evt3)));
}

@Test
public void sameAddressPostRetry() throws Exception {
State state = new State(true);
TestPublisher<Collection<ServiceDiscovererEvent<String>>> sdEvents = state.pubs.take();
final String addr = "addr1";
final DefaultServiceDiscovererEvent<String> evt1 = sendUpAndVerifyReceive(state, addr, sdEvents);

sdEvents = triggerRetry(state, sdEvents);
verifyNoEventsReceived(state);

final DefaultServiceDiscovererEvent<String> evt1Un = new DefaultServiceDiscovererEvent<>(addr, false);
sdEvents.onNext(asList(evt1, evt1Un, evt1));
final List<Collection<ServiceDiscovererEvent<String>>> items = state.subscriber.takeOnNext(1);
assertThat("Unexpected items received.", items, hasSize(1));
assertThat("Unexpected event received", items.get(0), contains(evt1Un, evt1));
}

@Test
Expand Down Expand Up @@ -179,6 +197,23 @@ public void noRetainErrorWithNoAddresses() throws Exception {
sendUpAndVerifyReceive(state, "addr1", sdEvents);
}

@Test
public void noRetainActiveAddressesTwoSequentialErrors() throws Exception {
State state = new State(false);
TestPublisher<Collection<ServiceDiscovererEvent<String>>> sdEvents = state.pubs.take();
final DefaultServiceDiscovererEvent<String> evt1 = sendUpAndVerifyReceive(state, "addr1", sdEvents);
final DefaultServiceDiscovererEvent<String> evt2 = sendUpAndVerifyReceive(state, "addr2", sdEvents);

sdEvents = triggerRetry(state, sdEvents);
final List<Collection<ServiceDiscovererEvent<String>>> items = state.subscriber.takeOnNext(1);
assertThat("Unexpected items received.", items, hasSize(1));
assertThat("Unexpected event received", items.get(0),
containsInAnyOrder(flipAvailable(evt1), flipAvailable(evt2)));

triggerRetry(state, sdEvents);
verifyNoEventsReceived(state);
}

private void verifyNoEventsReceived(final State state) {
assertThat("Unexpected event received", state.subscriber.pollOnNext(10, MILLISECONDS), is(nullValue()));
}
Expand Down

0 comments on commit 5556e22

Please sign in to comment.