Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix ldap org hierarchy bug and add sub flow reference id #10

Merged
merged 2 commits into from
Dec 18, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ subprojects {
"akka" : "com.typesafe.akka:akka-actor_2.10:2.2.0",
"jgit" : "org.eclipse.jgit:org.eclipse.jgit:4.1.1.201511131810-r",
"jsoup" : "org.jsoup:jsoup:1.8.3",
"commons_io" : "commons-io:commons-io:2.4",


"jackson_databind" : "com.fasterxml.jackson.core:jackson-databind:2.6.1",
"jackson_core" : "com.fasterxml.jackson.core:jackson-core:2.6.1",
Expand Down
5 changes: 5 additions & 0 deletions data-model/DDL/ETL_DDL/executor_metadata.sql
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ CREATE TABLE flow_job (
job_path VARCHAR(1024) COMMENT 'job path from top level',
job_type_id SMALLINT COMMENT 'type id of the job',
job_type VARCHAR(63) COMMENT 'type of the job',
ref_flow_id INT UNSIGNED DEFAULT NULL COMMENT 'the reference flow id of the job if the job is a subflow',
pre_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run before this job',
post_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run after this job',
is_current CHAR(1) COMMENT 'determine if it is a current job',
Expand All @@ -104,6 +105,7 @@ CREATE TABLE flow_job (
wh_etl_exec_id BIGINT COMMENT 'wherehows etl execution id that create this record',
PRIMARY KEY (app_id, job_id, dag_version),
INDEX flow_id_idx (app_id, flow_id),
INDEX ref_flow_id_idx (app_id, ref_flow_id),
INDEX job_path_idx (app_id, job_path(255))
)
ENGINE = InnoDB
Expand All @@ -122,6 +124,8 @@ CREATE TABLE stg_flow_job (
job_path VARCHAR(1024) COMMENT 'job path from top level',
job_type_id SMALLINT COMMENT 'type id of the job',
job_type VARCHAR(63) COMMENT 'type of the job',
ref_flow_id INT UNSIGNED DEFAULT NULL COMMENT 'the reference flow id of the job if the job is a subflow',
ref_flow_path VARCHAR(1024) COMMENT 'the reference flow path of the job if the job is a subflow',
pre_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run before this job',
post_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run after this job',
is_current CHAR(1) COMMENT 'determine if it is a current job',
Expand All @@ -131,6 +135,7 @@ CREATE TABLE stg_flow_job (
INDEX (app_id, job_id, dag_version),
INDEX flow_id_idx (app_id, flow_id),
INDEX flow_path_idx (app_id, flow_path(255)),
INDEX ref_flow_path_idx (app_id, ref_flow_path(255)),
INDEX job_path_idx (app_id, job_path(255)),
INDEX job_type_idx (job_type)
)
Expand Down
14 changes: 14 additions & 0 deletions data-model/DDL/ETL_DDL/git_metadata.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
--
-- Copyright 2015 LinkedIn Corp. All rights reserved.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--

CREATE TABLE `source_code_commit_info` (
`app_id` SMALLINT(5) UNSIGNED DEFAULT NULL,
`repository_urn` VARCHAR(300) CHAR SET latin1 NOT NULL COMMENT 'the git repo urn',
Expand Down
2 changes: 2 additions & 0 deletions metadata-etl/src/main/resources/jython/AzkabanExtract.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ def collect_flow_jobs(self, flow_file, job_file, dag_file):
node['jobType'],
'Y',
self.wh_exec_id)
if node['jobType'] == 'flow':
job_record.setRefFlowPath(row['project_name'] + ":" + node['embeddedFlowId'])
job_writer.append(job_record)

# job dag
Expand Down
2 changes: 1 addition & 1 deletion metadata-etl/src/main/resources/jython/AzkabanTransform.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

class AzkabanTransform(SchedulerTransform):
SchedulerTransform._tables["flows"]["columns"] = "app_id, flow_name, flow_group, flow_path, flow_level, source_modified_time, source_version, is_active, wh_etl_exec_id"
SchedulerTransform._tables["jobs"]["columns"] = "app_id, flow_path, source_version, job_name, job_path, job_type, is_current, wh_etl_exec_id"
SchedulerTransform._tables["jobs"]["columns"] = "app_id, flow_path, source_version, job_name, job_path, job_type, ref_flow_path, is_current, wh_etl_exec_id"
SchedulerTransform._tables["owners"]["columns"] = "app_id, flow_path, owner_id, permissions, owner_type, wh_etl_exec_id"
SchedulerTransform._tables["flow_execs"]["columns"] = "app_id, flow_name, flow_path, source_version, flow_exec_id, flow_exec_status, attempt_id, executed_by, start_time, end_time, wh_etl_exec_id"
SchedulerTransform._tables["job_execs"]["columns"] = "app_id, flow_path, source_version, flow_exec_id, job_name, job_path, job_exec_id, job_exec_status, attempt_id, start_time, end_time, wh_etl_exec_id"
Expand Down
5 changes: 5 additions & 0 deletions metadata-etl/src/main/resources/jython/LdapTransform.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ def update_hierarchy_info(self):
user_ids = []
org_hierarchy_long_string = ""
org_hierarchy_depth_long_string = ""

query = self._update_hierarchy_info.format(table=t.get("table"), app_id=self.app_id, user_ids=",".join(user_ids), org_hierarchy_long_string=org_hierarchy_long_string,
org_hierarchy_depth_long_string=org_hierarchy_depth_long_string)
# print query
self.wh_cursor.executemany(query)
self.wh_con.commit()

def find_path_for_user(self, start, pair, hierarchy):
Expand Down
5 changes: 3 additions & 2 deletions metadata-etl/src/main/resources/jython/SchedulerLoad.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ def load_jobs(self):
self.wh_con.commit()

cmd = """
INSERT INTO flow_job (app_id, flow_id, first_source_version, dag_version, job_id, job_name, job_path, job_type_id, job_type, pre_jobs, post_jobs,
INSERT INTO flow_job (app_id, flow_id, first_source_version, dag_version, job_id, job_name, job_path, job_type_id, job_type, ref_flow_id, pre_jobs, post_jobs,
is_current, is_first, is_last, created_time, modified_time, wh_etl_exec_id)
SELECT app_id, flow_id, source_version first_source_version, dag_version, job_id, job_name, job_path, job_type_id, job_type, pre_jobs, post_jobs,
SELECT app_id, flow_id, source_version first_source_version, dag_version, job_id, job_name, job_path, job_type_id, job_type, ref_flow_id, pre_jobs, post_jobs,
'Y', is_first, is_last, unix_timestamp(NOW()) created_time, NULL, wh_etl_exec_id
FROM stg_flow_job s
WHERE s.app_id = {app_id}
Expand All @@ -94,6 +94,7 @@ def load_jobs(self):
job_path = s.job_path,
job_type_id = s.job_type_id,
job_type = s.job_type,
ref_flow_id = s.ref_flow_id,
pre_jobs = s.pre_jobs,
post_jobs = s.post_jobs,
is_current = 'Y',
Expand Down
21 changes: 21 additions & 0 deletions metadata-etl/src/main/resources/jython/SchedulerTransform.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,27 @@ def read_job_file_to_stg(self):
self.wh_cursor.execute(query)
self.wh_con.commit()

# ad hoc fix for null values, need better solution by changing the load script
query = """
UPDATE {table} stg
SET stg.ref_flow_path = null
WHERE stg.ref_flow_path = 'null' and stg.app_id = {app_id}
""".format(table=t.get("table"), app_id=self.app_id)
print query
self.wh_cursor.execute(query)
self.wh_con.commit()

# Update sub flow id from mapping table
query = """
UPDATE {table} stg
JOIN flow_source_id_map fm
ON stg.app_id = fm.app_id AND stg.ref_flow_path = fm.source_id_string
SET stg.ref_flow_id = fm.flow_id WHERE stg.app_id = {app_id}
""".format(table=t.get("table"), app_id=self.app_id)
print query
self.wh_cursor.execute(query)
self.wh_con.commit()

# Insert new job into job map to generate job id
query = """
INSERT INTO job_source_id_map (app_id, source_id_string)
Expand Down
1 change: 1 addition & 0 deletions wherehows-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dependencies {
compile externalDependency.spring_jdbc
compile externalDependency.jgit
compile externalDependency.jsoup
compile externalDependency.commons_io
testCompile externalDependency.testng
testCompile project(":metadata-etl")
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class AzkabanJobRecord extends AbstractRecord {
String jobName;
String jobPath;
String jobType;
String refFlowPath;
Character isCurrent;
Long whExecId;

Expand All @@ -51,9 +52,81 @@ public List<Object> fillAllFields() {
allFields.add(jobName);
allFields.add(jobPath);
allFields.add(jobType);
allFields.add(refFlowPath);
allFields.add(isCurrent);
allFields.add(whExecId);
return allFields;
}

public Integer getAppId() {
return appId;
}

public void setAppId(Integer appId) {
this.appId = appId;
}

public String getFlowPath() {
return flowPath;
}

public void setFlowPath(String flowPath) {
this.flowPath = flowPath;
}

public Integer getSourceVersion() {
return sourceVersion;
}

public void setSourceVersion(Integer sourceVersion) {
this.sourceVersion = sourceVersion;
}

public String getJobName() {
return jobName;
}

public void setJobName(String jobName) {
this.jobName = jobName;
}

public String getJobPath() {
return jobPath;
}

public void setJobPath(String jobPath) {
this.jobPath = jobPath;
}

public String getJobType() {
return jobType;
}

public void setJobType(String jobType) {
this.jobType = jobType;
}

public String getRefFlowPath() {
return refFlowPath;
}

public void setRefFlowPath(String refFlowPath) {
this.refFlowPath = refFlowPath;
}

public Character getIsCurrent() {
return isCurrent;
}

public void setIsCurrent(Character isCurrent) {
this.isCurrent = isCurrent;
}

public Long getWhExecId() {
return whExecId;
}

public void setWhExecId(Long whExecId) {
this.whExecId = whExecId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public GitCommitRecord(GitUtil.CommitMetadata commitMetadata, String gitRepoUrn)
this.gitRepoUrn = gitRepoUrn;
this.commitId = commitMetadata.getCommitId();
this.filePath = commitMetadata.getFilePath();
this.fileName = FilenameUtils.getName(this.filePath);
this.fileName = commitMetadata.getFileName();
this.commitTime = commitMetadata.getCommitTime().getTime() / 1000;
this.committerName = commitMetadata.getCommitter();
this.committerEmail = commitMetadata.getCommitterEmail();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.eclipse.jgit.api.Git;
import org.eclipse.jgit.api.errors.GitAPIException;
import org.eclipse.jgit.lib.Constants;
Expand Down Expand Up @@ -134,6 +135,7 @@ public static List<CommitMetadata> getRepoMetadata(String repoDir) throws IOExce
for (RevCommit r : commitLog) {
CommitMetadata metadata = new CommitMetadata(r.getName());
metadata.setFilePath(filePath);
metadata.setFileName(FilenameUtils.getName(filePath));
metadata.setMessage(r.getShortMessage().trim());
// Difference between committer and author
// refer to: http://git-scm.com/book/ch2-3.html
Expand Down Expand Up @@ -172,6 +174,7 @@ public static class CommitMetadata {
String committerEmail;
String authorEmail;
String filePath;
String fileName;

public CommitMetadata() {
}
Expand All @@ -181,7 +184,7 @@ public CommitMetadata(String commitId) {
}

public CommitMetadata(String commitId, String author, String committer, Date commitTime, String message,
String committerEmail, String authorEmail, String filePath) {
String committerEmail, String authorEmail, String filePath, String fileName) {
this.commitId = commitId;
this.author = author;
this.committer = committer;
Expand All @@ -190,6 +193,7 @@ public CommitMetadata(String commitId, String author, String committer, Date com
this.committerEmail = committerEmail;
this.authorEmail = authorEmail;
this.filePath = filePath;
this.fileName = fileName;
}

public String getCommitId() {
Expand Down Expand Up @@ -255,6 +259,14 @@ public String getFilePath() {
public void setFilePath(String filePath) {
this.filePath = filePath;
}

public String getFileName() {
return fileName;
}

public void setFileName(String fileName) {
this.fileName = fileName;
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,20 @@ public class StringUtil {

public static String toDbString(Object object) {
if (object != null) {
return "'" + object.toString().replace("\'", "\\\'").replace("\"", "\\\"") + "'";
return "'" + object.toString().replace("\\", "\\\\").replace("\'", "\\\'").replace("\"", "\\\"") + "'";
} else {
return "null";
}
}

public static String toCsvString(Object object) {
if (object != null) {
return "\"" + object.toString().replace("\\", "\\\\").replace("\'", "\\\'").replace("\"", "\\\"") + "\"";
} else {
return "\\N";
}
}

public static String replace(String s, String target, Object replacement) {
if (replacement != null) {
return s.replace(target, "'" + replacement.toString().replace("\'", "\\\'").replace("\"", "\\\"") + "'");
Expand Down