Skip to content

Commit

Permalink
[CELEBORN-1440] Support baseline implementation of Celeborn Dashboard…
Browse files Browse the repository at this point in the history
… Server
  • Loading branch information
SteNicholas committed Jun 21, 2024
1 parent a9d1d64 commit f32fbc0
Show file tree
Hide file tree
Showing 23 changed files with 1,003 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public class RpcNameConstants {
public static String WORKER_EP = "WorkerEndpoint";
public static String WORKER_INTERNAL_EP = "WorkerInternalEndpoint";

// For Web
public static String WEB_SYS = "Web";

// Web Endpoint Name
public static String WEB_EP = "WebEndpoint";

// For LifecycleManager
public static String LIFECYCLE_MANAGER_SYS = "LifecycleManager";
public static String LIFECYCLE_MANAGER_MASTER_SYS = "LifecycleManagerMasterSys";
Expand Down
11 changes: 11 additions & 0 deletions common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ enum MessageType {
BATCH_OPEN_STREAM = 80;
BATCH_OPEN_STREAM_RESPONSE = 81;
REPORT_WORKER_DECOMMISSION = 82;
MASTER_GROUP_REQUEST = 83;
MASTER_GROUP_RESPONSE = 84;
}

enum StreamType {
Expand Down Expand Up @@ -797,3 +799,12 @@ message PbReportWorkerDecommission {
repeated PbWorkerInfo workers = 1;
string requestId = 2;
}

message PbMasterGroupRequest {
}

message PbMasterGroupResponse {
string groupId = 1;
string masterLeader = 2;
repeated string commits = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,22 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
WORKER_JVM_QUAKE_KILL_THRESHOLD.defaultValueString).microsecond
def workerJvmQuakeExitCode: Int = get(WORKER_JVM_QUAKE_EXIT_CODE)

// //////////////////////////////////////////////////////
// Web //
// //////////////////////////////////////////////////////

def webHost: String = get(WEB_HOST).replace("<localhost>", Utils.localHostName(this))
def webHttpHost: String =
get(WEB_HTTP_HOST).replace("<localhost>", Utils.localHostName(this))
def webPort: Int = get(WEB_PORT)
def webHttpPort: Int = get(WEB_HTTP_PORT)

def webHttpMaxWorkerThreads: Int = get(WEB_HTTP_MAX_WORKER_THREADS)

def webHttpStopTimeout: Long = get(WEB_HTTP_STOP_TIMEOUT)

def webHttpIdleTimeout: Long = get(WEB_HTTP_IDLE_TIMEOUT)

// //////////////////////////////////////////////////////
// Metrics System //
// //////////////////////////////////////////////////////
Expand Down Expand Up @@ -5363,10 +5379,68 @@ object CelebornConf extends Logging {

val LOG_CELEBORN_CONF_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.logConf.enabled")
.categories("master", "worker")
.categories("master", "worker", "web")
.version("0.5.0")
.doc("When `true`, log the CelebornConf for debugging purposes.")
.booleanConf
.createWithDefault(false)

val WEB_HOST: ConfigEntry[String] =
buildConf("celeborn.web.host")
.categories("web")
.version("0.6.0")
.doc("Hostname for web to bind.")
.stringConf
.createWithDefaultString("<localhost>")

val WEB_HTTP_HOST: ConfigEntry[String] =
buildConf("celeborn.web.http.host")
.categories("web")
.version("0.6.0")
.doc("Web's http host.")
.stringConf
.createWithDefaultString("<localhost>")

val WEB_PORT: ConfigEntry[Int] =
buildConf("celeborn.web.port")
.categories("web")
.version("0.6.0")
.doc("Port for web to bind.")
.intConf
.checkValue(p => p >= 1024 && p < 65535, "Invalid port")
.createWithDefault(9090)

val WEB_HTTP_PORT: ConfigEntry[Int] =
buildConf("celeborn.web.http.port")
.categories("web")
.version("0.6.0")
.doc("Web's http port.")
.intConf
.checkValue(p => p >= 1024 && p < 65535, "Invalid port")
.createWithDefault(9091)

val WEB_HTTP_MAX_WORKER_THREADS: ConfigEntry[Int] =
buildConf("celeborn.web.http.maxWorkerThreads")
.categories("web")
.version("0.6.0")
.doc("Maximum number of threads in the web http worker thread pool.")
.intConf
.checkValue(_ > 0, "Must be positive.")
.createWithDefault(200)

val WEB_HTTP_STOP_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.web.http.stopTimeout")
.categories("web")
.version("0.6.0")
.doc("Web http server stop timeout.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5s")

val WEB_HTTP_IDLE_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.web.http.idleTimeout")
.categories("web")
.version("0.6.0")
.doc("Web http server idle timeout.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
}
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,25 @@ object ControlMessages extends Logging {
.build()
}

object MasterGroupRequest {
def apply(): PbMasterGroupRequest = {
PbMasterGroupRequest.newBuilder().build()
}
}

object MasterGroupResponse {
def apply(
groupId: String = null,
masterLeader: String = null,
commits: util.List[String] = null): PbMasterGroupResponse = {
val masterGroupResponse = PbMasterGroupResponse.newBuilder()
if (groupId != null) {
masterGroupResponse.setGroupId(groupId).setMasterLeader(masterLeader).addAllCommits(commits)
}
masterGroupResponse.build()
}
}

/**
* ==========================================
* handled by worker
Expand Down Expand Up @@ -917,6 +936,12 @@ object ControlMessages extends Logging {

case pb: PbApplicationMetaRequest =>
new TransportMessage(MessageType.APPLICATION_META_REQUEST, pb.toByteArray)

case pb: PbMasterGroupRequest =>
new TransportMessage(MessageType.MASTER_GROUP_REQUEST, pb.toByteArray)

case pb: PbMasterGroupResponse =>
new TransportMessage(MessageType.MASTER_GROUP_RESPONSE, pb.toByteArray)
}

// TODO change return type to GeneratedMessageV3
Expand Down Expand Up @@ -1294,6 +1319,12 @@ object ControlMessages extends Logging {

case APPLICATION_META_REQUEST_VALUE =>
PbApplicationMetaRequest.parseFrom(message.getPayload)

case MASTER_GROUP_REQUEST_VALUE =>
PbMasterGroupRequest.parseFrom(message.getPayload)

case MASTER_GROUP_RESPONSE_VALUE =>
PbMasterGroupResponse.parseFrom(message.getPayload)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ object Utils extends Logging {
}
}

private var customHostname: Option[String] = sys.env.get("CELEBORN_LOCAL_HOSTNAME")
var customHostname: Option[String] = sys.env.get("CELEBORN_LOCAL_HOSTNAME")

// for testing
def setCustomHostname(hostname: String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ class ConfigurationSuite extends AnyFunSuite {
generateConfigurationMarkdown("worker")
}

test("docs - configuration/web.md") {
generateConfigurationMarkdown("web")
}

test("docs - configuration/quota.md") {
generateConfigurationMarkdown("quota")
}
Expand Down
7 changes: 7 additions & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ start="<!--begin-include-->"
end="<!--end-include-->"
!}

### Web

{!
include-markdown "./web.md"
start="<!--begin-include-->"
end="<!--end-include-->"
!}

### Client

Expand Down
30 changes: 30 additions & 0 deletions docs/configuration/web.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
license: |
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---

<!--begin-include-->
| Key | Default | isDynamic | Description | Since | Deprecated |
| --- | ------- | --------- | ----------- | ----- | ---------- |
| celeborn.logConf.enabled | false | false | When `true`, log the CelebornConf for debugging purposes. | 0.5.0 | |
| celeborn.web.host | &lt;localhost&gt; | false | Hostname for web to bind. | 0.6.0 | |
| celeborn.web.http.host | &lt;localhost&gt; | false | Web's http host. | 0.6.0 | |
| celeborn.web.http.idleTimeout | 30s | false | Web http server idle timeout. | 0.6.0 | |
| celeborn.web.http.maxWorkerThreads | 200 | false | Maximum number of threads in the web http worker thread pool. | 0.6.0 | |
| celeborn.web.http.port | 9091 | false | Web's http port. | 0.6.0 | |
| celeborn.web.http.stopTimeout | 5s | false | Web http server stop timeout. | 0.6.0 | |
| celeborn.web.port | 9090 | false | Port for web to bind. | 0.6.0 | |
<!--end-include-->
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,10 @@ private[celeborn] class Master(
case pb: PbApplicationMetaRequest =>
// This request is from a worker
executeWithLeaderChecker(context, handleRequestForApplicationMeta(context, pb))

case pb: PbMasterGroupRequest =>
logDebug(s"Received master group.")
handleMasterGroup(context)
}

private def timeoutDeadWorkers(): Unit = {
Expand Down Expand Up @@ -1212,6 +1216,20 @@ private[celeborn] class Master(
sb.append("\n").toString()
}

private def handleMasterGroup(
context: RpcCallContext): Unit = {
if (conf.haEnabled) {
val groupInfo = statusSystem.asInstanceOf[HAMasterMetaManager].getRatisServer.getGroupInfo
val leaderInfo = getLeader(groupInfo.getRoleInfoProto)
context.reply(MasterGroupResponse(
groupInfo.getGroup.getGroupId.toString,
if (leaderInfo == null) null else leaderInfo.getAddress,
groupInfo.getCommitInfos.asScala.map(_.toString).toList.asJava))
} else {
context.reply(MasterGroupResponse())
}
}

override def getWorkerInfo: String = {
val sb = new StringBuilder
sb.append("====================== Workers Info in Master =========================\n")
Expand Down Expand Up @@ -1329,26 +1347,26 @@ private[celeborn] class Master(
isActive
}

private def getLeader(roleInfo: RaftProtos.RoleInfoProto): RaftProtos.RaftPeerProto = {
if (roleInfo == null) {
return null
}
if (roleInfo.getRole == RaftPeerRole.LEADER) {
return roleInfo.getSelf
}
val followerInfo = roleInfo.getFollowerInfo
if (followerInfo == null) {
return null
}
followerInfo.getLeaderInfo.getId
}

private def getMasterGroupInfoInternal: String = {
if (conf.haEnabled) {
val sb = new StringBuilder
val groupInfo = statusSystem.asInstanceOf[HAMasterMetaManager].getRatisServer.getGroupInfo
sb.append(s"group id: ${groupInfo.getGroup.getGroupId.getUuid}\n")

def getLeader(roleInfo: RaftProtos.RoleInfoProto): RaftProtos.RaftPeerProto = {
if (roleInfo == null) {
return null
}
if (roleInfo.getRole == RaftPeerRole.LEADER) {
return roleInfo.getSelf
}
val followerInfo = roleInfo.getFollowerInfo
if (followerInfo == null) {
return null
}
followerInfo.getLeaderInfo.getId
}

val leader = getLeader(groupInfo.getRoleInfoProto)
if (leader == null) {
sb.append("leader not found\n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class MasterArguments(args: Array[String], conf: CelebornConf) {
parse(args.toList)

// 2nd parse from environment variables
_host = _host.orElse(sys.env.get("CELEBORN_LOCAL_HOSTNAME"))
_host = _host.orElse(Utils.customHostname)

// 3rd read from configuration file
_propertiesFile = Some(Utils.loadDefaultCelebornProperties(conf, _propertiesFile.orNull))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,6 @@ abstract class HttpService extends Service with Logging {
private def config(configKey: String, configVal: String, maxKeyLength: Int): String =
s"${configKey.padTo(maxKeyLength + 10, " ").mkString}$configVal\n"

def getWorkerInfo: String

def getThreadDump: String = {
val sb = new StringBuilder
sb.append(
Expand All @@ -151,11 +149,13 @@ abstract class HttpService extends Service with Logging {
sb.toString()
}

def getShuffleList: String
def getWorkerInfo: String = throw new UnsupportedOperationException()

def getShuffleList: String = throw new UnsupportedOperationException()

def getApplicationList: String
def getApplicationList: String = throw new UnsupportedOperationException()

def listTopDiskUseApps: String
def listTopDiskUseApps: String = throw new UnsupportedOperationException()

def getMasterGroupInfo: String = throw new UnsupportedOperationException()

Expand Down Expand Up @@ -213,6 +213,8 @@ abstract class HttpService extends Service with Logging {
conf.masterHttpHost
case Service.WORKER =>
conf.workerHttpHost
case Service.WEB =>
conf.webHttpHost
}
}

Expand All @@ -222,6 +224,8 @@ abstract class HttpService extends Service with Logging {
conf.masterHttpPort
case Service.WORKER =>
conf.workerHttpPort
case Service.WEB =>
conf.webHttpPort
}
}

Expand All @@ -231,6 +235,8 @@ abstract class HttpService extends Service with Logging {
conf.masterHttpMaxWorkerThreads
case Service.WORKER =>
conf.workerHttpMaxWorkerThreads
case Service.WEB =>
conf.webHttpMaxWorkerThreads
}
}

Expand All @@ -240,6 +246,8 @@ abstract class HttpService extends Service with Logging {
conf.masterHttpStopTimeout
case Service.WORKER =>
conf.workerHttpStopTimeout
case Service.WEB =>
conf.webHttpStopTimeout
}
}

Expand All @@ -249,6 +257,8 @@ abstract class HttpService extends Service with Logging {
conf.masterHttpIdleTimeout
case Service.WORKER =>
conf.workerHttpIdleTimeout
case Service.WEB =>
conf.webHttpIdleTimeout
}
}

Expand Down
Loading

0 comments on commit f32fbc0

Please sign in to comment.