Skip to content

Commit

Permalink
Merge pull request #48 from zhp8341/bug_fix_optimize
Browse files Browse the repository at this point in the history
新增任务修改历史版本查询
  • Loading branch information
zhp8341 authored May 7, 2021
2 parents 27ed006 + 19e7b21 commit 1d3e228
Show file tree
Hide file tree
Showing 15 changed files with 866 additions and 12 deletions.
26 changes: 26 additions & 0 deletions docs/sql/flink_web.sql
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,31 @@ ALTER TABLE job_config add `custom_args` varchar(128) DEFAULT NULL COMMENT '启
ALTER TABLE job_config add `custom_main_class` varchar(128) DEFAULT NULL COMMENT '程序入口类' AFTER custom_args;
ALTER TABLE job_config add `custom_jar_url` varchar(128) DEFAULT NULL COMMENT'自定义jar的http地址 如:http://ccblog.cn/xx.jar' AFTER custom_main_class;


-- ----------------------------
-- Table structure for job_config_history
-- ----------------------------
CREATE TABLE `job_config_history` (
`id` bigint(11) unsigned NOT NULL AUTO_INCREMENT,
`job_config_id` bigint(11) NOT NULL COMMENT 'job_config主表Id',
`job_name` varchar(64) NOT NULL COMMENT '任务名称',
`deploy_mode` varchar(64) NOT NULL COMMENT '提交模式: standalone 、yarn 、yarn-session ',
`flink_run_config` varchar(512) NOT NULL COMMENT 'flink运行配置',
`flink_sql` mediumtext NOT NULL COMMENT 'sql语句',
`flink_checkpoint_config` varchar(512) DEFAULT NULL COMMENT 'checkPoint配置',
`ext_jar_path` varchar(2048) DEFAULT NULL COMMENT 'udf地址及连接器jar 如http://xxx.xxx.com/flink-streaming-udf.jar',
`version` int(11) NOT NULL DEFAULT '0' COMMENT '更新版本号',
`is_deleted` tinyint(1) NOT NULL DEFAULT '0',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`edit_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`creator` varchar(32) DEFAULT 'sys',
`editor` varchar(32) DEFAULT 'sys',
PRIMARY KEY (`id`),
KEY `index_job_config_id` (`job_config_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='flink任务配置历史变更表';



-- ----------------------------
-- Table structure for job_run_log
-- ----------------------------
Expand All @@ -98,6 +123,7 @@ CREATE TABLE `job_run_log` (

ALTER TABLE job_run_log add `run_ip` varchar(64) DEFAULT NULL COMMENT '任务运行所在的机器' AFTER local_log ;


-- ----------------------------
-- Table structure for savepoint_backup
-- ----------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package com.flink.streaming.web.model.dto;

import cn.hutool.core.collection.CollectionUtil;
import com.flink.streaming.web.model.entity.JobConfig;
import com.flink.streaming.web.model.entity.JobConfigHistory;
import lombok.Data;
import org.apache.commons.compress.utils.Lists;

import java.io.Serializable;
import java.util.Collections;
import java.util.Date;
import java.util.List;

/**
* @author zhuhuipei
* @date 2021/5/5
* @time 19:49
*/
@Data
public class JobConfigHistoryDTO implements Serializable {

private static final long serialVersionUID = 1L;

private Long id;

/**
* job_config主表Id
*/
private Long jobConfigId;

/**
* 任务名称
*/
private String jobName;

/**
* 提交模式: standalone 、yarn 、yarn-session
*/
private String deployMode;

/**
* flink运行配置
*/
private String flinkRunConfig;

/**
* checkPoint配置
*/
private String flinkCheckpointConfig;

/**
* udf地址及连接器jar 如http://xxx.xxx.com/flink-streaming-udf.jar
*/
private String extJarPath;

/**
* 更新版本号
*/
private Integer version;

/**
* 创建时间
*/
private Date createTime;

/**
* 修改时间
*/
private Date editTime;

private String creator;

private String editor;

/**
* sql语句
*/
private String flinkSql;


public static JobConfigHistory toEntity(JobConfigHistoryDTO jobConfigHistoryDTO) {
if (jobConfigHistoryDTO == null) {
return null;
}
JobConfigHistory jobConfigHistory = new JobConfigHistory();
jobConfigHistory.setId(jobConfigHistoryDTO.getId());
jobConfigHistory.setJobConfigId(jobConfigHistoryDTO.getJobConfigId());
jobConfigHistory.setJobName(jobConfigHistoryDTO.getJobName());
jobConfigHistory.setDeployMode(jobConfigHistoryDTO.getDeployMode());
jobConfigHistory.setFlinkRunConfig(jobConfigHistoryDTO.getFlinkRunConfig());
jobConfigHistory.setFlinkCheckpointConfig(jobConfigHistoryDTO.getFlinkCheckpointConfig());
jobConfigHistory.setExtJarPath(jobConfigHistoryDTO.getExtJarPath());
jobConfigHistory.setVersion(jobConfigHistoryDTO.getVersion());
jobConfigHistory.setCreateTime(jobConfigHistoryDTO.getCreateTime());
jobConfigHistory.setEditTime(jobConfigHistoryDTO.getEditTime());
jobConfigHistory.setCreator(jobConfigHistoryDTO.getCreator());
jobConfigHistory.setEditor(jobConfigHistoryDTO.getEditor());
jobConfigHistory.setFlinkSql(jobConfigHistoryDTO.getFlinkSql());
return jobConfigHistory;
}


public static JobConfigHistoryDTO toDTO(JobConfigHistory jobConfigHistory) {
if (jobConfigHistory == null) {
return null;
}
JobConfigHistoryDTO jobConfigHistoryDTO = new JobConfigHistoryDTO();
jobConfigHistoryDTO.setId(jobConfigHistory.getId());
jobConfigHistoryDTO.setJobConfigId(jobConfigHistory.getJobConfigId());
jobConfigHistoryDTO.setJobName(jobConfigHistory.getJobName());
jobConfigHistoryDTO.setDeployMode(jobConfigHistory.getDeployMode());
jobConfigHistoryDTO.setFlinkRunConfig(jobConfigHistory.getFlinkRunConfig());
jobConfigHistoryDTO.setFlinkCheckpointConfig(jobConfigHistory.getFlinkCheckpointConfig());
jobConfigHistoryDTO.setExtJarPath(jobConfigHistory.getExtJarPath());
jobConfigHistoryDTO.setVersion(jobConfigHistory.getVersion());
jobConfigHistoryDTO.setCreateTime(jobConfigHistory.getCreateTime());
jobConfigHistoryDTO.setEditTime(jobConfigHistory.getEditTime());
jobConfigHistoryDTO.setCreator(jobConfigHistory.getCreator());
jobConfigHistoryDTO.setEditor(jobConfigHistory.getEditor());
jobConfigHistoryDTO.setFlinkSql(jobConfigHistory.getFlinkSql());
return jobConfigHistoryDTO;
}

public static List<JobConfigHistoryDTO> toListDTO(List<JobConfigHistory> jobConfigHistoryList) {
if (CollectionUtil.isEmpty(jobConfigHistoryList)) {
return Collections.EMPTY_LIST;
}

List<JobConfigHistoryDTO> list = Lists.newArrayList();

for (JobConfigHistory jobConfigHistory : jobConfigHistoryList) {

JobConfigHistoryDTO jobConfigHistoryDTO = JobConfigHistoryDTO.toDTO(jobConfigHistory);
if (jobConfigHistoryDTO != null) {
list.add(jobConfigHistoryDTO);
}
}

return list;
}


public static JobConfigHistoryDTO to(JobConfig jobConfig) {
if (jobConfig == null) {
return null;
}
JobConfigHistoryDTO jobConfigHistoryDTO = new JobConfigHistoryDTO();
jobConfigHistoryDTO.setJobConfigId (jobConfig.getId());
jobConfigHistoryDTO.setJobName(jobConfig.getJobName());
jobConfigHistoryDTO.setDeployMode(jobConfig.getDeployMode());
jobConfigHistoryDTO.setFlinkRunConfig(jobConfig.getFlinkRunConfig());
jobConfigHistoryDTO.setFlinkCheckpointConfig(jobConfig.getFlinkCheckpointConfig());
jobConfigHistoryDTO.setExtJarPath(jobConfig.getExtJarPath());
jobConfigHistoryDTO.setVersion(jobConfig.getVersion());
jobConfigHistoryDTO.setCreateTime(jobConfig.getCreateTime());
jobConfigHistoryDTO.setEditTime(jobConfig.getEditTime());
jobConfigHistoryDTO.setCreator(jobConfig.getCreator());
jobConfigHistoryDTO.setEditor(jobConfig.getEditor());
jobConfigHistoryDTO.setFlinkSql(jobConfig.getFlinkSql());
return jobConfigHistoryDTO;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.flink.streaming.web.model.entity;

import lombok.Data;

import java.io.Serializable;
import java.util.Date;

/**
*
* @author zhuhuipei
* @date 2021/5/5
* @time 19:49
*/
@Data
public class JobConfigHistory implements Serializable {

private static final long serialVersionUID = 1L;

private Long id;

/**
* job_config主表Id
*/
private Long jobConfigId;

/**
* 任务名称
*/
private String jobName;

/**
* 提交模式: standalone 、yarn 、yarn-session
*/
private String deployMode;

/**
* flink运行配置
*/
private String flinkRunConfig;

/**
* checkPoint配置
*/
private String flinkCheckpointConfig;

/**
* udf地址及连接器jar 如http://xxx.xxx.com/flink-streaming-udf.jar
*/
private String extJarPath;

/**
* 更新版本号
*/
private Integer version;

private Boolean isDeleted;

/**
* 创建时间
*/
private Date createTime;

/**
* 修改时间
*/
private Date editTime;

private String creator;

private String editor;

/**
* sql语句
*/
private String flinkSql;



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package com.flink.streaming.web.model.vo;

import cn.hutool.core.collection.CollectionUtil;
import com.flink.streaming.web.common.util.DateFormatUtils;
import com.flink.streaming.web.model.dto.JobConfigHistoryDTO;
import lombok.Data;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* @author zhuhuipei
* @date 2021/5/5
* @time 19:49
*/
@Data
public class JobConfigHistoryVO implements Serializable {

private static final long serialVersionUID = 1L;

private Long id;

/**
* job_config主表Id
*/
private Long jobConfigId;

/**
* 任务名称
*/
private String jobName;

/**
* 提交模式: standalone 、yarn 、yarn-session
*/
private String deployMode;

/**
* flink运行配置
*/
private String flinkRunConfig;

/**
* checkPoint配置
*/
private String flinkCheckpointConfig;

/**
* udf地址及连接器jar 如http://xxx.xxx.com/flink-streaming-udf.jar
*/
private String extJarPath;

/**
* 更新版本号
*/
private Integer version;

/**
* 创建时间
*/
private String createTime;

/**
* 修改时间
*/
private String editTime;

private String creator;

private String editor;

/**
* sql语句
*/
private String flinkSql;


public static JobConfigHistoryVO toVO(JobConfigHistoryDTO jobConfigHistoryDTO, boolean isFlinkSql) {
if (jobConfigHistoryDTO == null) {
return null;
}
JobConfigHistoryVO jobConfigHistoryVO = new JobConfigHistoryVO();
jobConfigHistoryVO.setId(jobConfigHistoryDTO.getId());
jobConfigHistoryVO.setJobConfigId(jobConfigHistoryDTO.getJobConfigId());
jobConfigHistoryVO.setJobName(jobConfigHistoryDTO.getJobName());
jobConfigHistoryVO.setDeployMode(jobConfigHistoryDTO.getDeployMode());
jobConfigHistoryVO.setFlinkRunConfig(jobConfigHistoryDTO.getFlinkRunConfig());
jobConfigHistoryVO.setFlinkCheckpointConfig(jobConfigHistoryDTO.getFlinkCheckpointConfig());
jobConfigHistoryVO.setExtJarPath(jobConfigHistoryDTO.getExtJarPath());
jobConfigHistoryVO.setVersion(jobConfigHistoryDTO.getVersion());
jobConfigHistoryVO.setCreateTime(DateFormatUtils.toFormatString(jobConfigHistoryDTO.getCreateTime()));
jobConfigHistoryVO.setEditTime(DateFormatUtils.toFormatString(jobConfigHistoryDTO.getEditTime()));
jobConfigHistoryVO.setCreator(jobConfigHistoryDTO.getCreator());
jobConfigHistoryVO.setEditor(jobConfigHistoryDTO.getEditor());
if (isFlinkSql) {
jobConfigHistoryVO.setFlinkSql(jobConfigHistoryDTO.getFlinkSql());
}
return jobConfigHistoryVO;
}

public static List<JobConfigHistoryVO> toListVO(List<JobConfigHistoryDTO> jobConfigHistoryDTOList) {
if (CollectionUtil.isEmpty(jobConfigHistoryDTOList)) {
return Collections.EMPTY_LIST;
}
List<JobConfigHistoryVO> list = new ArrayList<>();

for (JobConfigHistoryDTO jobConfigHistoryDTO : jobConfigHistoryDTOList) {
list.add(toVO(jobConfigHistoryDTO, Boolean.FALSE));
}

return list;
}
}
Loading

0 comments on commit 1d3e228

Please sign in to comment.