Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Parallel Partition Pruning for Glue Hive Metastore #1465

Merged
merged 1 commit into from
Nov 20, 2019
Merged

Implement Parallel Partition Pruning for Glue Hive Metastore #1465

merged 1 commit into from
Nov 20, 2019

Conversation

anoopj
Copy link
Member

@anoopj anoopj commented Sep 6, 2019

This change parallelizes the partition fetch for the Glue metastore by
splitting the partitions into non-overlapping segments[2]. This can speed
up query planning by upto an order of magnitude.

[1] https://docs.aws.amazon.com/glue/latest/webapi/API_Segment.html

@cla-bot cla-bot bot added the cla-signed label Sep 6, 2019
@anoopj anoopj requested review from findepi and electrum and removed request for findepi September 6, 2019 21:45

@Inject
public GlueHiveMetastore(HdfsEnvironment hdfsEnvironment, GlueHiveMetastoreConfig glueConfig)
public GlueHiveMetastore(HdfsEnvironment hdfsEnvironment, GlueHiveMetastoreConfig glueConfig,
@ForGlueHiveMetastore Executor executor)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: per code style, if there are too many params to put on one line, we put every on a separate line:

public GlueHiveMetastore(
    HdfsEnvironment hdfsEnvironment, 
    GlueHiveMetastoreConfig glueConfig,
    @ForGlueHiveMetastore Executor executor)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs updating

