Skip to content

Commit

Permalink
Merge pull request #8773 from TexasDigitalLibrary/TDL/7493-improve_Ba…
Browse files Browse the repository at this point in the history
…gGnerator_failure_handling

TDL/7493 improve bag generator failure handling part 2
  • Loading branch information
kcondon authored Jul 29, 2022
2 parents 4605ecf + 57ec341 commit 751a008
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 72 deletions.
169 changes: 104 additions & 65 deletions src/main/java/edu/harvard/iq/dataverse/util/bagit/BagGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.text.WordUtils;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.config.CookieSpecs;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
Expand All @@ -58,7 +59,7 @@
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.ssl.SSLContextBuilder;

import org.apache.http.util.EntityUtils;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
Expand Down Expand Up @@ -90,7 +91,8 @@ public class BagGenerator {

private int timeout = 60;
private RequestConfig config = RequestConfig.custom().setConnectTimeout(timeout * 1000)
.setConnectionRequestTimeout(timeout * 1000).setSocketTimeout(timeout * 1000).build();
.setConnectionRequestTimeout(timeout * 1000).setSocketTimeout(timeout * 1000)
.setCookieSpec(CookieSpecs.STANDARD).build();
protected CloseableHttpClient client;
private PoolingHttpClientConnectionManager cm = null;

Expand Down Expand Up @@ -281,7 +283,8 @@ public boolean generateBag(OutputStream outputStream) throws Exception {
}
createFileFromString(manifestName, sha1StringBuffer.toString());
} else {
logger.warning("No Hash values sent - Bag File does not meet BagIT specification requirement");
logger.warning("No Hash values (no files?) sending empty manifest to nominally comply with BagIT specification requirement");
createFileFromString("manifest-md5.txt", "");
}
// bagit.txt - Required by spec
createFileFromString("bagit.txt", "BagIt-Version: 1.0\r\nTag-File-Character-Encoding: UTF-8");
Expand Down Expand Up @@ -363,6 +366,7 @@ public boolean generateBag(String bagName, boolean temp) {
// Create an output stream backed by the file
bagFileOS = new FileOutputStream(bagFile);
if (generateBag(bagFileOS)) {
//The generateBag call sets this.bagName to the correct value
validateBagFile(bagFile);
if (usetemp) {
logger.fine("Moving tmp zip");
Expand All @@ -388,7 +392,8 @@ public void validateBag(String bagId) {
ZipFile zf = null;
InputStream is = null;
try {
zf = new ZipFile(getBagFile(bagId));
File bagFile = getBagFile(bagId);
zf = new ZipFile(bagFile);
ZipArchiveEntry entry = zf.getEntry(getValidName(bagId) + "/manifest-sha1.txt");
if (entry != null) {
logger.info("SHA1 hashes used");
Expand Down Expand Up @@ -428,7 +433,7 @@ public void validateBag(String bagId) {
}
IOUtils.closeQuietly(is);
logger.info("HashMap Map contains: " + checksumMap.size() + " entries");
checkFiles(checksumMap, zf);
checkFiles(checksumMap, bagFile);
} catch (IOException io) {
logger.log(Level.SEVERE,"Could not validate Hashes", io);
} catch (Exception e) {
Expand Down Expand Up @@ -457,14 +462,13 @@ public File getBagFile(String bagID) throws Exception {

private void validateBagFile(File bagFile) throws IOException {
// Run a confirmation test - should verify all files and hashes
ZipFile zf = new ZipFile(bagFile);

// Check files calculates the hashes and file sizes and reports on
// whether hashes are correct
checkFiles(checksumMap, zf);
checkFiles(checksumMap, bagFile);

logger.info("Data Count: " + dataCount);
logger.info("Data Size: " + totalDataSize);
zf.close();
}

public static String getValidName(String bagName) {
Expand All @@ -481,7 +485,7 @@ private void processContainer(JsonObject item, String currentPath) throws IOExce
} else if (item.has(JsonLDTerm.schemaOrg("name").getLabel())) {
title = item.get(JsonLDTerm.schemaOrg("name").getLabel()).getAsString();
}

logger.fine("Adding " + title + "/ to path " + currentPath);
currentPath = currentPath + title + "/";
int containerIndex = -1;
try {
Expand Down Expand Up @@ -557,6 +561,7 @@ private void processContainer(JsonObject item, String currentPath) throws IOExce
logger.warning("Duplicate/Collision: " + child.get("@id").getAsString() + " has SHA1 Hash: "
+ childHash + " in: " + bagID);
}
logger.fine("Adding " + childPath + " with hash " + childHash + " to checksumMap");
checksumMap.put(childPath, childHash);
}
}
Expand Down Expand Up @@ -700,29 +705,39 @@ private void createFileFromURL(final String relPath, final String uri)
addEntry(archiveEntry, supp);
}

private void checkFiles(HashMap<String, String> shaMap, ZipFile zf) {
private void checkFiles(HashMap<String, String> shaMap, File bagFile) {
ExecutorService executor = Executors.newFixedThreadPool(numConnections);
BagValidationJob.setZipFile(zf);
BagValidationJob.setBagGenerator(this);
logger.fine("Validating hashes for zipped data files");
int i = 0;
for (Entry<String, String> entry : shaMap.entrySet()) {
BagValidationJob vj = new BagValidationJob(entry.getValue(), entry.getKey());
executor.execute(vj);
i++;
if (i % 1000 == 0) {
logger.info("Queuing Hash Validations: " + i);
}
}
logger.fine("All Hash Validations Queued: " + i);

executor.shutdown();
ZipFile zf = null;
try {
while (!executor.awaitTermination(10, TimeUnit.MINUTES)) {
logger.fine("Awaiting completion of hash calculations.");
zf = new ZipFile(bagFile);

BagValidationJob.setZipFile(zf);
BagValidationJob.setBagGenerator(this);
logger.fine("Validating hashes for zipped data files");
int i = 0;
for (Entry<String, String> entry : shaMap.entrySet()) {
BagValidationJob vj = new BagValidationJob(bagName, entry.getValue(), entry.getKey());
executor.execute(vj);
i++;
if (i % 1000 == 0) {
logger.info("Queuing Hash Validations: " + i);
}
}
} catch (InterruptedException e) {
logger.log(Level.SEVERE,"Hash Calculations interrupted", e);
logger.fine("All Hash Validations Queued: " + i);

executor.shutdown();
try {
while (!executor.awaitTermination(10, TimeUnit.MINUTES)) {
logger.fine("Awaiting completion of hash calculations.");
}
} catch (InterruptedException e) {
logger.log(Level.SEVERE, "Hash Calculations interrupted", e);
}
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} finally {
IOUtils.closeQuietly(zf);
}
logger.fine("Hash Validations Completed");

Expand Down Expand Up @@ -907,8 +922,8 @@ public void incrementTotalDataSize(long inc) {
totalDataSize += inc;
}

public String getHashtype() {
return hashtype.toString();
public ChecksumType getHashtype() {
return hashtype;
}

// Get's all "Has Part" children, standardized to send an array with 0,1, or
Expand Down Expand Up @@ -993,46 +1008,70 @@ private HttpGet createNewGetRequest(URI url, String returnType) {
return request;
}

InputStreamSupplier getInputStreamSupplier(final String uri) {
InputStreamSupplier getInputStreamSupplier(final String uriString) {

return new InputStreamSupplier() {
public InputStream get() {
int tries = 0;
while (tries < 5) {
try {
logger.fine("Get # " + tries + " for " + uri);
HttpGet getMap = createNewGetRequest(new URI(uri), null);
logger.finest("Retrieving " + tries + ": " + uri);
CloseableHttpResponse response;
//Note - if we ever need to pass an HttpClientContext, we need a new one per thread.
response = client.execute(getMap);
if (response.getStatusLine().getStatusCode() == 200) {
logger.finest("Retrieved: " + uri);
return response.getEntity().getContent();
}
logger.fine("Status: " + response.getStatusLine().getStatusCode());
tries++;

} catch (ClientProtocolException e) {
tries += 5;
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// Retry if this is a potentially temporary error such
// as a timeout
tries++;
logger.log(Level.WARNING,"Attempt# " + tries + " : Unable to retrieve file: " + uri, e);
if (tries == 5) {
logger.severe("Final attempt failed for " + uri);
try {
URI uri = new URI(uriString);

int tries = 0;
while (tries < 5) {

logger.fine("Get # " + tries + " for " + uriString);
HttpGet getFile = createNewGetRequest(uri, null);
logger.finest("Retrieving " + tries + ": " + uriString);
CloseableHttpResponse response = null;
try {
response = client.execute(getFile);
// Note - if we ever need to pass an HttpClientContext, we need a new one per
// thread.
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == 200) {
logger.finest("Retrieved: " + uri);
return response.getEntity().getContent();
}
logger.warning("Attempt: " + tries + " - Unexpected Status when retrieving " + uriString
+ " : " + statusCode);
if (statusCode < 500) {
logger.fine("Will not retry for 40x errors");
tries += 5;
} else {
tries++;
}
// Error handling
if (response != null) {
try {
EntityUtils.consumeQuietly(response.getEntity());
response.close();
} catch (IOException io) {
logger.warning(
"Exception closing response after status: " + statusCode + " on " + uri);
}
}
} catch (ClientProtocolException e) {
tries += 5;
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// Retry if this is a potentially temporary error such
// as a timeout
tries++;
logger.log(Level.WARNING, "Attempt# " + tries + " : Unable to retrieve file: " + uriString,
e);
if (tries == 5) {
logger.severe("Final attempt failed for " + uriString);
}
e.printStackTrace();
}
e.printStackTrace();
} catch (URISyntaxException e) {
tries += 5;
// TODO Auto-generated catch block
e.printStackTrace();

}

} catch (URISyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
logger.severe("Could not read: " + uri);
logger.severe("Could not read: " + uriString);
return null;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.commons.compress.archivers.zip.ZipFile;

import edu.harvard.iq.dataverse.DataFile;
import edu.harvard.iq.dataverse.DataFile.ChecksumType;

import org.apache.commons.compress.utils.IOUtils;

Expand All @@ -41,13 +42,15 @@ public class BagValidationJob implements Runnable {

private String hash;
private String name;
private static String hashtype;
private String basePath;
private static ChecksumType hashtype;

public BagValidationJob(String value, String key) throws IllegalStateException {
public BagValidationJob(String bagName, String value, String key) throws IllegalStateException {
if (zf == null || bagGenerator == null) {
throw new IllegalStateException(
"Static Zipfile and BagGenerator must be set before creating ValidationJobs");
}
basePath=bagName;
hash = value;
name = key;

Expand All @@ -60,24 +63,28 @@ public BagValidationJob(String value, String key) throws IllegalStateException {
*/
public void run() {

String realHash = generateFileHash(name, zf);
String realHash = generateFileHash(basePath + "/" + name, zf);
if (hash.equals(realHash)) {
log.fine("Valid hash for " + name);
} else {
log.severe("Invalid " + bagGenerator.getHashtype() + " for " + name);
log.severe("Invalid " + bagGenerator.getHashtype().name() + " for " + name);
log.fine("As sent: " + hash);
log.fine("As calculated: " + realHash);
}
}

private String generateFileHash(String name, ZipFile zf) {

String realHash = null;

ZipArchiveEntry archiveEntry1 = zf.getEntry(name);

if(archiveEntry1 != null) {
// Error check - add file sizes to compare against supplied stats

log.fine("Getting stream for " + name);
long start = System.currentTimeMillis();
InputStream inputStream = null;
String realHash = null;

try {
inputStream = zf.getInputStream(archiveEntry1);
if (hashtype.equals(DataFile.ChecksumType.SHA1)) {
Expand All @@ -89,7 +96,7 @@ private String generateFileHash(String name, ZipFile zf) {
} else if (hashtype.equals(DataFile.ChecksumType.MD5)) {
realHash = DigestUtils.md5Hex(inputStream);
} else {
log.warning("Unknown hash type: " + hashtype);
log.warning("Unknown hash type: " + hashtype.name());
}

} catch (ZipException e) {
Expand All @@ -104,6 +111,9 @@ private String generateFileHash(String name, ZipFile zf) {
log.fine("Retrieve/compute time = " + (System.currentTimeMillis() - start) + " ms");
// Error check - add file sizes to compare against supplied stats
bagGenerator.incrementTotalDataSize(archiveEntry1.getSize());
} else {
log.warning("Entry " + name + " not found in zipped bag: not validated");
}
return realHash;
}

Expand Down

0 comments on commit 751a008

Please sign in to comment.