Skip to content

Commit

Permalink
Fix flaky pinot tests
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 committed Oct 18, 2022
1 parent 61c1b5a commit d29ed40
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_AMBIGUOUS_TABLE_NAME;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_EXCEPTION;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNABLE_TO_FIND_BROKER;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNCLASSIFIED_ERROR;
import static io.trino.plugin.pinot.PinotMetadata.SCHEMA_NAME;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
Expand All @@ -117,6 +118,8 @@ public class PinotClient
private static final String ROUTING_TABLE_API_TEMPLATE = "debug/routingTable/%s";
private static final String TIME_BOUNDARY_API_TEMPLATE = "debug/timeBoundary/%s";
private static final String QUERY_URL_PATH = "query/sql";
private static final int DEFAULT_HTTP_RETRY_COUNT = 10;
private static final int DEFAULT_RETRY_INTERVAL = 1000;

private final List<URI> controllerUrls;
private final HttpClient httpClient;
Expand Down Expand Up @@ -270,18 +273,21 @@ public List<String> getTables()

protected Multimap<String, String> getAllTables()
{
List<String> allTables = sendHttpGetToControllerJson(GET_ALL_TABLES_API_TEMPLATE, tablesJsonCodec).getTables();
ImmutableListMultimap.Builder<String, String> builder = ImmutableListMultimap.builder();
for (String table : allTables) {
builder.put(table.toLowerCase(ENGLISH), table);
}
return builder.build();
return doWithRetries(DEFAULT_HTTP_RETRY_COUNT, retryNumber -> {
List<String> allTables =
sendHttpGetToControllerJson(GET_ALL_TABLES_API_TEMPLATE, tablesJsonCodec).getTables();
ImmutableListMultimap.Builder<String, String> builder = ImmutableListMultimap.builder();
for (String table : allTables) {
builder.put(table.toLowerCase(ENGLISH), table);
}
return builder.build();
});
}

public Schema getTableSchema(String table)
throws Exception
{
return sendHttpGetToControllerJson(format(TABLE_SCHEMA_API_TEMPLATE, table), schemaJsonCodec);
return doWithRetries(DEFAULT_HTTP_RETRY_COUNT, retryNumber -> sendHttpGetToControllerJson(format(TABLE_SCHEMA_API_TEMPLATE, table), schemaJsonCodec));
}

public List<String> getPinotTableNames()
Expand Down Expand Up @@ -361,63 +367,73 @@ public List<InstancesInBroker> getBrokers()
@VisibleForTesting
public List<String> getAllBrokersForTable(String table)
{
ArrayList<String> brokers = sendHttpGetToControllerJson(format(TABLE_INSTANCES_API_TEMPLATE, table), brokersForTableJsonCodec)
.getBrokers().stream()
.flatMap(broker -> broker.getInstances().stream())
.distinct()
.map(brokerToParse -> {
Matcher matcher = BROKER_PATTERN.matcher(brokerToParse);
if (matcher.matches() && matcher.groupCount() == 2) {
return pinotHostMapper.getBrokerHost(matcher.group(1), matcher.group(2));
}
throw new PinotException(
PINOT_UNABLE_TO_FIND_BROKER,
Optional.empty(),
format("Cannot parse %s in the broker instance", brokerToParse));
})
.collect(Collectors.toCollection(ArrayList::new));
Collections.shuffle(brokers);
return ImmutableList.copyOf(brokers);
return doWithRetries(DEFAULT_HTTP_RETRY_COUNT, retryNumber -> {
ArrayList<String> brokers = sendHttpGetToControllerJson(format(TABLE_INSTANCES_API_TEMPLATE, table), brokersForTableJsonCodec)
.getBrokers().stream()
.flatMap(broker -> broker.getInstances().stream())
.distinct()
.map(brokerToParse -> {
Matcher matcher = BROKER_PATTERN.matcher(brokerToParse);
if (matcher.matches() && matcher.groupCount() == 2) {
return pinotHostMapper.getBrokerHost(matcher.group(1), matcher.group(2));
}
throw new PinotException(
PINOT_UNABLE_TO_FIND_BROKER,
Optional.empty(),
format("Cannot parse %s in the broker instance", brokerToParse));
})
.collect(Collectors.toCollection(ArrayList::new));
Collections.shuffle(brokers);
return ImmutableList.copyOf(brokers);
});
}