{
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.hdfsContext = new HdfsContext(new ConnectorIdentity(DEFAULT_METASTORE_USER, Optional.empty(), Optional.empty()));
this.glueClient = requireNonNull(glueClient, "glueClient is null");
this.defaultDir = glueConfig.getDefaultWarehouseDir();
this.catalogId = glueConfig.getCatalogId().orElse(null);
this.numSegments = glueConfig.getNumGlueSegments();
this.executor = executor;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requireNonNull(executor, "executor is null");

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

CompletionService<List<Partition>> completionService = new ExecutorCompletionService<>(executor);
List<Segment> segments = IntStream.range(0, numSegments)
.mapToObj(s -> new Segment().withSegmentNumber(s).withTotalSegments(numSegments))
.collect(toList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we prefer immutable collections, so: toImmutableList()

however, in this case you don't need the list at all:

.forEach(segment -> completionService.submit(() -> ....);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@@ -31,6 +32,8 @@
private Optional<String> awsAccessKey = Optional.empty();
private Optional<String> awsSecretKey = Optional.empty();
private Optional<String> catalogId = Optional.empty();
private int numGlueSegments = 5;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't use abbreviations like "num"

maybe getPartitionThreads ?

@@ -31,6 +32,8 @@
private Optional<String> awsAccessKey = Optional.empty();
private Optional<String> awsSecretKey = Optional.empty();
private Optional<String> catalogId = Optional.empty();
private int numGlueSegments = 5;
private int maxGlueGetPartitionThreads = 50;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • no need for "Glue" here
  • add "total" to emphasize this is total cap, not per invocation:

maxGetPartitionTotalThreads ?

Also, what is the rationale for 50 as the default?
What's the default request limit for Glue GetPartition call?
What's the typical duration of a call?

Also, should this be off by default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change variable names. Typically the call is expected to take hundreds of millis. 5 concurrent segments is conservative enough to be one by default.

return numGlueSegments;
}

@Config("hive.metastore.glue.num-segments")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be renamed appropriately, see comment at field name.

{
return new BoundedExecutor(
newCachedThreadPool(daemonThreadsNamed("hive-glue-%s")),
hiveConfig.getMaxGlueGetPartitionThreads());
Copy link
Member

@findepi findepi Sep 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When hiveConfig.getMaxGlueGetPartitionThreads() == 1 we could return directExecutor() here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

Future<List<Partition>> futurePartitions = completionService.take();
partitions.addAll(futurePartitions.get());
}
return partitions;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sort partitions before returning. We want planning to be deterministic (as much as possible).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When sorting, please do this outside of try block, so that try-catch encompasses as little as possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Will do the sorting based on partition values.

}

@Override
protected ConnectorTableHandle getTableHandle(ConnectorMetadata metadata, SchemaTableName tableName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this method may change number of test methods effectively run in TestHiveGlueMetastore. Please extract to a separate commit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will cause all of the tests that depend on existing tables to fail (they are currently skipped by the method in the super class). What is the reason for this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that the getPartitions tests are getting skipped without this change. What is the best way to test them?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's complicated. Travis invokes presto-hive-hadoop2/bin/run_hive_tests.sh which

  • starts up HDFS and a Hive metastore in Docker
  • runs the presto-hive/src/test/sql/create-test.sql Hive script to create test tables
  • runs the AbstractTestHive tests against this environment

The purpose is to create tables in various ways using Hive, then make sure that Presto can read them. That's where the "existing tables" part comes from -- they already exist when the tests run.

It's fine to leave this out, since there are plenty of existing tests that exercise the metastore partition calls. I verified this by running TestHiveInMemoryMetastore with logging on the various getPartition* metastore calls.

@findepi
Copy link
Member

findepi commented Sep 8, 2019

@anoopj technically, this looks decent.
Instead of one bigger call, we're now making 5 smaller calls in parallel and the PR introduces the necessary plumbing for this.
However, I don't understand yet what problem it is solving.

Are we improving Glue throughput this way?
Why isn't Glue service doing its work in parallel on its own, if this improves its performance?
Is it safe to enable this by default? (eg API rate limits, see https://groups.google.com/d/msg/presto-users/pWFEADyLNUc/pQ70_eDKAAAJ)

@anoopj
Copy link
Member Author

anoopj commented Sep 12, 2019

@findepi

@anoopj technically, this looks decent.
Instead of one bigger call, we're now making 5 smaller calls in parallel and the PR introduces the necessary plumbing for this.
However, I don't understand yet what problem it is solving.
Are we improving Glue throughput this way?

This change can improve query planning time on heavily partitioned tables because we can do the scan of the partitions in parallel. For large tables with millions of partitions, a query can be stuck in the planning phase for dozens of minutes. My tests show that the query planning time can be improved by up to an order of magnitude.

Why isn't Glue service doing its work in parallel on its own, if this improves its performance?

Please note that Glue GetPartitions is a paginated API and it returns only a set of partitions. So even if the Glue service did parallel reads across segments, it is not likely to help clients because they would be making the same number of calls anyway to the service.

Is it safe to enable this by default? (eg API rate limits, see https://groups.google.com/d/msg/presto-users/pWFEADyLNUc/pQ70_eDKAAAJ)

I think 5 is a conservative number and should be a safe default. For what it's worth, this is the default followed by Spark and Hive on EMR. (search for segments on the doc page)

We can add some documentation in Presto about the new setting and advise Presto users to either adjust this setting if they run into throttling or contact AWS to raise the throttling limits.

@findepi
Copy link
Member

findepi commented Sep 13, 2019

Please note that Glue GetPartitions is a paginated API and it returns only a set of partitions. So even if the Glue service did parallel reads across segments, it is not likely to help clients because they would be making the same number of calls anyway to the service.

good point.

Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some more minor comments

@anoopj let me know when you AC.

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@Qualifier
public @interface ForGlueHiveMetastore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
public @interface ForGlueHiveMetastore
public @interface ForGlueHiveMetastore {}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Future<List<Partition>> futurePartitions = completionService.take();
partitions.addAll(futurePartitions.get());
}
return partitions;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When sorting, please do this outside of try block, so that try-catch encompasses as little as possible.

@@ -654,7 +697,8 @@ public void dropColumn(String databaseName, String tableName, String columnName)
.withDatabaseName(databaseName)
.withTableName(tableName)
.withExpression(expression)
.withNextToken(nextToken));
.withNextToken(nextToken)
.withSegment(segment));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's hierarchical: next page (token) within a segment, so move withSegment before withNextToken

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK


public class TestHiveGlueMetastore
extends AbstractTestHiveLocal
{
public TestHiveGlueMetastore()
{
super("test_glue" + randomUUID().toString().toLowerCase(ENGLISH).replace("-", ""));
super("test_glue_" + randomUUID().toString().toLowerCase(ENGLISH).replace("-", ""));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, so best to separate commit or drop the change.

@anoopj
Copy link
Member Author

anoopj commented Sep 14, 2019

@findepi Updated the PR incorporating the review comments.

@findepi
Copy link
Member

findepi commented Sep 14, 2019

I have some doubts about intuitiveness of the configuration.

  • for hiveConfig.getMaxGlueGetPartitionThreads() == N, we have at most N threads calling Glue, regardless of number of queries being planned. This is both: parallelization and form of throttling (in case of multiple queries)
  • for hiveConfig.getMaxGlueGetPartitionThreads() == 1 it could be expected that we have (at most) 1 thread calling Glue.
    Instead, we have as many threads as the number of queries being planned, because no thread pool is being used.

This may be confusing to administrators. Also, we don't have option to configure "1 thread throttle". At least in theory, this might be an issue in an organization with multiple clusters, hitting Glue API limits.

I think we could use 0 as a special value (no thread pool). And any value >0 to configure the pool.
Any better options?

@electrum what's your thinking?

@anoopj
Copy link
Member Author

anoopj commented Sep 15, 2019

Currently when hiveConfig.getMaxGlueGetPartitionThreads() == 1, we are using a direct executor. Why don't we just use a threadpool of size 1 so that the behavior is consistent?

@electrum
Copy link
Member

Let's rename the config to GlueMaxParallelPartitionThreads and keep the special behavior for 1 being a direct executor. I notice we bypass the executor for totalSegments == 1, so it's already not a global limit. It's probably confusing no matter what we do, so we can just document the behavior.

Copy link
Member

@electrum electrum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few minor comments. Overall code looks good.

@Provides
@Singleton
@ForGlueHiveMetastore
public Executor createCachingHiveMetastoreExecutor(HiveCatalogName catalogName, GlueHiveMetastoreConfig hiveConfig)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This names seems wrong, since it's not for caching. Could just be createExecutor since the scope is for the Glue metastore module.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs updating

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for missing that one.


// Do parallel partition fetch.
CompletionService<List<Partition>> completionService = new ExecutorCompletionService<>(executor);
IntStream.range(0, totalSegments)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be easier to read as a traditional for loop

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

}
throw new PrestoException(HIVE_METASTORE_ERROR, "Exception when getting partitions", e);
}
Collections.sort(partitions, Comparator.comparing(p -> join(",", p.getValues())));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need to convert to string here, as List<String> is naturally comparable with the same semantics

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this should probably be partitions.sort() rather than Collections.sort(partitions)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change to partitions.sort(). Not using the string would require me to write a custom comparator that compares List since List is not Comparable. Something like:

partitions.sort((p1, p2) -> {
        	List<String> values1 = p1.getValues();
        	List<String> values2 = p2.getValues();
        	if (values1.size() != values2.size()) {
        		return values1.size() - values2.size();
                 }
        	for (int i = 0; i < values1.size(); i++) {
                     int c = values1.get(i).compareTo(values2.get(i));
                     if (c != 0) {
                         return c;
                      }
		}
                 return 0;
});

Does that sound reasonable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitions.sort(com.google.common.collect.Ordering.natural().lexicographical())

(there should be a way to do this without guava... but i didn't find it.)

assertTrue(partitionNames.isPresent());
assertEquals(partitionNames.get(), Arrays.asList("ds=2016-01-01", "ds=2016-01-02"));
}
catch (Exception ex) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This catch block is not needed -- any exception thrown by the test method will fail the test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Had to add throws to the parent class method too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, throws Exception is very common for test methods (and only for test methods)

}

