Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bump azure core dependency to mitigate woodstox CVE #15432

Closed
wants to merge 38 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
87b8e6f
Add Redshift tests
dain Dec 11, 2022
0312463
Add Redshift schema, table, and column length checks
dain Dec 11, 2022
7eab69a
Implement proper type mapping for Redshift
dain Dec 11, 2022
1e8887e
Implement Redshift DELETE
dain Dec 11, 2022
e65585d
Add Redshift statistics
dain Dec 11, 2022
c225b8f
Add Redshift pushdown
dain Dec 10, 2022
be62233
Test SET PATH support by clients
wendigo Dec 7, 2022
dfe33c7
Fix HTTP_Status on OAuth2 refresh token redirect
huberty89 Dec 7, 2022
c056bc7
Fix recording of projection metrics
raunaqmorarka Dec 10, 2022
a46a510
Fix dereference operations for union type in Hive Connector
leetcode-1533 Dec 2, 2022
0459735
Cleanup PartitioningExchanger
pettyjamesm Dec 9, 2022
1788829
Fix table name in TestDropTableTask
krvikash Dec 12, 2022
3c1c1fa
Document examples for datetime functions
rigogsilva Dec 12, 2022
58bd159
Decode path as URI in Delta Lake connector
jkylling Nov 25, 2022
35da905
Remove unused method from AstBuilder
ebyhr Dec 13, 2022
ce7b57f
Refactor BigQuery connector
ebyhr Nov 18, 2022
a660133
Fix projection pushdown when unsupported column exists in BigQuery
ebyhr Nov 19, 2022
cd8eac6
Update Iceberg to 1.1.0
Fokko Nov 17, 2022
b9d26a7
Document Top-N pushdown
findinpath Jul 5, 2021
f5b5fb8
Remove unnecessary override for `getTableProperties` method
findinpath Dec 14, 2022
ec8a8fd
Fix formatting
kasiafi Dec 8, 2022
7026432
Add requiredColumns field to TableFunctionAnalysis
kasiafi Nov 30, 2022
c3ee15f
Analyze table function's required input columns
kasiafi Nov 30, 2022
87f18c7
Add refresh interval for DB resource group manager
Chaho12 Dec 14, 2022
b7ed5ee
Fixing typo in SQL Server table function example syntax
mfzarko Dec 15, 2022
4a65f20
Disable preventive GC
Chaho12 Dec 13, 2022
ac661b9
Decide number of clients basing on average request size of client
radek-kondziolka Dec 15, 2022
e6512e6
Extract getting snapshotId from Iceberg metadata by types
mx123 Dec 15, 2022
5b37085
Use enhanced switch statement
mx123 Dec 14, 2022
4bf5d5e
Remove redundant scope parameter
lukasz-walkiewicz Dec 13, 2022
d2a5a38
Add test create Atop connector
mx123 Dec 15, 2022
22a524b
Add Delta Lake views docs
mdesmet Dec 15, 2022
e33b3ac
Fix error in Iceberg predicate pushdown to Parquet files
alexjo2144 Dec 14, 2022
8839cdc
Add materialized view GRACE PERIOD syntax
findepi Nov 18, 2022
d2c77ad
Report task input and output distribution in EXPLAIN ANALYZE VERBOSE
sopel39 Dec 2, 2022
1aeb8e4
Report if stage might be skewed in EXPLAIN ANALYZE
sopel39 Dec 2, 2022
f38ff0a
bump azure core dependency to mitigate woodstox CVE
tomrijntjes Dec 16, 2022
a4f92c2
remove trino exchange version pin; assume it is inferred
tomrijntjes Dec 16, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,20 @@

import okhttp3.OkHttpClient;

import java.util.Optional;
import java.util.Set;

