Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parametrise session properties #655

Merged
merged 1 commit into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ Lighter can be configured by using environment variables. Currently, Lighter sup
| LIGHTER_EXTERNAL_LOGS_URL_TEMPLATE | Template for link to external logs (Grafana, Graylog, etc.) used on the Lighter UI. Allowed placeholders: `{{id}}`, `{{appId}}`, `{{createdTs}}` | |
| LIGHTER_PY_GATEWAY_PORT | Port for live Spark session communication | 25333 |
| LIGHTER_URL | URL which can be used to access Lighter form Spark Job | http://lighter.spark:8080 |
| LIGHTER_SESSION_TIMEOUT_MINUTES | Session lifetime in minutes (from last statement creation). Use negative value to disable | 90 |
| LIGHTER_SESSION_TIMEOUT_ACTIVE | Should Lighter kill sessions with waiting statements (obsolete when `LIGHTER_SESSION_TIMEOUT_MINUTES` is negative) | false |
| LIGHTER_SESSION_TIMEOUT_INTERVAL | `java.time.Duration` representing session lifetime (from last statement creation). Use `0m` value to disable | 90m |
| LIGHTER_SESSION_TIMEOUT_ACTIVE | Should Lighter kill sessions with waiting statements (obsolete when `LIGHTER_SESSION_TIMEOUT_INTERVAL` is `0m`) | false |
| LIGHTER_SESSION_SCHEDULE_INTERVAL | `java.time.Duration` representing the interval at which a task is triggered to initiate scheduled sessions | 1m |
| LIGHTER_SESSION_TRACK_RUNNING_INTERVAL | `java.time.Duration` representing the interval at which a task is triggered to process and update running session state | 2m |
Minutis marked this conversation as resolved.
Show resolved Hide resolved
| LIGHTER_STORAGE_JDBC_URL | JDBC url for lighter storage | jdbc:h2:mem:lighter |
| LIGHTER_STORAGE_JDBC_USERNAME | JDBC username | sa |
| LIGHTER_STORAGE_JDBC_PASSWORD | JDBC password | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void keepPermanentSessions() throws InterruptedException {
}

