Skip to content

Commit

Permalink
Add LIGHTER_ZOMBIE_TIMEOUT to configure zombie timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
Minutis committed Oct 5, 2024
1 parent 3efd081 commit 5b64da6
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 31 deletions.
39 changes: 20 additions & 19 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,26 @@ Lighter can be configured by using environment variables. Currently, Lighter sup

## Global properties

| Property | Description | Default |
|----------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------|
| LIGHTER_MAX_RUNNING_JOBS | Max running Batch jobs in parallel | 5 |
| LIGHTER_MAX_STARTING_JOBS | Max starting Batch jobs in parallel | 5 |
| LIGHTER_SPARK_HISTORY_SERVER_URL | Spark history server URL used on the Lighter UI | http://localhost/spark-history/ |
| LIGHTER_EXTERNAL_LOGS_URL_TEMPLATE | Template for link to external logs (Grafana, Graylog, etc.) used on the Lighter UI. Allowed placeholders: `{{id}}`, `{{appId}}`, `{{createdTs}}` | |
| LIGHTER_PY_GATEWAY_PORT | Port for live Spark session communication | 25333 |
| LIGHTER_URL | URL which can be used to access Lighter form Spark Job | http://lighter.spark:8080 |
| LIGHTER_SESSION_TIMEOUT_INTERVAL | `java.time.Duration` representing session lifetime (from last statement creation). Use `0m` value to disable | 90m |
| LIGHTER_SESSION_TIMEOUT_ACTIVE | Should Lighter kill sessions with waiting statements (obsolete when `LIGHTER_SESSION_TIMEOUT_INTERVAL` is `0m`) | false |
| LIGHTER_SESSION_SCHEDULE_INTERVAL | `java.time.Duration` representing the interval at which a task is triggered to initiate scheduled sessions | 1m |
| LIGHTER_SESSION_TRACK_RUNNING_INTERVAL | `java.time.Duration` representing the interval at which a task is triggered to process and update running session state | 2m |
| LIGHTER_STORAGE_JDBC_URL | JDBC url for lighter storage | jdbc:h2:mem:lighter |
| LIGHTER_STORAGE_JDBC_USERNAME | JDBC username | sa |
| LIGHTER_STORAGE_JDBC_PASSWORD | JDBC password | |
| LIGHTER_STORAGE_JDBC_DRIVER_CLASS_NAME | JDBC driver class name | org.h2.Driver |
| LIGHTER_BATCH_DEFAULT_CONF | Default `conf` props for batch applications (JSON)<sup>*</sup> | |
| LIGHTER_SESSION_DEFAULT_CONF | Default `conf` props for session applications (JSON) | |
| LIGHTER_SESSION_PERMANENT_SESSIONS | List of configurations for [permanent sessions](./permanent_sessions.md) | "[]" |
| Property | Description | Default |
|----------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------|
| LIGHTER_MAX_RUNNING_JOBS | Max running Batch jobs in parallel | 5 |
| LIGHTER_MAX_STARTING_JOBS | Max starting Batch jobs in parallel | 5 |
| LIGHTER_SPARK_HISTORY_SERVER_URL | Spark history server URL used on the Lighter UI | http://localhost/spark-history/ |
| LIGHTER_EXTERNAL_LOGS_URL_TEMPLATE | Template for link to external logs (Grafana, Graylog, etc.) used on the Lighter UI. Allowed placeholders: `{{id}}`, `{{appId}}`, `{{createdTs}}` | |
| LIGHTER_PY_GATEWAY_PORT | Port for live Spark session communication | 25333 |
| LIGHTER_URL | URL which can be used to access Lighter form Spark Job | http://lighter.spark:8080 |
| LIGHTER_ZOMBIE_INTERVAL | How long for Lighter to try to fetch the status of the job before marking it as a zombie. (For jobs that "disappear" before Lighter could determine their final status) | 30m |
| LIGHTER_SESSION_TIMEOUT_INTERVAL | `java.time.Duration` representing session lifetime (from last statement creation). Use `0m` value to disable | 90m |
| LIGHTER_SESSION_TIMEOUT_ACTIVE | Should Lighter kill sessions with waiting statements (obsolete when `LIGHTER_SESSION_TIMEOUT_INTERVAL` is `0m`) | false |
| LIGHTER_SESSION_SCHEDULE_INTERVAL | `java.time.Duration` representing the interval at which a task is triggered to initiate scheduled sessions | 1m |
| LIGHTER_SESSION_TRACK_RUNNING_INTERVAL | `java.time.Duration` representing the interval at which a task is triggered to process and update running session state | 2m |
| LIGHTER_STORAGE_JDBC_URL | JDBC url for lighter storage | jdbc:h2:mem:lighter |
| LIGHTER_STORAGE_JDBC_USERNAME | JDBC username | sa |
| LIGHTER_STORAGE_JDBC_PASSWORD | JDBC password | |
| LIGHTER_STORAGE_JDBC_DRIVER_CLASS_NAME | JDBC driver class name | org.h2.Driver |
| LIGHTER_BATCH_DEFAULT_CONF | Default `conf` props for batch applications (JSON)<sup>*</sup> | |
| LIGHTER_SESSION_DEFAULT_CONF | Default `conf` props for session applications (JSON) | |
| LIGHTER_SESSION_PERMANENT_SESSIONS | List of configurations for [permanent sessions](./permanent_sessions.md) | "[]" |

