diff --git a/preston-cli/src/main/java/bio/guoda/preston/cmd/CmdGrep.java b/preston-cli/src/main/java/bio/guoda/preston/cmd/CmdGrep.java index e8078974f..9ee188de4 100644 --- a/preston-cli/src/main/java/bio/guoda/preston/cmd/CmdGrep.java +++ b/preston-cli/src/main/java/bio/guoda/preston/cmd/CmdGrep.java @@ -45,7 +45,7 @@ public class CmdGrep extends LoggingPersisting implements Runnable { public void run() { BlobStoreReadOnly blobStoreAppendOnly = new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType()); - run(resolvingBlobStore(blobStoreAppendOnly)); + run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this)); } diff --git a/preston-cli/src/main/java/bio/guoda/preston/cmd/CmdList.java b/preston-cli/src/main/java/bio/guoda/preston/cmd/CmdList.java index b42c9097c..84b6f1a85 100644 --- a/preston-cli/src/main/java/bio/guoda/preston/cmd/CmdList.java +++ b/preston-cli/src/main/java/bio/guoda/preston/cmd/CmdList.java @@ -1,16 +1,11 @@ package bio.guoda.preston.cmd; -import bio.guoda.preston.StatementLogFactory; import bio.guoda.preston.process.LogErrorHandler; -import bio.guoda.preston.process.StatementsListener; -import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import picocli.CommandLine; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.concurrent.atomic.AtomicBoolean; @CommandLine.Command( @@ -43,7 +38,7 @@ public void run(LogErrorHandler handler) { foundHistory.set(true); try { ContentQueryUtil.copyMostRecentContent( - resolvingBlobStore(ReplayUtil.getBlobStore(this)), + BlobStoreUtil.createResolvingBlobStoreFor(ReplayUtil.getBlobStore(this), this), statement, this, copyShop); diff --git a/preston-cmd/src/main/java/bio/guoda/preston/cmd/BlobStoreUtil.java b/preston-cmd/src/main/java/bio/guoda/preston/cmd/BlobStoreUtil.java new file mode 100644 index 000000000..b81893222 --- /dev/null +++ b/preston-cmd/src/main/java/bio/guoda/preston/cmd/BlobStoreUtil.java @@ -0,0 +1,132 @@ +package bio.guoda.preston.cmd; + +import bio.guoda.preston.RefNodeConstants; +import bio.guoda.preston.RefNodeFactory; +import bio.guoda.preston.process.EmittingStreamFactory; +import bio.guoda.preston.process.EmittingStreamOfAnyQuad; +import bio.guoda.preston.process.ParsingEmitter; +import bio.guoda.preston.process.ProcessorState; +import bio.guoda.preston.process.StatementEmitter; +import bio.guoda.preston.process.StatementsListener; +import bio.guoda.preston.process.StatementsListenerAdapter; +import bio.guoda.preston.store.AliasDereferencer; +import bio.guoda.preston.store.BlobStoreReadOnly; +import bio.guoda.preston.store.ContentHashDereferencer; +import bio.guoda.preston.store.Dereferencer; +import bio.guoda.preston.store.HashKeyUtil; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.StopWatch; +import org.apache.commons.rdf.api.IRI; +import org.apache.commons.rdf.api.Quad; +import org.mapdb.DBMaker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOError; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public class BlobStoreUtil { + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtil.class); + + public static BlobStoreReadOnly createIndexedBlobStoreFor(BlobStoreReadOnly blobStoreReadOnly, Persisting persisting) { + Map treeMap = buildIndexedBlobStore(persisting); + + return new BlobStoreReadOnly() { + + @Override + public InputStream get(IRI uri) throws IOException { + IRI iriForLookup = null; + if (HashKeyUtil.isValidHashKey(uri)) { + iriForLookup = uri; + } else { + String indexedVersion = treeMap.get(uri.getIRIString()); + iriForLookup = StringUtils.isBlank(indexedVersion) ? uri : RefNodeFactory.toIRI(indexedVersion); + } + + if (iriForLookup == null) { + throw new IOException("failed to find content associated to [" + uri + "] in index."); + } + + return blobStoreReadOnly.get(iriForLookup); + } + }; + } + + public static BlobStoreReadOnly createResolvingBlobStoreFor(Dereferencer blobStore, Persisting persisting) { + return new AliasDereferencer( + new ContentHashDereferencer(blobStore), + persisting, + persisting.getProvenanceTracer() + ); + } + + + private static Map buildIndexedBlobStore(Persisting persisting) { + + File tmpDir = persisting.getTmpDir(); + IRI provenanceAnchor = persisting.getProvenanceAnchor(); + if (CmdWithProvenance.PROVENANCE_ANCHOR_DEFAULT.equals(provenanceAnchor)) { + throw new IllegalArgumentException("--anchor provenance anchor not set; please set provenance anchor"); + } + // indexing + DBMaker maker = newTmpFileDB(tmpDir); + Map treeMap = maker + .deleteFilesAfterClose() + .closeOnJvmShutdown() + .transactionDisable() + .make() + .createTreeMap("zotero-stream") + .make(); + + + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + AtomicLong index = new AtomicLong(0); + LOG.info("version index for [" + provenanceAnchor + "] building..."); + + StatementsListener listener = new StatementsListenerAdapter() { + @Override + public void on(Quad statement) { + if (RefNodeConstants.HAS_VERSION.equals(statement.getPredicate()) + && !RefNodeFactory.isBlankOrSkolemizedBlank(statement.getObject())) { + if (statement.getSubject() instanceof IRI && statement.getObject() instanceof IRI) { + IRI version = (IRI) statement.getObject(); + if (HashKeyUtil.isValidHashKey(version)) { + index.incrementAndGet(); + String uri = ((IRI) statement.getSubject()).getIRIString(); + String indexedVersion = version.getIRIString(); + treeMap.putIfAbsent(uri, indexedVersion); + } + } + } + + } + }; + ReplayUtil.replay(listener, persisting, new EmittingStreamFactory() { + @Override + public ParsingEmitter createEmitter(StatementEmitter emitter, ProcessorState context) { + return new EmittingStreamOfAnyQuad(emitter, context); + } + }); + stopWatch.stop(); + LOG.info("version index for [" + provenanceAnchor + "] with [" + index.get() + "] versions built in [" + stopWatch.getTime(TimeUnit.SECONDS) + "] s"); + + return treeMap; + } + + private static DBMaker newTmpFileDB(File tmpDir) { + try { + File db = File.createTempFile("mapdb-temp", "db", tmpDir); + return DBMaker.newFileDB(db); + } catch (IOException e) { + throw new IOError(new IOException("failed to create tmpFile in [" + tmpDir.getAbsolutePath() + "]", e)); + } + + } + +} diff --git a/preston-cmd/src/main/java/bio/guoda/preston/cmd/ContentQueryUtil.java b/preston-cmd/src/main/java/bio/guoda/preston/cmd/ContentQueryUtil.java index 3d3221650..e2aea9ade 100644 --- a/preston-cmd/src/main/java/bio/guoda/preston/cmd/ContentQueryUtil.java +++ b/preston-cmd/src/main/java/bio/guoda/preston/cmd/ContentQueryUtil.java @@ -48,7 +48,7 @@ public static InputStream getContent( @Override public BlobStoreReadOnly create() { - return Persisting.resolvingBlobStore(blobStore, persisting); + return BlobStoreUtil.createResolvingBlobStoreFor(blobStore, persisting); } }; diff --git a/preston-cmd/src/main/java/bio/guoda/preston/cmd/Persisting.java b/preston-cmd/src/main/java/bio/guoda/preston/cmd/Persisting.java index f2b2f3cb8..9d25c9174 100644 --- a/preston-cmd/src/main/java/bio/guoda/preston/cmd/Persisting.java +++ b/preston-cmd/src/main/java/bio/guoda/preston/cmd/Persisting.java @@ -2,10 +2,8 @@ import bio.guoda.preston.DerefProgressListener; import bio.guoda.preston.ResourcesHTTP; -import bio.guoda.preston.store.AliasDereferencer; import bio.guoda.preston.store.BlobStoreAppendOnly; import bio.guoda.preston.store.BlobStoreReadOnly; -import bio.guoda.preston.store.ContentHashDereferencer; import bio.guoda.preston.store.DerefProgressLogger; import bio.guoda.preston.store.Dereferencer; import bio.guoda.preston.store.DereferencerContentAddressedTarGZ; @@ -274,18 +272,6 @@ private KeyValueStoreReadOnly remoteWithTarGzCacheAll( return withStoreAt(keyToPath, dereferencer); } - protected BlobStoreReadOnly resolvingBlobStore(Dereferencer blobStore) { - return resolvingBlobStore(blobStore, this); - } - - public static BlobStoreReadOnly resolvingBlobStore(Dereferencer blobStore, Persisting persisting) { - return new AliasDereferencer( - new ContentHashDereferencer(blobStore), - persisting, - persisting.getProvenanceTracer() - ); - } - public void setRemotes(List remotes) { this.remotes = remotes; diff --git a/preston-cmd/src/test/java/bio/guoda/preston/cmd/BlobStoreUtilTest.java b/preston-cmd/src/test/java/bio/guoda/preston/cmd/BlobStoreUtilTest.java new file mode 100644 index 000000000..d3fe3cde5 --- /dev/null +++ b/preston-cmd/src/test/java/bio/guoda/preston/cmd/BlobStoreUtilTest.java @@ -0,0 +1,56 @@ +package bio.guoda.preston.cmd; + +import bio.guoda.preston.HashType; +import bio.guoda.preston.Hasher; +import bio.guoda.preston.RefNodeFactory; +import bio.guoda.preston.store.BlobStoreReadOnly; +import org.apache.commons.io.IOUtils; +import org.apache.commons.rdf.api.IRI; +import org.hamcrest.core.Is; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.charset.StandardCharsets; + +import static org.hamcrest.MatcherAssert.assertThat; + +public class BlobStoreUtilTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void blobStore() throws IOException, URISyntaxException { + URL resource = getClass().getResource("index/data/27/f5/27f552c25bc733d05a5cc67e9ba63850"); + File root = new File(resource.toURI()); + File dataDir = root.getParentFile().getParentFile().getParentFile(); + + Persisting persisting = new Persisting(); + persisting.setHashType(HashType.md5); + + persisting.setLocalDataDir(dataDir.getAbsolutePath()); + persisting.setLocalTmpDir(folder.newFolder("tmp").getAbsolutePath()); + persisting.setProvenanceArchor(RefNodeFactory.toIRI("hash://md5/ec998a9c63a64ac7bfef04c91ee84f16")); + BlobStoreReadOnly blobStoreIndexed = BlobStoreUtil.createIndexedBlobStoreFor(new BlobStoreReadOnly() { + @Override + public InputStream get(IRI uri) throws IOException { + IRI iri = Hasher.calcHashIRI("foo\n", HashType.md5); + if (!iri.equals(uri)) { + throw new IOException("kaboom!"); + } + return IOUtils.toInputStream("foo", StandardCharsets.UTF_8); + } + }, persisting); + + InputStream inputStream = blobStoreIndexed.get(RefNodeFactory.toIRI("https://example.org")); + + assertThat(IOUtils.toString(inputStream, StandardCharsets.UTF_8), Is.is("foo")); + } + +} \ No newline at end of file diff --git a/preston-dbase/src/main/java/bio/guoda/preston/dbase/CmdDBaseRecordStream.java b/preston-dbase/src/main/java/bio/guoda/preston/dbase/CmdDBaseRecordStream.java index 0a1413bcb..93a79b22d 100644 --- a/preston-dbase/src/main/java/bio/guoda/preston/dbase/CmdDBaseRecordStream.java +++ b/preston-dbase/src/main/java/bio/guoda/preston/dbase/CmdDBaseRecordStream.java @@ -1,6 +1,7 @@ package bio.guoda.preston.dbase; import bio.guoda.preston.RefNodeFactory; +import bio.guoda.preston.cmd.BlobStoreUtil; import bio.guoda.preston.cmd.LoggingPersisting; import bio.guoda.preston.process.EmittingStreamOfAnyVersions; import bio.guoda.preston.process.StatementsEmitterAdapter; @@ -25,7 +26,7 @@ public class CmdDBaseRecordStream extends LoggingPersisting implements Runnable public void run() { BlobStoreReadOnly blobStoreAppendOnly = new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType()); - run(resolvingBlobStore(blobStoreAppendOnly)); + run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this)); } diff --git a/preston-dwc/src/main/java/bio/guoda/preston/cmd/CmdCite.java b/preston-dwc/src/main/java/bio/guoda/preston/cmd/CmdCite.java index 5b06e9fe4..3b69ea648 100644 --- a/preston-dwc/src/main/java/bio/guoda/preston/cmd/CmdCite.java +++ b/preston-dwc/src/main/java/bio/guoda/preston/cmd/CmdCite.java @@ -21,7 +21,7 @@ public class CmdCite extends LoggingPersisting implements Runnable { public void run() { BlobStoreReadOnly blobStoreAppendOnly = new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType()); - run(resolvingBlobStore(blobStoreAppendOnly)); + run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this)); } diff --git a/preston-dwc/src/main/java/bio/guoda/preston/cmd/CmdDwcRecordStream.java b/preston-dwc/src/main/java/bio/guoda/preston/cmd/CmdDwcRecordStream.java index 56ac2368e..7bfbf5d3e 100644 --- a/preston-dwc/src/main/java/bio/guoda/preston/cmd/CmdDwcRecordStream.java +++ b/preston-dwc/src/main/java/bio/guoda/preston/cmd/CmdDwcRecordStream.java @@ -24,7 +24,7 @@ public class CmdDwcRecordStream extends LoggingPersisting implements Runnable { public void run() { BlobStoreReadOnly blobStoreAppendOnly = new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType()); - run(resolvingBlobStore(blobStoreAppendOnly)); + run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this)); } diff --git a/preston-excel/src/main/java/bio/guoda/preston/excel/CmdExcelRecordStream.java b/preston-excel/src/main/java/bio/guoda/preston/excel/CmdExcelRecordStream.java index 665b18b0b..00ce70abb 100644 --- a/preston-excel/src/main/java/bio/guoda/preston/excel/CmdExcelRecordStream.java +++ b/preston-excel/src/main/java/bio/guoda/preston/excel/CmdExcelRecordStream.java @@ -1,21 +1,19 @@ package bio.guoda.preston.excel; import bio.guoda.preston.RefNodeFactory; +import bio.guoda.preston.cmd.BlobStoreUtil; import bio.guoda.preston.cmd.LoggingPersisting; import bio.guoda.preston.process.EmittingStreamOfAnyVersions; import bio.guoda.preston.process.StatementsEmitterAdapter; -import bio.guoda.preston.process.StatementsListenerAdapter; import bio.guoda.preston.store.BlobStoreAppendOnly; import bio.guoda.preston.store.BlobStoreReadOnly; import bio.guoda.preston.store.ValidatingKeyValueStreamContentAddressedFactory; -import org.apache.commons.io.IOUtils; import org.apache.commons.rdf.api.BlankNodeOrIRI; import org.apache.commons.rdf.api.IRI; import org.apache.commons.rdf.api.Quad; import picocli.CommandLine; import java.io.IOException; -import java.nio.charset.StandardCharsets; @CommandLine.Command( name = "excel-stream", @@ -41,7 +39,7 @@ public class CmdExcelRecordStream extends LoggingPersisting implements Runnable public void run() { BlobStoreReadOnly blobStoreAppendOnly = new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType()); - run(resolvingBlobStore(blobStoreAppendOnly)); + run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this)); } diff --git a/preston-genbank/src/main/java/bio/guoda/preston/cmd/CmdGenBankStream.java b/preston-genbank/src/main/java/bio/guoda/preston/cmd/CmdGenBankStream.java index fdae39f24..9bc03ab4e 100644 --- a/preston-genbank/src/main/java/bio/guoda/preston/cmd/CmdGenBankStream.java +++ b/preston-genbank/src/main/java/bio/guoda/preston/cmd/CmdGenBankStream.java @@ -22,7 +22,7 @@ public class CmdGenBankStream extends LoggingPersisting implements Runnable { public void run() { BlobStoreReadOnly blobStoreAppendOnly = new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType()); - run(resolvingBlobStore(blobStoreAppendOnly)); + run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this)); } diff --git a/preston-github/src/main/java/bio/guoda/preston/cmd/CmdGitHubStream.java b/preston-github/src/main/java/bio/guoda/preston/cmd/CmdGitHubStream.java index 9b212504b..76cb9b748 100644 --- a/preston-github/src/main/java/bio/guoda/preston/cmd/CmdGitHubStream.java +++ b/preston-github/src/main/java/bio/guoda/preston/cmd/CmdGitHubStream.java @@ -22,7 +22,7 @@ public class CmdGitHubStream extends LoggingPersisting implements Runnable { public void run() { BlobStoreReadOnly blobStoreAppendOnly = new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType()); - run(resolvingBlobStore(blobStoreAppendOnly)); + run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this)); } diff --git a/preston-mbd/src/main/java/bio/guoda/preston/cmd/CmdMBDStream.java b/preston-mbd/src/main/java/bio/guoda/preston/cmd/CmdMBDStream.java index 7745defd2..ffb30576c 100644 --- a/preston-mbd/src/main/java/bio/guoda/preston/cmd/CmdMBDStream.java +++ b/preston-mbd/src/main/java/bio/guoda/preston/cmd/CmdMBDStream.java @@ -21,7 +21,7 @@ public class CmdMBDStream extends LoggingPersisting implements Runnable { public void run() { BlobStoreReadOnly blobStoreAppendOnly = new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType()); - run(resolvingBlobStore(blobStoreAppendOnly)); + run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this)); } diff --git a/preston-paradox/src/main/java/bio/guoda/preston/paradox/CmdParadoxRecordStream.java b/preston-paradox/src/main/java/bio/guoda/preston/paradox/CmdParadoxRecordStream.java index 60ed7bd31..89c9089fd 100644 --- a/preston-paradox/src/main/java/bio/guoda/preston/paradox/CmdParadoxRecordStream.java +++ b/preston-paradox/src/main/java/bio/guoda/preston/paradox/CmdParadoxRecordStream.java @@ -1,6 +1,7 @@ package bio.guoda.preston.paradox; import bio.guoda.preston.RefNodeFactory; +import bio.guoda.preston.cmd.BlobStoreUtil; import bio.guoda.preston.cmd.LoggingPersisting; import bio.guoda.preston.process.EmittingStreamOfAnyQuad; import bio.guoda.preston.process.StatementsEmitterAdapter; @@ -28,7 +29,7 @@ public class CmdParadoxRecordStream extends LoggingPersisting implements Runnabl public void run() { BlobStoreReadOnly blobStoreAppendOnly = new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType()); - run(resolvingBlobStore(blobStoreAppendOnly)); + run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this)); } diff --git a/preston-plazi/src/main/java/bio/guoda/preston/cmd/CmdPlazi.java b/preston-plazi/src/main/java/bio/guoda/preston/cmd/CmdPlazi.java index 68d2d7587..f440d2993 100644 --- a/preston-plazi/src/main/java/bio/guoda/preston/cmd/CmdPlazi.java +++ b/preston-plazi/src/main/java/bio/guoda/preston/cmd/CmdPlazi.java @@ -21,7 +21,7 @@ public class CmdPlazi extends LoggingPersisting implements Runnable { public void run() { BlobStoreReadOnly blobStoreAppendOnly = new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType()); - run(resolvingBlobStore(blobStoreAppendOnly)); + run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this)); } diff --git a/preston-taxodros/src/main/java/bio/guoda/preston/cmd/CmdDarkTaxonStream.java b/preston-taxodros/src/main/java/bio/guoda/preston/cmd/CmdDarkTaxonStream.java index 914fda014..4df0a9dea 100644 --- a/preston-taxodros/src/main/java/bio/guoda/preston/cmd/CmdDarkTaxonStream.java +++ b/preston-taxodros/src/main/java/bio/guoda/preston/cmd/CmdDarkTaxonStream.java @@ -34,7 +34,7 @@ public class CmdDarkTaxonStream extends LoggingPersisting implements Runnable { public void run() { BlobStoreReadOnly blobStoreAppendOnly = new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType()); - run(resolvingBlobStore(blobStoreAppendOnly)); + run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this)); } diff --git a/preston-taxodros/src/main/java/bio/guoda/preston/cmd/CmdRISStream.java b/preston-taxodros/src/main/java/bio/guoda/preston/cmd/CmdRISStream.java index 2ebb5b484..b0d28487d 100644 --- a/preston-taxodros/src/main/java/bio/guoda/preston/cmd/CmdRISStream.java +++ b/preston-taxodros/src/main/java/bio/guoda/preston/cmd/CmdRISStream.java @@ -34,7 +34,7 @@ public class CmdRISStream extends LoggingPersisting implements Runnable { public void run() { BlobStoreReadOnly blobStoreAppendOnly = new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType()); - run(resolvingBlobStore(blobStoreAppendOnly)); + run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this)); } diff --git a/preston-taxodros/src/main/java/bio/guoda/preston/cmd/CmdTaxoDrosStream.java b/preston-taxodros/src/main/java/bio/guoda/preston/cmd/CmdTaxoDrosStream.java index 2dd6e25fc..b2f08f2eb 100644 --- a/preston-taxodros/src/main/java/bio/guoda/preston/cmd/CmdTaxoDrosStream.java +++ b/preston-taxodros/src/main/java/bio/guoda/preston/cmd/CmdTaxoDrosStream.java @@ -12,7 +12,6 @@ import picocli.CommandLine; import java.util.Arrays; -import java.util.Collections; import java.util.List; @CommandLine.Command( @@ -31,7 +30,7 @@ public class CmdTaxoDrosStream extends LoggingPersisting implements Runnable { public void run() { BlobStoreReadOnly blobStoreAppendOnly = new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType()); - run(resolvingBlobStore(blobStoreAppendOnly)); + run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this)); } diff --git a/preston-taxodros/src/main/java/bio/guoda/preston/cmd/CmdZoteroStream.java b/preston-taxodros/src/main/java/bio/guoda/preston/cmd/CmdZoteroStream.java index 3fd17272a..8ceaa1a8e 100644 --- a/preston-taxodros/src/main/java/bio/guoda/preston/cmd/CmdZoteroStream.java +++ b/preston-taxodros/src/main/java/bio/guoda/preston/cmd/CmdZoteroStream.java @@ -1,41 +1,22 @@ package bio.guoda.preston.cmd; -import bio.guoda.preston.RefNodeConstants; -import bio.guoda.preston.RefNodeFactory; import bio.guoda.preston.StatementLogFactory; -import bio.guoda.preston.process.EmittingStreamFactory; -import bio.guoda.preston.process.EmittingStreamOfAnyQuad; import bio.guoda.preston.process.EmittingStreamOfAnyVersions; -import bio.guoda.preston.process.ParsingEmitter; -import bio.guoda.preston.process.ProcessorState; -import bio.guoda.preston.process.StatementEmitter; import bio.guoda.preston.process.StatementsEmitterAdapter; import bio.guoda.preston.process.StatementsListener; -import bio.guoda.preston.process.StatementsListenerAdapter; import bio.guoda.preston.store.BlobStoreAppendOnly; import bio.guoda.preston.store.BlobStoreReadOnly; import bio.guoda.preston.store.ContentHashDereferencer; -import bio.guoda.preston.store.HashKeyUtil; import bio.guoda.preston.store.ValidatingKeyValueStreamContentAddressedFactory; import org.apache.commons.io.output.NullPrintStream; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.time.StopWatch; import org.apache.commons.rdf.api.IRI; import org.apache.commons.rdf.api.Quad; -import org.mapdb.DBMaker; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import picocli.CommandLine; -import java.io.File; -import java.io.IOError; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; @CommandLine.Command( name = "zotero-stream", @@ -43,8 +24,6 @@ ) public class CmdZoteroStream extends LoggingPersisting implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(CmdZoteroStream.class); - @CommandLine.Option( names = {"--community", "--communities"}, @@ -72,39 +51,10 @@ public InputStream get(IRI uri) throws IOException { } - private static DBMaker newTmpFileDB(File tmpDir) { - try { - File db = File.createTempFile("mapdb-temp", "db", tmpDir); - return DBMaker.newFileDB(db); - } catch (IOException e) { - throw new IOError(new IOException("failed to create tmpFile in [" + tmpDir.getAbsolutePath() + "]", e)); - } - - } - public void run(BlobStoreReadOnly blobStoreReadOnly) { - Map treeMap = buildIndexedBlobStore(blobStoreReadOnly, this); - - BlobStoreReadOnly blobStoreWithIndexedVersions = new BlobStoreReadOnly() { - @Override - public InputStream get(IRI uri) throws IOException { - IRI iriForLookup = null; - if (HashKeyUtil.isValidHashKey(uri)) { - iriForLookup = uri; - } else { - String indexedVersion = treeMap.get(uri.getIRIString()); - iriForLookup = StringUtils.isBlank(indexedVersion) ? uri : RefNodeFactory.toIRI(indexedVersion); - } - - if (iriForLookup == null) { - throw new IOException("failed to find content associated to [" + uri + "] in index."); - } - - return blobStoreReadOnly.get(iriForLookup); - } - }; + BlobStoreReadOnly blobStoreWithIndexedVersions = BlobStoreUtil.createIndexedBlobStoreFor(blobStoreReadOnly, this); StatementsListener listener = StatementLogFactory.createPrintingLogger( @@ -133,59 +83,5 @@ public void emit(Quad statement) { } - public static Map buildIndexedBlobStore(BlobStoreReadOnly blobStoreReadOnly, - Persisting persisting) { - - File tmpDir = persisting.getTmpDir(); - IRI provenanceAnchor = persisting.getProvenanceAnchor(); - if (PROVENANCE_ANCHOR_DEFAULT.equals(provenanceAnchor)) { - throw new IllegalArgumentException("--anchor provenance anchor not set; please set provenance anchor"); - } - // indexing - DBMaker maker = newTmpFileDB(tmpDir); - Map treeMap = maker - .deleteFilesAfterClose() - .closeOnJvmShutdown() - .transactionDisable() - .make() - .createTreeMap("zotero-stream") - .make(); - - - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - AtomicLong index = new AtomicLong(0); - LOG.info("version index for [" + provenanceAnchor + "] building..."); - - StatementsListener listener = new StatementsListenerAdapter() { - @Override - public void on(Quad statement) { - if (RefNodeConstants.HAS_VERSION.equals(statement.getPredicate()) - && !RefNodeFactory.isBlankOrSkolemizedBlank(statement.getObject())) { - if (statement.getSubject() instanceof IRI && statement.getObject() instanceof IRI) { - IRI version = (IRI) statement.getObject(); - if (HashKeyUtil.isValidHashKey(version)) { - index.incrementAndGet(); - String uri = ((IRI) statement.getSubject()).getIRIString(); - String indexedVersion = version.getIRIString(); - treeMap.putIfAbsent(uri, indexedVersion); - } - } - } - - } - }; - ReplayUtil.replay(listener, persisting, new EmittingStreamFactory() { - @Override - public ParsingEmitter createEmitter(StatementEmitter emitter, ProcessorState context) { - return new EmittingStreamOfAnyQuad(emitter, context); - } - }); - stopWatch.stop(); - LOG.info("version index for [" + provenanceAnchor + "] with [" + index.get() + "] versions built in [" + stopWatch.getTime(TimeUnit.SECONDS) + "] s"); - - return treeMap; - } - } diff --git a/preston-taxonworks/src/main/java/bio/guoda/preston/cmd/CmdTaxonWorksStream.java b/preston-taxonworks/src/main/java/bio/guoda/preston/cmd/CmdTaxonWorksStream.java index 13edd6c5f..1bb809d6f 100644 --- a/preston-taxonworks/src/main/java/bio/guoda/preston/cmd/CmdTaxonWorksStream.java +++ b/preston-taxonworks/src/main/java/bio/guoda/preston/cmd/CmdTaxonWorksStream.java @@ -7,7 +7,6 @@ import bio.guoda.preston.store.BlobStoreAppendOnly; import bio.guoda.preston.store.BlobStoreReadOnly; import bio.guoda.preston.store.ValidatingKeyValueStreamContentAddressedFactory; -import org.apache.commons.collections4.map.LRUMap; import org.apache.commons.io.output.NullPrintStream; import org.apache.commons.rdf.api.Quad; import picocli.CommandLine; @@ -25,7 +24,7 @@ public class CmdTaxonWorksStream extends LoggingPersisting implements Runnable { public void run() { BlobStoreReadOnly blobStoreAppendOnly = new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType()); - run(resolvingBlobStore(blobStoreAppendOnly)); + run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this)); } public void run(BlobStoreReadOnly blobStoreReadOnly) { diff --git a/preston-track/src/main/java/bio/guoda/preston/cmd/CmdActivity.java b/preston-track/src/main/java/bio/guoda/preston/cmd/CmdActivity.java index 1d2158116..da97c3b80 100644 --- a/preston-track/src/main/java/bio/guoda/preston/cmd/CmdActivity.java +++ b/preston-track/src/main/java/bio/guoda/preston/cmd/CmdActivity.java @@ -104,7 +104,7 @@ protected StatementsListener[] initListeners(BlobStoreReadOnly blobStore, Queue> statementQueue) { return createStatementListeners( - resolvingBlobStore(blobStore), + BlobStoreUtil.createResolvingBlobStoreFor(blobStore, this), archivingLogger, statementQueue ); diff --git a/preston-zenodo/src/main/java/bio/guoda/preston/zenodo/CmdZenodo.java b/preston-zenodo/src/main/java/bio/guoda/preston/zenodo/CmdZenodo.java index 8e0bdbb93..5a5d8a960 100644 --- a/preston-zenodo/src/main/java/bio/guoda/preston/zenodo/CmdZenodo.java +++ b/preston-zenodo/src/main/java/bio/guoda/preston/zenodo/CmdZenodo.java @@ -2,6 +2,7 @@ import bio.guoda.preston.EnvUtil; import bio.guoda.preston.StatementLogFactory; +import bio.guoda.preston.cmd.BlobStoreUtil; import bio.guoda.preston.cmd.LogErrorHandlerExitOnError; import bio.guoda.preston.cmd.LoggingPersisting; import bio.guoda.preston.process.EmittingStreamOfAnyVersions; @@ -45,7 +46,7 @@ public void run() { BlobStoreReadOnly blobStoreAppendOnly = new BlobStoreAppendOnly(getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()), true, getHashType()); - run(resolvingBlobStore(blobStoreAppendOnly)); + run(BlobStoreUtil.createResolvingBlobStoreFor(blobStoreAppendOnly, this)); }