Skip to content

Commit

Permalink
[Feature] [api env] Add job-level configuration for checkpoint timeou…
Browse files Browse the repository at this point in the history
…t. (apache#5222)
  • Loading branch information
ic4y authored Aug 28, 2023
1 parent 5160c08 commit 3c13275
Show file tree
Hide file tree
Showing 13 changed files with 236 additions and 19 deletions.
16 changes: 13 additions & 3 deletions docs/en/connector-v2/sink/Console.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,24 @@ Used to send data to Console. Both support streaming and batch mode.

## Options

| name | type | required | default value |
|----------------|------|----------|---------------|
| common-options | | no | - |
| name | type | required | default value |
|--------------------|---------|----------|---------------|
| common-options | | no | - |
| log.print.data | boolean | no | yes |
| log.print.delay.ms | int | no | 0 |

### common options

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details

### log.print.data

Flag to determine whether data should be printed in the logs. The default value is `true`.

### log.print.delay.ms

Delay in milliseconds between printing each data item to the logs. The default value is `0`.

## Example

simple:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ public interface EnvCommonOptions {
.withDescription(
"The interval (in milliseconds) between two consecutive checkpoints.");

Option<Long> CHECKPOINT_TIMEOUT =
Options.key("checkpoint.timeout")
.longType()
.noDefaultValue()
.withDescription("The timeout (in milliseconds) for a checkpoint.");

Option<String> JARS =
Options.key("jars")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public static OptionRule getEnvOptionRules() {
CommonOptions.PARALLELISM,
EnvCommonOptions.JARS,
EnvCommonOptions.CHECKPOINT_INTERVAL,
EnvCommonOptions.CHECKPOINT_TIMEOUT,
EnvCommonOptions.CUSTOM_PARAMETERS)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand All @@ -30,13 +31,20 @@
import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;

import static org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DATA;
import static org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DELAY;

@NoArgsConstructor
@AutoService(SeaTunnelSink.class)
public class ConsoleSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
private SeaTunnelRowType seaTunnelRowType;
private boolean isPrintData = true;
private int delayMs = 0;

public ConsoleSink(SeaTunnelRowType seaTunnelRowType) {
public ConsoleSink(SeaTunnelRowType seaTunnelRowType, ReadonlyConfig options) {
this.seaTunnelRowType = seaTunnelRowType;
this.isPrintData = options.get(LOG_PRINT_DATA);
this.delayMs = options.get(LOG_PRINT_DELAY);
}

@Override
Expand All @@ -51,7 +59,7 @@ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {

@Override
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) {
return new ConsoleSinkWriter(seaTunnelRowType, context);
return new ConsoleSinkWriter(seaTunnelRowType, context, isPrintData, delayMs);
}

@Override
Expand All @@ -60,5 +68,8 @@ public String getPluginName() {
}

@Override
public void prepare(Config pluginConfig) {}
public void prepare(Config pluginConfig) {
this.isPrintData = ReadonlyConfig.fromConfig(pluginConfig).get(LOG_PRINT_DATA);
this.delayMs = ReadonlyConfig.fromConfig(pluginConfig).get(LOG_PRINT_DELAY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.seatunnel.connectors.seatunnel.console.sink;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
Expand All @@ -27,6 +30,21 @@

@AutoService(Factory.class)
public class ConsoleSinkFactory implements TableSinkFactory {

public static final Option<Boolean> LOG_PRINT_DATA =
Options.key("log.print.data")
.booleanType()
.defaultValue(true)
.withDescription(
"Flag to determine whether data should be printed in the logs.");

public static final Option<Integer> LOG_PRINT_DELAY =
Options.key("log.print.delay.ms")
.intType()
.defaultValue(0)
.withDescription(
"Delay in milliseconds between printing each data item to the logs.");

@Override
public String factoryIdentifier() {
return "Console";
Expand All @@ -39,7 +57,10 @@ public OptionRule optionRule() {

@Override
public TableSink createSink(TableFactoryContext context) {
ReadonlyConfig options = context.getOptions();
return () ->
new ConsoleSink(context.getCatalogTable().getTableSchema().toPhysicalRowDataType());
new ConsoleSink(
context.getCatalogTable().getTableSchema().toPhysicalRowDataType(),
options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;

import org.apache.commons.lang3.StringUtils;
Expand All @@ -44,9 +45,18 @@ public class ConsoleSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
private final SinkWriter.Context context;
private final DataTypeChangeEventHandler dataTypeChangeEventHandler;

public ConsoleSinkWriter(SeaTunnelRowType seaTunnelRowType, SinkWriter.Context context) {
boolean isPrintData = true;
int delayMs = 0;

public ConsoleSinkWriter(
SeaTunnelRowType seaTunnelRowType,
SinkWriter.Context context,
boolean isPrintData,
int delayMs) {
this.seaTunnelRowType = seaTunnelRowType;
this.context = context;
this.isPrintData = isPrintData;
this.delayMs = delayMs;
this.dataTypeChangeEventHandler = new DataTypeChangeEventDispatcher();
log.info("output rowType: {}", fieldsInfo(seaTunnelRowType));
}
Expand All @@ -66,13 +76,23 @@ public void write(SeaTunnelRow element) {
for (int i = 0; i < fieldTypes.length; i++) {
arr[i] = fieldToString(fieldTypes[i], fields[i]);
}
log.info(
"subtaskIndex={} rowIndex={}: SeaTunnelRow#tableId={} SeaTunnelRow#kind={} : {}",
context.getIndexOfSubtask(),
rowCounter.incrementAndGet(),
element.getTableId(),
element.getRowKind(),
StringUtils.join(arr, ", "));
if (isPrintData) {
log.info(
"subtaskIndex={} rowIndex={}: SeaTunnelRow#tableId={} SeaTunnelRow#kind={} : {}",
context.getIndexOfSubtask(),
rowCounter.incrementAndGet(),
element.getTableId(),
element.getRowKind(),
StringUtils.join(arr, ", "));
}
if (delayMs > 0) {
try {
Thread.sleep(delayMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SeaTunnelException(e);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void setUp() {
String[] fieldNames = {};
SeaTunnelDataType<?>[] fieldTypes = {};
SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldNames, fieldTypes);
consoleSinkWriter = new ConsoleSinkWriter(seaTunnelRowType, null);
consoleSinkWriter = new ConsoleSinkWriter(seaTunnelRowType, null, true, 0);
}

private Object fieldToStringTest(SeaTunnelDataType<?> dataType, Object value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,10 @@ private void setCheckpoint() {
}
}

if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) {
if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
long timeout = config.getLong(EnvCommonOptions.CHECKPOINT_TIMEOUT.key());
checkpointConfig.setCheckpointTimeout(timeout);
} else if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) {
long timeout = config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT);
checkpointConfig.setCheckpointTimeout(timeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,10 @@ private void setCheckpoint() {
}
}

if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) {
if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
long timeout = config.getLong(EnvCommonOptions.CHECKPOINT_TIMEOUT.key());
checkpointConfig.setCheckpointTimeout(timeout);
} else if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) {
long timeout = config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT);
checkpointConfig.setCheckpointTimeout(timeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
public enum CheckpointCloseReason {
PIPELINE_END("Pipeline turn to end state."),
CHECKPOINT_EXPIRED(
"Checkpoint expired before completing. Please increase checkpoint timeout in the seatunnel.yaml"),
"Checkpoint expired before completing. Please increase checkpoint timeout in the seatunnel.yaml or jobConfig env."),
CHECKPOINT_COORDINATOR_COMPLETED("CheckpointCoordinator completed."),
CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."),
CHECKPOINT_COORDINATOR_RESET("CheckpointCoordinator reset."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@ private CheckpointConfig createJobCheckpointConfig(
Long.parseLong(
jobEnv.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()).toString()));
}
if (jobEnv.containsKey(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
jobCheckpointConfig.setCheckpointTimeout(
Long.parseLong(
jobEnv.get(EnvCommonOptions.CHECKPOINT_TIMEOUT.key()).toString()));
}
return jobCheckpointConfig;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.server.checkpoint;

import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.TestUtils;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import com.hazelcast.internal.serialization.Data;

import java.util.Collections;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;

public class CheckpointTimeOutTest extends AbstractSeaTunnelServerTest {

public static String CONF_PATH = "stream_fake_to_console_checkpointTimeOut.conf";
public static long JOB_ID = System.currentTimeMillis();

@Test
public void testJobLevelCheckpointTimeOut() {
startJob(JOB_ID, CONF_PATH);

await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Assertions.assertTrue(
server.getCoordinatorService()
.getJobStatus(JOB_ID)
.equals(JobStatus.RUNNING));
});

await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Assertions.assertTrue(
server.getCoordinatorService()
.getJobStatus(JOB_ID)
.equals(JobStatus.FAILED));
});
}

private void startJob(Long jobid, String path) {
LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, jobid.toString(), jobid);

JobImmutableInformation jobImmutableInformation =
new JobImmutableInformation(
jobid,
"Test",
false,
nodeEngine.getSerializationService().toData(testLogicalDag),
testLogicalDag.getJobConfig(),
Collections.emptyList());

Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);

PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
server.getCoordinatorService().submitJob(jobid, data);
voidPassiveCompletableFuture.join();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 1000
checkpoint.timeout = 100
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake1"
row.num = 1000
split.num = 100
split.read-interval = 3000
parallelism = 1
schema = {
fields {
name = "string"
age = "int"
}
}
parallelism = 1
}
}

transform {
}

sink {
console {
log.print.delay.ms=5000
}
}

0 comments on commit 3c13275

Please sign in to comment.