Skip to content

Commit

Permalink
add test for indexed blob store; related to #199
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorrit Poelen committed Aug 22, 2024
1 parent ca22e7c commit c54a767
Show file tree
Hide file tree
Showing 22 changed files with 211 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class CmdGrep extends LoggingPersisting implements Runnable {
public void run() {
BlobStoreReadOnly blobStoreAppendOnly
= new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType());
run(resolvingBlobStore(blobStoreAppendOnly));
run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this));

}

Expand Down
7 changes: 1 addition & 6 deletions preston-cli/src/main/java/bio/guoda/preston/cmd/CmdList.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
package bio.guoda.preston.cmd;

import bio.guoda.preston.StatementLogFactory;
import bio.guoda.preston.process.LogErrorHandler;
import bio.guoda.preston.process.StatementsListener;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;

@CommandLine.Command(
Expand Down Expand Up @@ -43,7 +38,7 @@ public void run(LogErrorHandler handler) {
foundHistory.set(true);
try {
ContentQueryUtil.copyMostRecentContent(
resolvingBlobStore(ReplayUtil.getBlobStore(this)),
BlobStoreUtil.createResolvingBlobStoreFor(ReplayUtil.getBlobStore(this), this),
statement,
this,
copyShop);
Expand Down
132 changes: 132 additions & 0 deletions preston-cmd/src/main/java/bio/guoda/preston/cmd/BlobStoreUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package bio.guoda.preston.cmd;

import bio.guoda.preston.RefNodeConstants;
import bio.guoda.preston.RefNodeFactory;
import bio.guoda.preston.process.EmittingStreamFactory;
import bio.guoda.preston.process.EmittingStreamOfAnyQuad;
import bio.guoda.preston.process.ParsingEmitter;
import bio.guoda.preston.process.ProcessorState;
import bio.guoda.preston.process.StatementEmitter;
import bio.guoda.preston.process.StatementsListener;
import bio.guoda.preston.process.StatementsListenerAdapter;
import bio.guoda.preston.store.AliasDereferencer;
import bio.guoda.preston.store.BlobStoreReadOnly;
import bio.guoda.preston.store.ContentHashDereferencer;
import bio.guoda.preston.store.Dereferencer;
import bio.guoda.preston.store.HashKeyUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.commons.rdf.api.IRI;
import org.apache.commons.rdf.api.Quad;
import org.mapdb.DBMaker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOError;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class BlobStoreUtil {
private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtil.class);

public static BlobStoreReadOnly createIndexedBlobStoreFor(BlobStoreReadOnly blobStoreReadOnly, Persisting persisting) {
Map<String, String> treeMap = buildIndexedBlobStore(persisting);

return new BlobStoreReadOnly() {

@Override
public InputStream get(IRI uri) throws IOException {
IRI iriForLookup = null;
if (HashKeyUtil.isValidHashKey(uri)) {
iriForLookup = uri;
} else {
String indexedVersion = treeMap.get(uri.getIRIString());
iriForLookup = StringUtils.isBlank(indexedVersion) ? uri : RefNodeFactory.toIRI(indexedVersion);
}

if (iriForLookup == null) {
throw new IOException("failed to find content associated to [" + uri + "] in index.");
}

return blobStoreReadOnly.get(iriForLookup);
}
};
}

public static BlobStoreReadOnly createResolvingBlobStoreFor(Dereferencer<InputStream> blobStore, Persisting persisting) {
return new AliasDereferencer(
new ContentHashDereferencer(blobStore),
persisting,
persisting.getProvenanceTracer()
);
}


private static Map<String, String> buildIndexedBlobStore(Persisting persisting) {

File tmpDir = persisting.getTmpDir();
IRI provenanceAnchor = persisting.getProvenanceAnchor();
if (CmdWithProvenance.PROVENANCE_ANCHOR_DEFAULT.equals(provenanceAnchor)) {
throw new IllegalArgumentException("--anchor provenance anchor not set; please set provenance anchor");
}
// indexing
DBMaker maker = newTmpFileDB(tmpDir);
Map<String, String> treeMap = maker
.deleteFilesAfterClose()
.closeOnJvmShutdown()
.transactionDisable()
.make()
.createTreeMap("zotero-stream")
.make();


StopWatch stopWatch = new StopWatch();
stopWatch.start();
AtomicLong index = new AtomicLong(0);
LOG.info("version index for [" + provenanceAnchor + "] building...");

StatementsListener listener = new StatementsListenerAdapter() {
@Override
public void on(Quad statement) {
if (RefNodeConstants.HAS_VERSION.equals(statement.getPredicate())
&& !RefNodeFactory.isBlankOrSkolemizedBlank(statement.getObject())) {
if (statement.getSubject() instanceof IRI && statement.getObject() instanceof IRI) {
IRI version = (IRI) statement.getObject();
if (HashKeyUtil.isValidHashKey(version)) {
index.incrementAndGet();
String uri = ((IRI) statement.getSubject()).getIRIString();
String indexedVersion = version.getIRIString();
treeMap.putIfAbsent(uri, indexedVersion);
}
}
}

}
};
ReplayUtil.replay(listener, persisting, new EmittingStreamFactory() {
@Override
public ParsingEmitter createEmitter(StatementEmitter emitter, ProcessorState context) {
return new EmittingStreamOfAnyQuad(emitter, context);
}
});
stopWatch.stop();
LOG.info("version index for [" + provenanceAnchor + "] with [" + index.get() + "] versions built in [" + stopWatch.getTime(TimeUnit.SECONDS) + "] s");

return treeMap;
}

private static DBMaker newTmpFileDB(File tmpDir) {
try {
File db = File.createTempFile("mapdb-temp", "db", tmpDir);
return DBMaker.newFileDB(db);
} catch (IOException e) {
throw new IOError(new IOException("failed to create tmpFile in [" + tmpDir.getAbsolutePath() + "]", e));
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static InputStream getContent(

@Override
public BlobStoreReadOnly create() {
return Persisting.resolvingBlobStore(blobStore, persisting);
return BlobStoreUtil.createResolvingBlobStoreFor(blobStore, persisting);
}
};

Expand Down
14 changes: 0 additions & 14 deletions preston-cmd/src/main/java/bio/guoda/preston/cmd/Persisting.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

import bio.guoda.preston.DerefProgressListener;
import bio.guoda.preston.ResourcesHTTP;
import bio.guoda.preston.store.AliasDereferencer;
import bio.guoda.preston.store.BlobStoreAppendOnly;
import bio.guoda.preston.store.BlobStoreReadOnly;
import bio.guoda.preston.store.ContentHashDereferencer;
import bio.guoda.preston.store.DerefProgressLogger;
import bio.guoda.preston.store.Dereferencer;
import bio.guoda.preston.store.DereferencerContentAddressedTarGZ;
Expand Down Expand Up @@ -274,18 +272,6 @@ private KeyValueStoreReadOnly remoteWithTarGzCacheAll(
return withStoreAt(keyToPath, dereferencer);
}

protected BlobStoreReadOnly resolvingBlobStore(Dereferencer<InputStream> blobStore) {
return resolvingBlobStore(blobStore, this);
}

public static BlobStoreReadOnly resolvingBlobStore(Dereferencer<InputStream> blobStore, Persisting persisting) {
return new AliasDereferencer(
new ContentHashDereferencer(blobStore),
persisting,
persisting.getProvenanceTracer()
);
}


public void setRemotes(List<URI> remotes) {
this.remotes = remotes;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package bio.guoda.preston.cmd;

import bio.guoda.preston.HashType;
import bio.guoda.preston.Hasher;
import bio.guoda.preston.RefNodeFactory;
import bio.guoda.preston.store.BlobStoreReadOnly;
import org.apache.commons.io.IOUtils;
import org.apache.commons.rdf.api.IRI;
import org.hamcrest.core.Is;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;

import static org.hamcrest.MatcherAssert.assertThat;

public class BlobStoreUtilTest {

@Rule
public TemporaryFolder folder = new TemporaryFolder();

@Test
public void blobStore() throws IOException, URISyntaxException {
URL resource = getClass().getResource("index/data/27/f5/27f552c25bc733d05a5cc67e9ba63850");
File root = new File(resource.toURI());
File dataDir = root.getParentFile().getParentFile().getParentFile();

Persisting persisting = new Persisting();
persisting.setHashType(HashType.md5);

persisting.setLocalDataDir(dataDir.getAbsolutePath());
persisting.setLocalTmpDir(folder.newFolder("tmp").getAbsolutePath());
persisting.setProvenanceArchor(RefNodeFactory.toIRI("hash://md5/ec998a9c63a64ac7bfef04c91ee84f16"));
BlobStoreReadOnly blobStoreIndexed = BlobStoreUtil.createIndexedBlobStoreFor(new BlobStoreReadOnly() {
@Override
public InputStream get(IRI uri) throws IOException {
IRI iri = Hasher.calcHashIRI("foo\n", HashType.md5);
if (!iri.equals(uri)) {
throw new IOException("kaboom!");
}
return IOUtils.toInputStream("foo", StandardCharsets.UTF_8);
}
}, persisting);

InputStream inputStream = blobStoreIndexed.get(RefNodeFactory.toIRI("https://example.org"));

assertThat(IOUtils.toString(inputStream, StandardCharsets.UTF_8), Is.is("foo"));
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bio.guoda.preston.dbase;

import bio.guoda.preston.RefNodeFactory;
import bio.guoda.preston.cmd.BlobStoreUtil;
import bio.guoda.preston.cmd.LoggingPersisting;
import bio.guoda.preston.process.EmittingStreamOfAnyVersions;
import bio.guoda.preston.process.StatementsEmitterAdapter;
Expand All @@ -25,7 +26,7 @@ public class CmdDBaseRecordStream extends LoggingPersisting implements Runnable
public void run() {
BlobStoreReadOnly blobStoreAppendOnly
= new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType());
run(resolvingBlobStore(blobStoreAppendOnly));
run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class CmdCite extends LoggingPersisting implements Runnable {
public void run() {
BlobStoreReadOnly blobStoreAppendOnly
= new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType());
run(resolvingBlobStore(blobStoreAppendOnly));
run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class CmdDwcRecordStream extends LoggingPersisting implements Runnable {
public void run() {
BlobStoreReadOnly blobStoreAppendOnly
= new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType());
run(resolvingBlobStore(blobStoreAppendOnly));
run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this));

}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
package bio.guoda.preston.excel;

import bio.guoda.preston.RefNodeFactory;
import bio.guoda.preston.cmd.BlobStoreUtil;
import bio.guoda.preston.cmd.LoggingPersisting;
import bio.guoda.preston.process.EmittingStreamOfAnyVersions;
import bio.guoda.preston.process.StatementsEmitterAdapter;
import bio.guoda.preston.process.StatementsListenerAdapter;
import bio.guoda.preston.store.BlobStoreAppendOnly;
import bio.guoda.preston.store.BlobStoreReadOnly;
import bio.guoda.preston.store.ValidatingKeyValueStreamContentAddressedFactory;
import org.apache.commons.io.IOUtils;
import org.apache.commons.rdf.api.BlankNodeOrIRI;
import org.apache.commons.rdf.api.IRI;
import org.apache.commons.rdf.api.Quad;
import picocli.CommandLine;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

@CommandLine.Command(
name = "excel-stream",
Expand All @@ -41,7 +39,7 @@ public class CmdExcelRecordStream extends LoggingPersisting implements Runnable
public void run() {
BlobStoreReadOnly blobStoreAppendOnly
= new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType());
run(resolvingBlobStore(blobStoreAppendOnly));
run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class CmdGenBankStream extends LoggingPersisting implements Runnable {
public void run() {
BlobStoreReadOnly blobStoreAppendOnly
= new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType());
run(resolvingBlobStore(blobStoreAppendOnly));
run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class CmdGitHubStream extends LoggingPersisting implements Runnable {
public void run() {
BlobStoreReadOnly blobStoreAppendOnly
= new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType());
run(resolvingBlobStore(blobStoreAppendOnly));
run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class CmdMBDStream extends LoggingPersisting implements Runnable {
public void run() {
BlobStoreReadOnly blobStoreAppendOnly
= new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType());
run(resolvingBlobStore(blobStoreAppendOnly));
run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this));

}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bio.guoda.preston.paradox;

import bio.guoda.preston.RefNodeFactory;
import bio.guoda.preston.cmd.BlobStoreUtil;
import bio.guoda.preston.cmd.LoggingPersisting;
import bio.guoda.preston.process.EmittingStreamOfAnyQuad;
import bio.guoda.preston.process.StatementsEmitterAdapter;
Expand Down Expand Up @@ -28,7 +29,7 @@ public class CmdParadoxRecordStream extends LoggingPersisting implements Runnabl
public void run() {
BlobStoreReadOnly blobStoreAppendOnly
= new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType());
run(resolvingBlobStore(blobStoreAppendOnly));
run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class CmdPlazi extends LoggingPersisting implements Runnable {
public void run() {
BlobStoreReadOnly blobStoreAppendOnly
= new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType());
run(resolvingBlobStore(blobStoreAppendOnly));
run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class CmdDarkTaxonStream extends LoggingPersisting implements Runnable {
public void run() {
BlobStoreReadOnly blobStoreAppendOnly
= new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType());
run(resolvingBlobStore(blobStoreAppendOnly));
run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this));

}

Expand Down
Loading

0 comments on commit c54a767

Please sign in to comment.