Skip to content

Commit

Permalink
Completed TripleSource tests for reporting partitioning
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Mar 23, 2022
1 parent fc4c801 commit 5f2b057
Showing 1 changed file with 37 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -853,48 +853,12 @@ class TestTriplesSource extends AnyFunSpec
)
}

it("should report single partitioning") {
val target = dgraph.target
val df =
reader
.option(PartitionerOption, SingletonPartitionerOption)
.dgraph.triples(target)
.repartition(1)
df.queryExecution.optimizedPlan
print()
}

def containsShuffleExchangeExec(plan: SparkPlan): Boolean = plan match {
case _: ShuffleExchangeExec => true
case p => p.children.exists(containsShuffleExchangeExec)
}

val predicatePartitioningTests = Seq(
("distinct", (df: DataFrame) => df.select($"predicate").distinct(), Seq(
Row("dgraph.type"), Row("director"), Row("name"), Row("release_date"),
Row("revenue"), Row("running_time"), Row("starring"), Row("title")
)),
("groupBy", (df: DataFrame) => df.groupBy($"predicate").count(), Seq(
Row("dgraph.type", 10), Row("director", 3), Row("name", 6), Row("release_date", 4),
Row("revenue", 4), Row("running_time", 4), Row("starring", 9), Row("title", 3)
)),
("Window.partitionBy", (df: DataFrame) => df.select($"predicate", count(lit(1)) over Window.partitionBy($"predicate")), Seq(
Row("dgraph.type", 10), Row("director", 3), Row("name", 6), Row("release_date", 4),
Row("revenue", 4), Row("running_time", 4), Row("starring", 9), Row("title", 3)
).flatMap(row => row * row.getInt(1))), // all rows occur with cardinality of their count
("Window.partitionBy.orderBy", (df: DataFrame) => df.select($"predicate", row_number() over Window.partitionBy($"predicate").orderBy($"subject")), Seq(
Row("dgraph.type", 10), Row("director", 3), Row("name", 6), Row("release_date", 4),
Row("revenue", 4), Row("running_time", 4), Row("starring", 9), Row("title", 3)
).flatMap(row => row ++ row.getInt(1))), // each row occurs with row_number up to their cardinality
)

def testPartitioning(df: () => DataFrame,
tests: Seq[(String, DataFrame => DataFrame, Seq[Row])],
shuffleExpected: Boolean): Unit = {
testPartitioning2(df, tests.map(test => (test._1, test._2, () => test._3)), shuffleExpected = shuffleExpected)
}

def testPartitioning2(df: () => DataFrame,
tests: Seq[(String, DataFrame => DataFrame, () => Seq[Row])],
shuffleExpected: Boolean): Unit = {
val label = if (shuffleExpected) "shuffle" else "reuse partitioning"
Expand All @@ -909,6 +873,19 @@ class TestTriplesSource extends AnyFunSpec
}
}

lazy val expectedPredicateCounts = expectedTypedTriples.toSeq.groupBy(_.predicate)
.mapValues(_.length).toSeq.sortBy(_._1).map(e => Row(e._1, e._2))
val predicatePartitioningTests = Seq(
("distinct", (df: DataFrame) => df.select($"predicate").distinct(), () => expectedPredicateCounts.map(row => Row(row.getString(0)))),
("groupBy", (df: DataFrame) => df.groupBy($"predicate").count(), () => expectedPredicateCounts),
("Window.partitionBy", (df: DataFrame) => df.select($"predicate", count(lit(1)) over Window.partitionBy($"predicate")),
() => expectedPredicateCounts.flatMap(row => row * row.getInt(1)) // all rows occur with cardinality of their count
),
("Window.partitionBy.orderBy", (df: DataFrame) => df.select($"predicate", row_number() over Window.partitionBy($"predicate").orderBy($"subject")),
() => expectedPredicateCounts.flatMap(row => row ++ row.getInt(1)) // each row occurs with row_number up to their cardinality
)
)

describe("without predicate partitioning") {
val withoutPartitioning = () =>
reader
Expand All @@ -918,7 +895,6 @@ class TestTriplesSource extends AnyFunSpec
MaxLeaseIdEstimatorIdOption -> dgraph.highestUid.toString
))
.dgraph.triples(dgraph.target)
.where(!$"predicate".contains("@"))

testPartitioning(withoutPartitioning, predicatePartitioningTests, shuffleExpected = true)
}
Expand All @@ -929,28 +905,21 @@ class TestTriplesSource extends AnyFunSpec
.option(PartitionerOption, PredicatePartitionerOption)
.option(PredicatePartitionerPredicatesOption, "2")
.dgraph.triples(dgraph.target)
.where(!$"predicate".contains("@"))

