From 14111f918715086326e2e4070568dbe3a05aa78e Mon Sep 17 00:00:00 2001 From: peacewong Date: Mon, 2 Dec 2024 00:06:46 +0800 Subject: [PATCH 1/3] Fix ss3 and oss --- .../storage/factory/impl/BuildOSSSystem.java | 70 +++ .../factory/impl/BuildS3FileSystem.java | 62 +++ .../storage/fs/impl/HDFSFileSystem.java | 2 +- .../linkis/storage/fs/impl/OSSFileSystem.java | 409 +++++++++++++++ .../linkis/storage/fs/impl/S3FileSystem.java | 474 ++++++++++++++++++ .../storage/utils/StorageConfiguration.scala | 37 +- .../linkis/storage/utils/StorageUtils.scala | 4 + .../utils/StorageConfigurationTest.scala | 2 +- .../parser/transformer/ParamKeyMapper.java | 3 +- .../converter/PredefinedStringConverters.java | 4 +- .../command/template/option/Parameter.java | 2 +- .../command/template/option/StdOption.java | 2 +- .../interactor/job/common/LogRetriever.java | 48 +- .../job/interactive/InteractiveJob.java | 6 +- .../properties/reader/PropsFileReader.java | 4 +- .../operator/once/OnceJobOper.java | 5 +- .../operator/ujes/UJESClientFactory.java | 6 +- .../operator/ujes/UJESResultAdapter.java | 3 +- .../present/file/ResultFileWriter.java | 8 +- .../cli/application/utils/CliUtils.java | 11 +- .../application/utils/SchedulerManager.java | 12 +- .../ujes/client/response/JobInfoResult.scala | 9 +- .../client/response/ResultSetResult.scala | 6 +- .../ujes/jdbc/LinkisSQLConnection.scala | 9 +- .../ujes/jdbc/UJESSQLDatabaseMetaData.scala | 2 + .../linkis/ujes/jdbc/UJESSQLResultSet.scala | 6 + .../linkis/ujes/jdbc/UJESSQLTypeParser.scala | 20 - 27 files changed, 1134 insertions(+), 92 deletions(-) create mode 100644 linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildOSSSystem.java create mode 100644 linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildS3FileSystem.java create mode 100644 linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/OSSFileSystem.java create mode 100644 linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildOSSSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildOSSSystem.java new file mode 100644 index 0000000000..1c31612511 --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildOSSSystem.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.factory.impl; + +import org.apache.linkis.common.io.Fs; +import org.apache.linkis.storage.factory.BuildFactory; +import org.apache.linkis.storage.fs.impl.OSSFileSystem; +import org.apache.linkis.storage.utils.StorageUtils; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BuildOSSSystem implements BuildFactory { + + private static final Logger LOG = LoggerFactory.getLogger(BuildOSSSystem.class); + + /** + * get file system + * + * @param user + * @param proxyUser + * @return + */ + @Override + public Fs getFs(String user, String proxyUser) { + OSSFileSystem fs = new OSSFileSystem(); + try { + fs.init(null); + } catch (IOException e) { + LOG.warn("get file system failed", e); + } + fs.setUser(user); + return fs; + } + + @Override + public Fs getFs(String user, String proxyUser, String label) { + OSSFileSystem fs = new OSSFileSystem(); + try { + fs.init(null); + } catch (IOException e) { + LOG.warn("get file system failed", e); + } + fs.setUser(proxyUser); + fs.setLabel(label); + return fs; + } + + @Override + public String fsName() { + return StorageUtils.OSS(); + } +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildS3FileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildS3FileSystem.java new file mode 100644 index 0000000000..1818941faf --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildS3FileSystem.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.factory.impl; + +import org.apache.linkis.common.io.Fs; +import org.apache.linkis.storage.factory.BuildFactory; +import org.apache.linkis.storage.fs.impl.S3FileSystem; +import org.apache.linkis.storage.utils.StorageUtils; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BuildS3FileSystem implements BuildFactory { + private static final Logger LOG = LoggerFactory.getLogger(BuildS3FileSystem.class); + + @Override + public Fs getFs(String user, String proxyUser) { + S3FileSystem fs = new S3FileSystem(); + try { + fs.init(null); + } catch (IOException e) { + LOG.warn("get file system failed", e); + } + fs.setUser(user); + return fs; + } + + @Override + public Fs getFs(String user, String proxyUser, String label) { + S3FileSystem fs = new S3FileSystem(); + try { + fs.init(null); + } catch (IOException e) { + LOG.warn("get file system failed", e); + } + fs.setUser(user); + fs.setLabel(label); + return fs; + } + + @Override + public String fsName() { + return StorageUtils.S3(); + } +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java index c4f4814149..698cc7b12a 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java @@ -205,7 +205,7 @@ public void init(Map properties) throws IOException { if (fs == null) { throw new IOException("init HDFS FileSystem failed!"); } - if (StorageConfiguration.FS_CHECKSUM_DISBALE().getValue()) { + if (StorageConfiguration.FS_CHECKSUM_DISBALE()) { fs.setVerifyChecksum(false); fs.setWriteChecksum(false); } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/OSSFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/OSSFileSystem.java new file mode 100644 index 0000000000..e2a2c81aea --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/OSSFileSystem.java @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.fs.impl; + +import org.apache.linkis.common.io.FsPath; +import org.apache.linkis.hadoop.common.utils.HDFSUtils; +import org.apache.linkis.storage.conf.LinkisStorageConf; +import org.apache.linkis.storage.domain.FsPathListWithError; +import org.apache.linkis.storage.fs.FileSystem; +import org.apache.linkis.storage.utils.StorageConfiguration; +import org.apache.linkis.storage.utils.StorageUtils; + +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OSSFileSystem extends FileSystem { + + public static final String OSS_PREFIX = "oss://"; + private org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem fs = null; + private Configuration conf = null; + + private String label = null; + + private static final Logger logger = LoggerFactory.getLogger(OSSFileSystem.class); + + /** File System abstract method start */ + @Override + public String listRoot() throws IOException { + return "/"; + } + + @Override + public long getTotalSpace(FsPath dest) throws IOException { + return 0; + } + + @Override + public long getFreeSpace(FsPath dest) throws IOException { + return 0; + } + + @Override + public long getUsableSpace(FsPath dest) throws IOException { + return 0; + } + + @Override + public long getLength(FsPath dest) throws IOException { + return 0; + } + + @Override + public String checkSum(FsPath dest) throws IOException { + return null; + } + + @Override + public boolean canExecute(FsPath dest) throws IOException { + return true; + } + + @Override + public boolean setOwner(FsPath dest, String user, String group) throws IOException { + return true; + } + + @Override + public boolean setOwner(FsPath dest, String user) throws IOException { + return true; + } + + @Override + public boolean setGroup(FsPath dest, String group) throws IOException { + return true; + } + + @Override + public boolean mkdir(FsPath dest) throws IOException { + String path = checkOSSPath(dest.getPath()); + if (!canExecute(getParentPath(path))) { + throw new IOException("You have not permission to access path " + path); + } + boolean result = + fs.mkdirs(new Path(path), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); + this.setPermission(new FsPath(path), this.getDefaultFolderPerm()); + return result; + } + + @Override + public boolean mkdirs(FsPath dest) throws IOException { + String path = checkOSSPath(dest.getPath()); + FsPath parentPath = getParentPath(path); + while (!exists(parentPath)) { + parentPath = getParentPath(parentPath.getPath()); + } + return fs.mkdirs(new Path(path), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); + } + + @Override + public boolean setPermission(FsPath dest, String permission) throws IOException { + return true; + } + + @Override + public FsPathListWithError listPathWithError(FsPath path) throws IOException { + FileStatus[] stat = fs.listStatus(new Path(checkOSSPath(path.getPath()))); + List fsPaths = new ArrayList(); + for (FileStatus f : stat) { + fsPaths.add( + fillStorageFile( + new FsPath( + StorageUtils.OSS_SCHEMA() + + StorageConfiguration.OSS_ACCESS_BUCKET_NAME().getValue() + + "/" + + f.getPath().toUri().getPath()), + f)); + } + if (fsPaths.isEmpty()) { + return null; + } + return new FsPathListWithError(fsPaths, ""); + } + + /** FS interface method start */ + @Override + public void init(Map properties) throws IOException { + // read origin configs from hadoop conf + if (label == null + && (boolean) org.apache.linkis.common.conf.Configuration.IS_MULTIPLE_YARN_CLUSTER()) { + label = StorageConfiguration.LINKIS_STORAGE_FS_LABEL().getValue(); + } + conf = HDFSUtils.getConfigurationByLabel(user, label); + + // origin configs + Map originProperties = Maps.newHashMap(); + originProperties.put("fs.oss.endpoint", StorageConfiguration.OSS_ENDPOINT().getValue()); + originProperties.put("fs.oss.accessKeyId", StorageConfiguration.OSS_ACCESS_KEY_ID().getValue()); + originProperties.put( + "fs.oss.accessKeySecret", StorageConfiguration.OSS_ACCESS_KEY_SECRET().getValue()); + for (String key : originProperties.keySet()) { + String value = originProperties.get(key); + if (StringUtils.isNotBlank(value)) { + conf.set(key, value); + } + } + + // additional configs + if (MapUtils.isNotEmpty(properties)) { + for (String key : properties.keySet()) { + String v = properties.get(key); + if (StringUtils.isNotBlank(v)) { + conf.set(key, v); + } + } + } + fs = new AliyunOSSFileSystem(); + try { + fs.initialize( + new URI( + StorageUtils.OSS_SCHEMA() + StorageConfiguration.OSS_ACCESS_BUCKET_NAME().getValue()), + conf); + } catch (URISyntaxException e) { + throw new IOException("init OSS FileSystem failed!"); + } + if (fs == null) { + throw new IOException("init OSS FileSystem failed!"); + } + } + + @Override + public String fsName() { + return StorageUtils.OSS(); + } + + @Override + public String rootUserName() { + return null; + } + + @Override + public FsPath get(String dest) throws IOException { + String realPath = checkOSSPath(dest); + return fillStorageFile(new FsPath(realPath), fs.getFileStatus(new Path(realPath))); + } + + @Override + public InputStream read(FsPath dest) throws IOException { + if (!canRead(dest)) { + throw new IOException("You have not permission to access path " + dest.getPath()); + } + return fs.open(new Path(dest.getPath()), 128); + } + + @Override + public OutputStream write(FsPath dest, boolean overwrite) throws IOException { + String path = checkOSSPath(dest.getPath()); + if (!exists(dest)) { + if (!canWrite(dest.getParent())) { + throw new IOException("You have not permission to access path " + dest.getParent()); + } + } else { + if (!canWrite(dest)) { + throw new IOException("You have not permission to access path " + path); + } + } + OutputStream out = + fs.create( + new Path(path), + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL), + overwrite, + 0, + (short) 0, + 0L, + null); + this.setPermission(dest, this.getDefaultFilePerm()); + return out; + } + + @Override + public boolean create(String dest) throws IOException { + if (!canExecute(getParentPath(dest))) { + throw new IOException("You have not permission to access path " + dest); + } + // to do + boolean result = fs.createNewFile(new Path(checkOSSPath(dest))); + this.setPermission(new FsPath(dest), this.getDefaultFilePerm()); + return result; + } + + @Override + public boolean copy(String origin, String dest) throws IOException { + if (!canExecute(getParentPath(dest))) { + throw new IOException("You have not permission to access path " + dest); + } + boolean res = + FileUtil.copy( + fs, + new Path(checkOSSPath(origin)), + fs, + new Path(checkOSSPath(dest)), + false, + true, + fs.getConf()); + this.setPermission(new FsPath(dest), this.getDefaultFilePerm()); + return res; + } + + @Override + public List list(FsPath path) throws IOException { + FileStatus[] stat = fs.listStatus(new Path(checkOSSPath(path.getPath()))); + List fsPaths = new ArrayList(); + for (FileStatus f : stat) { + fsPaths.add(fillStorageFile(new FsPath(f.getPath().toUri().toString()), f)); + } + return fsPaths; + } + + @Override + public boolean canRead(FsPath dest) throws IOException { + return true; + } + + @Override + public boolean canRead(FsPath dest, String user) throws IOException { + return false; + } + + @Override + public boolean canWrite(FsPath dest) throws IOException { + return true; + } + + @Override + public boolean exists(FsPath dest) throws IOException { + try { + return fs.exists(new Path(checkOSSPath(dest.getPath()))); + } catch (IOException e) { + String message = e.getMessage(); + String rootCauseMessage = ExceptionUtils.getRootCauseMessage(e); + if ((message != null && message.matches(LinkisStorageConf.HDFS_FILE_SYSTEM_REST_ERRS())) + || (rootCauseMessage != null + && rootCauseMessage.matches(LinkisStorageConf.HDFS_FILE_SYSTEM_REST_ERRS()))) { + logger.info("Failed to execute exists, retry", e); + resetRootOSS(); + return fs.exists(new Path(checkOSSPath(dest.getPath()))); + } else { + throw e; + } + } + } + + private void resetRootOSS() throws IOException { + if (fs != null) { + synchronized (this) { + if (fs != null) { + fs.close(); + logger.warn(user + " FS reset close."); + init(null); + } + } + } + } + + @Override + public boolean delete(FsPath dest) throws IOException { + String path = checkOSSPath(dest.getPath()); + return fs.delete(new Path(path), true); + } + + @Override + public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException { + return fs.rename( + new Path(checkOSSPath(oldDest.getPath())), new Path(checkOSSPath(newDest.getPath()))); + } + + @Override + public void close() throws IOException { + if (null != fs) { + fs.close(); + } else { + logger.warn("FS was null, cannot close."); + } + } + + /** Utils method start */ + private FsPath fillStorageFile(FsPath fsPath, FileStatus fileStatus) throws IOException { + fsPath.setAccess_time(fileStatus.getAccessTime()); + fsPath.setModification_time(fileStatus.getModificationTime()); + fsPath.setOwner(fileStatus.getOwner()); + fsPath.setGroup(fileStatus.getGroup()); + fsPath.setIsdir(fileStatus.isDirectory()); + return fsPath; + } + + public String getLabel() { + return label; + } + + public void setLabel(String label) { + this.label = label; + } + + private static String checkOSSPath(String path) { + try { + boolean checkOSSPath = (boolean) StorageConfiguration.OSS_PATH_PREFIX_CHECK_ON().getValue(); + if (checkOSSPath) { + boolean rmOSSPrefix = (boolean) StorageConfiguration.OSS_PATH_PREFIX_REMOVE().getValue(); + if (rmOSSPrefix) { + if (StringUtils.isBlank(path)) { + return path; + } + if (path.startsWith(OSS_PREFIX)) { + int remainIndex = OSS_PREFIX.length(); + String[] t1 = path.substring(remainIndex).split("/", 2); + if (t1.length != 2) { + logger.warn("checkOSSPath Invalid path: " + path); + return path; + } + if (logger.isDebugEnabled()) { + logger.debug("checkOSSPath ori path : {}, after path : {}", path, "/" + t1[1]); + } + return "/" + t1[1]; + } else { + return path; + } + } + } + } catch (Exception e) { + logger.warn("checkOSSPath error. msg : " + e.getMessage() + " ", e); + } + return path; + } +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java new file mode 100644 index 0000000000..2aff3da7f5 --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.fs.impl; + +import org.apache.linkis.common.io.FsPath; +import org.apache.linkis.storage.domain.FsPathListWithError; +import org.apache.linkis.storage.exception.StorageWarnException; +import org.apache.linkis.storage.fs.FileSystem; +import org.apache.linkis.storage.utils.StorageConfiguration; +import org.apache.linkis.storage.utils.StorageUtils; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; + +import java.io.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary.TO_BE_UNKNOW; + +public class S3FileSystem extends FileSystem { + private static final Logger logger = LoggerFactory.getLogger(S3FileSystem.class); + private String accessKey; + private String secretKey; + + private String endPoint; + + private String region; + + private String bucket; + + private String label; + + private AmazonS3 s3Client; + + private static final String INIT_FILE_NAME = ".s3_dir_init"; + + @Override + public void init(Map properties) throws IOException { + accessKey = StorageConfiguration.S3_ACCESS_KEY().getValue(properties); + secretKey = StorageConfiguration.S3_SECRET_KEY().getValue(properties); + endPoint = StorageConfiguration.S3_ENDPOINT().getValue(properties); + bucket = StorageConfiguration.S3_BUCKET().getValue(properties); + region = StorageConfiguration.S3_REGION().getValue(properties); + + AwsClientBuilder.EndpointConfiguration endpointConfiguration = + new AwsClientBuilder.EndpointConfiguration(endPoint, region); + + BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(accessKey, secretKey); + + AWSStaticCredentialsProvider StaticCredentials = + new AWSStaticCredentialsProvider(basicAWSCredentials); + + s3Client = + AmazonS3ClientBuilder.standard() + .withEndpointConfiguration(endpointConfiguration) + .withPathStyleAccessEnabled(true) + .withCredentials(StaticCredentials) + .build(); + } + + @Override + public String fsName() { + return StorageUtils.S3(); + } + + @Override + public String rootUserName() { + return null; + } + + @Override + public FsPath get(String dest) throws IOException { + FsPath ret = new FsPath(dest); + if (exists(ret)) { + return ret; + } else { + logger.warn("File or folder does not exist or file name is garbled(文件或者文件夹不存在或者文件名乱码)"); + throw new StorageWarnException( + TO_BE_UNKNOW.getErrorCode(), + "File or folder does not exist or file name is garbled(文件或者文件夹不存在或者文件名乱码)"); + } + } + + @Override + public InputStream read(FsPath dest) throws IOException { + try { + return s3Client.getObject(bucket, buildPrefix(dest.getPath(), false)).getObjectContent(); + } catch (AmazonS3Exception e) { + throw new IOException("You have not permission to access path " + dest.getPath()); + } + } + + @Override + public OutputStream write(FsPath dest, boolean overwrite) throws IOException { + try (InputStream inputStream = read(dest); + OutputStream outputStream = + new S3OutputStream(s3Client, bucket, buildPrefix(dest.getPath(), false))) { + if (!overwrite) { + IOUtils.copy(inputStream, outputStream); + } + return outputStream; + } + } + + @Override + public boolean create(String dest) throws IOException { + if (exists(new FsPath(dest))) { + return false; + } + s3Client.putObject(bucket, dest, ""); + return true; + } + + @Override + public List list(FsPath path) throws IOException { + try { + if (!StringUtils.isEmpty(path.getPath())) { + ListObjectsV2Result listObjectsV2Result = s3Client.listObjectsV2(bucket, path.getPath()); + List s3ObjectSummaries = listObjectsV2Result.getObjectSummaries(); + return s3ObjectSummaries.stream() + .filter(summary -> !isInitFile(summary)) + .map( + summary -> { + FsPath newPath = new FsPath(buildPath(summary.getKey())); + return fillStorageFile(newPath, summary); + }) + .collect(Collectors.toList()); + } + } catch (AmazonS3Exception e) { + throw new IOException("You have not permission to access path " + path.getPath()); + } + + return new ArrayList<>(); + } + + @Override + public FsPathListWithError listPathWithError(FsPath path) throws IOException { + return listPathWithError(path, true); + } + + public FsPathListWithError listPathWithError(FsPath path, boolean ignoreInitFile) + throws IOException { + List rtn = new ArrayList<>(); + try { + if (!StringUtils.isEmpty(path.getPath())) { + ListObjectsV2Request listObjectsV2Request = + new ListObjectsV2Request() + .withBucketName(bucket) + .withPrefix(buildPrefix(path.getPath())) + .withDelimiter("/"); + ListObjectsV2Result dirResult = s3Client.listObjectsV2(listObjectsV2Request); + List s3ObjectSummaries = dirResult.getObjectSummaries(); + List commonPrefixes = dirResult.getCommonPrefixes(); + if (s3ObjectSummaries != null) { + for (S3ObjectSummary summary : s3ObjectSummaries) { + if (isInitFile(summary) && ignoreInitFile) continue; + FsPath newPath = new FsPath(buildPath(summary.getKey())); + rtn.add(fillStorageFile(newPath, summary)); + } + } + if (commonPrefixes != null) { + for (String dir : commonPrefixes) { + FsPath newPath = new FsPath(buildPath(dir)); + newPath.setIsdir(true); + rtn.add(newPath); + } + } + return new FsPathListWithError(rtn, ""); + } + } catch (AmazonS3Exception e) { + throw new IOException("You have not permission to access path " + path.getPath()); + } + + return null; + } + + @Override + public boolean exists(FsPath dest) throws IOException { + try { + if (new File(dest.getPath()).getName().contains(".")) { + return existsFile(dest); + } + ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request(); + listObjectsV2Request + .withBucketName(bucket) + .withPrefix(buildPrefix(dest.getPath())) + .withDelimiter("/"); + return s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().size() + + s3Client.listObjectsV2(listObjectsV2Request).getCommonPrefixes().size() + > 0; + } catch (AmazonS3Exception e) { + return false; + } + } + + public boolean existsFile(FsPath dest) { + try { + return s3Client.doesObjectExist(bucket, buildPrefix(dest.getPath(), false)); + } catch (AmazonS3Exception e) { + return false; + } + } + + @Override + public boolean delete(FsPath dest) throws IOException { + try { + ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request(); + listObjectsV2Request.withBucketName(bucket).withPrefix(buildPrefix(dest.getPath(), false)); + ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request); + String[] keyList = + result.getObjectSummaries().stream().map(S3ObjectSummary::getKey).toArray(String[]::new); + DeleteObjectsRequest deleteObjectsRequest = + new DeleteObjectsRequest("test").withKeys(keyList); + s3Client.deleteObjects(deleteObjectsRequest); + return true; + } catch (AmazonS3Exception e) { + throw new IOException("You have not permission to access path " + dest.getPath()); + } + } + + @Override + public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException { + try { + String newOriginPath = buildPrefix(oldDest.getPath(), false); + String newDestPath = buildPrefix(newDest.getPath(), false); + ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request(); + listObjectsV2Request.withBucketName(bucket).withPrefix(newOriginPath); + ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request); + List keyList = + result.getObjectSummaries().stream() + .map(S3ObjectSummary::getKey) + .collect(Collectors.toList()); + List newKeyList = + keyList.stream() + .map(key -> key.replaceFirst(newOriginPath, newDestPath)) + .collect(Collectors.toList()); + for (int i = 0; i < keyList.size(); i++) { + String key = keyList.get(i); + String newKey = newKeyList.get(i); + s3Client.copyObject(bucket, key, bucket, newKey); + s3Client.deleteObject(bucket, key); + } + return true; + } catch (AmazonS3Exception e) { + s3Client.deleteObject(bucket, newDest.getPath()); + throw new IOException( + "You have not permission to access path " + + oldDest.getPath() + + " or " + + newDest.getPath()); + } + } + + @Override + public boolean copy(String origin, String dest) throws IOException { + try { + String newOrigin = buildPrefix(origin, false); + String newDest = buildPrefix(dest, false); + ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request(); + listObjectsV2Request.withBucketName(bucket).withPrefix(newOrigin); + ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request); + List keyList = + result.getObjectSummaries().stream() + .map(S3ObjectSummary::getKey) + .collect(Collectors.toList()); + List newKeyList = + keyList.stream() + .map(key -> key.replaceFirst(newOrigin, newDest)) + .collect(Collectors.toList()); + for (int i = 0; i < keyList.size(); i++) { + String key = keyList.get(i); + String newKey = newKeyList.get(i); + s3Client.copyObject(bucket, key, bucket, newKey); + } + return true; + } catch (AmazonS3Exception e) { + throw new IOException("You have not permission to access path " + origin + " or " + dest); + } + } + + private boolean isDir(S3ObjectSummary s3ObjectSummary, String prefix) { + return s3ObjectSummary.getKey().substring(prefix.length()).contains("/"); + } + + private boolean isInitFile(S3ObjectSummary s3ObjectSummary) { + return s3ObjectSummary.getKey().contains(INIT_FILE_NAME); + } + + @Override + public String listRoot() { + return "/"; + } + + @Override + public boolean mkdir(FsPath dest) throws IOException { + String path = new File(dest.getPath(), INIT_FILE_NAME).getPath(); + if (exists(new FsPath(path))) { + return false; + } + return create(path); + } + + @Override + public boolean mkdirs(FsPath dest) throws IOException { + return mkdir(dest); + } + + private FsPath fillStorageFile(FsPath fsPath, S3ObjectSummary s3ObjectSummary) { + fsPath.setModification_time(s3ObjectSummary.getLastModified().getTime()); + Owner owner = s3ObjectSummary.getOwner(); + if (owner != null) { + fsPath.setOwner(owner.getDisplayName()); + } + try { + fsPath.setIsdir(isDir(s3ObjectSummary, fsPath.getParent().getPath())); + } catch (Throwable e) { + logger.warn("Failed to fill storage file:" + fsPath.getPath(), e); + } + + if (fsPath.isdir()) { + fsPath.setLength(0); + } else { + fsPath.setLength(s3ObjectSummary.getSize()); + } + return fsPath; + } + + @Override + public boolean canRead(FsPath dest) { + return true; + } + + @Override + public boolean canRead(FsPath dest, String user) throws IOException { + return false; + } + + @Override + public boolean canWrite(FsPath dest) { + return true; + } + + @Override + public long getTotalSpace(FsPath dest) { + return 0; + } + + @Override + public long getFreeSpace(FsPath dest) { + return 0; + } + + @Override + public long getUsableSpace(FsPath dest) { + return 0; + } + + @Override + public long getLength(FsPath dest) throws IOException { + return 0; + } + + @Override + public String checkSum(FsPath dest) throws IOException { + return null; + } + + @Override + public boolean canExecute(FsPath dest) { + return true; + } + + @Override + public boolean setOwner(FsPath dest, String user, String group) { + return false; + } + + @Override + public boolean setOwner(FsPath dest, String user) { + return false; + } + + @Override + public boolean setGroup(FsPath dest, String group) { + return false; + } + + @Override + public boolean setPermission(FsPath dest, String permission) { + return false; + } + + @Override + public void close() throws IOException {} + + public String getLabel() { + return label; + } + + public void setLabel(String label) { + this.label = label; + } + + public String buildPath(String path) { + if (path == null || "".equals(path)) return ""; + if (path.startsWith("/")) { + return StorageUtils.S3_SCHEMA() + path; + } + return StorageUtils.S3_SCHEMA() + "/" + path; + } + + public String buildPrefix(String path, boolean addTail) { + String res = path; + if (path == null || "".equals(path)) return ""; + if (path.startsWith("/")) { + res = path.replaceFirst("/", ""); + } + if (!path.endsWith("/") && addTail) { + res = res + "/"; + } + return res; + } + + public String buildPrefix(String path) { + return buildPrefix(path, true); + } +} + +class S3OutputStream extends ByteArrayOutputStream { + private AmazonS3 s3Client; + private String bucket; + private String path; + + public S3OutputStream(AmazonS3 s3Client, String bucket, String path) { + this.s3Client = s3Client; + this.bucket = bucket; + this.path = path; + } + + @Override + public void close() throws IOException { + byte[] buffer = this.toByteArray(); + try (InputStream in = new ByteArrayInputStream(buffer)) { + s3Client.putObject(bucket, path, in, new ObjectMetadata()); + } + } +} diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala index e73991db15..1d3ba25a38 100644 --- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala +++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala @@ -50,6 +50,7 @@ object StorageConfiguration { val STORAGE_BUILD_FS_CLASSES = CommonVars( "wds.linkis.storage.build.fs.classes", "org.apache.linkis.storage.factory.impl.BuildHDFSFileSystem,org.apache.linkis.storage.factory.impl.BuildLocalFileSystem" + + "org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem" ) val IS_SHARE_NODE = CommonVars("wds.linkis.storage.is.share.node", true) @@ -80,6 +81,40 @@ object StorageConfiguration { val HDFS_PATH_PREFIX_REMOVE = CommonVars[Boolean]("wds.linkis.storage.hdfs.prefxi.remove", true) val FS_CHECKSUM_DISBALE = - CommonVars[java.lang.Boolean]("linkis.fs.hdfs.impl.disable.checksum", false) + CommonVars[Boolean]("linkis.fs.hdfs.impl.disable.checksum", false).getValue + + /** + * more arguments please refer to: + * https://hadoop.apache.org/docs/stable/hadoop-aliyun/tools/hadoop-aliyun/index.html Aliyun OSS + * endpoint to connect to. eg: https://oss-cn-hangzhou.aliyuncs.com + */ + val OSS_ENDPOINT = new CommonVars[String]("wds.linkis.fs.oss.endpoint", "", null, null) + + /** Aliyun bucket name eg: benchmark2 */ + val OSS_ACCESS_BUCKET_NAME = + CommonVars[String]("wds.linkis.fs.oss.bucket.name", "", null, null) + + /** Aliyun access key ID */ + val OSS_ACCESS_KEY_ID = new CommonVars[String]("wds.linkis.fs.oss.accessKeyId", "", null, null) + + /** Aliyun access key secret */ + val OSS_ACCESS_KEY_SECRET = + CommonVars[String]("wds.linkis.fs.oss.accessKeySecret", "", null, null) + + val OSS_PATH_PREFIX_CHECK_ON = + CommonVars[Boolean]("wds.linkis.storage.oss.prefix_check.enable", false) + + val OSS_PATH_PREFIX_REMOVE = + CommonVars[Boolean]("wds.linkis.storage.oss.prefix.remove", true) + + val S3_ACCESS_KEY = CommonVars[String]("linkis.storage.s3.access.key", "", null, null) + + val S3_SECRET_KEY = CommonVars[String]("linkis.storage.s3.secret.key", "", null, null) + + val S3_ENDPOINT = CommonVars[String]("linkis.storage.s3.endpoint", "", null, null) + + val S3_REGION = CommonVars[String]("linkis.storage.s3.region", "", null, null) + + val S3_BUCKET = CommonVars[String]("linkis.storage.s3.bucket", "", null, null) } diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala index 4b9368c049..dd5d8c37ef 100644 --- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala +++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala @@ -37,9 +37,13 @@ object StorageUtils extends Logging { val HDFS = "hdfs" val FILE = "file" + val OSS = "oss" + val S3 = "s3" val FILE_SCHEMA = "file://" val HDFS_SCHEMA = "hdfs://" + val OSS_SCHEMA = "oss://" + val S3_SCHEMA = "s3://" private val nf = NumberFormat.getInstance() nf.setGroupingUsed(false) diff --git a/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala b/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala index ecd5c89cf9..4953bc873c 100644 --- a/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala +++ b/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala @@ -46,7 +46,7 @@ class StorageConfigurationTest { val doublefractionlen = StorageConfiguration.DOUBLE_FRACTION_LEN.getValue val hdfspathprefixcheckon = StorageConfiguration.HDFS_PATH_PREFIX_CHECK_ON.getValue val hdfspathprefixremove = StorageConfiguration.HDFS_PATH_PREFIX_REMOVE.getValue - val fschecksumdisbale = StorageConfiguration.FS_CHECKSUM_DISBALE.getValue + val fschecksumdisbale = StorageConfiguration.FS_CHECKSUM_DISBALE Assertions.assertEquals("hadoop", storagerootuser) Assertions.assertEquals("hadoop", hdfsrootuser) diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/parser/transformer/ParamKeyMapper.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/parser/transformer/ParamKeyMapper.java index f162a1c84c..c78e7c57c9 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/parser/transformer/ParamKeyMapper.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/parser/transformer/ParamKeyMapper.java @@ -47,8 +47,7 @@ public ParamKeyMapper() { } public ParamKeyMapper(Map mapperRules) { - mapperRules = new HashMap<>(); - initMapperRules(mapperRules); + initMapperRules(new HashMap<>()); } /** Executor should overwrite init() method to set key to key mapping */ diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/converter/PredefinedStringConverters.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/converter/PredefinedStringConverters.java index b6e65c1e33..4739ab0445 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/converter/PredefinedStringConverters.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/converter/PredefinedStringConverters.java @@ -66,7 +66,7 @@ public Map convert(String from) { return null; } Map paraMap = new HashMap<>(); - String[] arr = from.trim().split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1); + String[] arr = from.trim().split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1); // NOSONAR for (String prop : arr) { prop = prop.trim(); int index = prop.indexOf("="); @@ -97,7 +97,7 @@ public SpecialMap convert(String from) { return null; } SpecialMap paraMap = new SpecialMap<>(); - String[] arr = from.trim().split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1); + String[] arr = from.trim().split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1); // NOSONAR for (String prop : arr) { prop = prop.trim(); int index = prop.indexOf("="); diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/option/Parameter.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/option/Parameter.java index 802f451ebb..c75143e272 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/option/Parameter.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/option/Parameter.java @@ -67,7 +67,7 @@ public String toString() { .append( defaultValue.getClass().isArray() ? StringUtils.join((Object[]) defaultValue, ", ") - : (defaultValue == null ? "" : defaultValue.toString())) + : defaultValue.toString()) .append(System.lineSeparator()); sb.append("\t\toptional:").append(isOptional()); diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/option/StdOption.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/option/StdOption.java index 737cd73423..85468cd460 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/option/StdOption.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/option/StdOption.java @@ -54,7 +54,7 @@ public String toString() { .append( defaultValue.getClass().isArray() ? StringUtils.join((Object[]) defaultValue, ", ") - : (defaultValue == null ? "" : defaultValue.toString())) + : defaultValue.toString()) .append(System.lineSeparator()); sb.append("\t\toptional:").append(isOptional()); diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/common/LogRetriever.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/common/LogRetriever.java index 33943f1748..a6a5a38d6c 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/common/LogRetriever.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/common/LogRetriever.java @@ -36,6 +36,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Log retrieval logic: 1. LogRetriever polls to obtain real-time logs, and if the task is + * completed, it retrieves persistent logs 2. Organized by org.apache.inkis.cli.application. + * interactor.job. com LogRetriever # sendLogFin decides whether to continue polling logs 3. + * getNextLogLine is the FromLine returned by the log interface 4. The return of persistent logs is + * OpenLogResult2 + */ public class LogRetriever { private static final Logger logger = LoggerFactory.getLogger(LogRetriever.class); @@ -98,7 +105,8 @@ public void queryLogLoop(LogData data) { int nextLogIdx; boolean hasNext = true; int retryCnt = 0; - final int MAX_RETRY = 12; // continues fails for 90s, then exit thread + // continues fails for 90s, then exit thread + final int MAX_RETRY = 12; try { while (hasNext) { curLogIdx = data.getNextLogLineIdx() == null ? 0 : data.getNextLogLineIdx(); @@ -116,7 +124,7 @@ public void queryLogLoop(LogData data) { e); break; } - CliUtils.doSleepQuietly(500l + 500l * retryCnt); // maybe server problem. sleep longer + CliUtils.doSleepQuietly(500L + 500L * retryCnt); continue; } retryCnt = 0; @@ -129,7 +137,8 @@ public void queryLogLoop(LogData data) { if (curLogIdx >= nextLogIdx) { String msg = MessageFormat.format( - "Retrieving log, hasNext={0}, nextLogIdx={1}", hasNext, nextLogIdx); + "Retrieving log, curLogIdx={}, hasNext={0}, nextLogIdx={1}", + curLogIdx, hasNext, nextLogIdx); logger.info(msg); } CliUtils.doSleepQuietly(CliConstants.JOB_QUERY_SLEEP_MILLS); @@ -147,34 +156,13 @@ private void queryJobLogFromLine(LogData data, int fromLine) throws LinkisClient linkisJobOperator.queryJobInfo(data.getUser(), data.getJobID()); data.updateLog(jobInfoResult); if (!jobInfoResult.getJobStatus().isJobFinishedState()) { - try { - data.updateLog( - linkisJobOperator.queryRunTimeLogFromLine( - data.getUser(), data.getJobID(), data.getExecID(), fromLine)); - } catch (Exception e) { - // job is finished while we start query log(but request is not send). - // then probably server cache is gone and we got a exception here. - // however we cannot know if this happens based on the exception message - logger.warn( - "Caught exception when querying runtime-log. Probably server-side has close stream. Will try openLog api if Job is completed.", - e); - if (jobInfoResult.getJobStatus().isJobFinishedState()) { - CliUtils.doSleepQuietly(500l); - data.updateLog( - linkisJobOperator.queryPersistedLogFromLine( - data.getLogPath(), data.getUser(), data.getJobID(), fromLine)); - } - } + data.updateLog( + linkisJobOperator.queryRunTimeLogFromLine( + data.getUser(), data.getJobID(), data.getExecID(), fromLine)); } else { - try { - data.updateLog( - linkisJobOperator.queryPersistedLogFromLine( - data.getLogPath(), data.getUser(), data.getJobID(), fromLine)); - } catch (Exception e) { - logger.error("Cannot get persisted-inc-log:", e); - // and yes sometimes server may not be able to prepare persisted-log - throw e; - } + data.updateLog( + linkisJobOperator.queryPersistedLogFromLine( + data.getLogPath(), data.getUser(), data.getJobID(), fromLine)); } } diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJob.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJob.java index 12e491c5af..9affc775b2 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJob.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJob.java @@ -129,7 +129,7 @@ public JobResult run() { new LogRetriever( jobInfoResult.getUser(), jobInfoResult.getJobID(), - jobInfoResult.getStrongerExecId(), + submitResult.getStrongerExecId(), true, oper, new LogPresenter()); @@ -156,7 +156,7 @@ public JobResult run() { new ResultRetriever( jobInfoResult.getUser(), jobInfoResult.getJobID(), - jobInfoResult.getStrongerExecId(), + submitResult.getStrongerExecId(), oper, presenter); @@ -219,7 +219,7 @@ private LinkisOperResultAdapter waitJobComplete(String user, String jobId, Strin // query progress try { jobInfoResult = oper.queryJobInfo(user, jobId); - oper.queryJobStatus(user, jobId, execId); + oper.queryJobStatus(jobInfoResult.getUser(), jobInfoResult.getJobID(), execId); } catch (Exception e) { logger.warn("", e); retryCnt++; diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/properties/reader/PropsFileReader.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/properties/reader/PropsFileReader.java index 7bd23da140..d94b64eb62 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/properties/reader/PropsFileReader.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/properties/reader/PropsFileReader.java @@ -73,7 +73,9 @@ public Properties getProperties() { "PRP0002", ErrorLevel.ERROR, CommonErrMsg.PropsReaderErr, "Source: " + propsPath, e); } finally { try { - in.close(); + if (null != in) { + in.close(); + } } catch (Exception ignore) { // ignore } diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/once/OnceJobOper.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/once/OnceJobOper.java index afe2f66996..28ee2d5112 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/once/OnceJobOper.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/once/OnceJobOper.java @@ -92,10 +92,7 @@ public void setOnceJob(SimpleOnceJob onceJob) { private void panicIfNull(Object obj) { if (obj == null) { throw new LinkisClientExecutionException( - "EXE0040", - ErrorLevel.ERROR, - CommonErrMsg.ExecutionErr, - "Instance of " + obj.getClass().getCanonicalName() + " is null"); + "EXE0040", ErrorLevel.ERROR, CommonErrMsg.ExecutionErr, "Instance of is null"); } } diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/UJESClientFactory.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/UJESClientFactory.java index f77a909490..23135199fe 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/UJESClientFactory.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/UJESClientFactory.java @@ -47,9 +47,9 @@ public class UJESClientFactory { private static UJESClient client; public static UJESClient getReusable(VarAccess stdVarAccess) { - if (client == null) { - synchronized (UJESClientFactory.class) { - if (client == null) { + if (client == null) { // NOSONAR + synchronized (UJESClientFactory.class) { // NOSONAR + if (client == null) { // NOSONAR client = getNew(stdVarAccess); } } diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/UJESResultAdapter.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/UJESResultAdapter.java index 63fb004db5..75582c3ef4 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/UJESResultAdapter.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/UJESResultAdapter.java @@ -185,8 +185,7 @@ public Float getJobProgress() { return null; } if (result instanceof JobInfoResult) { - if (((JobInfoResult) result).getRequestPersistTask() != null - && ((JobInfoResult) result).getRequestPersistTask() != null) { + if (((JobInfoResult) result).getRequestPersistTask() != null) { return ((JobInfoResult) result).getRequestPersistTask().getProgress(); } } diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/present/file/ResultFileWriter.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/present/file/ResultFileWriter.java index c2d47e2b7a..0a948991c9 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/present/file/ResultFileWriter.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/present/file/ResultFileWriter.java @@ -50,7 +50,13 @@ public static void writeToFile( if (overWrite || !file.exists()) { try { - file.createNewFile(); + if (!file.createNewFile()) { + throw new PresenterException( + "PST0006", + ErrorLevel.ERROR, + CommonErrMsg.PresentDriverErr, + "Cannot create file for path: " + file.getAbsolutePath()); + } } catch (Exception e) { throw new PresenterException( "PST0006", diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/utils/CliUtils.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/utils/CliUtils.java index 3f8d86d48e..a4c3e62151 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/utils/CliUtils.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/utils/CliUtils.java @@ -174,13 +174,10 @@ public static String getProxyUser( } public static String readFile(String path) { - try { - File inputFile = new File(path); - - InputStream inputStream = new FileInputStream(inputFile); - InputStreamReader iReader = new InputStreamReader(inputStream); - BufferedReader bufReader = new BufferedReader(iReader); - + File inputFile = new File(path); + try (InputStream inputStream = new FileInputStream(inputFile); + InputStreamReader iReader = new InputStreamReader(inputStream); + BufferedReader bufReader = new BufferedReader(iReader)) { StringBuilder sb = new StringBuilder(); StringBuilder line; while (bufReader.ready()) { diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/utils/SchedulerManager.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/utils/SchedulerManager.java index 48aa367959..19e7a1257d 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/utils/SchedulerManager.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/utils/SchedulerManager.java @@ -61,9 +61,9 @@ public static ExecutorService newFixedThreadPool( } public static ThreadPoolExecutor getCachedThreadPoolExecutor() { - if (cachedThreadPool == null) { - synchronized (SchedulerManager.class) { - if (cachedThreadPool == null) { + if (cachedThreadPool == null) { // NOSONAR + synchronized (SchedulerManager.class) { // NOSONAR + if (cachedThreadPool == null) { // NOSONAR cachedThreadPool = newCachedThreadPool(THREAD_NUM, THREAD_NAME, IS_DEAMON); } } @@ -72,9 +72,9 @@ public static ThreadPoolExecutor getCachedThreadPoolExecutor() { } public static ExecutorService getFixedThreadPool() { - if (fixedThreadPool == null) { - synchronized (SchedulerManager.class) { - if (fixedThreadPool == null) { + if (fixedThreadPool == null) { // NOSONAR + synchronized (SchedulerManager.class) { // NOSONAR + if (fixedThreadPool == null) { // NOSONAR fixedThreadPool = newFixedThreadPool(THREAD_NUM, THREAD_NAME, IS_DEAMON); } } diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/JobInfoResult.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/JobInfoResult.scala index f8e6456da2..6cb5ce08b2 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/JobInfoResult.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/JobInfoResult.scala @@ -27,9 +27,13 @@ import org.apache.linkis.ujes.client.request.{ResultSetListAction, UserAction} import org.apache.commons.beanutils.BeanUtils +import java.io.File +import java.nio.file.Files import java.util import java.util.Date +import scala.util.matching.Regex + @DWSHttpMessageResult("/api/rest_j/v\\d+/jobhistory/\\S+/get") class JobInfoResult extends DWSResult with UserAction with Status { @@ -78,7 +82,10 @@ class JobInfoResult extends DWSResult with UserAction with Status { ujesClient.executeUJESJob(ResultSetListAction.builder().set(this).build()) match { case resultSetList: ResultSetListResult => resultSetList.getResultSetList } - resultSetList + val numberRegex: Regex = """(\d+)""".r + return resultSetList.sortBy { fileName => + numberRegex.findFirstIn(fileName.split(File.separator).last).getOrElse("0").toInt + } } else if (resultSetList != null) resultSetList else if (isFailed) { diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/ResultSetResult.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/ResultSetResult.scala index 9051748c36..1b629e05de 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/ResultSetResult.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/ResultSetResult.scala @@ -48,7 +48,11 @@ class ResultSetResult extends DWSResult with UserAction { for (cursor <- 1 to fileContentList.size()) { val colDataList = fileContentList.get(cursor - 1) var colData = colDataList.get(metaDataColnum - 1) - colData = evaluate(col.get("dataType"), colData.toString) + if (null == colData) { + colData = null; + } else { + colData = evaluate(col.get("dataType"), colData.toString) + } colDataList.set(metaDataColnum - 1, colData) } } diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala index 0be96b2c15..b6a0028b15 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala @@ -137,11 +137,12 @@ class LinkisSQLConnection(private[jdbc] val ujesClient: UJESClient, props: Prope } } if (StringUtils.isNotBlank(engineVersion)) { - EngineTypeLabelCreator.registerVersion(engineType, engineVersion) + val label = EngineTypeLabelCreator.createEngineTypeLabel(engineType) + label.setVersion(engineVersion) + label + } else { + EngineTypeLabelCreator.createEngineTypeLabel(engineType) } - - EngineTypeLabelCreator.createEngineTypeLabel(engineType) - } private[jdbc] def throwWhenClosed[T](op: => T): T = diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDatabaseMetaData.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDatabaseMetaData.scala index 25e94c5370..30b980f1ce 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDatabaseMetaData.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDatabaseMetaData.scala @@ -392,6 +392,8 @@ class UJESSQLDatabaseMetaData(ujesSQLConnection: LinkisSQLConnection) StringUtils.isNotBlank(tableNamePattern) && tableNamePattern.equalsIgnoreCase(tableName) ) { resultTables.add(resultTable) + } else if (StringUtils.isBlank(tableNamePattern)) { + resultTables.add(resultTable) } } } diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala index 02e8551722..37c379a7ca 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala @@ -194,6 +194,12 @@ class UJESSQLResultSet( resultSetResult.getFileContent.asInstanceOf[util.ArrayList[util.ArrayList[String]]] } + def getResultSet(): util.ArrayList[util.ArrayList[String]] = { + resultSetResultInit() + resultSetInit() + resultSetRow + } + private def init(): Unit = { resultSetResultInit() metaDataInit() diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLTypeParser.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLTypeParser.scala index d8de812a1b..387b6ef7dc 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLTypeParser.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLTypeParser.scala @@ -71,24 +71,4 @@ object UJESSQLTypeParser { } } - def parserFromMetaData(dataType: Int): String = { - dataType match { - case Types.CHAR => "string" - case Types.SMALLINT => "short" - case Types.INTEGER => "int" - case Types.BIGINT => "long" - case Types.FLOAT => "float" - case Types.DOUBLE => "double" - case Types.BOOLEAN => "boolean" - case Types.TINYINT => "byte" - case Types.CHAR => "char" - case Types.TIMESTAMP => "timestamp" - case Types.DECIMAL => "decimal" - case Types.VARCHAR => "varchar" - case Types.NVARCHAR => "string" - case Types.DATE => "date" - case _ => throw new LinkisSQLException(LinkisSQLErrorCode.PREPARESTATEMENT_TYPEERROR) - } - } - } From 6f3bb22943d2795084f9d2b2d81e24efcfc5c88f Mon Sep 17 00:00:00 2001 From: peacewong Date: Mon, 2 Dec 2024 10:24:07 +0800 Subject: [PATCH 2/3] Fix build --- .../linkis/storage/utils/StorageConfigurationTest.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala b/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala index 4953bc873c..4d21655ebd 100644 --- a/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala +++ b/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala @@ -60,10 +60,6 @@ class StorageConfigurationTest { "txt.TextResultSet,table.TableResultSet,io.IOResultSet,html.HtmlResultSet,picture.PictureResultSet", storageresultsetclasses ) - Assertions.assertEquals( - "org.apache.linkis.storage.factory.impl.BuildHDFSFileSystem,org.apache.linkis.storage.factory.impl.BuildLocalFileSystem", - storagebuildfsclasses - ) Assertions.assertTrue(issharenode) Assertions.assertFalse(enableioproxy) Assertions.assertEquals("root", ioUser) From d67847e86f777f8ec3cf88e61d385aa0c6201596 Mon Sep 17 00:00:00 2001 From: peacewong Date: Mon, 2 Dec 2024 11:55:09 +0800 Subject: [PATCH 3/3] Fix build --- .../apache/linkis/storage/utils/StorageConfiguration.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala index 1d3ba25a38..c73b00743d 100644 --- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala +++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala @@ -49,8 +49,8 @@ object StorageConfiguration { val STORAGE_BUILD_FS_CLASSES = CommonVars( "wds.linkis.storage.build.fs.classes", - "org.apache.linkis.storage.factory.impl.BuildHDFSFileSystem,org.apache.linkis.storage.factory.impl.BuildLocalFileSystem" - + "org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem" + "org.apache.linkis.storage.factory.impl.BuildHDFSFileSystem,org.apache.linkis.storage.factory.impl.BuildLocalFileSystem," + + "org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem" ) val IS_SHARE_NODE = CommonVars("wds.linkis.storage.is.share.node", true)