Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1923] Add retention for lease arbiter table #3792

Merged
merged 6 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public class ConfigurationKeys {
public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE = "gobblin_multi_active_scheduler_constants_store";
public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".schedulerLeaseArbiter.store.db.table";
public static final String DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = "gobblin_scheduler_lease_determination_store";
public static final String SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".retentionPeriodMillis";
public static final int DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS = 500000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

500 seconds? seems way too low... at least for debugging. I'd look at more like 72 hours

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating to ~ 80 hours (300,000,000 millis). I was thinking of changing the units but we use millis for everything else so to avoid confusion between other constants I'll stay with millis.

// Refers to the event we originally tried to acquire a lease which achieved `consensus` among participants through
// the database
public static final String SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY = "preservedConsensusEventTimeMillis";
Expand All @@ -116,7 +118,7 @@ public class ConfigurationKeys {
public static final String SCHEDULER_EVENT_LINGER_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".lingerMillis";
public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 30000;
public static final String SCHEDULER_MAX_BACKOFF_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".maxBackoffMillis";
public static final int DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS = 5000;
public static final int DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS = 10000;

// Job executor thread pool size
public static final String JOB_EXECUTOR_THREAD_POOL_SIZE_KEY = "jobexecutor.threadpool.size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,14 @@ protected interface CheckedFunction<T, R> {
private final String constantsTableName;
private final int epsilon;
private final int linger;
private final int retention;
umustafi marked this conversation as resolved.
Show resolved Hide resolved
private String thisTableRetentionStatement;
private String thisTableGetInfoStatement;
private String thisTableGetInfoStatementForReminder;
private String thisTableSelectAfterInsertStatement;
private String thisTableAcquireLeaseIfMatchingAllStatement;
private String thisTableAcquireLeaseIfFinishedStatement;

// TODO: define retention on this table
/*
Notes:
- Set `event_timestamp` default value to turn off timestamp auto-updates for row modifications which alters this col
Expand All @@ -110,9 +111,13 @@ protected interface CheckedFunction<T, R> {
private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s ("
+ "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar("
+ ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + " flow_action varchar(100) NOT NULL, "
+ "event_timestamp TIMESTAMP(3) DEFAULT CURRENT_TIMESTAMP(3), "
+ "lease_acquisition_timestamp TIMESTAMP(3) NULL DEFAULT NULL, "
+ "event_timestamp TIMESTAMP NOT NULL, "
+ "lease_acquisition_timestamp TIMESTAMP NULL, "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as far as migrating this schema... will it require manual intervention to either drop or alter table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I believe we'll have to drop that table. I will do that before deploying.

+ "PRIMARY KEY (flow_group,flow_name,flow_action))";
// Deletes rows older than retention time period regardless of lease status as they should all be invalid or completed
// since retention >> linger
private static final String LEASE_ARBITER_TABLE_RETENTION_STATEMENT = "DELETE FROM %s WHERE event_timestamp < "
+ "DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL ? * 1000 MICROSECOND)";
private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s "
+ "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY (primary_key))";
// Only insert epsilon and linger values from config if this table does not contain a pre-existing values already.
Expand Down Expand Up @@ -182,6 +187,9 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS);
this.linger = ConfigUtils.getInt(config, ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
this.retention = ConfigUtils.getInt(config, ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS);
this.thisTableRetentionStatement = String.format(LEASE_ARBITER_TABLE_RETENTION_STATEMENT, this.leaseArbiterTableName);
this.thisTableGetInfoStatement = String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName,
this.constantsTableName);
this.thisTableGetInfoStatementForReminder = String.format(GET_EVENT_INFO_STATEMENT_FOR_REMINDER,
Expand All @@ -204,6 +212,14 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
}
initializeConstantsTable();

Thread retentionThread = new Thread(new Runnable() {
umustafi marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void run() {
runRetentionOnArbitrationTable();
}
});
retentionThread.start();

log.info("MysqlMultiActiveLeaseArbiter initialized");
}

Expand All @@ -221,6 +237,31 @@ private void initializeConstantsTable() throws IOException {
}, true);
}

/**
* Periodically deletes all rows in the table with event_timestamp older than the retention period defined by config.
*/
private void runRetentionOnArbitrationTable() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit but can we add the code for starting a thread within this method? Due to how it's implemented as an infinite loop we would generally always want this function to be run asynchronously

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems worth abstracting into a utility along the lines of "run this arbitrary SQL in a STPE using interval T"

you decide right now whether to do or merely "TODO" ;)

while (true) {
try {
Thread.sleep(10000);
umustafi marked this conversation as resolved.
Show resolved Hide resolved
withPreparedStatement(thisTableRetentionStatement,
retentionStatement -> {
int i = 0;
retentionStatement.setInt(++i, retention);
umustafi marked this conversation as resolved.
Show resolved Hide resolved
int numRowsDeleted = retentionStatement.executeUpdate();
if (numRowsDeleted != 0) {
log.info("Multi-active lease arbiter retention thread deleted {} rows from the lease arbiter table",
numRowsDeleted);
}
return numRowsDeleted;
}, true);
} catch (InterruptedException | IOException e) {
log.warn("Failing to run retention on lease arbiter table. Unbounded growth can lead to database slowness and "
umustafi marked this conversation as resolved.
Show resolved Hide resolved
+ "affect our system performance. Examine exception: ", e);
}
}
}

@Override
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis,
boolean isReminderEvent) throws IOException {
Expand Down