Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a lock & a timeout when checking whether an upload target file is open. #1852

Merged
merged 4 commits into from
Sep 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import org.slf4j.Logger;

Expand Down Expand Up @@ -42,12 +45,20 @@ public List<String> runCommand(final List<String> command, final Redirect redire
return runCommand(command, redirectOutput, Sets.newHashSet(0));
}

public int getExitCode(final List<String> command) {
public int getExitCode(final List<String> command, long timeoutMillis) {
final ProcessBuilder processBuilder = new ProcessBuilder(command);
Optional<Integer> exitCode = Optional.absent();
try {
final Process process = startProcess(processBuilder);
return process.waitFor();
boolean exited = process.waitFor(timeoutMillis, TimeUnit.MILLISECONDS);

if (exited) {
exitCode = Optional.of(process.exitValue());
return process.exitValue();
} else {
throw new TimeoutException(String.format("Waited %d ms for an exit code from `%s`, but it didn't terminate in time.", timeoutMillis, command.stream().collect(Collectors.joining(" "))));
}

} catch (Throwable t) {
signalKillToProcessIfActive();
throw new RuntimeException(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
Expand All @@ -35,8 +36,8 @@ public class SingularityGCSUploader extends SingularityUploader {

public SingularityGCSUploader(S3UploadMetadata uploadMetadata, FileSystem fileSystem, SingularityS3UploaderMetrics metrics, Path metadataPath,
SingularityS3UploaderConfiguration configuration, String hostname, SingularityRunnerExceptionNotifier exceptionNotifier,
JsonObjectFileHelper jsonHelper) {
super(uploadMetadata, fileSystem, metrics, metadataPath, configuration, hostname, exceptionNotifier);
Lock checkFileOpenLock, JsonObjectFileHelper jsonHelper) {
super(uploadMetadata, fileSystem, metrics, metadataPath, configuration, hostname, exceptionNotifier, checkFileOpenLock);
this.storage = StorageOptions.newBuilder()
.setCredentials(loadCredentials(uploadMetadata, jsonHelper))
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
Expand Down Expand Up @@ -41,8 +42,8 @@ public class SingularityS3Uploader extends SingularityUploader {
private final AmazonS3 s3Client;

SingularityS3Uploader(BasicAWSCredentials defaultCredentials, S3UploadMetadata uploadMetadata, FileSystem fileSystem, SingularityS3UploaderMetrics metrics, Path metadataPath,
SingularityS3UploaderConfiguration configuration, String hostname, SingularityRunnerExceptionNotifier exceptionNotifier) {
super(uploadMetadata, fileSystem, metrics, metadataPath, configuration, hostname, exceptionNotifier);
SingularityS3UploaderConfiguration configuration, String hostname, SingularityRunnerExceptionNotifier exceptionNotifier, Lock checkFileOpenLock) {
super(uploadMetadata, fileSystem, metrics, metadataPath, configuration, hostname, exceptionNotifier, checkFileOpenLock);
BasicAWSCredentials credentials = defaultCredentials;

if (uploadMetadata.getS3SecretKey().isPresent() && uploadMetadata.getS3AccessKey().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class SingularityS3UploaderDriver extends WatchServiceHelper implements S
private final Map<S3UploadMetadata, SingularityUploader> metadataToUploader;
private final Map<SingularityUploader, Long> uploaderLastHadFilesAt;
private final Lock runLock;
private final Lock checkFileOpenLock;
private final ExecutorService executorService;
private final FileSystem fileSystem;
private final Set<SingularityUploader> expiring;
Expand Down Expand Up @@ -100,6 +101,7 @@ public SingularityS3UploaderDriver(SingularityRunnerBaseConfiguration baseConfig
this.metrics.setExpiringCollection(expiring);

this.runLock = new ReentrantLock();
this.checkFileOpenLock = new ReentrantLock();

this.processUtils = new ProcessUtils(LOG);

Expand Down Expand Up @@ -473,9 +475,9 @@ private boolean handleNewOrModifiedS3Metadata(Path filename) throws IOException
final SingularityUploader uploader;

if (metadata.getUploaderType() == SingularityUploaderType.S3) {
uploader = new SingularityS3Uploader(bucketCreds.or(defaultCredentials), metadata, fileSystem, metrics, filename, configuration, hostname, exceptionNotifier);
uploader = new SingularityS3Uploader(bucketCreds.or(defaultCredentials), metadata, fileSystem, metrics, filename, configuration, hostname, exceptionNotifier, checkFileOpenLock);
} else {
uploader = new SingularityGCSUploader(metadata, fileSystem, metrics, filename, configuration, hostname, exceptionNotifier, jsonObjectFileHelper);
uploader = new SingularityGCSUploader(metadata, fileSystem, metrics, filename, configuration, hostname, exceptionNotifier, checkFileOpenLock, jsonObjectFileHelper);
}

if (metadata.isFinished()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,6 +37,7 @@ public abstract class SingularityUploader {
static final Logger LOG = LoggerFactory.getLogger(SingularityUploader.class);
private static final String LOG_START_TIME_ATTR = "logstart";
private static final String LOG_END_TIME_ATTR = "logend";
private static final long CHECK_FILE_OPEN_TIMEOUT_MILLIS = 1500;

final S3UploadMetadata uploadMetadata;
private final PathMatcher pathMatcher;
Expand All @@ -48,9 +50,16 @@ public abstract class SingularityUploader {
final String hostname;
final SingularityS3UploaderConfiguration configuration;
private final SingularityRunnerExceptionNotifier exceptionNotifier;

SingularityUploader(S3UploadMetadata uploadMetadata, FileSystem fileSystem, SingularityS3UploaderMetrics metrics, Path metadataPath,
SingularityS3UploaderConfiguration configuration, String hostname, SingularityRunnerExceptionNotifier exceptionNotifier) {
private final Lock checkFileOpenLock;

SingularityUploader(S3UploadMetadata uploadMetadata,
FileSystem fileSystem,
SingularityS3UploaderMetrics metrics,
Path metadataPath,
SingularityS3UploaderConfiguration configuration,
String hostname,
SingularityRunnerExceptionNotifier exceptionNotifier,
Lock checkFileOpenLock) {
this.metrics = metrics;
this.uploadMetadata = uploadMetadata;
this.fileDirectory = uploadMetadata.getDirectory();
Expand All @@ -68,6 +77,8 @@ public abstract class SingularityUploader {
this.logIdentifier = String.format("[%s]", metadataPath.getFileName());
this.configuration = configuration;
this.exceptionNotifier = exceptionNotifier;

this.checkFileOpenLock = checkFileOpenLock;
}

protected abstract void uploadSingle(int sequence, Path file) throws Exception;
Expand Down Expand Up @@ -171,12 +182,13 @@ private int handleFile(Path path, boolean isFinished, List<Path> toUpload) throw
return found;
}

static boolean isFileOpen(Path path, boolean useFuser) {
private boolean isFileOpen(Path path, boolean useFuser) {
try {
checkFileOpenLock.lock();
if (useFuser) {
SimpleProcessManager fuser = new SimpleProcessManager(LOG);
List<String> cmd = ImmutableList.of("fuser", path.toAbsolutePath().toString());
int exitCode = fuser.getExitCode(cmd);
int exitCode = fuser.getExitCode(cmd, CHECK_FILE_OPEN_TIMEOUT_MILLIS);
return exitCode == 0;
} else {
SimpleProcessManager lsof = new SimpleProcessManager(LOG);
Expand All @@ -191,6 +203,8 @@ static boolean isFileOpen(Path path, boolean useFuser) {
} catch (Exception e) {
LOG.error("Could not determine if file {} was in use, skipping", path, e);
return true;
} finally {
checkFileOpenLock.unlock();
}
return false;
}
Expand Down