Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjusting ServiceDiscovererEvent contract #1906

Merged
merged 14 commits into from
Nov 15, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static io.servicetalk.client.api.ServiceDiscoveryStatus.AVAILABLE;
import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverCompleteFromSource;
Expand Down Expand Up @@ -106,7 +107,8 @@ public DefaultPartitionedClientGroup(final Function<PartitionAttributes, Client>
this.unknownPartitionClient = unknownPartitionClient;
this.partitionMap = partitionMapFactory.newPartitionMap(event ->
new Partition<>(event, closedPartitionClient.apply(event)));
toSource(psdEvents.groupToMany(event -> event.isAvailable() ?
// TODO: consider other events than AVAILABLE and UNAVAILABLE
toSource(psdEvents.groupToMany(event -> event.status() == AVAILABLE ?
partitionMap.add(event.partitionAddress()).iterator() :
partitionMap.remove(event.partitionAddress()).iterator(), psdMaxQueueSize))
.subscribe(new GroupedByPartitionSubscriber(clientFactory));
Expand Down Expand Up @@ -168,7 +170,8 @@ public Publisher<Collection<ServiceDiscovererEvent<R>>> discover(final U ignored
public boolean test(PSDE evt) {
MutableInt counter = addressCount.computeIfAbsent(evt.address(), __ -> new MutableInt());
boolean acceptEvent;
if (evt.isAvailable()) {
// TODO: consider other events than AVAILABLE and UNAVAILABLE
if (evt.status() == AVAILABLE) {
acceptEvent = ++counter.value == 1;
} else {
acceptEvent = --counter.value == 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
import io.servicetalk.client.api.DefaultServiceDiscovererEvent;
import io.servicetalk.client.api.ServiceDiscoverer;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.client.api.ServiceDiscoveryStatus;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.RandomAccess;
import javax.annotation.Nullable;

import static io.servicetalk.client.api.ServiceDiscoveryStatus.AVAILABLE;
import static io.servicetalk.client.api.ServiceDiscoveryStatus.UNAVAILABLE;
import static java.util.Collections.binarySearch;

/**
Expand Down Expand Up @@ -89,8 +92,8 @@ private static <T> void reportEvents(@Nullable final TwoIntsConsumer reporter,
* {@code sortedA}).
* <p>
* See <a href="https://en.wikipedia.org/wiki/Venn_diagram#Overview">Set Mathematics</a>.
* @param available Will be used for {@link ServiceDiscovererEvent#isAvailable()} for each
* {@link ServiceDiscovererEvent} in the returned {@link List}.
* @param available Will be used to map to {@link ServiceDiscoveryStatus#AVAILABLE} in
* {@link ServiceDiscovererEvent#status()} for each {@link ServiceDiscovererEvent} in the returned {@link List}.
* @param sortedA A sorted {@link List} of which no elements be present in the return value.
* @param sortedB A sorted {@link List} of which elements in this set that are not in {@code sortedA} will be in the
* return value.
Expand All @@ -104,17 +107,18 @@ private static <T> void reportEvents(@Nullable final TwoIntsConsumer reporter,
private static <T> List<ServiceDiscovererEvent<T>> relativeComplement(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 0.42 we should move the whole ServiceDiscovererUtils to dns module as pkg-private. Not sure why it had to be in client-api-internal as public.

boolean available, List<? extends T> sortedA, List<? extends T> sortedB, Comparator<T> comparator,
@Nullable List<ServiceDiscovererEvent<T>> result) {
ServiceDiscoveryStatus status = available ? AVAILABLE : UNAVAILABLE;
if (sortedB instanceof RandomAccess) {
for (int i = 0; i < sortedB.size(); ++i) {
final T valueB = sortedB.get(i);
if (binarySearch(sortedA, valueB, comparator) < 0) {
if (result == null) {
result = new ArrayList<>(4);
result.add(new DefaultServiceDiscovererEvent<>(valueB, available));
result.add(new DefaultServiceDiscovererEvent<>(valueB, status));
} else if (comparator.compare(valueB, result.get(result.size() - 1).address()) != 0) {
// make sure we don't include duplicates. the input lists are sorted and we process in order so
// we verify the previous entry is not a duplicate.
result.add(new DefaultServiceDiscovererEvent<>(valueB, available));
result.add(new DefaultServiceDiscovererEvent<>(valueB, status));
}
}
}
Expand All @@ -123,11 +127,11 @@ private static <T> List<ServiceDiscovererEvent<T>> relativeComplement(
if (binarySearch(sortedA, valueB, comparator) < 0) {
if (result == null) {
result = new ArrayList<>(4);
result.add(new DefaultServiceDiscovererEvent<>(valueB, available));
result.add(new DefaultServiceDiscovererEvent<>(valueB, status));
} else if (comparator.compare(valueB, result.get(result.size() - 1).address()) != 0) {
// make sure we don't include duplicates. the input lists are sorted and we process in order so
// we verify the previous entry is not a duplicate.
result.add(new DefaultServiceDiscovererEvent<>(valueB, available));
result.add(new DefaultServiceDiscovererEvent<>(valueB, status));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
*/
package io.servicetalk.client.api;

import java.util.Objects;

import static io.servicetalk.client.api.ServiceDiscoveryStatus.AVAILABLE;
import static io.servicetalk.client.api.ServiceDiscoveryStatus.UNAVAILABLE;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -24,15 +28,30 @@
public final class DefaultServiceDiscovererEvent<T> implements ServiceDiscovererEvent<T> {
private final T address;
private final boolean available;
private final ServiceDiscoveryStatus status;

/**
* Create a new instance.
* @param address The address returned by {@link #address()}.
* @param available Value returned by {@link #available}.
* @deprecated Use {@link #DefaultServiceDiscovererEvent(Object, ServiceDiscoveryStatus)}.
*/
@Deprecated
public DefaultServiceDiscovererEvent(T address, boolean available) {
this.address = requireNonNull(address);
this.available = available;
this.status = available ? AVAILABLE : UNAVAILABLE;
}

/**
* Create a new instance.
* @param address The address returned by {@link #address()}.
* @param status Value returned by {@link #status()}.
*/
public DefaultServiceDiscovererEvent(T address, ServiceDiscoveryStatus status) {
this.address = requireNonNull(address);
this.status = requireNonNull(status);
this.available = status == AVAILABLE;
}

@Override
Expand All @@ -41,8 +60,8 @@ public T address() {
}

@Override
public boolean isAvailable() {
return available;
public ServiceDiscoveryStatus status() {
return status;
}

@Override
Expand All @@ -53,26 +72,20 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}

final DefaultServiceDiscovererEvent<?> that = (DefaultServiceDiscovererEvent<?>) o;

if (available != that.available) {
return false;
}
return address.equals(that.address);
return address.equals(that.address) && status.equals(that.status);
}

@Override
public int hashCode() {
int result = address.hashCode();
result = 31 * result + (available ? 1 : 0);
return result;
return Objects.hash(address, status);
}

@Override
public String toString() {
return "DefaultServiceDiscovererEvent{" +
"address=" + address +
", status=" + status +
", available=" + available +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.servicetalk.client.api;

import static io.servicetalk.client.api.ServiceDiscoveryStatus.AVAILABLE;

/**
* Notification from the Service Discovery system that availability for an address has changed.
* @param <ResolvedAddress> the type of address after resolution.
Expand All @@ -26,10 +28,25 @@ public interface ServiceDiscovererEvent<ResolvedAddress> {
*/
ResolvedAddress address();

/**
* {@link ServiceDiscoveryStatus Status} of the event instructing the {@link ServiceDiscoverer} what actions
* to take upon the associated {@link #address() address}.
* @return {@link ServiceDiscoveryStatus Status} of the associated {@link #address()}.
*/
default ServiceDiscoveryStatus status() {
throw new UnsupportedOperationException("Method status is not supported by " + getClass().getName());
}

/**
* Determine if {@link #address()} is now available or unavailable.
* @return {@code true} if {@link #address()} is now available or false if the {@link #address()} is now
* unavailable.
* @deprecated Use {@link #status()}. This method will be removed, but in the transition period its default
* implementation calls {{@link #status()}} to determine availability – implementors of this interface need to
* override {@link #status()}.
*/
boolean isAvailable();
@Deprecated
default boolean isAvailable() {
return status() == AVAILABLE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.client.api;

import java.util.Objects;

/**
* Status provided by the Service Discovery system that guides the actions of {@link ServiceDiscoverer} upon the
* bound {@link ServiceDiscovererEvent#address()} (via {@link ServiceDiscovererEvent}).
*/
public final class ServiceDiscoveryStatus {

/**
* Signifies the {@link ServiceDiscovererEvent#address()} is available for use in connection establishment.
*/
public static final ServiceDiscoveryStatus AVAILABLE = new ServiceDiscoveryStatus("available");

/**
* Signifies the {@link ServiceDiscovererEvent#address()} is not available for use in connection establishment.
*/
public static final ServiceDiscoveryStatus UNAVAILABLE = new ServiceDiscoveryStatus("unavailable");

/**
* Signifies the {@link ServiceDiscovererEvent#address()} is expired and should not be used
* for connection establishment. It doesn't necessarily mean that the host should not be used in traffic routing
* over already established connections. The implementations can have different policies in that regard.
*/
public static final ServiceDiscoveryStatus EXPIRED = new ServiceDiscoveryStatus("expired");

private final String name;

private ServiceDiscoveryStatus(final String name) {
if (name.isEmpty()) {
throw new IllegalArgumentException("ServiceDiscoveryStatus name cannot be empty");
}
this.name = name;
}

/**
* Returns an {@link ServiceDiscoveryStatus} for the specified name.
* @param name the status name.
* @return {@link ServiceDiscoveryStatus} representing the status for given name.
*/
public static ServiceDiscoveryStatus of(final String name) {
switch (name) {
case "available":
return AVAILABLE;
case "unavailable":
return UNAVAILABLE;
case "expired":
return EXPIRED;
default:
return new ServiceDiscoveryStatus(name);
}
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ServiceDiscoveryStatus)) {
return false;
}
final ServiceDiscoveryStatus that = (ServiceDiscoveryStatus) o;
return name.equals(that.name);
}

@Override
public int hashCode() {
return Objects.hash(name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.servicetalk.client.api.DefaultServiceDiscovererEvent;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.client.api.ServiceDiscoveryStatus;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.PublisherSource.Subscriber;
import io.servicetalk.concurrent.PublisherSource.Subscription;
Expand Down Expand Up @@ -68,6 +69,8 @@

import static io.netty.handler.codec.dns.DefaultDnsRecordDecoder.decodeName;
import static io.netty.handler.codec.dns.DnsRecordType.SRV;
import static io.servicetalk.client.api.ServiceDiscoveryStatus.AVAILABLE;
import static io.servicetalk.client.api.ServiceDiscoveryStatus.UNAVAILABLE;
import static io.servicetalk.client.api.internal.ServiceDiscovererUtils.calculateDifference;
import static io.servicetalk.concurrent.api.AsyncCloseables.toAsyncCloseable;
import static io.servicetalk.concurrent.api.Completable.completed;
Expand Down Expand Up @@ -219,7 +222,8 @@ public Publisher<Collection<ServiceDiscovererEvent<InetSocketAddress>>> dnsSrvQu
.flatMapConcatIterable(identity())
.flatMapMerge(srvEvent -> {
assertInEventloop();
if (srvEvent.isAvailable()) {
// TODO: consider other events than AVAILABLE and UNAVAILABLE
if (srvEvent.status() == AVAILABLE) {
return defer(() -> {
final ARecordPublisher aPublisher =
new ARecordPublisher(srvEvent.address().hostName(), discoveryObserver);
Expand Down Expand Up @@ -708,11 +712,11 @@ private List<ServiceDiscovererEvent<T>> generateInactiveEvent() {
final List<ServiceDiscovererEvent<T>> events = new ArrayList<>(activeAddresses.size());
if (activeAddresses instanceof RandomAccess) {
for (int i = 0; i < activeAddresses.size(); ++i) {
events.add(new DefaultServiceDiscovererEvent<>(activeAddresses.get(i), false));
events.add(new DefaultServiceDiscovererEvent<>(activeAddresses.get(i), UNAVAILABLE));
}
} else {
for (final T address : activeAddresses) {
events.add(new DefaultServiceDiscovererEvent<>(address, false));
events.add(new DefaultServiceDiscovererEvent<>(address, UNAVAILABLE));
}
}
activeAddresses = emptyList();
Expand All @@ -728,21 +732,22 @@ private static Publisher<? extends Collection<ServiceDiscovererEvent<InetSocketA
ArrayList<ServiceDiscovererEvent<InetSocketAddress>> mappedEvents = new ArrayList<>(events.size());
for (ServiceDiscovererEvent<InetAddress> event : events) {
InetSocketAddress addr = new InetSocketAddress(event.address(), port);
if (event.isAvailable()) {
final ServiceDiscoveryStatus status = event.status();
if (status == AVAILABLE) {
Integer count = availableAddresses.get(addr);
if (count == null) {
mappedEvents.add(new DefaultServiceDiscovererEvent<>(addr, true));
mappedEvents.add(new DefaultServiceDiscovererEvent<>(addr, status));
availableAddresses.put(addr, 1);
} else {
availableAddresses.put(addr, count + 1);
}
} else {
} else { // TODO: consider other events than UNAVAILABLE
Integer count = availableAddresses.get(addr);
if (count == null) {
throw new IllegalStateException("null count for: " + addr);
}
if (count == 1) {
mappedEvents.add(new DefaultServiceDiscovererEvent<>(addr, false));
mappedEvents.add(new DefaultServiceDiscovererEvent<>(addr, status));
availableAddresses.remove(addr);
} else {
availableAddresses.put(addr, count - 1);
Expand Down Expand Up @@ -864,8 +869,8 @@ public T address() {
}

@Override
public boolean isAvailable() {
return false;
public ServiceDiscoveryStatus status() {
return UNAVAILABLE;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ static <T, R> List<ServiceDiscovererEvent<R>> mapEventList(final Collection<Serv
final Function<T, R> mapper) {
List<ServiceDiscovererEvent<R>> result = new ArrayList<>(original.size());
for (ServiceDiscovererEvent<T> evt : original) {
result.add(new DefaultServiceDiscovererEvent<>(mapper.apply(evt.address()), evt.isAvailable()));
result.add(new DefaultServiceDiscovererEvent<>(mapper.apply(evt.address()), evt.status()));
}
return result;
}
Expand Down
Loading