diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 81608655a0e..6d36d9ff3e3 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -101,6 +101,8 @@ public class ConfigurationKeys { public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE = "gobblin_multi_active_scheduler_constants_store"; public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".schedulerLeaseArbiter.store.db.table"; public static final String DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = "gobblin_scheduler_lease_determination_store"; + public static final String SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".retentionPeriodMillis"; + public static final long DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS = 3 * 24 * 60 * 60 * 1000; // (3 days in ms) // Refers to the event we originally tried to acquire a lease which achieved `consensus` among participants through // the database public static final String SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY = "preservedConsensusEventTimeMillis"; @@ -116,7 +118,7 @@ public class ConfigurationKeys { public static final String SCHEDULER_EVENT_LINGER_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".lingerMillis"; public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 30000; public static final String SCHEDULER_MAX_BACKOFF_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".maxBackoffMillis"; - public static final int DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS = 5000; + public static final int DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS = 10000; // Job executor thread pool size public static final String JOB_EXECUTOR_THREAD_POOL_SIZE_KEY = "jobexecutor.threadpool.size"; diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java index a7c0351859f..4b4b9c8374c 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java @@ -30,6 +30,8 @@ import java.util.Calendar; import java.util.Optional; import java.util.TimeZone; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.sql.DataSource; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -85,15 +87,16 @@ protected interface CheckedFunction { protected final DataSource dataSource; private final String leaseArbiterTableName; private final String constantsTableName; - private final int epsilon; - private final int linger; + private final int epsilonMillis; + private final int lingerMillis; + private final long retentionPeriodMillis; + private String thisTableRetentionStatement; private String thisTableGetInfoStatement; private String thisTableGetInfoStatementForReminder; private String thisTableSelectAfterInsertStatement; private String thisTableAcquireLeaseIfMatchingAllStatement; private String thisTableAcquireLeaseIfFinishedStatement; - // TODO: define retention on this table /* Notes: - Set `event_timestamp` default value to turn off timestamp auto-updates for row modifications which alters this col @@ -110,9 +113,13 @@ protected interface CheckedFunction { private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (" + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + " flow_action varchar(100) NOT NULL, " - + "event_timestamp TIMESTAMP(3) DEFAULT CURRENT_TIMESTAMP(3), " - + "lease_acquisition_timestamp TIMESTAMP(3) NULL DEFAULT NULL, " + + "event_timestamp TIMESTAMP(3) NOT NULL, " + + "lease_acquisition_timestamp TIMESTAMP(3) NULL, " + "PRIMARY KEY (flow_group,flow_name,flow_action))"; + // Deletes rows older than retention time period regardless of lease status as they should all be invalid or completed + // since retention >> linger + private static final String LEASE_ARBITER_TABLE_RETENTION_STATEMENT = "DELETE FROM %s WHERE event_timestamp < " + + "DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL ? * 1000 MICROSECOND)"; private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s " + "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY (primary_key))"; // Only insert epsilon and linger values from config if this table does not contain a pre-existing values already. @@ -137,7 +144,8 @@ protected interface CheckedFunction { + "WHEN CURRENT_TIMESTAMP(3) < DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 1 " + "WHEN CURRENT_TIMESTAMP(3) >= DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 2 " + "ELSE 3 END as lease_validity_status, linger, " - + "UTC_TIMESTAMP(3) as utc_current_timestamp FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY; + + "CONVERT_TZ(CURRENT_TIMESTAMP(3), @@session.time_zone, '+00:00') as utc_current_timestamp FROM %s, %s " + + WHERE_CLAUSE_TO_MATCH_KEY; // Same as query above, except that isWithinEpsilon is True if the reminder event timestamp (provided by caller) is // OLDER than or equal to the db event_timestamp and within epsilon away from it. protected static final String GET_EVENT_INFO_STATEMENT_FOR_REMINDER = "SELECT " @@ -147,7 +155,8 @@ protected interface CheckedFunction { + "WHEN CURRENT_TIMESTAMP(3) < DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 1 " + "WHEN CURRENT_TIMESTAMP(3) >= DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 2 " + "ELSE 3 END as lease_validity_status, linger, " - + "UTC_TIMESTAMP(3) as utc_current_timestamp FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY; + + "CONVERT_TZ(CURRENT_TIMESTAMP(3), @@session.time_zone, '+00:00') as utc_current_timestamp FROM %s, %s " + + WHERE_CLAUSE_TO_MATCH_KEY; // Insert or update row to acquire lease if values have not changed since the previous read // Need to define three separate statements to handle cases where row does not exist or has null values to check protected static final String ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s (flow_group, flow_name, " @@ -162,8 +171,9 @@ protected interface CheckedFunction { + WHERE_CLAUSE_TO_MATCH_ROW; // Complete lease acquisition if values have not changed since lease was acquired protected static final String CONDITIONALLY_COMPLETE_LEASE_STATEMENT = "UPDATE %s SET " - + "lease_acquisition_timestamp = NULL " + WHERE_CLAUSE_TO_MATCH_ROW; - protected static final Calendar UTC_CAL = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + + "event_timestamp=event_timestamp, lease_acquisition_timestamp = NULL " + WHERE_CLAUSE_TO_MATCH_ROW; + private static final ThreadLocal UTC_CAL = + ThreadLocal.withInitial(() -> Calendar.getInstance(TimeZone.getTimeZone("UTC"))); @Inject public MysqlMultiActiveLeaseArbiter(Config config) throws IOException { @@ -178,10 +188,13 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException { ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE); this.constantsTableName = ConfigUtils.getString(config, ConfigurationKeys.MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY, ConfigurationKeys.DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE); - this.epsilon = ConfigUtils.getInt(config, ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY, + this.epsilonMillis = ConfigUtils.getInt(config, ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY, ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS); - this.linger = ConfigUtils.getInt(config, ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY, + this.lingerMillis = ConfigUtils.getInt(config, ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY, ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS); + this.retentionPeriodMillis = ConfigUtils.getLong(config, ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY, + ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS); + this.thisTableRetentionStatement = String.format(LEASE_ARBITER_TABLE_RETENTION_STATEMENT, this.leaseArbiterTableName); this.thisTableGetInfoStatement = String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName, this.constantsTableName); this.thisTableGetInfoStatementForReminder = String.format(GET_EVENT_INFO_STATEMENT_FOR_REMINDER, @@ -203,7 +216,7 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException { throw new IOException("Table creation failure for " + leaseArbiterTableName, e); } initializeConstantsTable(); - + runRetentionOnArbitrationTable(); log.info("MysqlMultiActiveLeaseArbiter initialized"); } @@ -215,12 +228,41 @@ private void initializeConstantsTable() throws IOException { String insertConstantsStatement = String.format(UPSERT_CONSTANTS_TABLE_STATEMENT, this.constantsTableName); withPreparedStatement(insertConstantsStatement, insertStatement -> { int i = 0; - insertStatement.setInt(++i, epsilon); - insertStatement.setInt(++i, linger); + insertStatement.setInt(++i, epsilonMillis); + insertStatement.setInt(++i, lingerMillis); return insertStatement.executeUpdate(); }, true); } + /** + * Periodically deletes all rows in the table with event_timestamp older than the retention period defined by config. + * // TODO: create a utility to run a SQL commend in a STPE using interval T + */ + private void runRetentionOnArbitrationTable() { + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + Runnable retentionTask = () -> { + try { + Thread.sleep(10000); + withPreparedStatement(thisTableRetentionStatement, + retentionStatement -> { + retentionStatement.setLong(1, retentionPeriodMillis); + int numRowsDeleted = retentionStatement.executeUpdate(); + if (numRowsDeleted != 0) { + log.info("Multi-active lease arbiter retention thread deleted {} rows from the lease arbiter table", + numRowsDeleted); + } + return numRowsDeleted; + }, true); + } catch (InterruptedException | IOException e) { + log.error("Failing to run retention on lease arbiter table. Unbounded growth can lead to database slowness and " + + "affect our system performance. Examine exception: ", e); + } + }; + + // Run retention thread every 4 hours (6 times a day) + executor.scheduleAtFixedRate(retentionTask, 0, 4, TimeUnit.HOURS); + } + @Override public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis, boolean isReminderEvent) throws IOException { @@ -340,12 +382,12 @@ protected Optional getExistingEventInfo(DagActionStore.DagAc protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws IOException { try { // Extract values from result set - Timestamp dbEventTimestamp = resultSet.getTimestamp("utc_event_timestamp", UTC_CAL); - Timestamp dbLeaseAcquisitionTimestamp = resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL); + Timestamp dbEventTimestamp = resultSet.getTimestamp("utc_event_timestamp", UTC_CAL.get()); + Timestamp dbLeaseAcquisitionTimestamp = resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL.get()); boolean withinEpsilon = resultSet.getBoolean("is_within_epsilon"); int leaseValidityStatus = resultSet.getInt("lease_validity_status"); int dbLinger = resultSet.getInt("linger"); - Timestamp dbCurrentTimestamp = resultSet.getTimestamp("utc_current_timestamp", UTC_CAL); + Timestamp dbCurrentTimestamp = resultSet.getTimestamp("utc_current_timestamp", UTC_CAL.get()); return new GetEventInfoResult(dbEventTimestamp, dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus, dbLinger, dbCurrentTimestamp); } catch (SQLException e) { @@ -423,14 +465,14 @@ protected static SelectInfoResult createSelectInfoResult(ResultSet resultSet) th throw new IOException("Expected resultSet containing row information for the lease that was attempted but " + "received nothing."); } - if (resultSet.getTimestamp("utc_event_timestamp", UTC_CAL) == null) { + if (resultSet.getTimestamp("utc_event_timestamp", UTC_CAL.get()) == null) { throw new IOException("event_timestamp should never be null (it is always set to current timestamp)"); } - long eventTimeMillis = resultSet.getTimestamp("utc_event_timestamp", UTC_CAL).getTime(); + long eventTimeMillis = resultSet.getTimestamp("utc_event_timestamp", UTC_CAL.get()).getTime(); // Lease acquisition timestamp is null if another participant has completed the lease Optional leaseAcquisitionTimeMillis = - resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL) == null ? Optional.empty() : - Optional.of(resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL).getTime()); + resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL.get()) == null ? Optional.empty() : + Optional.of(resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL.get()).getTime()); int dbLinger = resultSet.getInt("linger"); return new SelectInfoResult(eventTimeMillis, leaseAcquisitionTimeMillis, dbLinger); } catch (SQLException e) { @@ -526,10 +568,10 @@ protected static void completeUpdatePreparedStatement(PreparedStatement statemen statement.setString(++i, flowAction.getFlowActionType().toString()); // Values that may be needed depending on the insert statement if (needEventTimeCheck) { - statement.setTimestamp(++i, originalEventTimestamp, UTC_CAL); + statement.setTimestamp(++i, originalEventTimestamp, UTC_CAL.get()); } if (needLeaseAcquisitionTimeCheck) { - statement.setTimestamp(++i, originalLeaseAcquisitionTimestamp, UTC_CAL); + statement.setTimestamp(++i, originalLeaseAcquisitionTimestamp, UTC_CAL.get()); } } @@ -546,8 +588,8 @@ public boolean recordLeaseSuccess(LeaseObtainedStatus status) updateStatement.setString(++i, flowGroup); updateStatement.setString(++i, flowName); updateStatement.setString(++i, flowActionType.toString()); - updateStatement.setTimestamp(++i, new Timestamp(status.getEventTimestamp()), UTC_CAL); - updateStatement.setTimestamp(++i, new Timestamp(status.getLeaseAcquisitionTimestamp()), UTC_CAL); + updateStatement.setTimestamp(++i, new Timestamp(status.getEventTimestamp()), UTC_CAL.get()); + updateStatement.setTimestamp(++i, new Timestamp(status.getLeaseAcquisitionTimestamp()), UTC_CAL.get()); int numRowsUpdated = updateStatement.executeUpdate(); if (numRowsUpdated == 0) { log.info("Multi-active lease arbiter lease attempt: [{}, eventTimestamp: {}] - FAILED to complete because " diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java index ab5faee8ca0..4f639e04a4f 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java @@ -173,7 +173,6 @@ private DagAction getDagActionWithRetry(String flowGroup, String flowName, Strin rs.close(); } } - } @Override