-
Notifications
You must be signed in to change notification settings - Fork 751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-1923] Add retention for lease arbiter table #3792
Changes from 2 commits
65abea8
2933333
ba04633
ad4eaae
a23bc7b
ab2b0de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -87,13 +87,14 @@ protected interface CheckedFunction<T, R> { | |
private final String constantsTableName; | ||
private final int epsilon; | ||
private final int linger; | ||
private final int retention; | ||
umustafi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private String thisTableRetentionStatement; | ||
private String thisTableGetInfoStatement; | ||
private String thisTableGetInfoStatementForReminder; | ||
private String thisTableSelectAfterInsertStatement; | ||
private String thisTableAcquireLeaseIfMatchingAllStatement; | ||
private String thisTableAcquireLeaseIfFinishedStatement; | ||
|
||
// TODO: define retention on this table | ||
/* | ||
Notes: | ||
- Set `event_timestamp` default value to turn off timestamp auto-updates for row modifications which alters this col | ||
|
@@ -110,9 +111,13 @@ protected interface CheckedFunction<T, R> { | |
private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (" | ||
+ "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar(" | ||
+ ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + " flow_action varchar(100) NOT NULL, " | ||
+ "event_timestamp TIMESTAMP(3) DEFAULT CURRENT_TIMESTAMP(3), " | ||
+ "lease_acquisition_timestamp TIMESTAMP(3) NULL DEFAULT NULL, " | ||
+ "event_timestamp TIMESTAMP NOT NULL, " | ||
+ "lease_acquisition_timestamp TIMESTAMP NULL, " | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as far as migrating this schema... will it require manual intervention to either There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes I believe we'll have to drop that table. I will do that before deploying. |
||
+ "PRIMARY KEY (flow_group,flow_name,flow_action))"; | ||
// Deletes rows older than retention time period regardless of lease status as they should all be invalid or completed | ||
// since retention >> linger | ||
private static final String LEASE_ARBITER_TABLE_RETENTION_STATEMENT = "DELETE FROM %s WHERE event_timestamp < " | ||
+ "DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL ? * 1000 MICROSECOND)"; | ||
private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s " | ||
+ "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY (primary_key))"; | ||
// Only insert epsilon and linger values from config if this table does not contain a pre-existing values already. | ||
|
@@ -182,6 +187,9 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException { | |
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS); | ||
this.linger = ConfigUtils.getInt(config, ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY, | ||
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS); | ||
this.retention = ConfigUtils.getInt(config, ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY, | ||
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS); | ||
this.thisTableRetentionStatement = String.format(LEASE_ARBITER_TABLE_RETENTION_STATEMENT, this.leaseArbiterTableName); | ||
this.thisTableGetInfoStatement = String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName, | ||
this.constantsTableName); | ||
this.thisTableGetInfoStatementForReminder = String.format(GET_EVENT_INFO_STATEMENT_FOR_REMINDER, | ||
|
@@ -204,6 +212,14 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException { | |
} | ||
initializeConstantsTable(); | ||
|
||
Thread retentionThread = new Thread(new Runnable() { | ||
umustafi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
@Override | ||
public void run() { | ||
runRetentionOnArbitrationTable(); | ||
} | ||
}); | ||
retentionThread.start(); | ||
|
||
log.info("MysqlMultiActiveLeaseArbiter initialized"); | ||
} | ||
|
||
|
@@ -221,6 +237,31 @@ private void initializeConstantsTable() throws IOException { | |
}, true); | ||
} | ||
|
||
/** | ||
* Periodically deletes all rows in the table with event_timestamp older than the retention period defined by config. | ||
*/ | ||
private void runRetentionOnArbitrationTable() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Small nit but can we add the code for starting a thread within this method? Due to how it's implemented as an infinite loop we would generally always want this function to be run asynchronously There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems worth abstracting into a utility along the lines of "run this arbitrary SQL in a STPE using interval T" you decide right now whether to do or merely "TODO" ;) |
||
while (true) { | ||
try { | ||
Thread.sleep(10000); | ||
umustafi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
withPreparedStatement(thisTableRetentionStatement, | ||
retentionStatement -> { | ||
int i = 0; | ||
retentionStatement.setInt(++i, retention); | ||
umustafi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
int numRowsDeleted = retentionStatement.executeUpdate(); | ||
if (numRowsDeleted != 0) { | ||
log.info("Multi-active lease arbiter retention thread deleted {} rows from the lease arbiter table", | ||
numRowsDeleted); | ||
} | ||
return numRowsDeleted; | ||
}, true); | ||
} catch (InterruptedException | IOException e) { | ||
log.warn("Failing to run retention on lease arbiter table. Unbounded growth can lead to database slowness and " | ||
umustafi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
+ "affect our system performance. Examine exception: ", e); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis, | ||
boolean isReminderEvent) throws IOException { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
500 seconds? seems way too low... at least for debugging. I'd look at more like 72 hours
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updating to ~ 80 hours (300,000,000 millis). I was thinking of changing the units but we use millis for everything else so to avoid confusion between other constants I'll stay with millis.