Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjusting ServiceDiscovererEvent contract #1906

Merged
merged 14 commits into from
Nov 15, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.EXPIRED;
import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.UNAVAILABLE;
import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverCompleteFromSource;
Expand Down Expand Up @@ -106,9 +108,13 @@ public DefaultPartitionedClientGroup(final Function<PartitionAttributes, Client>
this.unknownPartitionClient = unknownPartitionClient;
this.partitionMap = partitionMapFactory.newPartitionMap(event ->
new Partition<>(event, closedPartitionClient.apply(event)));
toSource(psdEvents.groupToMany(event -> event.isAvailable() ?
partitionMap.add(event.partitionAddress()).iterator() :
partitionMap.remove(event.partitionAddress()).iterator(), psdMaxQueueSize))
toSource(psdEvents
.groupToMany(event -> UNAVAILABLE.equals(event.status()) ?
partitionMap.remove(event.partitionAddress()).iterator()
// EXPIRED events neither add or remove new partitions so it's safe to call add
// as it will just return current partitions.
: partitionMap.add(event.partitionAddress()).iterator(),
psdMaxQueueSize))
.subscribe(new GroupedByPartitionSubscriber(clientFactory));
}

Expand Down Expand Up @@ -166,11 +172,12 @@ public Publisher<Collection<ServiceDiscovererEvent<R>>> discover(final U ignored

@Override
public boolean test(PSDE evt) {
if (EXPIRED.equals(evt.status())) {
return false;
}
MutableInt counter = addressCount.computeIfAbsent(evt.address(), __ -> new MutableInt());
boolean acceptEvent;
if (evt.isAvailable()) {
acceptEvent = ++counter.value == 1;
} else {
if (UNAVAILABLE.equals(evt.status())) {
acceptEvent = --counter.value == 0;
if (acceptEvent) {
// If address is unavailable and no more add events are pending stop tracking and
Expand All @@ -181,6 +188,8 @@ public boolean test(PSDE evt) {
partition.closeNow();
}
}
} else {
acceptEvent = ++counter.value == 1;
}
return acceptEvent;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.RandomAccess;
import javax.annotation.Nullable;

import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.AVAILABLE;
import static java.util.Collections.binarySearch;

/**
Expand All @@ -45,27 +46,32 @@ private ServiceDiscovererUtils() {
* @param newActiveAddresses The new list of active addresses.<b>This list must be modifiable</b> as it will be
* sorted with {@link List#sort(Comparator)}.
* @param comparator A comparator for the addresses and to use for binary searches.
* @param reporter A reporter for the numbers of available and unavailable events.
* @param reporter A reporter for the numbers of available and missing events.
* @param <T> The type of address.
* @param missingRecordStatus {@link ServiceDiscovererEvent.Status} to use for created
* {@link ServiceDiscovererEvent} when address present in current list but not in the new one.
* @return A list of {@link ServiceDiscovererEvent}s which represents the changes between
* {@code currentActiveAddresses} and {@code newActiveAddresses}, or {@code null} if there are no changes.
*/
@Nullable
public static <T> List<ServiceDiscovererEvent<T>> calculateDifference(List<? extends T> currentActiveAddresses,
List<? extends T> newActiveAddresses,
Comparator<T> comparator,
@Nullable TwoIntsConsumer reporter) {
public static <T> List<ServiceDiscovererEvent<T>> calculateDifference(
List<? extends T> currentActiveAddresses,
List<? extends T> newActiveAddresses,
Comparator<T> comparator,
@Nullable TwoIntsConsumer reporter,
ServiceDiscovererEvent.Status missingRecordStatus) {
// First sort the newAddresses so we can use binary search.
newActiveAddresses.sort(comparator);

// Calculate additions (in newAddresses, not in activeAddresses).
List<ServiceDiscovererEvent<T>> availableEvents =
relativeComplement(true, currentActiveAddresses, newActiveAddresses, comparator, null);
relativeComplement(currentActiveAddresses, newActiveAddresses, comparator, null, AVAILABLE);
// Store nAvailable now because the List may be updated on the next step.
final int nAvailable = availableEvents == null ? 0 : availableEvents.size();
// Calculate removals (in activeAddresses, not in newAddresses).
List<ServiceDiscovererEvent<T>> allEvents =
relativeComplement(false, newActiveAddresses, currentActiveAddresses, comparator, availableEvents);
relativeComplement(newActiveAddresses, currentActiveAddresses, comparator, availableEvents,
missingRecordStatus);

reportEvents(reporter, allEvents, nAvailable);
return allEvents;
Expand All @@ -89,32 +95,32 @@ private static <T> void reportEvents(@Nullable final TwoIntsConsumer reporter,
* {@code sortedA}).
* <p>
* See <a href="https://en.wikipedia.org/wiki/Venn_diagram#Overview">Set Mathematics</a>.
* @param available Will be used for {@link ServiceDiscovererEvent#isAvailable()} for each
* {@link ServiceDiscovererEvent} in the returned {@link List}.
* @param sortedA A sorted {@link List} of which no elements be present in the return value.
* @param sortedB A sorted {@link List} of which elements in this set that are not in {@code sortedA} will be in the
* return value.
* @param comparator Used for binary searches on {@code sortedA} for each element in {@code sortedB}.
* @param result List to append new results to.
* @param status {@link ServiceDiscovererEvent.Status} to use for created {@link ServiceDiscovererEvent}
* in the {@code result}.
* @param <T> The type of resolved address.
* @return the relative complement of {@code sortedA} and {@code sortedB} (elements in {@code sortedB} and not in
* {@code sortedA}).
*/
@Nullable
private static <T> List<ServiceDiscovererEvent<T>> relativeComplement(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 0.42 we should move the whole ServiceDiscovererUtils to dns module as pkg-private. Not sure why it had to be in client-api-internal as public.

boolean available, List<? extends T> sortedA, List<? extends T> sortedB, Comparator<T> comparator,
@Nullable List<ServiceDiscovererEvent<T>> result) {
List<? extends T> sortedA, List<? extends T> sortedB, Comparator<T> comparator,
@Nullable List<ServiceDiscovererEvent<T>> result, ServiceDiscovererEvent.Status status) {
if (sortedB instanceof RandomAccess) {
for (int i = 0; i < sortedB.size(); ++i) {
final T valueB = sortedB.get(i);
if (binarySearch(sortedA, valueB, comparator) < 0) {
if (result == null) {
result = new ArrayList<>(4);
result.add(new DefaultServiceDiscovererEvent<>(valueB, available));
result.add(new DefaultServiceDiscovererEvent<>(valueB, status));
} else if (comparator.compare(valueB, result.get(result.size() - 1).address()) != 0) {
// make sure we don't include duplicates. the input lists are sorted and we process in order so
// we verify the previous entry is not a duplicate.
result.add(new DefaultServiceDiscovererEvent<>(valueB, available));
result.add(new DefaultServiceDiscovererEvent<>(valueB, status));
}
}
}
Expand All @@ -123,11 +129,11 @@ private static <T> List<ServiceDiscovererEvent<T>> relativeComplement(
if (binarySearch(sortedA, valueB, comparator) < 0) {
if (result == null) {
result = new ArrayList<>(4);
result.add(new DefaultServiceDiscovererEvent<>(valueB, available));
result.add(new DefaultServiceDiscovererEvent<>(valueB, status));
} else if (comparator.compare(valueB, result.get(result.size() - 1).address()) != 0) {
// make sure we don't include duplicates. the input lists are sorted and we process in order so
// we verify the previous entry is not a duplicate.
result.add(new DefaultServiceDiscovererEvent<>(valueB, available));
result.add(new DefaultServiceDiscovererEvent<>(valueB, status));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.servicetalk.client.api;

import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.AVAILABLE;
import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.UNAVAILABLE;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -23,26 +25,44 @@
*/
public final class DefaultServiceDiscovererEvent<T> implements ServiceDiscovererEvent<T> {
private final T address;
private final boolean available;
private final Status status;

/**
* Create a new instance.
* @param address The address returned by {@link #address()}.
* @param available Value returned by {@link #available}.
* @param available Value used to determine {@link #status()}.
* @deprecated Use
* {@link #DefaultServiceDiscovererEvent(Object, io.servicetalk.client.api.ServiceDiscovererEvent.Status)}.
*/
@Deprecated
public DefaultServiceDiscovererEvent(T address, boolean available) {
this.address = requireNonNull(address);
this.available = available;
this.status = available ? AVAILABLE : UNAVAILABLE;
}

/**
* Create a new instance.
* @param address The address returned by {@link #address()}.
* @param status Value returned by {@link #status()}.
*/
public DefaultServiceDiscovererEvent(T address, Status status) {
this.address = requireNonNull(address);
this.status = requireNonNull(status);
}

@Override
public T address() {
return address;
}

@Override
public Status status() {
return status;
}

@Override
public boolean isAvailable() {
return available;
return AVAILABLE.equals(status);
}

@Override
Expand All @@ -53,27 +73,23 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}

final DefaultServiceDiscovererEvent<?> that = (DefaultServiceDiscovererEvent<?>) o;

if (available != that.available) {
return false;
}
return address.equals(that.address);
return status.equals(that.status) && address.equals(that.address);
}

@Override
public int hashCode() {
int result = address.hashCode();
result = 31 * result + (available ? 1 : 0);
result = 31 * result + status.hashCode();
return result;
}

@Override
public String toString() {
return "DefaultServiceDiscovererEvent{" +
"address=" + address +
", available=" + available +
", status=" + status.getName() +
", available=" + isAvailable() +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
*/
package io.servicetalk.client.api;

import java.util.Locale;

import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.AVAILABLE;
import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.UNAVAILABLE;

/**
* Notification from the Service Discovery system that availability for an address has changed.
* @param <ResolvedAddress> the type of address after resolution.
Expand All @@ -26,10 +31,113 @@ public interface ServiceDiscovererEvent<ResolvedAddress> {
*/
ResolvedAddress address();

/**
* {@link Status Status} of the event instructing the {@link ServiceDiscoverer} what actions
* to take upon the associated {@link #address() address}.
* <p>
* Note, the default implementation calls {@link #isAvailable()} to allow frictionless adoption, but once the
* implementing class removes the override for the deprecated method {@link #isAvailable()},
* it will be also necessary to override {@link #status()}.
* @return {@link Status Status} of the associated {@link #address()}.
*/
default Status status() {
return isAvailable() ? AVAILABLE : UNAVAILABLE;
}

/**
* Determine if {@link #address()} is now available or unavailable.
* @return {@code true} if {@link #address()} is now available or false if the {@link #address()} is now
* unavailable.
* @deprecated Implement and use {@link #status()}. This method will be removed.
*/
boolean isAvailable();
@Deprecated
default boolean isAvailable() {
throw new UnsupportedOperationException("Migrate to status() method. This method may be implemented" +
" temporarily until migration to status() is complete.");
}

/**
* Status provided by the {@link ServiceDiscoverer} system that guides the actions of {@link LoadBalancer} upon the
* bound {@link ServiceDiscovererEvent#address()} (via {@link ServiceDiscovererEvent}).
*/
final class Status {

/**
* Signifies the {@link ServiceDiscovererEvent#address()} is available for use in connection establishment.
*/
public static final Status AVAILABLE = new Status("available");

/**
* Signifies the {@link ServiceDiscovererEvent#address()} is not available for use and all currently established
* connections should be closed.
*/
public static final Status UNAVAILABLE = new Status("unavailable");

/**
* Signifies the {@link ServiceDiscovererEvent#address()} is expired and should not be used for establishing
* new connections. It doesn't necessarily mean that the host should not be used in traffic routing over already
* established connections as long as they are kept open by the remote peer. The implementations can have
* different policies in that regard.
*/
public static final Status EXPIRED = new Status("expired");

private final String name;

private Status(final String name) {
if (name.isEmpty()) {
throw new IllegalArgumentException("Status name cannot be empty");
}
this.name = name.toLowerCase(Locale.ENGLISH);
}

/**
* Returns an {@link Status} for the specified name.
* @param name the status name.
* @return {@link Status} representing the status for given name.
*/
public static Status of(final String name) {
switch (name.toLowerCase(Locale.ENGLISH)) {
case "available":
return AVAILABLE;
case "unavailable":
return UNAVAILABLE;
case "expired":
return EXPIRED;
default:
return new Status(name);
}
}

/**
* Returns the name of this status.
* @return name of this status.
*/
public String getName() {
return name;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ServiceDiscovererEvent.Status)) {
return false;
}
final Status that = (Status) o;
return name.equals(that.name);
}

@Override
public int hashCode() {
return name.hashCode();
}

@Override
public String toString() {
return "Status{" +
"name='" + name + '\'' +
'}';
}
}
}
1 change: 1 addition & 0 deletions servicetalk-dns-discovery-netty/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies {
testImplementation project(":servicetalk-concurrent-test-internal")
testImplementation "commons-lang:commons-lang:$commonsLangVersion"
testImplementation "org.junit.jupiter:junit-jupiter-api"
testImplementation "org.junit.jupiter:junit-jupiter-params"
testImplementation "org.apache.directory.server:apacheds-protocol-dns:$apacheDirectoryServerVersion"
testImplementation "org.hamcrest:hamcrest:$hamcrestVersion"
testImplementation "org.mockito:mockito-core:$mockitoCoreVersion"
Expand Down
Loading