public final class StatementClientFactory
{
private StatementClientFactory() {}

public static StatementClient newStatementClient(OkHttpClient httpClient, ClientSession session, String query)
{
return new StatementClientV1(httpClient, session, query);
return new StatementClientV1(httpClient, session, query, Optional.empty());
}

public static StatementClient newStatementClient(OkHttpClient httpClient, ClientSession session, String query, Optional<Set<String>> clientCapabilities)
{
return new StatementClientV1(httpClient, session, query, clientCapabilities);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.net.HttpHeaders.ACCEPT_ENCODING;
import static com.google.common.net.HttpHeaders.USER_AGENT;
import static io.trino.client.JsonCodec.jsonCodec;
Expand All @@ -56,6 +57,7 @@
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
import static java.util.Arrays.stream;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

Expand Down Expand Up @@ -93,7 +95,7 @@ class StatementClientV1

private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);

public StatementClientV1(OkHttpClient httpClient, ClientSession session, String query)
public StatementClientV1(OkHttpClient httpClient, ClientSession session, String query, Optional<Set<String>> clientCapabilities)
{
requireNonNull(httpClient, "httpClient is null");
requireNonNull(session, "session is null");
Expand All @@ -107,7 +109,9 @@ public StatementClientV1(OkHttpClient httpClient, ClientSession session, String
.filter(Optional::isPresent)
.map(Optional::get)
.findFirst();
this.clientCapabilities = Joiner.on(",").join(ClientCapabilities.values());
this.clientCapabilities = Joiner.on(",").join(clientCapabilities.orElseGet(() -> stream(ClientCapabilities.values())
.map(Enum::name)
.collect(toImmutableSet())));
this.compressionDisabled = session.isCompressionDisabled();

Request request = buildQueryRequest(session, query);
Expand Down
2 changes: 2 additions & 0 deletions core/docker/default/etc/jvm.config
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@
# Improve AES performance for S3, etc. on ARM64 (JDK-8271567)
-XX:+UnlockDiagnosticVMOptions
-XX:+UseAESCTRIntrinsics
# Disable Preventive GC for performance reasons (JDK-8293861)
-XX:-G1UsePreventiveGC
1 change: 1 addition & 0 deletions core/trino-main/src/main/java/io/trino/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ private SessionBuilder(Session session)
this.remoteUserAddress = session.remoteUserAddress.orElse(null);
this.userAgent = session.userAgent.orElse(null);
this.clientInfo = session.clientInfo.orElse(null);
this.clientCapabilities = ImmutableSet.copyOf(session.clientCapabilities);
this.clientTags = ImmutableSet.copyOf(session.clientTags);
this.start = session.start;
this.systemProperties.putAll(session.systemProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.SchemaTableName;
Expand Down Expand Up @@ -139,12 +138,6 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
return builder.buildOrThrow();
}

@Override
public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle table)
{
return new ConnectorTableProperties();
}

@Override
public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(ConnectorSession session, ConnectorTableHandle handle, Constraint constraint)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ public ListenableFuture<Void> execute(
parameterLookup,
true);

if (statement.getGracePeriod().isPresent()) {
// Should fail in analysis
throw new UnsupportedOperationException();
}
MaterializedViewDefinition definition = new MaterializedViewDefinition(
sql,
session.getCatalog(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.operator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.http.client.HttpClient;
Expand All @@ -36,9 +37,9 @@
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -68,7 +69,7 @@ public class DirectExchangeClient
@GuardedBy("this")
private boolean noMoreLocations;

private final ConcurrentMap<URI, HttpPageBufferClient> allClients = new ConcurrentHashMap<>();
private final Map<URI, HttpPageBufferClient> allClients = new ConcurrentHashMap<>();

@GuardedBy("this")
private final Deque<HttpPageBufferClient> queuedClients = new LinkedList<>();
Expand Down Expand Up @@ -260,38 +261,56 @@ public synchronized void close()
}
}

private synchronized void scheduleRequestIfNecessary()
@VisibleForTesting
synchronized int scheduleRequestIfNecessary()
{
if ((buffer.isFinished() || buffer.isFailed()) && completedClients.size() == allClients.size()) {
return;
return 0;
}

long neededBytes = buffer.getRemainingCapacityInBytes();
if (neededBytes <= 0) {
return;
return 0;
}

int clientCount = (int) ((1.0 * neededBytes / averageBytesPerRequest) * concurrentRequestMultiplier);
clientCount = Math.max(clientCount, 1);

int pendingClients = allClients.size() - queuedClients.size() - completedClients.size();
clientCount -= pendingClients;
long reservedBytesForScheduledClients = allClients.values().stream()
.filter(client -> !queuedClients.contains(client) && !completedClients.contains(client))
.mapToLong(HttpPageBufferClient::getAverageRequestSizeInBytes)
.sum();
long projectedBytesToBeRequested = 0;
int clientCount = 0;

for (HttpPageBufferClient client : queuedClients) {
if (projectedBytesToBeRequested >= neededBytes * concurrentRequestMultiplier - reservedBytesForScheduledClients) {
break;
}
projectedBytesToBeRequested += client.getAverageRequestSizeInBytes();
clientCount++;
}
for (int i = 0; i < clientCount; i++) {
HttpPageBufferClient client = queuedClients.poll();
if (client == null) {
// no more clients available
return;
}
client.scheduleRequest();
}
return clientCount;
}

public ListenableFuture<Void> isBlocked()
{
return buffer.isBlocked();
}

@VisibleForTesting
Deque<HttpPageBufferClient> getQueuedClients()
{
return queuedClients;
}

@VisibleForTesting
Map<URI, HttpPageBufferClient> getAllClients()
{
return allClients;
}

private boolean addPages(HttpPageBufferClient client, List<Slice> pages)
{
checkState(!completedClients.contains(client), "client is already marked as completed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.operator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.io.LittleEndianDataInputStream;
Expand Down Expand Up @@ -145,6 +146,9 @@ public interface ClientCallback
@GuardedBy("this")
private String taskInstanceId;

// it is synchronized on `this` for update
private volatile long averageRequestSizeInBytes;

private final AtomicLong rowsReceived = new AtomicLong();
private final AtomicInteger pagesReceived = new AtomicInteger();

Expand All @@ -153,6 +157,7 @@ public interface ClientCallback

private final AtomicInteger requestsScheduled = new AtomicInteger();
private final AtomicInteger requestsCompleted = new AtomicInteger();
private final AtomicInteger requestsSucceeded = new AtomicInteger();
private final AtomicInteger requestsFailed = new AtomicInteger();

private final Executor pageBufferClientCallbackExecutor;
Expand Down Expand Up @@ -251,6 +256,7 @@ else if (completed) {
requestsScheduled.get(),
requestsCompleted.get(),
requestsFailed.get(),
requestsSucceeded.get(),
httpRequestState);
}

Expand All @@ -259,6 +265,11 @@ public TaskId getRemoteTaskId()
return remoteTaskId;
}

public long getAverageRequestSizeInBytes()
{
return averageRequestSizeInBytes;
}

public synchronized boolean isRunning()
{
return future != null;
Expand Down Expand Up @@ -434,6 +445,8 @@ public Void handle(Request request, Response response)
}
}
requestsCompleted.incrementAndGet();
long responseSize = pages.stream().mapToLong(Slice::length).sum();
requestSucceeded(responseSize);

synchronized (HttpPageBufferClient.this) {
// client is complete, acknowledge it by sending it a delete in the next request
Expand Down Expand Up @@ -485,6 +498,14 @@ public void onFailure(Throwable t)
}, pageBufferClientCallbackExecutor);
}