@Override
protected ConnectorTableHandle getTableHandle(ConnectorMetadata metadata, SchemaTableName tableName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will cause all of the tests that depend on existing tables to fail (they are currently skipped by the method in the super class). What is the reason for this change?

@ForGlueHiveMetastore
public Executor createCachingHiveMetastoreExecutor(HiveCatalogName catalogName, GlueHiveMetastoreConfig hiveConfig)
{
return hiveConfig.getTotalGetPartitionThreads() == 1 ?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use ordinary if

if (hiveConfig.getTotalGetPartitionThreads() == 1) {
    return directExecutor();
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

}

@Config("hive.metastore.glue.total-get-partition-threads")
@ConfigDescription("Number of threads for parallel partition fetches from Glue")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@electrum how to say that 1 is a special value here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference is subtle and complicated to explain. I think the description here is fine, though we could document it in the main Hive documentation.

}
throw new PrestoException(HIVE_METASTORE_ERROR, "Exception when getting partitions", e);
}
Collections.sort(partitions, Comparator.comparing(p -> join(",", p.getValues())));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this should probably be partitions.sort() rather than Collections.sort(partitions)

return totalSegments;
}

@Config("hive.metastore.glue.total-segments")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think "total" should be here.

@electrum, hive.metastore.glue.partitions-segments ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let me know if this needs a change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, let's change this as @findepi suggested. The "total segments" makes sense in terms of the parameter in the API call (since it is saying X of Y), but "partition segments" seems better for a configuration name.

