Skip to content

Commit

Permalink
GEOMESA-3291 Fix implementation of parallel table scans
Browse files Browse the repository at this point in the history
* Fixes `geomesa.partition.scan.parallel`
  • Loading branch information
elahrvivaz committed Aug 29, 2023
1 parent 0994b85 commit 8f7206f
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ object AccumuloQueryPlan extends LazyLogging {
timeout: Option[Timeout]): CloseableIterator[Entry[Key, Value]] = {
if (partitionParallelScans) {
// kick off all the scans at once
tables.map(scanner(connector, _, auths, timeout)).foldLeft(CloseableIterator.empty[Entry[Key, Value]])(_ ++ _)
tables.map(scanner(connector, _, auths, timeout)).foldLeft(CloseableIterator.empty[Entry[Key, Value]])(_ concat _)
} else {
// kick off the scans sequentially as they finish
SelfClosingIterator(tables.iterator).flatMap(scanner(connector, _, auths, timeout))
Expand Down Expand Up @@ -189,7 +189,7 @@ object AccumuloQueryPlan extends LazyLogging {
if (ds.config.queries.parallelPartitionScans) {
// kick off all the scans at once
tables.map(scanner(ds.connector, _, joinTables.next, auths, partitionParallelScans = true, timeout))
.foldLeft(CloseableIterator.empty[Entry[Key, Value]])(_ ++ _)
.foldLeft(CloseableIterator.empty[Entry[Key, Value]])(_ concat _)
} else {
// kick off the scans sequentially as they finish
SelfClosingIterator(tables.iterator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ class AccumuloPartitioningTest extends Specification with TestWithFeatureType {
override val spec: String =
s"name:String:index=true,attr:String,dtg:Date,*geom:Point:srid=4326;${Configs.TablePartitioning}=${TimePartition.Name}"

lazy val parallelDs = {
val params = dsParams + (AccumuloDataStoreParams.PartitionParallelScansParam.key -> "true")
DataStoreFinder.getDataStore(params.asJava).asInstanceOf[AccumuloDataStore]
}

val features = (0 until 10).map { i =>
val sf = new ScalaSimpleFeature(sft, i.toString)
sf.getUserData.put(Hints.USE_PROVIDED_FID, java.lang.Boolean.TRUE)
Expand Down Expand Up @@ -92,22 +97,25 @@ class AccumuloPartitioningTest extends Specification with TestWithFeatureType {
}

def testQuery(filter: String, transforms: Array[String], results: Seq[SimpleFeature]): Unit = {
val query = new Query(sftName, ECQL.toFilter(filter), transforms: _*)
val fr = ds.getFeatureReader(query, Transaction.AUTO_COMMIT)
val features = SelfClosingIterator(fr).toList
if (features.length != results.length) {
ds.getQueryPlan(query, explainer = new ExplainPrintln)
}
val attributes = Option(transforms).getOrElse(ds.getSchema(sftName).getAttributeDescriptors.asScala.map(_.getLocalName).toArray)
features.map(_.getID) must containTheSameElementsAs(results.map(_.getID))
forall(features) { feature =>
feature.getAttributes must haveLength(attributes.length)
forall(attributes.zipWithIndex) { case (attribute, i) =>
feature.getAttribute(attribute) mustEqual feature.getAttribute(i)
feature.getAttribute(attribute) mustEqual results.find(_.getID == feature.getID).get.getAttribute(attribute)
foreach(Seq(ds, parallelDs)) { ds =>
val query = new Query(sftName, ECQL.toFilter(filter), transforms: _*)
val fr = ds.getFeatureReader(query, Transaction.AUTO_COMMIT)
val features = SelfClosingIterator(fr).toList
if (features.length != results.length) {
ds.getQueryPlan(query, explainer = new ExplainPrintln)
}
val attributes = Option(transforms).getOrElse(ds.getSchema(sftName).getAttributeDescriptors.asScala.map(_
.getLocalName).toArray)
features.map(_.getID) must containTheSameElementsAs(results.map(_.getID))
forall(features) { feature =>
feature.getAttributes must haveLength(attributes.length)
forall(attributes.zipWithIndex) { case (attribute, i) => feature.getAttribute(attribute) mustEqual
feature.getAttribute(i)
feature.getAttribute(attribute) mustEqual results.find(_.getID == feature.getID).get.getAttribute(attribute)
}
}
query.getHints.put(QueryHints.EXACT_COUNT, java.lang.Boolean.TRUE)
ds.getFeatureSource(sftName).getFeatures(query).size() mustEqual results.length
}
query.getHints.put(QueryHints.EXACT_COUNT, java.lang.Boolean.TRUE)
ds.getFeatureSource(sftName).getFeatures(query).size() mustEqual results.length
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ sealed trait HBaseQueryPlan extends QueryPlan[HBaseDataStore] {
val iter = scans.iterator.map(singleTableScan(_, ds.connection, threads(ds), timeout))
if (ds.config.queries.parallelPartitionScans) {
// kick off all the scans at once
iter.foldLeft(CloseableIterator.empty[Results])(_ ++ _)
iter.foldLeft(CloseableIterator.empty[Results])(_ concat _)
} else {
// kick off the scans sequentially as they finish
SelfClosingIterator(iter).flatMap(s => s)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,19 @@ class HBasePartitioningTest extends Specification with LazyLogging {

val transformsList = Seq(null, Array("geom"), Array("geom", "dtg"), Array("name"), Array("dtg", "geom", "attr", "name"))

foreach(transformsList) { transforms =>
testQuery(ds, typeName, "IN('0', '2')", transforms, Seq(toAdd(0), toAdd(2)))
testQuery(ds, typeName, "bbox(geom,38,48,52,62) and dtg DURING 2018-01-01T00:00:00.000Z/2018-01-08T12:00:00.000Z", transforms, toAdd.dropRight(2))
testQuery(ds, typeName, "bbox(geom,42,48,52,62) and dtg DURING 2017-12-15T00:00:00.000Z/2018-01-15T00:00:00.000Z", transforms, toAdd.drop(2))
testQuery(ds, typeName, "bbox(geom,42,48,52,62)", transforms, toAdd.drop(2))
testQuery(ds, typeName, "dtg DURING 2018-01-01T00:00:00.000Z/2018-01-08T12:00:00.000Z", transforms, toAdd.dropRight(2))
testQuery(ds, typeName, "attr = 'name5' and bbox(geom,38,48,52,62) and dtg DURING 2018-01-01T00:00:00.000Z/2018-01-08T12:00:00.000Z", transforms, Seq(toAdd(5)))
testQuery(ds, typeName, "name < 'name5'", transforms, toAdd.take(5))
testQuery(ds, typeName, "name = 'name5'", transforms, Seq(toAdd(5)))
WithClose(DataStoreFinder.getDataStore((params + (HBaseDataStoreParams.PartitionParallelScansParam.key -> "true")).asJava).asInstanceOf[HBaseDataStore]) { parallelDs =>
foreach(Seq(ds, parallelDs)) { ds =>
foreach(transformsList) { transforms =>
testQuery(ds, typeName, "IN('0', '2')", transforms, Seq(toAdd(0), toAdd(2)))
testQuery(ds, typeName, "bbox(geom,38,48,52,62) and dtg DURING 2018-01-01T00:00:00.000Z/2018-01-08T12:00:00.000Z", transforms, toAdd.dropRight(2))
testQuery(ds, typeName, "bbox(geom,42,48,52,62) and dtg DURING 2017-12-15T00:00:00.000Z/2018-01-15T00:00:00.000Z", transforms, toAdd.drop(2))
testQuery(ds, typeName, "bbox(geom,42,48,52,62)", transforms, toAdd.drop(2))
testQuery(ds, typeName, "dtg DURING 2018-01-01T00:00:00.000Z/2018-01-08T12:00:00.000Z", transforms, toAdd.dropRight(2))
testQuery(ds, typeName, "attr = 'name5' and bbox(geom,38,48,52,62) and dtg DURING 2018-01-01T00:00:00.000Z/2018-01-08T12:00:00.000Z", transforms, Seq(toAdd(5)))
testQuery(ds, typeName, "name < 'name5'", transforms, toAdd.take(5))
testQuery(ds, typeName, "name = 'name5'", transforms, Seq(toAdd(5)))
}
}
}

{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ object RedisQueryPlan {
val scans = iter.map(singleTableScan(ds, _))
if (ds.config.queries.parallelPartitionScans) {
// kick off all the scans at once
scans.foldLeft(CloseableIterator.empty[Array[Byte]])(_ ++ _)
scans.foldLeft(CloseableIterator.empty[Array[Byte]])(_ concat _)
} else {
// kick off the scans sequentially as they finish
SelfClosingIterator(scans).flatMap(s => s)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,6 @@ object CloseableIterator {
queue.foreach(_.apply().close())
queue.clear()
}

override def ++[B >: A](that: => GenTraversableOnce[B]): CloseableIterator[B] = {
lazy val applied = CloseableIterator.wrap(that)
new ConcatCloseableIterator[B](queue.+:(() => current).:+(() => applied))
}
}

private final class FlatMapCloseableIterator[A, B](source: CloseableIterator[A], f: A => GenTraversableOnce[B])
Expand Down Expand Up @@ -177,7 +172,7 @@ trait CloseableIterator[+A] extends Iterator[A] with Closeable {
new ConcatCloseableIterator[B](queue)
}

// in scala 2.13 this metho is final, and can cause resource leaks due to not returning a closeable iterator
// in scala 2.13 this method is final, and can cause resource leaks due to not returning a closeable iterator
override def ++[B >: A](that: => GenTraversableOnce[B]): CloseableIterator[B] =
throw new NotImplementedError("Not safe for cross-scala usage")

Expand Down

0 comments on commit 8f7206f

Please sign in to comment.