@VisibleForTesting
synchronized void requestSucceeded(long responseSize)
{
int successfulRequests = requestsSucceeded.incrementAndGet();
// AVG_n = AVG_(n-1) * (n-1)/n + VALUE_n / n
averageRequestSizeInBytes = (long) ((1.0 * averageRequestSizeInBytes * (successfulRequests - 1)) + responseSize) / successfulRequests;
}

private synchronized void destroyTaskResults()
{
HttpResponseFuture<StatusResponse> resultFuture = httpClient.executeAsync(prepareDelete().setUri(location).build(), createStatusResponseHandler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class PageBufferClientStatus
private final int requestsScheduled;
private final int requestsCompleted;
private final int requestsFailed;
private final int requestsSucceeded;
private final String httpRequestState;

@JsonCreator
Expand All @@ -50,6 +51,7 @@ public PageBufferClientStatus(@JsonProperty("uri") URI uri,
@JsonProperty("requestsScheduled") int requestsScheduled,
@JsonProperty("requestsCompleted") int requestsCompleted,
@JsonProperty("requestsFailed") int requestsFailed,
@JsonProperty("requestsSucceeded") int requestsSucceeded,
@JsonProperty("httpRequestState") String httpRequestState)
{
this.uri = uri;
Expand All @@ -62,6 +64,7 @@ public PageBufferClientStatus(@JsonProperty("uri") URI uri,
this.requestsScheduled = requestsScheduled;
this.requestsCompleted = requestsCompleted;
this.requestsFailed = requestsFailed;
this.requestsSucceeded = requestsSucceeded;
this.httpRequestState = httpRequestState;
}

Expand Down Expand Up @@ -125,6 +128,12 @@ public int getRequestsFailed()
return requestsFailed;
}

@JsonProperty
public int getRequestsSucceeded()
{
return requestsSucceeded;
}

@JsonProperty
public String getHttpRequestState()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.trino.spi.Page;
import it.unimi.dsi.fastutil.ints.IntArrayList;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

import java.util.List;
Expand Down Expand Up @@ -58,18 +57,7 @@ public PartitioningExchanger(
@Override
public void accept(Page page)
{
Consumer<Page> wholePagePartition = partitionPageOrFindWholePagePartition(page, partitionedPagePreparer.apply(page));
if (wholePagePartition != null) {
// whole input page will go to this partition, compact the input page avoid over-retaining memory and to
// match the behavior of sub-partitioned pages that copy positions out
page.compact();
sendPageToPartition(wholePagePartition, page);
}
}

@Nullable
private Consumer<Page> partitionPageOrFindWholePagePartition(Page page, Page partitionPage)
{
Page partitionPage = partitionedPagePreparer.apply(page);
// assign each row to a partition. The assignments lists are all expected to cleared by the previous iterations
for (int position = 0; position < partitionPage.getPositionCount(); position++) {
int partition = partitionFunction.getPartition(partitionPage, position);
Expand All @@ -89,22 +77,19 @@ private Consumer<Page> partitionPageOrFindWholePagePartition(Page page, Page par
int[] positions = positionsList.elements();
positionsList.clear();

Page pageSplit;
if (partitionSize == page.getPositionCount()) {
// entire page will be sent to this partition, compact and send the page after releasing the lock
return buffers.get(partition);
// whole input page will go to this partition, compact the input page avoid over-retaining memory and to
// match the behavior of sub-partitioned pages that copy positions out
page.compact();
pageSplit = page;
}
Page pageSplit = page.copyPositions(positions, 0, partitionSize);
sendPageToPartition(buffers.get(partition), pageSplit);
else {
pageSplit = page.copyPositions(positions, 0, partitionSize);
}
memoryManager.updateMemoryUsage(pageSplit.getRetainedSizeInBytes());
buffers.get(partition).accept(pageSplit);
}
// No single partition receives the entire input page
return null;
}

// This is safe to call without synchronizing because the partition buffers are themselves threadsafe
private void sendPageToPartition(Consumer<Page> buffer, Page pageSplit)
{
memoryManager.updateMemoryUsage(pageSplit.getRetainedSizeInBytes());
buffer.accept(pageSplit);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,13 +342,15 @@ private ProcessBatchResult processBatch(int batchSize)
}
else {
if (pageProjectWork == null) {
Page inputPage = projection.getInputChannels().getInputChannels(page);
expressionProfiler.start();
pageProjectWork = projection.project(session, yieldSignal, inputPage, positionsBatch);
long projectionTimeNanos = expressionProfiler.stop(positionsBatch.size());
metrics.recordProjectionTime(projectionTimeNanos);
pageProjectWork = projection.project(session, yieldSignal, projection.getInputChannels().getInputChannels(page), positionsBatch);
}
if (!pageProjectWork.process()) {

expressionProfiler.start();
boolean finished = pageProjectWork.process();
long projectionTimeNanos = expressionProfiler.stop(positionsBatch.size());
metrics.recordProjectionTime(projectionTimeNanos);

if (!finished) {
return ProcessBatchResult.processBatchYield();
}
previouslyComputedResults[i] = pageProjectWork.getResult();
Expand Down
Loading