Skip to content

Commit

Permalink
[Fix][Engine] Fix spark local mode execute status error (#306)
Browse files Browse the repository at this point in the history
  • Loading branch information
zixi0825 authored Dec 6, 2023
1 parent 6163e44 commit 6a53171
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.connector.api;

import io.datavines.common.datasource.jdbc.BaseJdbcDataSourceInfo;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.connector.plugin;

import io.datavines.common.datasource.jdbc.BaseJdbcDataSourceInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public static List<StructField> getSchema(ResultSet resultSet, Dialect dialect,
boolean isNullable = metaData.isNullable(i + 1) != ResultSetMetaData.columnNoNulls;

StructField field = new StructField();
field.setName(columnName);
field.setName(columnName.toLowerCase());
field.setDataType(typeConverter.convert(typeName));
field.setNullable(isNullable);
field.setComment("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,19 @@ public ProcessResult run(String executeCommand){

String appId = YarnUtils.getYarnAppId(jobExecutionRequest.getTenantCode(), jobExecutionRequest.getJobExecutionUniqueId());
result.setApplicationId(appId);

// if yarn job , yarn state is final state
if (exitValue == 0){
exitStatusCode = YarnUtils.isSuccessOfYarnState(appId) ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode();
if (StringUtils.isEmpty(appId)) {
if (exitValue == 0){
exitStatusCode = ExecutionStatus.SUCCESS.getCode();
} else {
exitStatusCode = ExecutionStatus.FAILURE.getCode();
}
} else {
exitStatusCode = ExecutionStatus.FAILURE.getCode();
// if yarn job , yarn state is final state
if (exitValue == 0){
exitStatusCode = YarnUtils.isSuccessOfYarnState(appId) ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode();
} else {
exitStatusCode = ExecutionStatus.FAILURE.getCode();
}
}

result.setExitStatusCode(exitStatusCode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ private void sinkErrorData() {

ResultList resultList = SqlUtils.getPageFromResultSet(errorDataResultSet, SqlUtils.getQueryFromsAndJoins("select * from " + sourceTable), start, end);
for (Map<String, Object> row: resultList.getResultList()) {
for (int j=0 ;j<columns.size();j++) {
for (int j=0; j<columns.size(); j++) {
StructField field = columns.get(j);
String value = String.valueOf(row.get(field.getName()));
String value = String.valueOf(row.get(field.getName().toLowerCase()));
String rowContent = "null".equalsIgnoreCase(value) ? null : value;
if (rowContent != null) {
rowContent = rowContent.replaceAll("\"","");
Expand All @@ -198,51 +198,61 @@ private void sinkErrorData() {
if (StringUtils.isNotEmpty(rowContent)) {
errorDataPreparedStatement.setByte(j+1, Byte.parseByte(rowContent));
} else {
errorDataPreparedStatement.setByte(j+1,Byte.parseByte(""));
errorDataPreparedStatement.setNull(j+1, Types.TINYINT);
}
break;
case SHORT_TYPE:
if (StringUtils.isNotEmpty(rowContent)) {
errorDataPreparedStatement.setShort(j+1, Short.parseShort(rowContent));
} else {
errorDataPreparedStatement.setShort(j+1, Short.parseShort("0"));
errorDataPreparedStatement.setNull(j+1, Types.SMALLINT);
}
break;
case INT_TYPE :
if (StringUtils.isNotEmpty(rowContent)) {
errorDataPreparedStatement.setInt(j+1, Integer.parseInt(rowContent));
} else {
errorDataPreparedStatement.setInt(j+1, 0);
errorDataPreparedStatement.setNull(j+1, Types.INTEGER);
}
break;
case LONG_TYPE:
if (StringUtils.isNotEmpty(rowContent)) {
errorDataPreparedStatement.setLong(j+1, Long.parseLong(rowContent));
} else {
errorDataPreparedStatement.setLong(j+1, 0);
errorDataPreparedStatement.setNull(j+1, Types.BIGINT);
}
break;
case FLOAT_TYPE:
if (StringUtils.isNotEmpty(rowContent)) {
errorDataPreparedStatement.setFloat(j+1, Float.parseFloat(rowContent));
} else {
errorDataPreparedStatement.setFloat(j+1, 0);
errorDataPreparedStatement.setNull(j+1, Types.FLOAT);
}
break;
case DOUBLE_TYPE:
if (StringUtils.isNotEmpty(rowContent)) {
errorDataPreparedStatement.setDouble(j+1, Double.parseDouble(rowContent));
} else {
errorDataPreparedStatement.setDouble(j+1, 0);
errorDataPreparedStatement.setNull(j+1, Types.DOUBLE);
}
break;
case TIME_TYPE:
if (StringUtils.isNotEmpty(rowContent)) {
errorDataPreparedStatement.setString(j+1, rowContent);
} else {
errorDataPreparedStatement.setNull(j+1, Types.TIME);
}
case DATE_TYPE:
if (StringUtils.isNotEmpty(rowContent)) {
errorDataPreparedStatement.setString(j+1, rowContent);
} else {
errorDataPreparedStatement.setNull(j+1, Types.DATE);
}
case TIMESTAMP_TYPE:
if (StringUtils.isNotEmpty(rowContent)) {
errorDataPreparedStatement.setString(j+1,rowContent);
errorDataPreparedStatement.setString(j+1, rowContent);
} else {
errorDataPreparedStatement.setString(j+1,null);
errorDataPreparedStatement.setNull(j+1, Types.TIMESTAMP);
}
break;
case STRING_TYPE :
Expand All @@ -255,7 +265,7 @@ private void sinkErrorData() {
if (StringUtils.isNotEmpty(rowContent)) {
errorDataPreparedStatement.setBigDecimal(j+1, new BigDecimal(rowContent));
} else {
errorDataPreparedStatement.setBigDecimal(j+1, null);
errorDataPreparedStatement.setNull(j+1, Types.DECIMAL);
}
break;
case OBJECT:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.notification.plugin.wecombot.entity;

import io.datavines.common.utils.JSONUtils;
Expand Down

0 comments on commit 6a53171

Please sign in to comment.