@anoopj
Copy link
Member Author

anoopj commented Sep 21, 2019

@electrum @findepi Responded to your feedback and have some questions. Please take a look.

@findepi
Copy link
Member

findepi commented Sep 24, 2019

@anoopj a new thought.
I guess that the most common case are queries that hit only 1-2 partitions.
In that case, you will still issue 5 requests even though 1 would be enough.
Maybe we should consider a mixed strategy: request first page of partitions.
If there is a next page, go parallel (possibly discarding the first page).

@anoopj
Copy link
Member Author

anoopj commented Sep 25, 2019

@findepi My recommendation is to keep it simple and use parallel calls because a typical query usually spans way more than 5 partitions. Also, the default throttling of Glue is way higher and should allow several concurrent queries (and can be raised by contacting AWS).

@electrum
Copy link
Member

I agree with keeping the code simple, but I also agree with @findepi that we could be a bit smarter about parallel reads. One idea is to have a minPartitionsPerSegment, which defaults to a smallish number like 5. Then we could do

total = min(((partitionCount - 1) / minPartitionsPerSegment) + 1, totalSegments);

This would limit parallelism for small numbers of partitions while adding minimal complexity.

@sopel39
Copy link
Member

sopel39 commented Sep 27, 2019

One idea is to have a minPartitionsPerSegment, which defaults to a smallish number like 5

Maybe minPartitionsPerSegment should be equal to max page size (when paginating) of Glue request.

@anoopj
Copy link
Member Author

anoopj commented Sep 27, 2019

@electrum @findepi @sopel39

The challenge here is that we don't know what the number of matching partitions a-priori. This could vary based on the number of partitions for the table and how selective the filters were. Please let me know if I misunderstood.

@anoopj
Copy link
Member Author

anoopj commented Oct 1, 2019

@electrum @findepi

Ping

@findepi
Copy link
Member

findepi commented Oct 1, 2019

Sorry, i missed last update.

a typical query usually spans way more than 5 partitions

If this is an assumption, then we have divergent assumptions here.
If this is a fact, can you please elaborate?

The challenge here is that we don't know what the number of matching partitions a-priori.

I am aware. This is why i proposed to go parallel only after first call.

Maybe we try to impl that and see how much we sacrifice on code simplicity?
e.g. a commit on top of the existing PR would help

