Skip to content

Commit

Permalink
[Fix][Server] Solving the problem of thread not releasing (datavane#273)
Browse files Browse the repository at this point in the history
  • Loading branch information
zixi0825 authored and winghv committed Nov 8, 2023
1 parent b3a81d4 commit dc9d087
Show file tree
Hide file tree
Showing 13 changed files with 77 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public String toString() {
"&" + properties;
}

private String getOrEmpty (String keyword) {
return StringUtils.isNotEmpty(keyword)? keyword.trim():"";
private String getOrEmpty(String keyword) {
return StringUtils.isNotEmpty(keyword)? keyword.trim() : "";
}

public String getUniqueKey() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,14 @@
*/
package io.datavines.common.entity;

import lombok.Data;

import java.util.Map;

@Data
public class ConnectorParameter {

private String type;

private Map<String,Object> parameters;

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}

public Map<String, Object> getParameters() {
return parameters;
}

public void setParameters(Map<String, Object> parameters) {
this.parameters = parameters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package io.datavines.common.entity;

import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public class ExecuteSql {

private String sql;
Expand All @@ -27,9 +29,6 @@ public class ExecuteSql {

private boolean isErrorOutput;

public ExecuteSql() {
}

public ExecuteSql(String sql, String resultTable) {
this.sql = sql;
this.resultTable = resultTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
*/
package io.datavines.common.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class JobExecutionInfo {

private Long id;
Expand All @@ -40,24 +44,4 @@ public class JobExecutionInfo {
private String validateResultDataStorageParameter;

private JobExecutionParameter jobExecutionParameter;

public JobExecutionInfo() {
}

public JobExecutionInfo(Long id, String name,
String engineType, String engineParameter,
String errorDataStorageType, String errorDataStorageParameter, String errorDataFileName,
String validateResultDataStorageType, String validateResultDataStorageParameter,
JobExecutionParameter jobExecutionParameter) {
this.id = id;
this.name = name;
this.engineType = engineType;
this.engineParameter = engineParameter;
this.errorDataStorageType = errorDataStorageType;
this.errorDataStorageParameter = errorDataStorageParameter;
this.errorDataFileName = errorDataFileName;
this.validateResultDataStorageType = validateResultDataStorageType;
this.validateResultDataStorageParameter = validateResultDataStorageParameter;
this.jobExecutionParameter = jobExecutionParameter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
*/
package io.datavines.common.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class LogResult {

private String msg;

private int offsetLine;

public LogResult(String msg, int offsetLine){
this.msg = msg;
this.offsetLine = offsetLine;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,21 @@
*/
package io.datavines.common.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* MappingColumn
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MappingColumn {

private String column;

private String operator;

private String column2;

public MappingColumn() {}

public MappingColumn(String column, String operator, String column2) {
this.column = column;
this.operator = operator;
this.column2 = column2;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ public class ProcessResult {
public ProcessResult(){
this.exitStatusCode = ExecutionStatus.FAILURE.getCode();
this.processId = -1;
this.applicationId = "-1";
}

public ProcessResult(Integer exitStatusCode){
this.exitStatusCode = exitStatusCode;
this.processId = -1;
this.applicationId = "-1";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
public class CommonPropertyUtils {

public static final String EXEC_THREADS = "exec.threads";
public static final Integer EXEC_THREADS_DEFAULT = 100;
public static final Integer EXEC_THREADS_DEFAULT = 20;

public static final String METADATA_FETCH_EXEC_THREADS = "metadata.fetch.exec.threads";
public static final Integer METADATA_FETCH_EXEC_THREADS_DEFAULT = 5;

public static final String MAX_CPU_LOAD_AVG = "max.cpu.load.avg";
public static final Double MAX_CPU_LOAD_AVG_DEFAULT = 0.5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.datavines.server.catalog.metadata.CatalogMetaDataFetchTaskManager;
import io.datavines.server.catalog.metadata.CatalogMetaDataFetchTaskScheduler;
import io.datavines.server.catalog.metadata.CatalogMetaDataFetchTaskFailover;
import io.datavines.server.dqc.coordinator.cache.JobExecutionResponseProcessor;
import io.datavines.server.registry.Register;
import io.datavines.server.dqc.coordinator.cache.JobExecuteManager;
import io.datavines.server.dqc.coordinator.failover.JobExecutionFailover;
Expand Down Expand Up @@ -76,6 +77,8 @@ private void initializeAndStart() throws Exception {
jobExecuteManager = new JobExecuteManager();
jobExecuteManager.start();

JobExecutionResponseProcessor.getInstance().setJobExecuteManager(jobExecuteManager);

CatalogMetaDataFetchTaskManager catalogMetaDataFetchTaskManager = new CatalogMetaDataFetchTaskManager();
catalogMetaDataFetchTaskManager.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class CatalogMetaDataFetchTaskManager {

public CatalogMetaDataFetchTaskManager() {
this.taskExecuteService = Executors.newFixedThreadPool(
CommonPropertyUtils.getInt(CommonPropertyUtils.EXEC_THREADS,CommonPropertyUtils.EXEC_THREADS_DEFAULT),
CommonPropertyUtils.getInt(CommonPropertyUtils.METADATA_FETCH_EXEC_THREADS,CommonPropertyUtils.METADATA_FETCH_EXEC_THREADS_DEFAULT),
new NamedThreadFactory("CatalogMetaDataFetchExecutor-Execute-Thread"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private void executeJobExecution(JobExecutionRequest jobExecutionRequest) {
CommonPropertyUtils.getInt(CommonPropertyUtils.SERVER_PORT, CommonPropertyUtils.SERVER_PORT_DEFAULT)));
doAck(jobExecutionRequest);

JobRunner jobRunner = new JobRunner(jobExecutionRequest, this, configurations);
JobRunner jobRunner = new JobRunner(jobExecutionRequest, configurations);
JobExecutionContext jobExecutionContext = new JobExecutionContext();
jobExecutionContext.setJobExecutionRequest(jobExecutionRequest);
jobExecutionContext.setJobRunner(jobRunner);
Expand Down Expand Up @@ -297,7 +297,6 @@ public void run() {
unFinishedJobExecutionMap.put(jobExecutionRequest.getJobExecutionId(), jobExecutionRequest);
if (ExecutionStatus.of(jobExecutionRequest.getStatus()).typeIsSuccess()) {
unFinishedJobExecutionMap.remove(jobExecutionRequest.getJobExecutionId());
jobExecutionCache.remove(jobExecutionRequest.getJobExecutionId());
jobExecution.setApplicationId(jobExecutionRequest.getApplicationId());
jobExecution.setProcessId(jobExecutionRequest.getProcessId());
jobExecution.setExecuteHost(jobExecutionRequest.getExecuteHost());
Expand Down Expand Up @@ -367,7 +366,6 @@ private void sendErrorEmail(JobExecution jobExecution){

private void updateJobExecutionAndRemoveCache(JobExecutionRequest jobExecutionRequest, JobExecution jobExecution) {
unFinishedJobExecutionMap.remove(jobExecutionRequest.getJobExecutionId());
jobExecutionCache.remove(jobExecutionRequest.getJobExecutionId());
jobExternalService.deleteJobExecutionResultByJobExecutionId(jobExecutionRequest.getJobExecutionId());
jobExternalService.deleteActualValuesByJobExecutionId(jobExecutionRequest.getJobExecutionId());
jobExecution.setApplicationId(jobExecutionRequest.getApplicationId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.server.dqc.coordinator.cache;

import io.datavines.server.dqc.command.JobExecuteResponseCommand;

public class JobExecutionResponseProcessor {

private JobExecuteManager jobExecuteManager;

private JobExecutionResponseProcessor(){}

private static class Singleton {
static JobExecutionResponseProcessor instance = new JobExecutionResponseProcessor();
}

public static JobExecutionResponseProcessor getInstance(){
return Singleton.instance;
}

public void setJobExecuteManager(JobExecuteManager jobExecuteManager) {
this.jobExecuteManager = jobExecuteManager;
}

public void processJobExecutionExecuteResponse(JobExecuteResponseCommand jobExecuteResponseCommand) {
jobExecuteManager.processJobExecutionExecuteResponse(jobExecuteResponseCommand);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import io.datavines.common.enums.ExecutionStatus;
import io.datavines.common.utils.LoggerUtils;
import io.datavines.engine.api.engine.EngineExecutor;
import io.datavines.server.dqc.coordinator.cache.JobExecuteManager;
import io.datavines.server.dqc.command.JobExecuteResponseCommand;
import io.datavines.server.dqc.coordinator.cache.JobExecutionResponseProcessor;
import io.datavines.spi.PluginLoader;

import org.slf4j.Logger;
Expand All @@ -36,15 +36,12 @@ public class JobRunner implements Runnable {

private final JobExecutionRequest jobExecutionRequest;

private final JobExecuteManager jobExecuteManager;

private EngineExecutor engineExecutor;

private final Configurations configurations;

public JobRunner(JobExecutionRequest jobExecutionRequest, JobExecuteManager jobExecuteManager, Configurations configurations){
public JobRunner(JobExecutionRequest jobExecutionRequest, Configurations configurations){
this.jobExecutionRequest = jobExecutionRequest;
this.jobExecuteManager = jobExecuteManager;
this.configurations = configurations;
}

Expand Down Expand Up @@ -96,7 +93,7 @@ public void run() {
responseCommand.setApplicationIds(engineExecutor.getProcessResult().getApplicationId());
responseCommand.setProcessId(engineExecutor.getProcessResult().getProcessId());
} finally {
jobExecuteManager.processJobExecutionExecuteResponse(responseCommand);
JobExecutionResponseProcessor.getInstance().processJobExecutionExecuteResponse(responseCommand);
}
}

Expand All @@ -115,8 +112,7 @@ public void kill(){
responseCommand.setApplicationIds(engineExecutor.getProcessResult().getApplicationId());
responseCommand.setProcessId(engineExecutor.getProcessResult().getProcessId());
}
jobExecuteManager.processJobExecutionExecuteResponse(responseCommand);

JobExecutionResponseProcessor.getInstance().processJobExecutionExecuteResponse(responseCommand);
} catch (Exception e) {
logger.error(e.getMessage(),e);
}
Expand Down

0 comments on commit dc9d087

Please sign in to comment.