testPartitioning(withPartitioning, predicatePartitioningTests, shuffleExpected = false)
}

lazy val expectedSubjectCounts = expectedTypedTriples.toSeq.groupBy(_.subject)
.mapValues(_.length).toSeq.sortBy(_._1).map(e => Row(e._1, e._2))
val subjectPartitioningTests = Seq(
("distinct", (df: DataFrame) => df.select($"subject").distinct(), () => dgraph.allUids.sorted.map(Row(_))),
("groupBy", (df: DataFrame) => df.groupBy($"subject").count(), () => Seq(
Row(dgraph.han, 2), Row(dgraph.irvin, 2), Row(dgraph.leia, 2), Row(dgraph.luke, 2),
Row(dgraph.lucas, 2), Row(dgraph.richard, 2),
Row(dgraph.st1, 4), Row(dgraph.sw1, 9), Row(dgraph.sw2, 9), Row(dgraph.sw3, 9)
).sortBy(_.getLong(0))),
("Window.partitionBy", (df: DataFrame) => df.select($"subject", count(lit(1)) over Window.partitionBy($"subject")), () => Seq(
Row(dgraph.han, 2), Row(dgraph.irvin, 2), Row(dgraph.leia, 2), Row(dgraph.luke, 2),
Row(dgraph.lucas, 2), Row(dgraph.richard, 2),
Row(dgraph.st1, 4), Row(dgraph.sw1, 9), Row(dgraph.sw2, 9), Row(dgraph.sw3, 9)
).sortBy(_.getLong(0)).flatMap(row => row * row.getInt(1))), // all rows occur with cardinality of their count
("Window.partitionBy.orderBy", (df: DataFrame) => df.select($"subject", row_number() over Window.partitionBy($"subject").orderBy($"predicate")), () => Seq(
Row(dgraph.han, 2), Row(dgraph.irvin, 2), Row(dgraph.leia, 2), Row(dgraph.luke, 2),
Row(dgraph.lucas, 2), Row(dgraph.richard, 2),
Row(dgraph.st1, 4), Row(dgraph.sw1, 9), Row(dgraph.sw2, 9), Row(dgraph.sw3, 9)
).sortBy(_.getLong(0)).flatMap(row => row ++ row.getInt(1))), // each row occurs with row_number up to their cardinality
("distinct", (df: DataFrame) => df.select($"subject").distinct(), () => expectedSubjectCounts.map(row => Row(row.getLong(0)))),
("groupBy", (df: DataFrame) => df.groupBy($"subject").count(), () => expectedSubjectCounts),
("Window.partitionBy", (df: DataFrame) => df.select($"subject", count(lit(1)) over Window.partitionBy($"subject")),
() => expectedSubjectCounts.flatMap(row => row * row.getInt(1)) // all rows occur with cardinality of their count
),
("Window.partitionBy.orderBy", (df: DataFrame) => df.select($"subject", row_number() over Window.partitionBy($"subject").orderBy($"predicate")),
() => expectedSubjectCounts.flatMap(row => row ++ row.getInt(1)) // each row occurs with row_number up to their cardinality
)
)

describe("without subject partitioning") {
Expand All @@ -959,9 +928,8 @@ class TestTriplesSource extends AnyFunSpec
.option(PartitionerOption, PredicatePartitionerOption)
.option(PredicatePartitionerPredicatesOption, "2")
.dgraph.triples(dgraph.target)
.where(!$"predicate".contains("@"))

testPartitioning2(withoutPartitioning, subjectPartitioningTests, shuffleExpected = true)
testPartitioning(withoutPartitioning, subjectPartitioningTests, shuffleExpected = true)
}

describe("with subject partitioning") {
Expand All @@ -973,30 +941,21 @@ class TestTriplesSource extends AnyFunSpec
MaxLeaseIdEstimatorIdOption -> dgraph.highestUid.toString
))
.dgraph.triples(dgraph.target)
.where(!$"predicate".contains("@"))

testPartitioning2(withPartitioning, subjectPartitioningTests, shuffleExpected = false)
testPartitioning(withPartitioning, subjectPartitioningTests, shuffleExpected = false)
}

// Array([3,dgraph.type], [3,release_date], [3,revenue], [3,running_time], [4,dgraph.type], [4,name], [5,dgraph.type], [5,name], [6,dgraph.type], [6,director], [6,release_date], [6,revenue], [6,running_time], [6,starring], [6,title], [7,dgraph.type], [7,name], [8,dgraph.type], [8,director], [8,release_date], [8,revenue], [8,running_time], [8,starring], [8,title], [9,dgraph.type], [9,director], [9,release_date], [9,revenue], [9,running_time], [9,starring], [9,title], [10,dgraph.type], [10,name], [11,dgraph.type], [11,name], [12,dgraph.type], [12,name])