@anoopj
Copy link
Member Author

anoopj commented Oct 7, 2019

@findepi @electrum @sopel39

I'm more worried about making an unnecessary API call to switch to parallel partition pruning. In my experience using partitioned tables, the tables were typically partitioned using a set of keys and also a time series field and many queries were hitting several partitions with some queries spending more time in query planning than execution and hence this PR.

If there are concerns, we could make this serial by default.

Copy link
Member

@electrum electrum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no one has strong objections, I think this is good to merge with the current behavior. It's a win in the case of many partitions, and for the few partitions case, it likely doesn't add enough overhead to matter. Otherwise, I think we'd need to collect real stats to compare the different cases in various scenarios.

@anoopj can you rebase and address all of the existing comments so that we can get this merged?

return totalSegments;
}

@Config("hive.metastore.glue.total-segments")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, let's change this as @findepi suggested. The "total segments" makes sense in terms of the parameter in the API call (since it is saying X of Y), but "partition segments" seems better for a configuration name.

assertTrue(partitionNames.isPresent());
assertEquals(partitionNames.get(), Arrays.asList("ds=2016-01-01", "ds=2016-01-02"));
}
catch (Exception ex) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, throws Exception is very common for test methods (and only for test methods)

@electrum
Copy link
Member

electrum commented Oct 9, 2019

@dain and I were looking at this and noticed a big inefficiency in the interaction between Presto and the Glue API design. Presto accesses partition information in two phases:

  1. All matching partition names are fetched during planning. This needs to be fast, since it happens before the execution starts. The TupleDomain is translated in into a metastore filter expression, then the list of partitions names is filtered on the Presto side using a black box Predicate (not limited to what TupleDomain can represent).

  2. During execution, as needed, the full partition metadata is fetched for specific partition names. This happens at runtime because the full metadata can be large and fetching it can be slow.

The problem is that the Glue API only allows fetching full partition metadata -- there is no efficient way to fetch just the partition names. So for the Glue metastore implementation, we fetch the full partition metadata during planning, throw away everything but the name, then fetch the metadata again during execution.

If the Glue API had a way to fetch just partition names, we might not need this segmented fetching, since listing names would (hopefully) be significantly faster.

@anoopj
Copy link
Member Author

anoopj commented Oct 10, 2019

@david @dain @findepi

That is a good observation and we are aware that this is suboptimal. We currently are working on adding a flag to the Glue GetPartitions API such that it returns only the partition names and the expensive parts like the schema won't have to be read. This would reduce the latency and also allow more partitions to be returned. When this is available in the public AWS APIs and SDK, we can send another PR to make use of that.

I don't think it would obviate the parallel/segmented calls though, since there could be heavily partitioned tables and queries that need to read a lot of partitions. Maybe we could adjust the defaults to be even lower than 5 based on some tests.

I'm a bit overbooked this week and will update the PR with feedback next week hopefully.

@anoopj
Copy link
Member Author

anoopj commented Oct 23, 2019

@electrum @findepi

Updated the PR incorporating feedback. I've also tested this on a Presto cluster. On a table with about 2000 partitions, getPartitions used to take 2766 ms. With this change, it drops to 650 ms (with hive.metastore.glue.partitions-segments set to 5, which is the default).

On heavily partitioned tables, this can result in an order of magnitude improvement.

@anoopj
Copy link
Member Author

anoopj commented Nov 1, 2019

@electrum @findepi

Ping.

Copy link
Member

@electrum electrum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few minor comments, otherwise looks good. There is one minor fix needed for the Travis build.

@@ -136,21 +143,28 @@
private static final String WILDCARD_EXPRESSION = "";
private static final int BATCH_GET_PARTITION_MAX_PAGE_SIZE = 1000;
private static final int BATCH_CREATE_PARTITION_MAX_PAGE_SIZE = 100;
private static final Comparator<Partition> PARTITION_COMPARATOR =
Comparator.comparing(p -> p.getValues(), Comparators.lexicographical(String.CASE_INSENSITIVE_ORDER));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: use a method reference and static import

