Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes SubscribeToLogs example to work with authenticated channel #3400

Merged
merged 6 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.deephaven.client.impl.SessionImpl;
import io.deephaven.client.impl.SessionImplConfig;
import io.deephaven.proto.DeephavenChannel;
import io.deephaven.proto.DeephavenChannelImpl;
import io.grpc.Channel;
import io.grpc.ManagedChannel;

Expand All @@ -21,6 +22,9 @@ public interface SessionImplModule {
@Binds
Channel bindsManagedChannel(ManagedChannel managedChannel);

@Binds
DeephavenChannel bindsDeephavenChannelImpl(DeephavenChannelImpl deephavenChannelImpl);

@Provides
static SessionImpl session(DeephavenChannel channel, ScheduledExecutorService scheduler) {
return SessionImplConfig.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ class SubscribeToLogs extends SessionExampleBase {

@Override
protected void execute(SessionFactory sessionFactory) throws Exception {
try (final Session _session = sessionFactory.newSession()) {
try (final Session session = sessionFactory.newSession()) {
final Builder builder = LogSubscriptionRequest.newBuilder();
if (levels != null) {
for (String level : levels) {
builder.addLevels(level);
}
}
final Iterator<LogSubscriptionData> logs = sessionFactory
final Iterator<LogSubscriptionData> logs = session
.channel()
.consoleBlocking()
.subscribeToLogs(builder.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
*/
package io.deephaven.client.impl;

import io.deephaven.proto.DeephavenChannel;

import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -50,4 +52,11 @@ public interface Session
CompletableFuture<Void> release(ExportId exportId);

// ----------------------------------------------------------

/**
* The authenticated channel.
*
* @return the authenticated channel
*/
DeephavenChannel channel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@
*/
package io.deephaven.client.impl;

import io.deephaven.proto.DeephavenChannel;

import java.util.concurrent.CompletableFuture;

public interface SessionFactory {

Session newSession();

CompletableFuture<? extends Session> newSessionFuture();

DeephavenChannel channel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import com.google.protobuf.ByteString;
import io.deephaven.client.impl.script.Changes;
import io.deephaven.proto.DeephavenChannel;
import io.deephaven.proto.DeephavenChannelMixin;
import io.deephaven.proto.backplane.grpc.AddTableRequest;
import io.deephaven.proto.backplane.grpc.AddTableResponse;
import io.deephaven.proto.backplane.grpc.ApplicationServiceGrpc.ApplicationServiceStub;
Expand Down Expand Up @@ -35,6 +37,7 @@
import io.grpc.CallCredentials;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
Expand All @@ -46,7 +49,6 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -185,7 +187,7 @@ public void onCompleted() {
}
}

private final ScheduledExecutorService executor;
private final SessionImplConfig config;
private final SessionServiceStub sessionService;
private final ConsoleServiceStub consoleService;
private final ObjectServiceStub objectService;
Expand All @@ -194,22 +196,15 @@ public void onCompleted() {
private final Handler handler;
private final ExportTicketCreator exportTicketCreator;
private final ExportStates states;

private volatile AuthenticationInfo auth;

private final boolean delegateToBatch;
private final boolean mixinStacktrace;
private final Duration executeTimeout;
private final Duration closeTimeout;
private final TableHandleManagerSerial serialManager;
private final TableHandleManagerBatch batchManager;

private SessionImpl(SessionImplConfig config, Handler handler, AuthenticationInfo auth) {

CallCredentials credentials = new SessionCallCredentials();
SessionCallCredentials credentials = new SessionCallCredentials();
this.auth = Objects.requireNonNull(auth);
this.handler = Objects.requireNonNull(handler);
this.executor = config.executor();
this.config = Objects.requireNonNull(config);
this.sessionService = config.channel().session().withCallCredentials(credentials);
this.consoleService = config.channel().console().withCallCredentials(credentials);
this.objectService = config.channel().object().withCallCredentials(credentials);
Expand All @@ -218,12 +213,8 @@ private SessionImpl(SessionImplConfig config, Handler handler, AuthenticationInf
this.exportTicketCreator = new ExportTicketCreator();
this.states = new ExportStates(this, sessionService, config.channel().table().withCallCredentials(credentials),
exportTicketCreator);
this.delegateToBatch = config.delegateToBatch();
this.mixinStacktrace = config.mixinStacktrace();
this.executeTimeout = config.executeTimeout();
this.closeTimeout = config.closeTimeout();
this.serialManager = TableHandleManagerSerial.of(this);
this.batchManager = TableHandleManagerBatch.of(this, mixinStacktrace);
this.batchManager = TableHandleManagerBatch.of(this, config.mixinStacktrace());
}

public AuthenticationInfo auth() {
Expand Down Expand Up @@ -272,7 +263,7 @@ public CompletableFuture<FetchedObject> fetchObject(String type, HasTicketId tic
@Override
public void close() {
try {
closeFuture().get(closeTimeout.toNanos(), TimeUnit.NANOSECONDS);
closeFuture().get(config.closeTimeout().toNanos(), TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Interrupted waiting for session close");
Expand All @@ -294,7 +285,7 @@ public CompletableFuture<Void> closeFuture() {

@Override
protected TableHandleManager delegate() {
return delegateToBatch ? batchManager : serialManager;
return config.delegateToBatch() ? batchManager : serialManager;
}

@Override
Expand All @@ -304,7 +295,7 @@ public TableHandleManager batch() {

@Override
public TableHandleManager batch(boolean mixinStacktrace) {
if (this.mixinStacktrace == mixinStacktrace) {
if (this.config.mixinStacktrace() == mixinStacktrace) {
return batchManager;
}
return TableHandleManagerBatch.of(this, mixinStacktrace);
Expand All @@ -328,6 +319,11 @@ public CompletableFuture<Void> release(ExportId exportId) {
return observer.future;
}

@Override
public DeephavenChannel channel() {
return new DeephavenChannelWithCredentials();
}

@Override
public CompletableFuture<Void> addToInputTable(HasTicketId destination, HasTicketId source) {
final AddTableRequest request = AddTableRequest.newBuilder()
Expand Down Expand Up @@ -359,7 +355,7 @@ public Cancel subscribeToFields(Listener listener) {
}

public ScheduledExecutorService executor() {
return executor;
return config.executor();
}

public long batchCount() {
Expand All @@ -376,11 +372,11 @@ private void scheduleRefreshSessionToken(HandshakeResponse response) {
now + response.getTokenExpirationDelayMillis() / 3,
response.getTokenDeadlineTimeMillis() - response.getTokenExpirationDelayMillis() / 10);
final long refreshDelayMs = Math.max(targetRefreshTime - now, 0);
executor.schedule(SessionImpl.this::refreshSessionToken, refreshDelayMs, TimeUnit.MILLISECONDS);
config.executor().schedule(SessionImpl.this::refreshSessionToken, refreshDelayMs, TimeUnit.MILLISECONDS);
}

private void scheduleRefreshSessionTokenNow() {
executor.schedule(SessionImpl.this::refreshSessionToken, 0, TimeUnit.MILLISECONDS);
config.executor().schedule(SessionImpl.this::refreshSessionToken, 0, TimeUnit.MILLISECONDS);
}

private void refreshSessionToken() {
Expand Down Expand Up @@ -613,13 +609,13 @@ public Ticket ticket() {

@Override
public Changes executeCode(String code) throws InterruptedException, ExecutionException, TimeoutException {
return executeCodeFuture(code).get(executeTimeout.toNanos(), TimeUnit.NANOSECONDS);
return executeCodeFuture(code).get(config.executeTimeout().toNanos(), TimeUnit.NANOSECONDS);
}

@Override
public Changes executeScript(Path path)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
return executeScriptFuture(path).get(executeTimeout.toNanos(), TimeUnit.NANOSECONDS);
return executeScriptFuture(path).get(config.executeTimeout().toNanos(), TimeUnit.NANOSECONDS);
}

@Override
Expand Down Expand Up @@ -647,7 +643,7 @@ public CompletableFuture<Void> closeFuture() {
@Override
public void close() {
try {
closeFuture().get(closeTimeout.toNanos(), TimeUnit.NANOSECONDS);
closeFuture().get(config.closeTimeout().toNanos(), TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Interrupted waiting for console close");
Expand Down Expand Up @@ -818,4 +814,15 @@ public void onCompleted() {
listener.onCompleted();
}
}

private final class DeephavenChannelWithCredentials extends DeephavenChannelMixin {
public DeephavenChannelWithCredentials() {
super(config.channel());
}

@Override
protected <S extends AbstractStub<S>> S mixin(S stub) {
return stub.withCallCredentials(new SessionCallCredentials());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import io.deephaven.client.impl.ExportRequest.Listener;
import io.deephaven.proto.DeephavenChannel;
import io.deephaven.proto.DeephavenChannelImpl;
import io.deephaven.proto.backplane.grpc.BatchTableRequest;
import io.deephaven.proto.backplane.grpc.BatchTableRequest.Operation;
import io.deephaven.proto.backplane.grpc.ExportedTableCreationResponse;
Expand Down Expand Up @@ -89,7 +90,7 @@ public void setUp() throws IOException {
ManagedChannel channel = grpcCleanup
.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());

DeephavenChannel deephavenChannel = new DeephavenChannel(channel);
DeephavenChannel deephavenChannel = new DeephavenChannelImpl(channel);

states = new ExportStates(deephavenChannel.session(), deephavenChannel.table(), new ExportTicketCreator());
}
Expand Down
Loading