Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Ticket/Descriptor error messages for all gRPC usages #1174

Merged
merged 3 commits into from
Sep 1, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private void blockUntilShutdown() throws InterruptedException {
private int nextId = 1;

private void startConsole() {
consoleTicket = ExportTicketHelper.exportIdToTicket(nextId++);
consoleTicket = ExportTicketHelper.wrapExportIdInTicket(nextId++);
consoleServiceGrpc.startConsole(StartConsoleRequest.newBuilder()
.setResultId(consoleTicket)
.setSessionType(sessionType)
Expand Down Expand Up @@ -193,7 +193,7 @@ private void awaitCommand() {
firstTable.ifPresent(table -> {
log.debug().append("A table was created: ").append(table.toString()).endl();
tableServiceGrpc.fetchTable(FetchTableRequest.newBuilder()
.setResultId(ExportTicketHelper.exportIdToTicket(nextId++))
.setResultId(ExportTicketHelper.wrapExportIdInTicket(nextId++))
.setSourceId(TableReference.newBuilder()
.setTicket(ScopeTicketResolver.ticketForName(table.getTitle())))
.build(),
Expand Down Expand Up @@ -255,7 +255,8 @@ private void onExportedTableCreationResponse(final ExportedTableCreationResponse
final LogEntry entry = log.info().append("Received ExportedTableCreationResponse for {");

if (result.getResultId().hasTicket()) {
entry.append("exportId: ").append(ExportTicketHelper.ticketToExportId(result.getResultId().getTicket()));
entry.append("exportId: ")
.append(ExportTicketHelper.ticketToExportId(result.getResultId().getTicket(), "resultId"));
} else {
entry.append("batchOffset: ").append(result.getResultId().getBatchOffset());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.deephaven.grpc_api.util.ExportTicketHelper;
import io.deephaven.grpc_api.util.Scheduler;
import io.deephaven.grpc_api_client.table.BarrageTable;
import io.deephaven.grpc_api_client.util.BarrageProtoUtil;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.log.LogEntry;
import io.deephaven.io.logger.Logger;
Expand All @@ -31,7 +30,6 @@
import io.deephaven.proto.backplane.grpc.ExportedTableUpdatesRequest;
import io.deephaven.proto.backplane.grpc.HandshakeRequest;
import io.deephaven.proto.backplane.grpc.HandshakeResponse;
import io.deephaven.proto.backplane.grpc.ReleaseResponse;
import io.deephaven.proto.backplane.grpc.SelectOrUpdateRequest;
import io.deephaven.proto.backplane.grpc.SessionServiceGrpc;
import io.deephaven.proto.backplane.grpc.TableReference;
Expand Down Expand Up @@ -128,8 +126,8 @@ private int nextExportId() {
return ++nextTableId;
}

final Ticket exportTable = ExportTicketHelper.exportIdToTicket(nextExportId());
final Ticket putResultTicket = ExportTicketHelper.exportIdToTicket(nextExportId());
final Ticket exportTable = ExportTicketHelper.wrapExportIdInTicket(nextExportId());
final Ticket putResultTicket = ExportTicketHelper.wrapExportIdInTicket(nextExportId());

BarrageTable resultTable;
BarrageClientSubscription resultSub;
Expand Down Expand Up @@ -171,7 +169,7 @@ private void runScript() {
.build());

flightService.getSchema(
ExportTicketHelper.ticketToDescriptor(exportTable),
ExportTicketHelper.ticketToDescriptor(exportTable, "exportTable"),
new ResponseBuilder<Flight.SchemaResult>()
.onError(this::onError)
.onNext(this::onSchemaResult)
Expand Down Expand Up @@ -218,7 +216,7 @@ public void onUpdate(final Update update) {
resultTable.listenForUpdates(listener);

resultSub = new BarrageClientSubscription(
ExportTicketHelper.toReadableString(exportTable),
ExportTicketHelper.toReadableString(exportTable, "exportTable"),
serverChannel, BarrageClientSubscription.makeRequest(null, columns),
new BarrageStreamReader(), resultTable);
}
Expand All @@ -238,7 +236,8 @@ private void onExportedTableCreationResponse(final ExportedTableCreationResponse
final LogEntry entry = log.info().append("Received ExportedTableCreationResponse for {");

if (result.getResultId().hasTicket()) {
entry.append("exportId: ").append(ExportTicketHelper.ticketToExportId(result.getResultId().getTicket()));
entry.append("exportId: ")
.append(ExportTicketHelper.ticketToExportId(result.getResultId().getTicket(), "resultId"));
} else {
entry.append("batchOffset: ").append(result.getResultId().getBatchOffset());
}
Expand Down Expand Up @@ -270,7 +269,7 @@ private void onExportedTableCreationResponse(final ExportedTableCreationResponse

private void onExportNotificationMessage(final String prefix, final ExportNotification notification) {
final LogEntry entry = log.info().append(prefix).append("Received ExportNotification: {id: ")
.append(ExportTicketHelper.ticketToExportId(notification.getTicket()))
.append(ExportTicketHelper.ticketToExportId(notification.getTicket(), "ticket"))
.append(", state: ").append(notification.getExportState().toString());

if (!notification.getContext().isEmpty()) {
Expand All @@ -286,7 +285,7 @@ private void onTableUpdate(final ExportedTableUpdateMessage msg) {
log.info().append("Received ExportedTableUpdatedMessage:").endl();

final LogEntry entry = log.info().append("\tid=")
.append(ExportTicketHelper.ticketToExportId(msg.getExportId()))
.append(ExportTicketHelper.ticketToExportId(msg.getExportId(), "exportId"))
.append(" size=").append(msg.getSize());

if (!msg.getUpdateFailureMessage().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ protected void after() {
@Test
public void testSimpleEmptyTableDoGet() {
Flight.Ticket simpleTableTicket = ExportTicketHelper.exportIdToArrowTicket(1);
currentSession.newExport(simpleTableTicket)
currentSession.newExport(simpleTableTicket, "test")
.submit(() -> TableTools.emptyTable(10).update("I=i"));

FlightStream stream = client.getStream(new Ticket(simpleTableTicket.getTicket().toByteArray()));
Expand Down Expand Up @@ -340,7 +340,7 @@ public void testExportTicketVisibility() {
// flight ticket can be resolved).
final Flight.Ticket ticket = ExportTicketHelper.exportIdToArrowTicket(1);
final Table table = TableTools.emptyTable(10).update("I = i");
currentSession.newExport(ticket).submit(() -> table);
currentSession.newExport(ticket, "test").submit(() -> table);

// test fetch info from export ticket
final FlightInfo info = client.getInfo(FlightDescriptor.path("export", "1"));
Expand Down Expand Up @@ -377,7 +377,7 @@ private void assertSchemaMatchesTable(Schema schema, Table table) {
private void assertRoundTripDataEqual(Table deephavenTable) throws InterruptedException, ExecutionException {
// bind the table in the session
Flight.Ticket dhTableTicket = ExportTicketHelper.exportIdToArrowTicket(nextTicket++);
currentSession.newExport(dhTableTicket).submit(() -> deephavenTable);
currentSession.newExport(dhTableTicket, "test").submit(() -> deephavenTable);

// fetch with DoGet
FlightStream stream = client.getStream(new Ticket(dhTableTicket.getTicket().toByteArray()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,66 +51,75 @@ public synchronized void onApplicationLoad(final ApplicationState app) {
}

@Override
public <T> SessionState.ExportObject<T> resolve(final @Nullable SessionState session, final ByteBuffer ticket) {
return resolve(appFieldIdFor(ticket));
public <T> SessionState.ExportObject<T> resolve(
final @Nullable SessionState session, final ByteBuffer ticket, final String logId) {
return resolve(appFieldIdFor(ticket, logId), logId);
}

@Override
public <T> SessionState.ExportObject<T> resolve(final @Nullable SessionState session,
final Flight.FlightDescriptor descriptor) {
return resolve(appFieldIdFor(descriptor));
public <T> SessionState.ExportObject<T> resolve(
final @Nullable SessionState session, final Flight.FlightDescriptor descriptor, final String logId) {
return resolve(appFieldIdFor(descriptor, logId), logId);
}

private <T> SessionState.ExportObject<T> resolve(final AppFieldId id) {
private <T> SessionState.ExportObject<T> resolve(final AppFieldId id, final String logId) {
if (id.app == null) {
throw new IllegalStateException("Cannot resolve field without knowing the application");
throw GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION,
"Could not resolve '" + logId + "': field '" + getLogNameFor(id)
+ "' does not belong to an application");
}
final Field<Object> field = id.app.getField(id.fieldName);
if (field == null) {
throw GrpcUtil.statusRuntimeException(Code.NOT_FOUND, "ticket not found: " + getLogNameFor(id));
throw GrpcUtil.statusRuntimeException(Code.NOT_FOUND,
"Could not resolve '" + logId + "': field '" + getLogNameFor(id) + "' not found");
}
// noinspection unchecked
return SessionState.wrapAsExport((T) field.value());
}

@Override
public SessionState.ExportObject<Flight.FlightInfo> flightInfoFor(final @Nullable SessionState session,
final Flight.FlightDescriptor descriptor) {
final AppFieldId id = appFieldIdFor(descriptor);
public SessionState.ExportObject<Flight.FlightInfo> flightInfoFor(
final @Nullable SessionState session, final Flight.FlightDescriptor descriptor, final String logId) {
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
final AppFieldId id = appFieldIdFor(descriptor, logId);
if (id.app == null) {
throw new IllegalStateException("Cannot resolve field without knowing the application");
throw GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION,
"Could not resolve '" + logId + "': field does not belong to an application");
}

final Flight.FlightInfo info;
synchronized (id.app) {
Field<?> field = id.app.getField(id.fieldName);
if (field == null) {
throw GrpcUtil.statusRuntimeException(Code.NOT_FOUND, "no flight found: " + getLogNameFor(id));
throw GrpcUtil.statusRuntimeException(Code.NOT_FOUND,
"Could not resolve '" + logId + "': field '" + getLogNameFor(id) + "' not found");
}
Object value = field.value();
if (value instanceof Table) {
info = TicketRouter.getFlightInfo((Table) value, descriptor, flightTicketForName(id.app, id.fieldName));
} else {
throw GrpcUtil.statusRuntimeException(Code.NOT_FOUND, "flight not found: " + getLogNameFor(id));
throw GrpcUtil.statusRuntimeException(Code.NOT_FOUND,
"Could not resolve '" + logId + "': field '" + getLogNameFor(id) + "' is not a flight");
}
}

return SessionState.wrapAsExport(info);
}

@Override
public <T> SessionState.ExportBuilder<T> publish(SessionState session, ByteBuffer ticket) {
public <T> SessionState.ExportBuilder<T> publish(
SessionState session, ByteBuffer ticket, final String logId) {
throw new UnsupportedOperationException("applications cannot be published to");
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public <T> SessionState.ExportBuilder<T> publish(SessionState session, Flight.FlightDescriptor descriptor) {
public <T> SessionState.ExportBuilder<T> publish(
final SessionState session, final Flight.FlightDescriptor descriptor, final String logId) {
throw new UnsupportedOperationException("applications cannot be published to");
}

@Override
public String getLogNameFor(final ByteBuffer ticket) {
return getLogNameFor(appFieldIdFor(ticket));
public String getLogNameFor(final ByteBuffer ticket, final String logId) {
return getLogNameFor(appFieldIdFor(ticket, logId));
}

private String getLogNameFor(final AppFieldId id) {
Expand Down Expand Up @@ -176,9 +185,10 @@ public static Flight.FlightDescriptor descriptorForName(final ApplicationState a
.build();
}

private AppFieldId appFieldIdFor(final ByteBuffer ticket) {
private AppFieldId appFieldIdFor(final ByteBuffer ticket, final String logId) {
if (ticket == null) {
throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Ticket not supplied");
throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION,
"Could not resolve '" + logId + "': ticket not supplied");
}

final String ticketAsString;
Expand All @@ -189,7 +199,7 @@ private AppFieldId appFieldIdFor(final ByteBuffer ticket) {
ticketAsString = decoder.decode(ticket).toString();
} catch (CharacterCodingException e) {
throw GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION,
"Cannot parse ticket: failed to decode: " + e.getMessage());
"Could not resolve '" + logId + "': failed to decode: " + e.getMessage());
} finally {
ticket.position(initialPosition);
ticket.limit(initialLimit);
Expand All @@ -199,45 +209,49 @@ private AppFieldId appFieldIdFor(final ByteBuffer ticket) {
final int endOfAppId = ticketAsString.indexOf('/', endOfRoute + 1);
final int endOfFieldSegment = ticketAsString.indexOf('/', endOfAppId + 1);
if (endOfAppId == -1 || endOfFieldSegment == -1) {
throw GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION, "Ticket does conform to expected format");
throw GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION,
"Could not resolve '" + logId + "': ticket does conform to expected format");
}
final String appId = ticketAsString.substring(endOfRoute + 1, endOfAppId);
final String fieldName = ticketAsString.substring(endOfFieldSegment + 1);

final ApplicationState app = applicationMap.get(appId);
if (app == null) {
throw GrpcUtil.statusRuntimeException(Code.NOT_FOUND,
"No application exists with the identifier: " + appId);
"Could not resolve '" + logId + "': no application exists with the identifier: " + appId);
}

return AppFieldId.from(app, fieldName);
}

private AppFieldId appFieldIdFor(final Flight.FlightDescriptor descriptor) {
private AppFieldId appFieldIdFor(final Flight.FlightDescriptor descriptor, final String logId) {
if (descriptor == null) {
throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Descriptor not supplied");
throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION,
"Could not resolve '" + logId + "': descriptor not supplied");
}

if (descriptor.getType() != Flight.FlightDescriptor.DescriptorType.PATH) {
throw GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION, "Cannot parse descriptor: not a path");
throw GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION,
"Could not resolve '" + logId + "': only flight paths are supported");
}

// current structure: a/app_id/f/field_name
if (descriptor.getPathCount() != 4) {
throw GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION,
"Cannot parse descriptor: unexpected path length (found: "
"Could not resolve '" + logId + "': unexpected path length (found: "
+ TicketRouterHelper.getLogNameFor(descriptor) + ", expected: 4)");
}

final String appId = descriptor.getPath(1);
final ApplicationState app = applicationMap.get(appId);
if (app == null) {
throw GrpcUtil.statusRuntimeException(Code.NOT_FOUND,
"No application exists with the identifier: " + appId);
"Could not resolve '" + logId + "': no application exists with the identifier: " + appId);
}

if (!descriptor.getPath(2).equals(FIELD_PATH_SEGMENT)) {
throw GrpcUtil.statusRuntimeException(Code.NOT_FOUND, "Path is not an application field.");
throw GrpcUtil.statusRuntimeException(Code.NOT_FOUND,
"Could not resolve '" + logId + "': path is not an application field");
}

return AppFieldId.from(app, descriptor.getPath(3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void onNext(final InputStream request) {
"Only one descriptor definition allowed");
}
resultExportBuilder = ticketRouter
.<Table>publish(session, mi.descriptor)
.<Table>publish(session, mi.descriptor, "Flight.Descriptor")
.onError(observer);
manage(resultExportBuilder.getExport());
}
Expand Down Expand Up @@ -428,7 +428,7 @@ public void onNext(final InputStream request) {
preExportSubscriptions = new ArrayDeque<>();
preExportSubscriptions.add(subscriptionRequest);
final SessionState.ExportObject<Object> parent =
ticketRouter.resolve(session, subscriptionRequest.ticketAsByteBuffer());
ticketRouter.resolve(session, subscriptionRequest.ticketAsByteBuffer(), "ticket");

onExportResolvedContinuation = session.nonExport()
.require(parent)
Expand Down Expand Up @@ -462,7 +462,7 @@ private synchronized void onExportResolved(final SessionState.ExportObject<Objec
}
} else {
GrpcUtil.safelyError(listener, Code.FAILED_PRECONDITION, "Ticket ("
+ ExportTicketHelper.toReadableString(subscriptionRequest.ticketAsByteBuffer())
+ ExportTicketHelper.toReadableString(subscriptionRequest.ticketAsByteBuffer(), "ticket")
+ ") is not a subscribable table.");
return;
}
Expand Down Expand Up @@ -491,7 +491,7 @@ private synchronized void onExportResolved(final SessionState.ExportObject<Objec

/**
* Update the existing subscription to match the new request.
*
*
* @param subscriptionRequest the requested view change
*/
private void apply(final BarrageSubscriptionRequest subscriptionRequest) {
Expand Down
Loading