<sup>*</sup> default configs will be merged with configss provided in submit request, if property is defined in submit request, default will be ignored.
Example of `LIGHTER_BATCH_DEFAULT_CONF`: `{"spark.kubernetes.driverEnv.TEST1":"test1"}`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.slf4j.LoggerFactory.getLogger;

import com.exacaster.lighter.backend.Backend;
import com.exacaster.lighter.configuration.AppConfiguration;
import com.exacaster.lighter.log.Log;
import com.exacaster.lighter.log.LogService;
import com.exacaster.lighter.storage.ApplicationStorage;
Expand All @@ -20,11 +21,13 @@ public class ApplicationStatusHandler {
private final ApplicationStorage applicationStorage;
private final Backend backend;
private final LogService logService;
private final AppConfiguration conf;

public ApplicationStatusHandler(ApplicationStorage applicationStorage, Backend backend, LogService logService) {
public ApplicationStatusHandler(ApplicationStorage applicationStorage, Backend backend, LogService logService, AppConfiguration conf) {
this.applicationStorage = applicationStorage;
this.backend = backend;
this.logService = logService;
this.conf = conf;
}

public Application processApplicationStarting(Application application) {
Expand Down Expand Up @@ -85,13 +88,13 @@ private ApplicationState trackStatus(Application app, ApplicationInfo info) {

private ApplicationState checkZombie(Application app) {
LOG.info("No info for {}", app);
if (app.getContactedAt() != null && app.getContactedAt().isBefore(LocalDateTime.now().minusMinutes(30))) {
if (app.getContactedAt() != null && app.getContactedAt().isBefore(LocalDateTime.now().minus(conf.getZombieInterval()))) {
LOG.info("Assuming zombie ({})", app.getId());
applicationStorage.saveApplication(ApplicationBuilder.builder(app)
.setState(ApplicationState.ERROR)
.build());
logService.save(new Log(app.getId(),
"Application was not reachable for 10 minutes, so we assume something went wrong"));
"Application was not reachable for " + conf.getZombieInterval().toMinutes() + " minutes, so we assume something went wrong"));
return ApplicationState.ERROR;
}
return app.getState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class AppConfiguration {
private final Integer pyGatewayPort;
@JsonProperty(access = Access.WRITE_ONLY)
private final String url;
private final Duration zombieInterval;
private final SessionConfiguration sessionConfiguration;
private final Map<String, String> batchDefaultConf;
private final Map<String, String> sessionDefaultConf;
Expand All @@ -45,6 +46,7 @@ public AppConfiguration(Integer maxRunningJobs,
@Nullable String externalLogsUrlTemplate,
Integer pyGatewayPort,
String url,
Duration zombieInterval,
SessionConfiguration sessionConfiguration,
@MapFormat(transformation = FLAT, keyFormat = RAW)
@Nullable Map<String, String> batchDefaultConf,
Expand All @@ -55,6 +57,7 @@ public AppConfiguration(Integer maxRunningJobs,
this.externalLogsUrlTemplate = externalLogsUrlTemplate;
this.pyGatewayPort = pyGatewayPort;
this.url = url;
this.zombieInterval = zombieInterval;
this.sessionConfiguration = sessionConfiguration;
this.batchDefaultConf = ofNullable(batchDefaultConf).orElse(Map.of());
this.sessionDefaultConf = ofNullable(sessionDefaultConf).orElse(Map.of());
Expand Down Expand Up @@ -84,6 +87,10 @@ public String getUrl() {
return url;
}

public Duration getZombieInterval() {
return zombieInterval;
}

public SessionConfiguration getSessionConfiguration() {
return sessionConfiguration;
}
Expand All @@ -98,14 +105,18 @@ public Map<String, String> getSessionDefaultConf() {

@Override
public String toString() {
return new StringJoiner(", ", AppConfiguration.class.getSimpleName() + "[", "]")
.add("maxRunningJobs=" + maxRunningJobs)
.add("maxStartingJobs=" + maxStartingJobs)
.add("sparkHistoryServerUrl=" + sparkHistoryServerUrl)
.add("sessionConfiguration=" + sessionConfiguration)
.add("batchDefaultConf=" + batchDefaultConf)
.add("sessionDefaultConf=" + sessionDefaultConf)
.toString();
return "AppConfiguration{" +
"maxRunningJobs=" + maxRunningJobs +
", maxStartingJobs=" + maxStartingJobs +
", sparkHistoryServerUrl='" + sparkHistoryServerUrl + '\'' +
", externalLogsUrlTemplate='" + externalLogsUrlTemplate + '\'' +
", pyGatewayPort=" + pyGatewayPort +
", url='" + url + '\'' +
", zombieInterval=" + zombieInterval +
", sessionConfiguration=" + sessionConfiguration +
", batchDefaultConf=" + batchDefaultConf +
", sessionDefaultConf=" + sessionDefaultConf +
'}';
}

@Introspected
Expand Down
1 change: 1 addition & 0 deletions server/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ lighter:
frontend-path: file:${FRONTEND_PATH:../frontend/build}
py-gateway-port: 25333
url: http://lighter.spark:8080
zombie-interval: 30m
session:
timeout-interval: 90m
timeout-active: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import spock.lang.Subject

import java.time.LocalDateTime

import static com.exacaster.lighter.test.Factories.appConfiguration
import static com.exacaster.lighter.test.Factories.newSession

class ApplicationStatusHandlerTest extends Specification {
Expand All @@ -21,7 +22,7 @@ class ApplicationStatusHandlerTest extends Specification {
LogService logService = Mock()

@Subject
ApplicationStatusHandler handler = new ApplicationStatusHandler(storage, backend, logService)
ApplicationStatusHandler handler = new ApplicationStatusHandler(storage, backend, logService, appConfiguration())

def cleanup() {
storage.cleanup()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class Factories {
null,
5432,
"http://lighter:8080",
Duration.ofMinutes(30),
new AppConfiguration.SessionConfiguration(Duration.ofMinutes(20), false,
[new AppConfiguration.PermanentSession("permanentSessionId", submitParams())]
, Duration.ofMinutes(1), Duration.ofMinutes(2)),
Expand Down

0 comments on commit 5b64da6

Please sign in to comment.