private static final Comparator<Partition> PARTITION_COMPARATOR =
        comparing(Partition::getValues, lexicographical(CASE_INSENSITIVE_ORDER));

Copy link
Member Author

@anoopj anoopj Nov 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change.


@Inject
public GlueHiveMetastore(HdfsEnvironment hdfsEnvironment, GlueHiveMetastoreConfig glueConfig)
public GlueHiveMetastore(HdfsEnvironment hdfsEnvironment, GlueHiveMetastoreConfig glueConfig,
@ForGlueHiveMetastore Executor executor)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs updating

}

@Config("hive.metastore.glue.total-get-partition-threads")
@ConfigDescription("Number of threads for parallel partition fetches from Glue")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference is subtle and complicated to explain. I think the description here is fine, though we could document it in the main Hive documentation.

@Provides
@Singleton
@ForGlueHiveMetastore
public Executor createCachingHiveMetastoreExecutor(HiveCatalogName catalogName, GlueHiveMetastoreConfig hiveConfig)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs updating

@@ -959,7 +959,7 @@ public void testListUnknownSchema()
}

@Test
public void testGetPartitions()
public void testGetPartitions() throws Exception
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: put throws clause on next line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

createDummyPartitionedTable(tablePartitionFormat, CREATE_TABLE_COLUMNS_PARTITIONED);
Optional<List<String>> partitionNames = getMetastoreClient().getPartitionNames(HIVE_CONTEXT, tablePartitionFormat.getSchemaName(), tablePartitionFormat.getTableName());
assertTrue(partitionNames.isPresent());
assertEquals(partitionNames.get(), Arrays.asList("ds=2016-01-01", "ds=2016-01-02"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: use ImmutableList.of()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

@@ -41,7 +55,8 @@ protected HiveMetastore createMetastore(File tempDir)
GlueHiveMetastoreConfig glueConfig = new GlueHiveMetastoreConfig();
glueConfig.setDefaultWarehouseDir(tempDir.toURI().toString());

return new GlueHiveMetastore(HDFS_ENVIRONMENT, glueConfig);
Executor executor = new BoundedExecutor(newCachedThreadPool(daemonThreadsNamed("hive-glue-%s")), 10);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a shared cached thread pool for the test class using @BeforeClass / @AfterClass so that we shut it down, otherwise we can run out of JVM threads when running many tests. See TestThrottledAsyncQueue for an example.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

@@ -54,6 +56,8 @@ public void testExplicitPropertyMapping()
.put("hive.metastore.glue.aws-credentials-provider", "custom")
.put("hive.metastore.glue.catalogid", "0123456789")
.put("hive.metastore.glue.use-instance-credentials", "true")
.put("hive.metastore.glue.total-segments", "10")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These need to be updated to match the new names. This is the cause of the Travis CI failure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Sorry for missing that.

This change parallelizes the partition fetch for the Glue metastore by
splitting the partitions into non-overlapping segments[2]. This can speed
up query planning by upto an order of magnitude.

[1] https://docs.aws.amazon.com/glue/latest/webapi/API_Segment.html
@anoopj
Copy link
Member Author

anoopj commented Nov 5, 2019

@electrum

Updated the PR with feedback.

@electrum electrum merged commit c6b34ec into trinodb:master Nov 20, 2019
@electrum
Copy link
Member

Merged, thanks!

@electrum electrum mentioned this pull request Nov 24, 2019
7 tasks
@martint martint added this to the 326 milestone Nov 27, 2019
findepi added a commit that referenced this pull request Jun 24, 2020
Document the new configurations introduced as part of
#1465 and a few other configs
introduced over time.

Co-authored-by: Piotr Findeisen <piotr.findeisen@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

5 participants