Skip to content

Commit

Permalink
[native] Replace fixed test worker port with ephemeral ports
Browse files Browse the repository at this point in the history
Previously the listener ports for the native works in the E2E tests
was hard coded to be 1234 + worker count.
The change looks in the OS for an available ephemeral port
and uses this value when spawning the native workers.

The native worker must is deferring some configuration until the
port selection by the OS is known.
  • Loading branch information
czentgr committed Oct 15, 2024
1 parent 6efcf0f commit 7d6e5ca
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 38 deletions.
2 changes: 1 addition & 1 deletion presto-docs/src/main/sphinx/installation/deployment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ These properties require some explanation:

* ``http-server.http.port``:
Specifies the port for the HTTP server. Presto uses HTTP for all
communication, internal and external.
communication, internal and external. If the value is set to 0 an ephemeral port is used.

* ``query.max-memory``:
The maximum amount of distributed memory that a query may use.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ To enable SSL/TLS for Presto internal communication, do the following:
http-server.https.keystore.path=<keystore path>
http-server.https.keystore.key=<keystore password>
Note, setting the `http-server.https.port` to 0 results in the use of an ephemeral port.

6. Change the discovery uri to HTTPS.

.. code-block:: none
Expand Down
101 changes: 67 additions & 34 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,35 +290,37 @@ void PrestoServer::run() {
address_);

initializeCoordinatorDiscoverer();
if (coordinatorDiscoverer_ != nullptr) {
announcer_ = std::make_unique<Announcer>(
address_,
httpsPort.has_value(),
httpsPort.has_value() ? httpsPort.value() : httpPort,
coordinatorDiscoverer_,
nodeVersion_,
environment_,
nodeId_,
nodeLocation_,
systemConfig->prestoNativeSidecar(),
catalogNames,
systemConfig->announcementMaxFrequencyMs(),
sslContext_);
updateAnnouncerDetails();
announcer_->start();

uint64_t heartbeatFrequencyMs = systemConfig->heartbeatFrequencyMs();
if (heartbeatFrequencyMs > 0) {
heartbeatManager_ = std::make_unique<PeriodicHeartbeatManager>(
auto startAnnouncerAndHeartbeatManager = [&](bool useHttps, int port) {
if (coordinatorDiscoverer_ != nullptr) {
announcer_ = std::make_unique<Announcer>(
address_,
httpsPort.has_value() ? httpsPort.value() : httpPort,
useHttps,
port,
coordinatorDiscoverer_,
sslContext_,
[server = this]() { return server->fetchNodeStatus(); },
heartbeatFrequencyMs);
heartbeatManager_->start();
nodeVersion_,
environment_,
nodeId_,
nodeLocation_,
systemConfig->prestoNativeSidecar(),
catalogNames,
systemConfig->announcementMaxFrequencyMs(),
sslContext_);
updateAnnouncerDetails();
announcer_->start();

uint64_t heartbeatFrequencyMs = systemConfig->heartbeatFrequencyMs();
if (heartbeatFrequencyMs > 0) {
heartbeatManager_ = std::make_unique<PeriodicHeartbeatManager>(
address_,
port,
coordinatorDiscoverer_,
sslContext_,
[server = this]() { return server->fetchNodeStatus(); },
heartbeatFrequencyMs);
heartbeatManager_->start();
}
}
}
};

const bool reusePort = SystemConfig::instance()->httpServerReusePort();
auto httpConfig =
Expand Down Expand Up @@ -481,14 +483,16 @@ void PrestoServer::run() {
});
}

std::string taskUri;
if (httpsPort.has_value()) {
taskUri = fmt::format(kTaskUriFormat, kHttps, address_, httpsPort.value());
} else {
taskUri = fmt::format(kTaskUriFormat, kHttp, address_, httpPort);
}
auto setTaskUri = [&](bool useHttps, int port) {
std::string taskUri;
if (useHttps) {
taskUri = fmt::format(kTaskUriFormat, kHttps, address_, port);
} else {
taskUri = fmt::format(kTaskUriFormat, kHttp, address_, port);
}
taskManager_->setBaseUri(taskUri);
};

taskManager_->setBaseUri(taskUri);
taskManager_->setNodeId(nodeId_);
taskManager_->setOldTaskCleanUpMs(systemConfig->oldTaskCleanUpMs());

Expand Down Expand Up @@ -575,7 +579,36 @@ void PrestoServer::run() {

// Start everything. After the return from the following call we are shutting
// down.
httpServer_->start(getHttpServerFilters());
httpServer_->start(getHttpServerFilters(), [&](proxygen::HTTPServer* server) {
const auto addresses = server->addresses();
for (auto address : addresses) {
PRESTO_STARTUP_LOG(INFO) << fmt::format(
"Server listening at {}:{} - https {}",
address.address.getIPAddress().str(),
address.address.getPort(),
address.sslConfigs.size() != 0);
// We could be bound to both http and https ports.
// If set, we must use the https port and skip http.
if (httpsPort.has_value() && address.sslConfigs.size() == 0) {
continue;
}
startAnnouncerAndHeartbeatManager(
httpsPort.has_value(), address.address.getPort());
setTaskUri(httpsPort.has_value(), address.address.getPort());
break;
}
if (coordinatorDiscoverer_ != nullptr) {
VELOX_CHECK(
announcer_ != nullptr,
"The announcer is expected to have been created but wasn't.");
const auto heartbeatFrequencyMs = systemConfig->heartbeatFrequencyMs();
if (heartbeatFrequencyMs > 0) {
VELOX_CHECK(
heartbeatManager_ != nullptr,
"The heartbeat manager is expected to have been created but wasn't.");
}
}
});

if (announcer_ != nullptr) {
PRESTO_SHUTDOWN_LOG(INFO) << "Stopping announcer";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,13 +428,12 @@ public static Optional<BiFunction<Integer, URI, Process>> getExternalWorkerLaunc
Files.createDirectories(dir);
Path tempDirectoryPath = Files.createTempDirectory(dir, "worker");
log.info("Temp directory for Worker #%d: %s", workerIndex, tempDirectoryPath.toString());
int port = 1234 + workerIndex;

// Write config file
// Write config file - use an ephemeral port for the worker.
String configProperties = format("discovery.uri=%s%n" +
"presto.version=testversion%n" +
"system-memory-gb=4%n" +
"http-server.http.port=%d", discoveryUri, port);
"http-server.http.port=0%n", discoveryUri);

if (remoteFunctionServerUds.isPresent()) {
String jsonSignaturesPath = Resources.getResource(REMOTE_FUNCTION_JSON_SIGNATURES).getFile();
Expand Down

0 comments on commit 7d6e5ca

Please sign in to comment.