Skip to content

Commit

Permalink
Separate ExecutingStatement response from QueuedStatement
Browse files Browse the repository at this point in the history
  • Loading branch information
prithvip committed Aug 15, 2024
1 parent 0c273df commit 6072d37
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@
import com.facebook.presto.operator.OperatorInfo;
import com.facebook.presto.resourcemanager.ForResourceManager;
import com.facebook.presto.resourcemanager.ResourceManagerProxy;
import com.facebook.presto.server.protocol.ExecutingQueryResponseProvider;
import com.facebook.presto.server.protocol.ExecutingStatementResource;
import com.facebook.presto.server.protocol.LocalExecutingQueryResponseProvider;
import com.facebook.presto.server.protocol.LocalQueryProvider;
import com.facebook.presto.server.protocol.QueryBlockingRateLimiter;
import com.facebook.presto.server.protocol.QueuedStatementResource;
Expand Down Expand Up @@ -187,6 +189,7 @@ protected void setup(Binder binder)
newExporter(binder).export(QueryBlockingRateLimiter.class).withGeneratedName();

binder.bind(LocalQueryProvider.class).in(Scopes.SINGLETON);
binder.bind(ExecutingQueryResponseProvider.class).to(LocalExecutingQueryResponseProvider.class).in(Scopes.SINGLETON);

