Skip to content

Commit

Permalink
Pipe IT: Added flush for assertData on targetEnv to avoid IoTV2 batch…
Browse files Browse the repository at this point in the history
… mode lead to large delay (apache#14707)
  • Loading branch information
Caideyipi authored Jan 16, 2025
1 parent c3df7d5 commit 6cb9035
Showing 1 changed file with 75 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,8 @@ public static void assertDataEventuallyOnEnv(
String expectedHeader,
Set<String> expectedResSet,
long timeoutSeconds) {
final long startTime = System.currentTimeMillis();
final boolean[] flushed = {false};
try (Connection connection = env.getConnection();
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Expand All @@ -1032,6 +1034,13 @@ public static void assertDataEventuallyOnEnv(
.untilAsserted(
() -> {
try {
// For IoTV2 batch mode, the pipe receiver may need to flush because the replica
// sync requires tsFile to process. We flush in the middle of assertion because we
// don't know when the data reaches the receiver in general cases
if (!flushed[0] && System.currentTimeMillis() - startTime > timeoutSeconds >> 1) {
flushed[0] = true;
statement.execute("flush");
}
TestUtils.assertResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
} catch (Exception e) {
Expand All @@ -1055,6 +1064,8 @@ public static void assertDataSizeEventuallyOnEnv(
final int size,
final long timeoutSeconds,
final String dataBaseName) {
final long startTime = System.currentTimeMillis();
final boolean[] flushed = {false};
try (final Connection connection = env.getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Expand All @@ -1069,6 +1080,13 @@ public static void assertDataSizeEventuallyOnEnv(
if (dataBaseName != null) {
statement.execute("use " + dataBaseName);
}
// For IoTV2 batch mode, the pipe receiver may need to flush because the replica
// sync requires tsFile to process. We flush in the middle of assertion because we
// don't know when the data reaches the receiver in general cases
if (!flushed[0] && System.currentTimeMillis() - startTime > timeoutSeconds >> 1) {
flushed[0] = true;
statement.execute("flush");
}
if (sql != null && !sql.isEmpty()) {
TestUtils.assertResultSetSize(executeQueryWithRetry(statement, sql), size);
}
Expand All @@ -1088,6 +1106,8 @@ public static void assertDataEventuallyOnEnv(
final Set<String> expectedResSet,
final long timeoutSeconds,
final Consumer<String> handleFailure) {
final long startTime = System.currentTimeMillis();
final boolean[] flushed = {false};
try (Connection connection = env.getConnection();
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Expand All @@ -1099,6 +1119,13 @@ public static void assertDataEventuallyOnEnv(
.untilAsserted(
() -> {
try {
// For IoTV2 batch mode, the pipe receiver may need to flush because the replica
// sync requires tsFile to process. We flush in the middle of assertion because we
// don't know when the data reaches the receiver in general cases
if (!flushed[0] && System.currentTimeMillis() - startTime > timeoutSeconds >> 1) {
flushed[0] = true;
statement.execute("flush");
}
TestUtils.assertResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
} catch (Exception e) {
Expand Down Expand Up @@ -1147,6 +1174,8 @@ public static void assertDataEventuallyOnEnv(
final long timeoutSeconds,
final String databaseName,
final Consumer<String> handleFailure) {
final long startTime = System.currentTimeMillis();
final boolean[] flushed = {false};
try (Connection connection = env.getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Expand All @@ -1161,6 +1190,13 @@ public static void assertDataEventuallyOnEnv(
if (databaseName != null) {
statement.execute("use " + databaseName);
}
// For IoTV2 batch mode, the pipe receiver may need to flush because the replica
// sync requires tsFile to process. We flush in the middle of assertion because we
// don't know when the data reaches the receiver in general cases
if (!flushed[0] && System.currentTimeMillis() - startTime > timeoutSeconds >> 1) {
flushed[0] = true;
statement.execute("flush");
}
if (sql != null && !sql.isEmpty()) {
TestUtils.assertResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
Expand Down Expand Up @@ -1189,6 +1225,8 @@ public static void assertDataEventuallyOnEnv(

public static void assertDataEventuallyOnEnv(
BaseEnv env, String sql, Map<String, String> expectedHeaderWithResult, long timeoutSeconds) {
final long startTime = System.currentTimeMillis();
final boolean[] flushed = {false};
try (Connection connection = env.getConnection();
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Expand All @@ -1200,6 +1238,13 @@ public static void assertDataEventuallyOnEnv(
.untilAsserted(
() -> {
try {
// For IoTV2 batch mode, the pipe receiver may need to flush because the replica
// sync requires tsFile to process. We flush in the middle of assertion because we
// don't know when the data reaches the receiver in general cases
if (!flushed[0] && System.currentTimeMillis() - startTime > timeoutSeconds >> 1) {
flushed[0] = true;
statement.execute("flush");
}
TestUtils.assertSingleResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeaderWithResult);
} catch (Exception e) {
Expand Down Expand Up @@ -1245,6 +1290,8 @@ public static void assertDataAlwaysOnEnv(
Set<String> expectedResSet,
long consistentSeconds,
final String database) {
final long startTime = System.currentTimeMillis();
final boolean[] flushed = {false};
try (Connection connection =
env.getConnection(
Objects.isNull(database) ? BaseEnv.TREE_SQL_DIALECT : BaseEnv.TABLE_SQL_DIALECT);
Expand All @@ -1261,6 +1308,14 @@ public static void assertDataAlwaysOnEnv(
if (Objects.nonNull(database)) {
statement.execute("use " + database);
}
// For IoTV2 batch mode, the pipe receiver may need to flush because the replica
// sync requires tsFile to process. We flush in the middle of assertion because we
// don't know when the data reaches the receiver in general cases
if (!flushed[0]
&& System.currentTimeMillis() - startTime > consistentSeconds >> 1) {
flushed[0] = true;
statement.execute("flush");
}
TestUtils.assertResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
} catch (Exception e) {
Expand All @@ -1280,9 +1335,10 @@ public static void assertDataAlwaysOnEnv(
Set<String> expectedResSet,
long consistentSeconds,
Consumer<String> handleFailure) {
final long startTime = System.currentTimeMillis();
final boolean[] flushed = {false};
try (Connection connection = env.getConnection();
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
await()
.pollInSameThread()
.pollDelay(1L, TimeUnit.SECONDS)
Expand All @@ -1291,6 +1347,14 @@ public static void assertDataAlwaysOnEnv(
.failFast(
() -> {
try {
// For IoTV2 batch mode, the pipe receiver may need to flush because the replica
// sync requires tsFile to process. We flush in the middle of assertion because we
// don't know when the data reaches the receiver in general cases
if (!flushed[0]
&& System.currentTimeMillis() - startTime > consistentSeconds >> 1) {
flushed[0] = true;
statement.execute("flush");
}
TestUtils.assertResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
} catch (Exception e) {
Expand Down Expand Up @@ -1319,6 +1383,8 @@ public static void assertDataAlwaysOnEnv(
long consistentSeconds,
String database,
Consumer<String> handleFailure) {
final long startTime = System.currentTimeMillis();
final boolean[] flushed = {false};
try (Connection connection = env.getConnection();
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Expand All @@ -1333,6 +1399,14 @@ public static void assertDataAlwaysOnEnv(
if (database != null) {
statement.execute("use " + database);
}
// For IoTV2 batch mode, the pipe receiver may need to flush because the replica
// sync requires tsFile to process. We flush in the middle of assertion because we
// don't know when the data reaches the receiver in general cases
if (!flushed[0]
&& System.currentTimeMillis() - startTime > consistentSeconds >> 1) {
flushed[0] = true;
statement.execute("flush");
}
TestUtils.assertResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
} catch (Exception e) {
Expand Down

0 comments on commit 6cb9035

Please sign in to comment.