lazy val expectedSubjectAndPredicateCounts = expectedTypedTriples.toSeq.groupBy(t => (t.subject, t.predicate))
.mapValues(_.length).toSeq.sortBy(_._1).map(e => Row(e._1._1, e._1._2, e._2))
val subjectAndPredicatePartitioningTests = Seq(
("distinct", (df: DataFrame) => df.select($"subject", $"predicate").distinct(),
() => TriplesSourceExpecteds(dgraph).getExpectedTypedTriples.map(t => Row(t.subject, t.predicate))
.toSeq.sortBy(row => (row.getLong(0), row.getString(1)))
("distinct", (df: DataFrame) => df.select($"subject", $"predicate").distinct(), () => expectedSubjectAndPredicateCounts.map(row => Row(row.getLong(0), row.getString(1)))),
("groupBy", (df: DataFrame) => df.groupBy($"subject", $"predicate").count(), () => expectedSubjectAndPredicateCounts),
("Window.partitionBy", (df: DataFrame) => df.select($"subject", $"predicate", count(lit(1)) over Window.partitionBy($"subject", $"predicate")),
() => expectedSubjectAndPredicateCounts.flatMap(row => row * row.getInt(2)) // all rows occur with cardinality of their count
),
("groupBy", (df: DataFrame) => df.groupBy($"subject", $"predicate").count(), () => Seq(
Row("dgraph.type", 11), Row("director", 3), Row("name", 6), Row("release_date", 4),
Row("revenue", 4), Row("running_time", 4), Row("starring", 9), Row("title", 3)
)),
("Window.partitionBy", (df: DataFrame) => df.select($"subject", $"predicate", count(lit(1)) over Window.partitionBy($"subject", $"predicate")), () => Seq(
Row("dgraph.type", 11), Row("director", 3), Row("name", 6), Row("release_date", 4),
Row("revenue", 4), Row("running_time", 4), Row("starring", 9), Row("title", 3)
).flatMap(row => row * row.getInt(1))), // all rows occur with cardinality of their count
("Window.partitionBy.orderBy", (df: DataFrame) => df.select($"subject", $"predicate", row_number() over Window.partitionBy($"subject", $"predicate").orderBy($"objectType")), () => Seq(
Row("dgraph.type", 11), Row("director", 3), Row("name", 6), Row("release_date", 4),
Row("revenue", 4), Row("running_time", 4), Row("starring", 9), Row("title", 3)
).flatMap(row => row ++ row.getInt(1))), // each row occurs with row_number up to their cardinality
("Window.partitionBy.orderBy", (df: DataFrame) => df.select($"subject", $"predicate", row_number() over Window.partitionBy($"subject", $"predicate").orderBy($"objectType")),
() => expectedSubjectAndPredicateCounts.flatMap(row => row ++ row.getInt(2)) // each row occurs with row_number up to their cardinality
)
)

describe("without subject and predicate partitioning") {
Expand All @@ -1006,7 +965,7 @@ class TestTriplesSource extends AnyFunSpec
.option(PredicatePartitionerPredicatesOption, "2")
.dgraph.triples(dgraph.target)

testPartitioning2(withoutPartitioning, subjectAndPredicatePartitioningTests, shuffleExpected = true)
testPartitioning(withoutPartitioning, subjectAndPredicatePartitioningTests, shuffleExpected = true)
}

describe("with subject and predicate partitioning") {
Expand All @@ -1020,7 +979,7 @@ class TestTriplesSource extends AnyFunSpec
))
.dgraph.triples(dgraph.target)

testPartitioning2(withPartitioning, subjectAndPredicatePartitioningTests, shuffleExpected = false)
testPartitioning(withPartitioning, subjectAndPredicatePartitioningTests, shuffleExpected = false)
}

}
Expand Down Expand Up @@ -1180,7 +1139,7 @@ object TestTriplesSource {

implicit class ExtendedRow(row: Row) {
def *(n: Int): Seq[Row] = Seq.fill(n)(row)
def ++(n: Int): Seq[Row] = Seq.fill(n)(row).zipWithIndex.map { case (row, idx) => Row(row.get(0), idx+1) }
def ++(n: Int): Seq[Row] = Seq.fill(n)(row).zipWithIndex.map { case (row, idx) => Row(row.toSeq.init :+ (idx+1): _*) }
}

}

0 comments on commit 5f2b057

Please sign in to comment.