Skip to content

Commit

Permalink
XdsTestServer: add --xds_server_mode
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiitk committed Oct 17, 2024
1 parent ad4d9f9 commit f347251
Showing 1 changed file with 80 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.grpc.ServerCredentials;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.gcp.csm.observability.CsmObservability;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
Expand Down Expand Up @@ -82,6 +83,7 @@ public final class XdsTestServer {
private int port = 8080;
private int maintenancePort = 8080;
private boolean secureMode = false;
private boolean xdsServerMode = false;
private boolean enableCsmObservability;
private String serverId = "java_server";
private HealthStatusManager health;
Expand Down Expand Up @@ -142,7 +144,10 @@ void parseArgs(String[] args) {
maintenancePort = Integer.valueOf(value);
} else if ("secure_mode".equals(key)) {
secureMode = Boolean.parseBoolean(value);
} else if ("enable_csm_observability".equals(key)) {
} else if ("xds_server_mode".equals(key)) {
xdsServerMode = Boolean.parseBoolean(value);
}
else if ("enable_csm_observability".equals(key)) {
enableCsmObservability = Boolean.valueOf(value);
} else if ("server_id".equals(key)) {
serverId = value;
Expand All @@ -163,6 +168,9 @@ void parseArgs(String[] args) {
+ maintenancePort);
usage = true;
}
if (secureMode) {
xdsServerMode = true;
}

if (usage) {
XdsTestServer s = new XdsTestServer();
Expand All @@ -179,6 +187,9 @@ void parseArgs(String[] args) {
+ " port and maintenance_port should be different for secure mode."
+ "\n Default: "
+ s.secureMode
+ "\n --xds_server_mode=BOOLEAN Start in xDS Server mode."
+ "\n Default: "
+ s.xdsServerMode
+ "\n --enable_csm_observability=BOOL Enable CSM observability reporting. Default: "
+ s.enableCsmObservability
+ "\n --server_id=STRING server ID for response."
Expand Down Expand Up @@ -211,74 +222,93 @@ void start() throws Exception {
logger.log(Level.SEVERE, "Failed to get host", e);
throw new RuntimeException(e);
}

health = new HealthStatusManager();
ServerServiceDefinition testServiceInterceptor = ServerInterceptors.intercept(
new TestServiceImpl(serverId, host),
new TestInfoInterceptor(host));
ServerCredentials insecureServerCreds = InsecureServerCredentials.create();

if (secureMode) {
if (addressType != Util.AddressType.IPV4_IPV6) {
throw new IllegalArgumentException("Secure mode only supports IPV4_IPV6 address type");
}
maintenanceServer =
Grpc.newServerBuilderForPort(maintenancePort, InsecureServerCredentials.create())
Grpc.newServerBuilderForPort(maintenancePort, insecureServerCreds)
.addService(new XdsUpdateHealthServiceImpl(health))
.addService(health.getHealthService())
.addService(ProtoReflectionService.newInstance())
.addService(ProtoReflectionServiceV1.newInstance())
.addServices(AdminInterface.getStandardServices())
.build();
maintenanceServer.start();
server =
XdsServerBuilder.forPort(
port, XdsServerCredentials.create(InsecureServerCredentials.create()))
.addService(
ServerInterceptors.intercept(
new TestServiceImpl(serverId, host), new TestInfoInterceptor(host)))
server = XdsServerBuilder.forPort(port, XdsServerCredentials.create(insecureServerCreds))
.addService(testServiceInterceptor)
.build();
server.start();
} else {
ServerBuilder<?> serverBuilder;
ServerCredentials insecureServerCreds = InsecureServerCredentials.create();
switch (addressType) {
case IPV4_IPV6:
serverBuilder = Grpc.newServerBuilderForPort(port, insecureServerCreds);
break;
case IPV4:
SocketAddress v4Address = Util.getV4Address(port);
InetSocketAddress localV4Address = new InetSocketAddress("127.0.0.1", port);
serverBuilder = NettyServerBuilder.forAddress(
localV4Address, insecureServerCreds);
if (v4Address != null && !v4Address.equals(localV4Address) ) {
((NettyServerBuilder) serverBuilder).addListenAddress(v4Address);
}
break;
case IPV6:
List<SocketAddress> v6Addresses = Util.getV6Addresses(port);
InetSocketAddress localV6Address = new InetSocketAddress("::1", port);
serverBuilder = NettyServerBuilder.forAddress(localV6Address, insecureServerCreds);
for (SocketAddress address : v6Addresses) {
if (!address.equals(localV6Address)) {
((NettyServerBuilder) serverBuilder).addListenAddress(address);
}
}
break;
default:
throw new AssertionError("Unknown address type: " + addressType);
}

logger.info("Starting server on port " + port + " with address type " + addressType);
health.setStatus("", ServingStatus.SERVING);
return;
}

server =
serverBuilder
.addService(
ServerInterceptors.intercept(
new TestServiceImpl(serverId, host), new TestInfoInterceptor(host)))
.addService(new XdsUpdateHealthServiceImpl(health))
.addService(health.getHealthService())
.addService(ProtoReflectionService.newInstance())
.addService(ProtoReflectionServiceV1.newInstance())
.addServices(AdminInterface.getStandardServices())
.build();
if (xdsServerMode) {
if (addressType != Util.AddressType.IPV4_IPV6) {
throw new IllegalArgumentException("xDS Server mode only supports IPV4_IPV6 address type");
}
server = XdsServerBuilder.forPort(port, XdsServerCredentials.create(insecureServerCreds))
.addService(testServiceInterceptor)
.addService(new XdsUpdateHealthServiceImpl(health))
.addService(health.getHealthService())
.addService(ProtoReflectionService.newInstance())
.addService(ProtoReflectionServiceV1.newInstance())
.addServices(AdminInterface.getStandardServices())
.build();
server.start();
maintenanceServer = null;
health.setStatus("", ServingStatus.SERVING);
return;
}

ServerBuilder<?> serverBuilder;
switch (addressType) {
case IPV4_IPV6:
serverBuilder = Grpc.newServerBuilderForPort(port, insecureServerCreds);
break;
case IPV4:
SocketAddress v4Address = Util.getV4Address(port);
InetSocketAddress localV4Address = new InetSocketAddress("127.0.0.1", port);
serverBuilder = NettyServerBuilder.forAddress(
localV4Address, insecureServerCreds);
if (v4Address != null && !v4Address.equals(localV4Address) ) {
((NettyServerBuilder) serverBuilder).addListenAddress(v4Address);
}
break;
case IPV6:
List<SocketAddress> v6Addresses = Util.getV6Addresses(port);
InetSocketAddress localV6Address = new InetSocketAddress("::1", port);
serverBuilder = NettyServerBuilder.forAddress(localV6Address, insecureServerCreds);
for (SocketAddress address : v6Addresses) {
if (!address.equals(localV6Address)) {
((NettyServerBuilder) serverBuilder).addListenAddress(address);
}
}
break;
default:
throw new AssertionError("Unknown address type: " + addressType);
}

logger.info("Starting server on port " + port + " with address type " + addressType);

server =
serverBuilder
.addService(testServiceInterceptor)
.addService(new XdsUpdateHealthServiceImpl(health))
.addService(health.getHealthService())
.addService(ProtoReflectionService.newInstance())
.addService(ProtoReflectionServiceV1.newInstance())
.addServices(AdminInterface.getStandardServices())
.build();
server.start();
maintenanceServer = null;
health.setStatus("", ServingStatus.SERVING);
}

Expand Down

0 comments on commit f347251

Please sign in to comment.