@SchedulerLock(name = "processScheduledSessions")
@Scheduled(fixedRate = "1m")
@Scheduled(fixedRate = "${lighter.session.schedule-interval}")
public void processScheduledSessions() throws InterruptedException {
assertLocked();
var waitables = sessionService.fetchByState(ApplicationState.NOT_STARTED, SortOrder.ASC, 10).stream()
Expand All @@ -93,7 +93,7 @@ private Waitable launchSession(Application session) {
}

@SchedulerLock(name = "trackRunningSessions", lockAtMostFor = "1m")
@Scheduled(fixedRate = "2m")
@Scheduled(fixedRate = "${lighter.session.track-running-interval}")
public void trackRunning() {
assertLocked();
var running = sessionService.fetchRunning();
Expand All @@ -110,14 +110,14 @@ public void trackRunning() {
public void handleTimeout() {
assertLocked();
var sessionConfiguration = appConfiguration.getSessionConfiguration();
var timeout = sessionConfiguration.getTimeoutMinutes();
if (timeout != null && timeout > 0) {
var timeoutInterval = sessionConfiguration.getTimeoutInterval();
if (timeoutInterval != null && !timeoutInterval.isZero()) {
sessionService.fetchRunning()
.stream()
.filter(s -> isNotPermanent(sessionConfiguration, s))
.filter(s -> sessionConfiguration.shouldTimeoutActive() || !sessionService.isActive(s))
.filter(s -> sessionService.lastUsed(s.getId()).isBefore(LocalDateTime.now().minusMinutes(timeout)))
.peek(s -> LOG.info("Killing because of timeout {}, session: {}", timeout, s))
.filter(s -> sessionService.lastUsed(s.getId()).isBefore(LocalDateTime.now().minus(timeoutInterval)))
.peek(s -> LOG.info("Killing because of timeout {}, session: {}", timeoutInterval, s))
.forEach(sessionService::killOne);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.convert.format.MapFormat;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
Expand Down Expand Up @@ -140,21 +142,27 @@ public String toString() {
@ConfigurationProperties("session")
public static class SessionConfiguration {

private final Integer timeoutMinutes;
private final Duration timeoutInterval;
private final Boolean timeoutActive;
private final List<PermanentSession> permanentSessions;
private final Duration scheduleInterval;
private final Duration trackRunningInterval;

@ConfigurationInject
public SessionConfiguration(@Nullable Integer timeoutMinutes,
public SessionConfiguration(@Nullable Duration timeoutInterval,
Boolean timeoutActive,
List<PermanentSession> permanentSessions) {
this.timeoutMinutes = timeoutMinutes;
List<PermanentSession> permanentSessions,
Duration scheduleInterval,
Duration trackRunningInterval) {
this.timeoutInterval = timeoutInterval;
this.timeoutActive = timeoutActive;
this.permanentSessions = permanentSessions;
this.scheduleInterval = scheduleInterval;
this.trackRunningInterval = trackRunningInterval;
}

public Integer getTimeoutMinutes() {
return timeoutMinutes;
public Duration getTimeoutInterval() {
return timeoutInterval;
}

public boolean shouldTimeoutActive() {
Expand All @@ -165,11 +173,21 @@ public List<PermanentSession> getPermanentSessions() {
return permanentSessions;
}

public Duration getScheduleInterval() {
return scheduleInterval;
}

public Duration getTrackRunningInterval() {
return trackRunningInterval;
}

@Override
public String toString() {
return new StringJoiner(", ", SessionConfiguration.class.getSimpleName() + "[", "]")
.add("timeoutMinutes=" + timeoutMinutes)
.add("timeoutMinutes=" + timeoutInterval)
.add("permanentSessions=" + permanentSessions)
.add("scheduleIntervalSeconds=" + scheduleInterval)
.add("trackRunningInterval=" + trackRunningInterval)
.toString();
}
}
Expand Down
4 changes: 3 additions & 1 deletion server/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ lighter:
py-gateway-port: 25333
url: http://lighter.spark:8080
session:
timeout-minutes: 90
timeout-interval: 90m
timeout-active: false
permanent-sessions: []
schedule-interval: 1m
track-running-interval: 2m
kubernetes:
enabled: false
master: k8s://kubernetes.default.svc.cluster.local:443
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class SessionHandlerTest extends Specification {
def "kills timeouted sessions"() {
given:
def oldSession = newSession()
service.lastUsed(oldSession.id) >> LocalDateTime.now().minusMinutes(conf.sessionConfiguration.timeoutMinutes + 1)
service.lastUsed(oldSession.id) >> LocalDateTime.now() - conf.sessionConfiguration.timeoutInterval.plusMinutes(1)

def newSession = app()
service.lastUsed(newSession.id) >> newSession.createdAt
Expand All @@ -62,7 +62,7 @@ class SessionHandlerTest extends Specification {
def "preserves active timeouted sessions"() {
given:
def oldSession = newSession()
service.lastUsed(oldSession.id) >> LocalDateTime.now().minusMinutes(conf.sessionConfiguration.timeoutMinutes + 1)
service.lastUsed(oldSession.id) >> LocalDateTime.now() - conf.sessionConfiguration.timeoutInterval.plusMinutes(1)
service.isActive(oldSession) >> true

1 * service.fetchRunning() >> [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class AppConfigurationTest extends Specification {
def "binds properties form yaml"() {
expect:
appConfiguration.maxRunningJobs == 5
appConfiguration.sessionConfiguration.timeoutMinutes == 90
appConfiguration.sessionConfiguration.timeoutInterval.toMinutes() == 90
appConfiguration.sessionConfiguration.permanentSessions.size() == 1
appConfiguration.sessionConfiguration.permanentSessions.get(0).id == "permanentId1"
appConfiguration.sessionConfiguration.permanentSessions.get(0).submitParams.conf == [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.exacaster.lighter.backend.kubernetes.KubernetesProperties
import com.exacaster.lighter.configuration.AppConfiguration
import com.exacaster.lighter.application.SubmitParams

import java.time.Duration
import java.time.LocalDateTime

class Factories {
Expand Down Expand Up @@ -62,8 +63,9 @@ class Factories {
null,
5432,
"http://lighter:8080",
new AppConfiguration.SessionConfiguration(20, false,
[new AppConfiguration.PermanentSession("permanentSessionId", submitParams())]),
new AppConfiguration.SessionConfiguration(Duration.ofMinutes(20), false,
[new AppConfiguration.PermanentSession("permanentSessionId", submitParams())]
, Duration.ofMinutes(1), Duration.ofMinutes(2)),
["spark.kubernetes.driverEnv.TEST": "test"],
["spark.kubernetes.driverEnv.TEST": "test"]
)
Expand Down