Skip to content

Commit

Permalink
[feature][st-engine] Add jobHistory (#3191)
Browse files Browse the repository at this point in the history
* [feature][st-engine] add jobHistory

* [feature][st-engine] add jobHistory

* [feature][st-engine] add jobHistory

* [feature][st-engine] add jobHistory

* [feature][st-engine] add jobHistory

* [feature][st-engine] add jobHistory

* [feature][st-engine] add jobHistory

* [feature][st-engine] add jobHistory

* [bugfix][connector-v2] make jdbc it test single concurrent

* [bugfix][connector-v2] make jdbc it test single concurrent

* [bugfix][connector-v2] make jdbc it test single concurrent
  • Loading branch information
ic4y authored Nov 8, 2022
1 parent d9519d6 commit 5bfd508
Show file tree
Hide file tree
Showing 27 changed files with 1,048 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@

public abstract class AbstractCommandArgs implements CommandArgs {

@Parameter(names = {"-c", "--config"},
description = "Config file",
required = true)
private String configFile;

@Parameter(names = {"-i", "--variable"},
description = "variable substitution, such as -i city=beijing, or -i date=20190318")
private List<String> variables = Collections.emptyList();
Expand All @@ -56,14 +51,6 @@ public abstract class AbstractCommandArgs implements CommandArgs {
*/
private List<String> originalParameters;

public String getConfigFile() {
return configFile;
}

public void setConfigFile(String configFile) {
this.configFile = configFile;
}

public List<String> getVariables() {
return variables;
}
Expand Down Expand Up @@ -112,4 +99,12 @@ public DeployMode getDeployMode() {
throw new UnsupportedOperationException("abstract class CommandArgs not support this method");
}

public String getConfigFile() {
throw new UnsupportedOperationException("abstract class CommandArgs not support this method");
}

public void setConfigFile(String configFile) {
throw new UnsupportedOperationException("abstract class CommandArgs not support this method");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;

import com.beust.jcommander.Parameter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand All @@ -44,6 +45,12 @@ public void getConfigPath() throws URISyntaxException {
}

private static class SparkCommandArgs extends AbstractCommandArgs {

@Parameter(names = {"-c", "--config"},
description = "Config file",
required = true)
private String configFile;

private DeployMode deployMode;

public void setDeployMode(DeployMode deployMode) {
Expand All @@ -53,5 +60,15 @@ public void setDeployMode(DeployMode deployMode) {
public DeployMode getDeployMode() {
return deployMode;
}

@Override
public String getConfigFile() {
return this.configFile;
}

@Override
public void setConfigFile(String configFile) {
this.configFile = configFile;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@

public class FlinkCommandArgs extends AbstractCommandArgs {

@Parameter(names = {"-c", "--config"},
description = "Config file",
required = true)
private String configFile;

@Parameter(names = {"-r", "--run-mode"},
converter = RunModeConverter.class,
description = "job run mode, run or run-application")
Expand All @@ -50,6 +55,16 @@ public void setRunMode(FlinkRunMode runMode) {
this.runMode = runMode;
}

@Override
public String getConfigFile() {
return this.configFile;
}

@Override
public void setConfigFile(String configFile) {
this.configFile = configFile;
}

/**
* Used to convert the run mode string to the enum value.
*/
Expand All @@ -70,5 +85,4 @@ public FlinkRunMode convert(String value) {
throw new IllegalArgumentException(String.format("Run mode %s not supported", value));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@

public class SparkCommandArgs extends AbstractCommandArgs {

@Parameter(names = {"-c", "--config"},
description = "Config file",
required = true)
private String configFile;

@Parameter(names = {"-e", "--deploy-mode"},
description = "Spark deploy mode",
required = true,
Expand Down Expand Up @@ -59,4 +64,14 @@ public void setMaster(String master) {
this.master = master;
}

@Override
public String getConfigFile() {
return this.configFile;
}

@Override
public void setConfigFile(String configFile) {
this.configFile = configFile;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,22 @@ public class ClientCommandArgs extends AbstractCommandArgs {
converter = ExecutionModeConverter.class)
private ExecutionMode executionMode = ExecutionMode.CLUSTER;

@Parameter(names = {"-c", "--config"},
description = "Config file")
private String configFile;

@Parameter(names = {"-cn", "--cluster"},
description = "The name of cluster")
private String clusterName = "seatunnel_default_cluster";

@Parameter(names = {"-j", "--job-id"},
description = "Get job status by JobId")
private String jobId;

@Parameter(names = {"-l", "--list"},
description = "list job status")
private boolean listJob = false;

public String getClusterName() {
return clusterName;
}
Expand All @@ -65,6 +77,14 @@ public void setExecutionMode(ExecutionMode executionMode) {
this.executionMode = executionMode;
}

public String getJobId() {
return jobId;
}

public boolean isListJob(){
return listJob;
}

@Override
public EngineType getEngineType() {
return EngineType.SEATUNNEL;
Expand All @@ -74,4 +94,14 @@ public EngineType getEngineType() {
public DeployMode getDeployMode() {
return DeployMode.CLIENT;
}

@Override
public String getConfigFile() {
return this.configFile;
}

@Override
public void setConfigFile(String configFile) {
this.configFile = configFile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
import lombok.extern.slf4j.Slf4j;

import java.nio.file.Path;
import java.util.Random;
Expand All @@ -41,6 +42,7 @@
/**
* This command is used to execute the SeaTunnel engine job by SeaTunnel API.
*/
@Slf4j
public class ClientExecuteCommand implements Command<ClientCommandArgs> {

private final ClientCommandArgs clientCommandArgs;
Expand All @@ -49,12 +51,9 @@ public ClientExecuteCommand(ClientCommandArgs clientCommandArgs) {
this.clientCommandArgs = clientCommandArgs;
}

@SuppressWarnings("checkstyle:RegexpSingleline")
@Override
public void execute() throws CommandExecuteException {
Path configFile = FileUtils.getConfigPath(clientCommandArgs);

JobConfig jobConfig = new JobConfig();
jobConfig.setName(clientCommandArgs.getJobName());
HazelcastInstance instance = null;
SeaTunnelClient engineClient = null;
try {
Expand All @@ -66,10 +65,22 @@ public void execute() throws CommandExecuteException {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(clusterName);
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(configFile.toString(), jobConfig);
if (clientCommandArgs.isListJob()) {
String jobstatus = engineClient.listJobStatus();
System.out.println(jobstatus);
} else if (null != clientCommandArgs.getJobId()) {
String jobState = engineClient.getJobState(Long.parseLong(clientCommandArgs.getJobId()));
System.out.println(jobState);
} else {
Path configFile = FileUtils.getConfigPath(clientCommandArgs);
JobConfig jobConfig = new JobConfig();
jobConfig.setName(clientCommandArgs.getJobName());
JobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(configFile.toString(), jobConfig);

ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
clientJobProxy.waitForJobComplete();
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
clientJobProxy.waitForJobComplete();
}
} catch (ExecutionException | InterruptedException e) {
throw new CommandExecuteException("SeaTunnel job executed failed", e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

junit.jupiter.execution.parallel.mode.default = same_thread
junit.jupiter.execution.parallel.mode.classes.default = same_thread
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ public void testStreamJobRestoreIn3NodeWorkerDown() throws ExecutionException, I
// shutdown on worker node
node2.shutdown();

Awaitility.await().atMost(180000, TimeUnit.MILLISECONDS)
Awaitility.await().atMost(360000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
// Wait job write all rows in file
Thread.sleep(2000);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

junit.jupiter.execution.parallel.mode.default = same_thread
junit.jupiter.execution.parallel.mode.classes.default = same_thread
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

junit.jupiter.execution.parallel.mode.default = same_thread
junit.jupiter.execution.parallel.mode.classes.default = same_thread
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.seatunnel.engine.client.job.JobClient;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStateCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;

import com.hazelcast.client.config.ClientConfig;
Expand Down Expand Up @@ -64,4 +66,18 @@ public void shutdown() {
hazelcastClient.shutdown();
}
}

public String getJobState(Long jobId){
return hazelcastClient.requestOnMasterAndDecodeResponse(
SeaTunnelGetJobStateCodec.encodeRequest(jobId),
SeaTunnelGetJobStateCodec::decodeResponse
);
}

public String listJobStatus(){
return hazelcastClient.requestOnMasterAndDecodeResponse(
SeaTunnelListJobStatusCodec.encodeRequest(),
SeaTunnelListJobStatusCodec::decodeResponse
);
}
}
Loading

0 comments on commit 5bfd508

Please sign in to comment.