jaxrsBinder(binder).bind(TaskInfoResource.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.server.protocol;

import com.facebook.presto.dispatcher.DispatchInfo;
import com.facebook.presto.spi.QueryId;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;

import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;

import java.util.Optional;

public interface ExecutingQueryResponseProvider
{
/**
* Generally, the Presto protocol redirects the client from QueuedStatementResource
* to ExecutingStatementResource once the query has been de-queued and begun execution.
* But this redirect might add too much latency for certain very low latency use cases.
* This interface allows for a response from ExecutingStatementResource to be wired into
* QueuedStatementResource, so that the client can receive results directly from the
* QueuedStatement endpoint, without having to be redirected to the ExecutingStatement endpoint.
*
* This interface is required for https://github.com/prestodb/presto/issues/23455
*
* @param queryId query id
* @param slug nonce to protect the query
* @param dispatchInfo information about state of the query
* @param uriInfo endpoint URI
* @param xPrestoPrefixUrl prefix URL, that is useful if a proxy is being used
* @param scheme HTTP scheme
* @param maxWait duration to wait for query results
* @param targetResultSize target result size of first response
* @param compressionEnabled enable compression
* @param nestedDataSerializationEnabled enable nested data serialization
* @param binaryResults generate results in binary format, rather than JSON
* @return the ExecutingStatement's Response, if available
*/
Optional<ListenableFuture<Response>> waitForExecutingResponse(
QueryId queryId,
String slug,
DispatchInfo dispatchInfo,
UriInfo uriInfo,
String xPrestoPrefixUrl,
String scheme,
Duration maxWait,
DataSize targetResultSize,
boolean compressionEnabled,
boolean nestedDataSerializationEnabled,
boolean binaryResults);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.server.protocol;

import com.facebook.presto.dispatcher.DispatchInfo;
import com.facebook.presto.spi.QueryId;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;

import javax.inject.Inject;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;

import java.util.Optional;

import static com.google.common.util.concurrent.Futures.transform;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.Objects.requireNonNull;

public class LocalExecutingQueryResponseProvider
implements ExecutingQueryResponseProvider
{
private final LocalQueryProvider queryProvider;

@Inject
public LocalExecutingQueryResponseProvider(LocalQueryProvider queryProvider)
{
this.queryProvider = requireNonNull(queryProvider, "queryProvider is null");
}

@Override
public Optional<ListenableFuture<Response>> waitForExecutingResponse(
QueryId queryId,
String slug,
DispatchInfo dispatchInfo,
UriInfo uriInfo,
String xPrestoPrefixUrl,
String scheme,
Duration maxWait,
DataSize targetResultSize,
boolean compressionEnabled,
boolean nestedDataSerializationEnabled,
boolean binaryResults)
{
Query query;
try {
query = queryProvider.getQuery(queryId, slug);
}
catch (WebApplicationException e) {
return Optional.empty();
}
return Optional.of(transform(
query.waitForResults(0, uriInfo, scheme, maxWait, targetResultSize, binaryResults),
results -> QueryResourceUtil.toResponse(query, results, xPrestoPrefixUrl, compressionEnabled, nestedDataSerializationEnabled),
directExecutor()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
import static com.google.common.net.HttpHeaders.X_FORWARDED_PROTO;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.Futures.transform;
import static com.google.common.util.concurrent.Futures.transformAsync;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
Expand All @@ -116,7 +115,7 @@ public class QueuedStatementResource
private static final Duration NO_DURATION = new Duration(0, MILLISECONDS);

private final DispatchManager dispatchManager;
private final LocalQueryProvider queryResultsProvider;
private final ExecutingQueryResponseProvider executingQueryResponseProvider;

private final Executor responseExecutor;
private final ScheduledExecutorService timeoutExecutor;
Expand All @@ -137,15 +136,15 @@ public class QueuedStatementResource
public QueuedStatementResource(
DispatchManager dispatchManager,
DispatchExecutor executor,
LocalQueryProvider queryResultsProvider,
ExecutingQueryResponseProvider executingQueryResponseProvider,
SqlParserOptions sqlParserOptions,
ServerConfig serverConfig,
TracerProviderManager tracerProviderManager,
SessionPropertyManager sessionPropertyManager,
QueryBlockingRateLimiter queryRateLimiter)
{
this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null");
this.queryResultsProvider = queryResultsProvider;
this.executingQueryResponseProvider = requireNonNull(executingQueryResponseProvider, "executingQueryResponseProvider is null");
this.sqlParserOptions = requireNonNull(sqlParserOptions, "sqlParserOptions is null");
this.compressionEnabled = requireNonNull(serverConfig, "serverConfig is null").isQueryResultsCompressionEnabled();
this.nestedDataSerializationEnabled = requireNonNull(serverConfig, "serverConfig is null").isNestedDataSerializationEnabled();
Expand Down Expand Up @@ -221,7 +220,7 @@ public Response postStatement(
sqlParserOptions,
tracerProviderManager.getTracerProvider(),
Optional.of(sessionPropertyManager));
Query query = new Query(statement, sessionContext, dispatchManager, queryResultsProvider, 0);
Query query = new Query(statement, sessionContext, dispatchManager, executingQueryResponseProvider, 0);
queries.put(query.getQueryId(), query);

return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build();
Expand Down Expand Up @@ -258,7 +257,7 @@ public Response retryFailedQuery(
"-- retry query " + queryId + "; attempt: " + retryCount + "\n" + failedQuery.getQuery(),
failedQuery.getSessionContext(),
dispatchManager,
queryResultsProvider,
executingQueryResponseProvider,
retryCount);

retriedQueries.putIfAbsent(queryId, query);
Expand Down Expand Up @@ -453,7 +452,7 @@ private static final class Query
private final String query;
private final SessionContext sessionContext;
private final DispatchManager dispatchManager;
private final LocalQueryProvider queryProvider;
private final ExecutingQueryResponseProvider executingQueryResponseProvider;
private final QueryId queryId;
private final String slug = "x" + randomUUID().toString().toLowerCase(ENGLISH).replace("-", "");
private final AtomicLong lastToken = new AtomicLong();
Expand All @@ -462,12 +461,12 @@ private static final class Query
@GuardedBy("this")
private ListenableFuture<?> querySubmissionFuture;

public Query(String query, SessionContext sessionContext, DispatchManager dispatchManager, LocalQueryProvider queryResultsProvider, int retryCount)
public Query(String query, SessionContext sessionContext, DispatchManager dispatchManager, ExecutingQueryResponseProvider executingQueryResponseProvider, int retryCount)
{
this.query = requireNonNull(query, "query is null");
this.sessionContext = requireNonNull(sessionContext, "sessionContext is null");
this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null");
this.queryProvider = requireNonNull(queryResultsProvider, "queryExecutor is null");
this.executingQueryResponseProvider = requireNonNull(executingQueryResponseProvider, "executingQueryResponseProvider is null");
this.queryId = dispatchManager.createQueryId();
this.retryCount = retryCount;
}
Expand Down Expand Up @@ -565,7 +564,15 @@ public synchronized QueryResults getInitialQueryResults(UriInfo uriInfo, String
binaryResults);
}

public ListenableFuture<Response> toResponse(long token, UriInfo uriInfo, String xForwardedProto, String xPrestoPrefixUrl, Duration maxWait, boolean compressionEnabled, boolean nestedDataSerializationEnabled, boolean binaryResults)
public ListenableFuture<Response> toResponse(
long token,
UriInfo uriInfo,
String xForwardedProto,
String xPrestoPrefixUrl,
Duration maxWait,
boolean compressionEnabled,
boolean nestedDataSerializationEnabled,
boolean binaryResults)
{
long lastToken = this.lastToken.get();
// token should be the last token or the next token
Expand Down Expand Up @@ -597,23 +604,28 @@ public ListenableFuture<Response> toResponse(long token, UriInfo uriInfo, String
.build()));
}

if (!waitForDispatched().isDone()) {
return immediateFuture(withCompressionConfiguration(Response.ok(createQueryResults(token + 1, uriInfo, xForwardedProto, xPrestoPrefixUrl, dispatchInfo.get(), binaryResults)), compressionEnabled).build());
if (waitForDispatched().isDone()) {
Optional<ListenableFuture<Response>> executingQueryResponse = executingQueryResponseProvider.waitForExecutingResponse(
queryId,
slug,
dispatchInfo.get(),
uriInfo,
xPrestoPrefixUrl,
getScheme(xForwardedProto, uriInfo),
maxWait,
TARGET_RESULT_SIZE,
compressionEnabled,
nestedDataSerializationEnabled,
binaryResults);

if (executingQueryResponse.isPresent()) {
return executingQueryResponse.get();
}
}

com.facebook.presto.server.protocol.Query query;
try {
query = queryProvider.getQuery(queryId, slug);
}
catch (WebApplicationException e) {
return immediateFuture(withCompressionConfiguration(Response.ok(createQueryResults(token + 1, uriInfo, xForwardedProto, xPrestoPrefixUrl, dispatchInfo.get(), binaryResults)), compressionEnabled).build());
}
// If this future completes successfully, the next URI will redirect to the executing statement endpoint.
// Hence it is safe to hardcode the token to be 0.
return transform(
query.waitForResults(0, uriInfo, getScheme(xForwardedProto, uriInfo), maxWait, TARGET_RESULT_SIZE, binaryResults),
results -> QueryResourceUtil.toResponse(query, results, xPrestoPrefixUrl, compressionEnabled, nestedDataSerializationEnabled),
directExecutor());
return immediateFuture(withCompressionConfiguration(Response.ok(
createQueryResults(token + 1, uriInfo, xForwardedProto, xPrestoPrefixUrl, dispatchInfo.get(), binaryResults)), compressionEnabled)
.build());
}

public synchronized void cancel()
Expand Down

0 comments on commit 6072d37

Please sign in to comment.