Skip to content

Commit

Permalink
Support cloud input/output for IntervalListTools (#1852)
Browse files Browse the repository at this point in the history
  • Loading branch information
takutosato committed Jul 20, 2023
1 parent ac8890a commit 6420484
Show file tree
Hide file tree
Showing 6 changed files with 614 additions and 135 deletions.
68 changes: 68 additions & 0 deletions src/main/java/picard/nio/DeleteRecursive.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package picard.nio;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;

/**
*
* Copied from GATK; to be removed once the original GATK code is ported to htsjdk
*
* Class to hold a set of {@link Path} to be delete on the JVM exit through a shutdown hook.
*
* <p>This class is a modification of {@link htsjdk.samtools.util.nio.DeleteOnExitPathHook}
*
* This class should be considered an implementation detail of {@link IOUtils#deleteOnExit(Path)} and not used directly.
*/
class DeleteRecursivelyOnExitPathHook {
private static final Logger LOG = LogManager.getLogger(DeleteRecursivelyOnExitPathHook.class);
private static LinkedHashSet<Path> paths = new LinkedHashSet<>();
static {
Runtime.getRuntime().addShutdownHook(new Thread(DeleteRecursivelyOnExitPathHook::runHooks));
}

private DeleteRecursivelyOnExitPathHook() {}

/**
* Adds a {@link Path} for deletion on JVM exit.
*
* @param path path to be deleted. This path may be a non-empty directory and the entire directory structure will
* be deleted.
*
* @throws IllegalStateException if the shutdown hook is in progress.
*/
public static synchronized void add(Path path) {
if(paths == null) {
// DeleteOnExitHook is running. Too late to add a file
throw new IllegalStateException("Shutdown in progress");
}

paths.add(path);
}

static void runHooks() {
LinkedHashSet<Path> thePaths;

synchronized (DeleteRecursivelyOnExitPathHook.class) {
thePaths = paths;
paths = null;
}

ArrayList<Path> toBeDeleted = new ArrayList<>(thePaths);

// reverse the list to maintain previous jdk deletion order.
// Last in first deleted.
Collections.reverse(toBeDeleted);
for (Path path : toBeDeleted) {
try {
GATKIOUtils.deleteRecursively(path);
} catch (final Exception e) {
// do nothing if cannot be deleted, because it is a shutdown hook
LOG.debug(() -> "Could not recursively delete " + path.toString() + " during JVM shutdown because we encountered the following exception:", e);
}
}
}
}
160 changes: 160 additions & 0 deletions src/main/java/picard/nio/GATKBucketUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package picard.nio;

import com.google.cloud.storage.contrib.nio.CloudStorageFileSystem;
import htsjdk.samtools.util.FileExtensions;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.UUID;


/**
* Copied from BucketUtils.java in GATK
* To be replaced once the original GATK BucketUtils.java is ported to htsjdk
*/
public class GATKBucketUtils {
// In GATK these are accessed as e.g. GoogleCloudStorageFileSystem.SCHEME
public static final String GOOGLE_CLOUD_STORAGE_FILESYSTEM_SCHEME = "gs";
public static final String HTTP_FILESYSTEM_PROVIDER_SCHEME = "http";
public static final String HTTPS_FILESYSTEM_PROVIDER_SCHEME = "https";


public static final String GCS_PREFIX = "gs://";
public static final String HTTP_PREFIX = "http://";
public static final String HTTPS_PREFIX = "https://";
public static final String HDFS_SCHEME = "hdfs";
public static final String HDFS_PREFIX = HDFS_SCHEME + "://";

// slashes omitted since hdfs paths seem to only have 1 slash which would be weirder to include than no slashes
public static final String FILE_PREFIX = "file:";

private GATKBucketUtils(){} //private so that no one will instantiate this class

/**
* Get a temporary file path based on the prefix and extension provided.
* This file (and possible indexes associated with it) will be scheduled for deletion on shutdown
*
* @param prefix a prefix for the file name
* for remote paths this should be a valid URI to root the temporary file in (e.g. gs://hellbender/staging/)
* there is no guarantee that this will be used as the root of the tmp file name, a local prefix may be placed in the tmp folder for example
* @param extension and extension for the temporary file path, the resulting path will end in this
* @return a path to use as a temporary file, on remote file systems which don't support an atomic tmp file reservation a path is chosen with a long randomized name
*
*/
public static String getTempFilePath(String prefix, String extension){
if (isGcsUrl(prefix) || (isHadoopUrl(prefix))){
final String path = randomRemotePath(prefix, "", extension);
GATKIOUtils.deleteOnExit(GATKIOUtils.getPath(path));
// Mark auxiliary files to be deleted
GATKIOUtils.deleteOnExit(GATKIOUtils.getPath(path + FileExtensions.TRIBBLE_INDEX));
GATKIOUtils.deleteOnExit(GATKIOUtils.getPath(path + FileExtensions.TABIX_INDEX));
GATKIOUtils.deleteOnExit(GATKIOUtils.getPath(path + ".bai"));
GATKIOUtils.deleteOnExit(GATKIOUtils.getPath(path + ".md5"));
GATKIOUtils.deleteOnExit(GATKIOUtils.getPath(path.replaceAll(extension + "$", ".bai"))); //if path ends with extension, replace it with .bai
return path;
} else {
return GATKIOUtils.createTempFile(prefix, extension).getAbsolutePath();
}
}



/**
* Picks a random name, by putting some random letters between "prefix" and "suffix".
*
* @param stagingLocation The folder where you want the file to be. Must start with "gs://" or "hdfs://"
* @param prefix The beginning of the file name
* @param suffix The end of the file name, e.g. ".tmp"
*/
private static String randomRemotePath(String stagingLocation, String prefix, String suffix) {
if (isGcsUrl(stagingLocation)) {
// Go through URI because Path.toString isn't guaranteed to include the "gs://" prefix.
return getPathOnGcs(stagingLocation).resolve(prefix + UUID.randomUUID() + suffix).toUri().toString();
// Disable support for Hadoop in Picard
// } else if (isHadoopUrl(stagingLocation)) {
// return new Path(stagingLocation, prefix + UUID.randomUUID() + suffix).toString();
} else {
throw new IllegalArgumentException("Staging location is not remote: " + stagingLocation);
}
}

/**
* String -> Path. This *should* not be necessary (use Paths.get(URI.create(...)) instead) , but it currently is
* on Spark because using the fat, shaded jar breaks the registration of the GCS FilesystemProvider.
* To transform other types of string URLs into Paths, use IOUtils.getPath instead.
*/
public static java.nio.file.Path getPathOnGcs(String gcsUrl) {
// use a split limit of -1 to preserve empty split tokens, especially trailing slashes on directory names
final String[] split = gcsUrl.split("/", -1);
final String BUCKET = split[2];
final String pathWithoutBucket = String.join("/", Arrays.copyOfRange(split, 3, split.length));
return CloudStorageFileSystem.forBucket(BUCKET).getPath(pathWithoutBucket);
}

/**
* @param path path to inspect
* @return true if this path represents a gcs location
*/
public static boolean isGcsUrl(final String path) {
GATKUtils.nonNull(path);
return path.startsWith(GCS_PREFIX);
}

/**
*
* The GATK code modified to use PicardHTSPath rather than GATKPath
* Return true if this {@code PicardHTSPath} represents a gcs URI.
* @param pathSpec specifier to inspect
* @return true if this {@code PicardHTSPath} represents a gcs URI.
*/
public static boolean isGcsUrl(final PicardHtsPath pathSpec) {
GATKUtils.nonNull(pathSpec);
return pathSpec.getScheme().equals(GOOGLE_CLOUD_STORAGE_FILESYSTEM_SCHEME);
}

/**
* @param pathSpec specifier to inspect
* @return true if this {@code GATKPath} represents a remote storage system which may benefit from prefetching (gcs or http(s))
*/
public static boolean isEligibleForPrefetching(final PicardHtsPath pathSpec) {
GATKUtils.nonNull(pathSpec);
return isEligibleForPrefetching(pathSpec.getScheme());
}

/**
* @param path path to inspect
* @return true if this {@code Path} represents a remote storage system which may benefit from prefetching (gcs or http(s))
*/
public static boolean isEligibleForPrefetching(final java.nio.file.Path path) {
GATKUtils.nonNull(path);
return isEligibleForPrefetching(path.toUri().getScheme());
}

private static boolean isEligibleForPrefetching(final String scheme){
return scheme != null
&& (scheme.equals(GOOGLE_CLOUD_STORAGE_FILESYSTEM_SCHEME)
|| scheme.equals(HTTP_FILESYSTEM_PROVIDER_SCHEME)
|| scheme.equals(HTTPS_FILESYSTEM_PROVIDER_SCHEME));
}

/**
* @return true if the given path is an http or https Url.
*/
public static boolean isHttpUrl(String path){
return path.startsWith(HTTP_PREFIX) || path.startsWith(HTTPS_PREFIX);
}

/**
* Returns true if the given path is a HDFS (Hadoop filesystem) URL.
*/
public static boolean isHadoopUrl(String path) {
return path.startsWith(HDFS_PREFIX);
}

/**
* Returns true if the given path is a GCS, HDFS (Hadoop filesystem), or Http(s) URL.
*/
public static boolean isRemoteStorageUrl(String path) {
return isGcsUrl(path) || isHadoopUrl(path) || isHttpUrl(path);
}
}
128 changes: 128 additions & 0 deletions src/main/java/picard/nio/GATKIOUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package picard.nio;

import com.google.cloud.storage.contrib.nio.CloudStorageFileSystem;
import htsjdk.samtools.util.FileExtensions;
import htsjdk.samtools.util.IOUtil;
import picard.PicardException;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.*;
import java.util.HashMap;

public class GATKIOUtils {
/**
* Schedule a file or directory to be deleted on JVM shutdown.
*
* This calls {@link GATKIOUtils#deleteRecursively(Path)} on {@code fileToDelete }as a shutdown hook.
* @param fileToDelete file or directory to be deleted recursively at JVM shutdown.
*/
public static void deleteOnExit(final Path fileToDelete){
DeleteRecursivelyOnExitPathHook.add(fileToDelete);
}

/**
* Converts the given URI to a {@link Path} object. If the filesystem cannot be found in the usual way, then attempt
* to load the filesystem provider using the thread context classloader. This is needed when the filesystem
* provider is loaded using a URL classloader (e.g. in spark-submit).
*
* Also makes an attempt to interpret the argument as a file name if it's not a URI.
*
* @param uriString the URI to convert.
* @return the resulting {@code Path}
* @throws UserException if an I/O error occurs when creating the file system
*/
public static Path getPath(String uriString) {
GATKUtils.nonNull(uriString);
URI uri;
try {
uri = URI.create(uriString);
} catch (IllegalArgumentException x) {
// not a valid URI. Caller probably just gave us a file name.
return Paths.get(uriString);
}
try {
// special case GCS, in case the filesystem provider wasn't installed properly but is available.
if (CloudStorageFileSystem.URI_SCHEME.equals(uri.getScheme())) {
return GATKBucketUtils.getPathOnGcs(uriString);
}
// Paths.get(String) assumes the default file system
// Paths.get(URI) uses the scheme
return uri.getScheme() == null ? Paths.get(uriString) : Paths.get(uri);
} catch (FileSystemNotFoundException e) {
try {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if ( cl == null ) {
throw e;
}
return FileSystems.newFileSystem(uri, new HashMap<>(), cl).provider().getPath(uri);
}
catch (ProviderNotFoundException x) {
// TODO: this creates bogus Path on the current file system for schemes such as gendb, nonexistent, gcs
// TODO: we depend on this code path to allow IntervalUtils to all getPath on a string that may be either
// a literal interval or a feature file containing intervals
// not a valid URI. Caller probably just gave us a file name or "chr1:1-2".
return Paths.get(uriString);
}
catch ( IOException io ) {
// UserException in GATK but Picard does not differentiate between e.g. PicardException vs UserException
// Might be useful to add the UserException class in the long term
throw new PicardException(uriString + " is not a supported path", io);
}
}
}

/**
* Creates a temp file that will be deleted on exit
*
* This will also mark the corresponding Tribble/Tabix/BAM indices matching the temp file for deletion.
* @param name Prefix of the file; {@link File#createTempFile(String, String, File)} requires that this be >= 3 characters
* @param extension Extension to concat to the end of the file.
* @return A file in the temporary directory starting with name, ending with extension, which will be deleted after the program exits.
*/
public static File createTempFile(String name, String extension) {
return createTempFileInDirectory(name, extension, null);
}

/**
* Creates a temp file in a target directory that will be deleted on exit
*
* This will also mark the corresponding Tribble/Tabix/BAM indices matching the temp file for deletion.
* @param name Prefix of the file; {@link File#createTempFile(String, String, File)} requires that this be >= 3 characters
* @param extension Extension to concat to the end of the file name.
* @param targetDir Directory in which to create the temp file. If null, the default temp directory is used.
* @return A file in the temporary directory starting with name, ending with extension, which will be deleted after the program exits.
*/
public static File createTempFileInDirectory(final String name, String extension, final File targetDir) {
try {

if (!extension.startsWith(".")) {
extension = "." + extension;
}

final File file = File.createTempFile(name, extension, targetDir);
file.deleteOnExit();

// Mark corresponding indices for deletion on exit as well just in case an index is created for the temp file:
new File(file.getAbsolutePath() + FileExtensions.TRIBBLE_INDEX).deleteOnExit();
new File(file.getAbsolutePath() + FileExtensions.TABIX_INDEX).deleteOnExit();
new File(file.getAbsolutePath() + ".bai").deleteOnExit();
new File(file.getAbsolutePath() + ".md5").deleteOnExit();
new File(file.getAbsolutePath().replaceAll(extension + "$", ".bai")).deleteOnExit();

return file;
} catch (IOException ex) {
throw new PicardException("Cannot create temp file: " + ex.getMessage(), ex);
}
}

/**
* Delete rootPath recursively
* @param rootPath is the file/directory to be deleted
*/
public static void deleteRecursively(final Path rootPath) {
IOUtil.recursiveDelete(rootPath);
}

}
Loading

0 comments on commit 6420484

Please sign in to comment.