public String getBrokerHost(String table)
{
try {
List<String> brokers = brokersForTableCache.get(table);
if (brokers.isEmpty()) {
throw new PinotException(PINOT_UNABLE_TO_FIND_BROKER, Optional.empty(), "No valid brokers found for " + table);
return doWithRetries(DEFAULT_HTTP_RETRY_COUNT, retryNumber -> {
try {
List<String> brokers = brokersForTableCache.get(table);
if (brokers.isEmpty()) {
throw new PinotException(PINOT_UNABLE_TO_FIND_BROKER, Optional.empty(),
"No valid brokers found for " + table, true);
}
return brokers.get(ThreadLocalRandom.current().nextInt(brokers.size()));
}
return brokers.get(ThreadLocalRandom.current().nextInt(brokers.size()));
}
catch (ExecutionException e) {
Throwable throwable = e.getCause();
if (throwable instanceof PinotException) {
throw (PinotException) throwable;
catch (ExecutionException e) {
Throwable throwable = e.getCause();
if (throwable instanceof PinotException) {
throw (PinotException) throwable;
}
throw new PinotException(PINOT_UNABLE_TO_FIND_BROKER, Optional.empty(),
"Error when getting brokers for table " + table, true, throwable);
}
throw new PinotException(PINOT_UNABLE_TO_FIND_BROKER, Optional.empty(), "Error when getting brokers for table " + table, throwable);
}
});
}

public Map<String, Map<String, List<String>>> getRoutingTableForTable(String tableName)
{
Map<String, Map<String, List<String>>> routingTable = sendHttpGetToBrokerJson(tableName, format(ROUTING_TABLE_API_TEMPLATE, tableName), ROUTING_TABLE_CODEC);
ImmutableMap.Builder<String, Map<String, List<String>>> routingTableMap = ImmutableMap.builder();
for (Map.Entry<String, Map<String, List<String>>> entry : routingTable.entrySet()) {
String tableNameWithType = entry.getKey();
if (!entry.getValue().isEmpty() && tableName.equals(extractRawTableName(tableNameWithType))) {
ImmutableMap.Builder<String, List<String>> segmentBuilder = ImmutableMap.builder();
for (Map.Entry<String, List<String>> segmentEntry : entry.getValue().entrySet()) {
if (!segmentEntry.getValue().isEmpty()) {
segmentBuilder.put(segmentEntry.getKey(), segmentEntry.getValue());
return doWithRetries(DEFAULT_HTTP_RETRY_COUNT, retryNumber -> {
Map<String, Map<String, List<String>>> routingTable =
sendHttpGetToBrokerJson(tableName, format(ROUTING_TABLE_API_TEMPLATE, tableName),
ROUTING_TABLE_CODEC);
ImmutableMap.Builder<String, Map<String, List<String>>> routingTableMap = ImmutableMap.builder();
for (Map.Entry<String, Map<String, List<String>>> entry : routingTable.entrySet()) {
String tableNameWithType = entry.getKey();
if (!entry.getValue().isEmpty() && tableName.equals(extractRawTableName(tableNameWithType))) {
ImmutableMap.Builder<String, List<String>> segmentBuilder = ImmutableMap.builder();
for (Map.Entry<String, List<String>> segmentEntry : entry.getValue().entrySet()) {
if (!segmentEntry.getValue().isEmpty()) {
segmentBuilder.put(segmentEntry.getKey(), segmentEntry.getValue());
}
}
Map<String, List<String>> segmentMap = segmentBuilder.buildOrThrow();
if (!segmentMap.isEmpty()) {
routingTableMap.put(tableNameWithType, segmentMap);
}
}
Map<String, List<String>> segmentMap = segmentBuilder.buildOrThrow();
if (!segmentMap.isEmpty()) {
routingTableMap.put(tableNameWithType, segmentMap);
}
}
}
return routingTableMap.buildOrThrow();
return routingTableMap.buildOrThrow();
});
}

public static class TimeBoundary
Expand Down Expand Up @@ -458,16 +474,19 @@ public Optional<String> getOfflineTimePredicate()

public TimeBoundary getTimeBoundaryForTable(String table)
{
try {
return sendHttpGetToBrokerJson(table, format(TIME_BOUNDARY_API_TEMPLATE, table), timeBoundaryJsonCodec);
}
catch (Exception e) {
String[] errorMessageSplits = e.getMessage().split(" ");
if (errorMessageSplits.length >= 4 && errorMessageSplits[3].equalsIgnoreCase(TIME_BOUNDARY_NOT_FOUND_ERROR_CODE)) {
return timeBoundaryJsonCodec.fromJson("{}");
return doWithRetries(DEFAULT_HTTP_RETRY_COUNT, retryNumber -> {
try {
return sendHttpGetToBrokerJson(table, format(TIME_BOUNDARY_API_TEMPLATE, table), timeBoundaryJsonCodec);
}
throw e;
}
catch (Exception e) {
String[] errorMessageSplits = e.getMessage().split(" ");
if (errorMessageSplits.length >= 4 && errorMessageSplits[3].equalsIgnoreCase(
TIME_BOUNDARY_NOT_FOUND_ERROR_CODE)) {
return timeBoundaryJsonCodec.fromJson("{}");
}
throw e;
}
});
}

public static class QueryRequest
Expand Down Expand Up @@ -560,7 +579,8 @@ private BrokerResponseNative submitBrokerQueryJson(ConnectorSession session, Pin
throw new PinotException(
PINOT_EXCEPTION,
Optional.of(query.getQuery()),
format("Query %s encountered exception %s", query.getQuery(), processingExceptionMessage));
format("Query %s encountered exception %s", query.getQuery(), processingExceptionMessage),
true);
}
if (response.getNumServersQueried() == 0 || response.getNumServersResponded() == 0 || response.getNumServersQueried() > response.getNumServersResponded()) {
throw new PinotInsufficientServerResponseException(query, response.getNumServersResponded(), response.getNumServersQueried());
Expand Down Expand Up @@ -623,19 +643,38 @@ public static ResultsIterator fromResultTable(BrokerResponseNative brokerRespons
}

public static <T> T doWithRetries(int retries, Function<Integer, T> caller)
{
return doWithRetries(retries, caller, DEFAULT_RETRY_INTERVAL);
}

public static <T> T doWithRetries(int retries, Function<Integer, T> caller, int retryInterval)
{
PinotException firstError = null;
checkState(retries > 0, "Invalid num of retries %s", retries);
for (int i = 0; i < retries; ++i) {
try {
return caller.apply(i);
}
catch (PinotException e) {
if (firstError == null) {
firstError = e;
catch (Exception e) {
if (e instanceof PinotException pinotException) {
if (firstError == null) {
firstError = pinotException;
}
if (!pinotException.isRetryable()) {
throw pinotException;
}
}
else {
if (firstError == null) {
firstError = new PinotException(PINOT_UNCLASSIFIED_ERROR, Optional.empty(),
"Unexpected exception", e);
}
}
try {
Thread.sleep(retryInterval);
}
if (!e.isRetryable()) {
throw e;
catch (InterruptedException ex) {
// Sleep interrupted, ignore
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testcontainers.shaded.org.bouncycastle.util.encoders.Hex;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.io.File;
Expand All @@ -73,6 +74,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -145,6 +147,37 @@ protected boolean isLatestVersion()
return getPinotImageName().equals(PINOT_LATEST_IMAGE_NAME);
}

@Override
@BeforeClass
public void init()
throws Exception
{
super.init();
// Ensure test tables are available in Pinot with expected number of rows.
validateTableRows(ALL_TYPES_TABLE, MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES);
validateTableRows(MIXED_CASE_COLUMN_NAMES_TABLE, 4);
validateTableRows(MIXED_CASE_DISTINCT_TABLE, 4);
validateTableRows(TOO_MANY_ROWS_TABLE, MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES + 1);
validateTableRows(TOO_MANY_BROKER_ROWS_TABLE, MAX_ROWS_PER_SPLIT_FOR_BROKER_QUERIES + 1);
validateTableRows(MIXED_CASE_TABLE_NAME, 4);
validateTableRows(JSON_TABLE, 7);
validateTableRows(JSON_TYPE_TABLE, 3);
validateTableRows(RESERVED_KEYWORD_TABLE, 2);
validateTableRows(QUOTES_IN_COLUMN_NAME_TABLE, 2);
validateTableRows(DUPLICATE_VALUES_IN_COLUMNS_TABLE, 5);
validateTableRows("region", getQueryRunner().execute("SELECT * FROM tpch.tiny.region").getRowCount());
validateTableRows("nation", getQueryRunner().execute("SELECT * FROM tpch.tiny.nation").getRowCount());
}

private void validateTableRows(String tableName, int expectedRows)
{
assertQueryEventually(
getQueryRunner().getDefaultSession(),
"SELECT COUNT(*) FROM " + tableName,
"VALUES '" + expectedRows + "'",
new io.airlift.units.Duration(10, TimeUnit.SECONDS));
}

@Override
protected QueryRunner createQueryRunner()
throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.function.Supplier;

Expand Down Expand Up @@ -85,51 +86,50 @@ public class TestingPinotCluster
private final GenericContainer<?> server;
private final GenericContainer<?> zookeeper;
private final HttpClient httpClient;
private final Closer closer = Closer.create();
private final boolean secured;

public TestingPinotCluster(Network network, boolean secured, String pinotImageName)
{
httpClient = closer.register(new JettyHttpClient());
httpClient = new JettyHttpClient();
zookeeper = new GenericContainer<>(parse("zookeeper:3.5.6"))
.withStartupAttempts(3)
.withStartupTimeout(Duration.ofMinutes(2))
.withNetwork(network)
.withNetworkAliases(ZOOKEEPER_INTERNAL_HOST)
.withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(ZOOKEEPER_PORT))
.withExposedPorts(ZOOKEEPER_PORT);
closer.register(zookeeper::stop);

String controllerConfig = secured ? "/var/pinot/controller/config/pinot-controller-secured.conf" : "/var/pinot/controller/config/pinot-controller.conf";
controller = new GenericContainer<>(parse(pinotImageName))
.withStartupAttempts(3)
.withStartupTimeout(Duration.ofMinutes(2))
.withNetwork(network)
.withClasspathResourceMapping("/pinot-controller", "/var/pinot/controller/config", BindMode.READ_ONLY)
.withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-controller-log4j2.xml -Dplugins.dir=/opt/pinot/plugins")
.withCommand("StartController", "-configFileName", controllerConfig)
.withNetworkAliases("pinot-controller", "localhost")
.withExposedPorts(CONTROLLER_PORT);
closer.register(controller::stop);

String brokerConfig = secured ? "/var/pinot/broker/config/pinot-broker-secured.conf" : "/var/pinot/broker/config/pinot-broker.conf";
broker = new GenericContainer<>(parse(pinotImageName))
.withStartupAttempts(3)
.withStartupTimeout(Duration.ofMinutes(2))
.withNetwork(network)
.withClasspathResourceMapping("/pinot-broker", "/var/pinot/broker/config", BindMode.READ_ONLY)
.withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-broker-log4j2.xml -Dplugins.dir=/opt/pinot/plugins")
.withCommand("StartBroker", "-clusterName", "pinot", "-zkAddress", getZookeeperInternalHostPort(), "-configFileName", brokerConfig)
.withNetworkAliases("pinot-broker", "localhost")
.withExposedPorts(BROKER_PORT);
closer.register(broker::stop);

server = new GenericContainer<>(parse(pinotImageName))
.withStartupAttempts(3)
.withStartupTimeout(Duration.ofMinutes(2))
.withNetwork(network)
.withClasspathResourceMapping("/pinot-server", "/var/pinot/server/config", BindMode.READ_ONLY)
.withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-server-log4j2.xml -Dplugins.dir=/opt/pinot/plugins")
.withCommand("StartServer", "-clusterName", "pinot", "-zkAddress", getZookeeperInternalHostPort(), "-configFileName", "/var/pinot/server/config/pinot-server.conf")
.withNetworkAliases("pinot-server", "localhost")
.withExposedPorts(SERVER_PORT, SERVER_ADMIN_PORT, GRPC_PORT);
closer.register(server::stop);

this.secured = secured;
}
Expand All @@ -146,7 +146,13 @@ public void start()
public void close()
throws IOException
{
closer.close();
try (Closer closer = Closer.create()) {
closer.register(zookeeper::stop);
closer.register(controller::stop);
closer.register(broker::stop);
closer.register(server::stop);
closer.register(httpClient);
}
}

private static String getZookeeperInternalHostPort()
Expand Down

0 comments on commit d29ed40

Please sign in to comment.