Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload diff results from BigDiffy to BigQuery #113

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ lazy val ratatoolDiffy = project
),
// In case of scalacheck failures print more info
testOptions in Test += Tests.Argument(TestFrameworks.ScalaCheck, "-verbosity", "3"),
parallelExecution in Test := false
parallelExecution in Test := false,
addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.0" cross CrossVersion.full)
)
.enablePlugins(ProtobufPlugin)
.dependsOn(
Expand Down
21 changes: 11 additions & 10 deletions ratatool-diffy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@ For full details on Statistics and output see [BigDiffy.scala](https://github.co
From the CLI
```
BigDiffy - pair-wise field-level statistical diff
Usage: BigDiffy [dataflow_options] [options]

--mode=[avro|bigquery]
--key=<key> '.' separated key field
--lhs=<path> LHS File path or BigQuery table
--rhs=<path> RHS File path or BigQuery table
--output=<output> File path prefix for output
--ignore=<keys> ',' separated field list to ignore
--unordered=<keys> ',' separated field list to treat as unordered
[--with-header] Output all TSVs with header rows
Usage: ratatool bigDiffy [dataflow_options] [options]

--input-mode=[avro|bigquery] Diff-ing Avro or BQ records
--output-mode=[gcs|bigquery] Saves to a text file in GCS or a BigQuery dataset
--key=<key> '.' separated key field
--lhs=<path> LHS File path or BigQuery table
--rhs=<path> RHS File path or BigQuery table
--output=<output> File path prefix for output
--ignore=<keys> ',' separated field list to ignore
--unordered=<keys> ',' separated field list to treat as unordered
[--with-header] Output all TSVs with header rows
```

Or from SBT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.spotify.ratatool.{Command, GcsConfiguration}
import com.spotify.ratatool.samplers.AvroSampler
import com.spotify.scio._
import com.spotify.scio.bigquery.BigQueryClient
import com.spotify.scio.bigquery.types.BigQueryType
import com.spotify.scio.io.Tap
import com.spotify.scio.values.SCollection
import com.twitter.algebird._
Expand Down Expand Up @@ -157,6 +158,10 @@ class BigDiffy[T](lhs: SCollection[T], rhs: SCollection[T],

}

sealed trait OutputMode
case object GCS extends OutputMode
case object BQ extends OutputMode

/** Big diff between two data sets given a primary key. */
object BigDiffy extends Command {
val command: String = "bigDiffy"
Expand Down Expand Up @@ -282,19 +287,69 @@ object BigDiffy extends Command {
r.setFields(mergeFields(x.getFields.asScala, y.getFields.asScala).asJava)
}

def saveStats[T](bigDiffy: BigDiffy[T], output: String, withHeader: Boolean = false): Unit = {
if (withHeader) {
bigDiffy.keyStats.map(_.toString).saveAsTextFileWithHeader(s"$output/keys", "key\tdifftype")
bigDiffy.fieldStats.map(_.toString).saveAsTextFileWithHeader(s"$output/fields",
"field\tcount\tfraction\tdeltaType\tmin" +
"\tmax\tcount\tmean\tvariance\tstddev\tskewness\tkurtosis")
bigDiffy.globalStats.map(_.toString).saveAsTextFileWithHeader(s"$output/global",
"numTotal\tnumSame\tnumDiff\tnumMissingLhs\tnumMissingRhs")
}
else {
bigDiffy.keyStats.saveAsTextFile(s"$output/keys")
bigDiffy.fieldStats.saveAsTextFile(s"$output/fields")
bigDiffy.globalStats.saveAsTextFile(s"$output/global")
@BigQueryType.toTable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If paradise import fixes the annotation compilation then we can probably reduce these down to a single class again

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also seeing other errors around the subtypes not being mappable to BigQuery types - in particular, I'm saving enums and Any types as Strings in BigQuery with these new case classes. I would prefer to leave them separate.

case class KeyStatsBigQuery(key: String, diffType: String, delta: Option[DeltaBigQuery])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outside the scope of this PR but we can potentially add @description annotations here now that it can write out to BQ to make it clearer for users what fields are.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should I create a follow up issue for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case class DeltaBigQuery(field: String, left: String, right: String, delta: DeltaValueBigQuery)
case class DeltaValueBigQuery(deltaType: String, deltaValue: Option[Double])
@BigQueryType.toTable
case class GlobalStatsBigQuery(numTotal: Long, numSame: Long, numDiff: Long,
numMissingLhs: Long, numMissingRhs: Long)
@BigQueryType.toTable
case class FieldStatsBigQuery(field: String,
count: Long,
fraction: Double,
deltaStats: Option[DeltaStatsBigQuery])
case class DeltaStatsBigQuery(deltaType: String,
min: Double, max: Double, count: Long,
mean: Double, variance: Double, stddev: Double,
skewness: Double, kurtosis: Double)

/** saves stats to either GCS as text, or BigQuery */
def saveStats[T](bigDiffy: BigDiffy[T], output: String, withHeader: Boolean = false,
outputMode: OutputMode = GCS): Unit = {
outputMode match {
case GCS =>
// Saving to GCS, either with or without header
val keyStatsPath = s"$output/keys"
val fieldStatsPath = s"$output/fields"
val globalStatsPath = s"$output/global"

if (withHeader) {
bigDiffy.keyStats.map(_.toString).saveAsTextFileWithHeader(keyStatsPath,
Seq("key", "difftype").mkString("\t"))
bigDiffy.fieldStats.map(_.toString).saveAsTextFileWithHeader(fieldStatsPath,
Seq("field", "count", "fraction", "deltaType", "min", "max", "count", "mean",
"variance", "stddev", "skewness", "kurtosis").mkString("\t"))
bigDiffy.globalStats.map(_.toString).saveAsTextFileWithHeader(globalStatsPath,
Seq("numTotal", "numSame", "numDiff", "numMissingLhs", "numMissingRhs").mkString("\t"))
} else {
bigDiffy.keyStats.saveAsTextFile(keyStatsPath)
bigDiffy.fieldStats.saveAsTextFile(fieldStatsPath)
bigDiffy.globalStats.saveAsTextFile(globalStatsPath)
}
case BQ =>
// Saving to BQ, header irrelevant
bigDiffy.keyStats.map(stat =>
KeyStatsBigQuery(stat.key, stat.diffType.toString, stat.delta.map(d => {
val dv = d.delta match {
case TypedDelta(dt, v) =>
DeltaValueBigQuery(dt.toString, Option(v))
case _ =>
DeltaValueBigQuery("UNKNOWN", None)
}
DeltaBigQuery(d.field, d.left.toString, d.right.toString, dv)})
))
.saveAsTypedBigQuery(s"${output}_keys")
bigDiffy.fieldStats.map(stat =>
FieldStatsBigQuery(stat.field, stat.count, stat.fraction, stat.deltaStats.map(ds =>
DeltaStatsBigQuery(ds.deltaType.toString, ds.min, ds.max, ds.count, ds.mean,
ds.variance, ds.stddev, ds.skewness, ds.kurtosis)
)))
.saveAsTypedBigQuery(s"${output}_fields")
bigDiffy.globalStats.map( stat =>
GlobalStatsBigQuery(stat.numTotal, stat.numSame, stat.numDiff,
stat.numMissingLhs, stat.numMissingRhs))
.saveAsTypedBigQuery(s"${output}_global")
}
}

Expand Down Expand Up @@ -329,14 +384,15 @@ object BigDiffy extends Command {
s"""BigDiffy - pair-wise field-level statistical diff
|Usage: ratatool $command [dataflow_options] [options]
|
| --mode=[avro|bigquery]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to support mode as well for backwards compatibility

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to break it, will add a more helpful log error if mode is passed in

| --key=<key> '.' separated key field
| --lhs=<path> LHS File path or BigQuery table
| --rhs=<path> RHS File path or BigQuery table
| --output=<output> File path prefix for output
| --ignore=<keys> ',' separated field list to ignore
| --unordered=<keys> ',' separated field list to treat as unordered
| [--with-header] Output all TSVs with header rows
| --input-mode=(avro|bigquery) Diff-ing Avro or BQ records
| [--output-mode=(gcs|bigquery)] Saves to a text file in GCS or a BigQuery dataset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do the braces connote - optional args?

Copy link
Contributor

@idreeskhan idreeskhan Aug 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, common convention in bash help/usage guides

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I was making this optional and then reverted the change, so usage needs to be updated

| --key=<key> '.' separated key field
| --lhs=<path> LHS File path or BigQuery table
| --rhs=<path> RHS File path or BigQuery table
| --output=<output> File path prefix for output
| --ignore=<keys> ',' separated field list to ignore
| --unordered=<keys> ',' separated field list to treat as unordered
| [--with-header] Output all TSVs with header rows
""".stripMargin)
// scalastyle:on regex
sys.exit(1)
Expand Down Expand Up @@ -380,23 +436,32 @@ object BigDiffy extends Command {
}
}

/** for easier running via sbt */
def main(cmdlineArgs: Array[String]): Unit = run(cmdlineArgs)

/** Scio pipeline for BigDiffy. */
def run(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)

val (mode, key, lhs, rhs, output, header, ignore, unordered) = {
val (inputMode, key, lhs, rhs, output, header, ignore, unordered, outputMode) = {
try {
(args("mode"), args("key"), args("lhs"), args("rhs"), args("output"),
(args("input-mode"), args("key"), args("lhs"), args("rhs"), args("output"),
args.boolean("with-header", false), args.list("ignore").toSet,
args.list("unordered").toSet)
args.list("unordered").toSet, args.optional("output-mode"))
} catch {
case e: Throwable =>
usage()
throw e
}
}

val result = mode match {
val om: OutputMode = outputMode match {
case Some("gcs") => GCS
case Some("bigquery") => BQ
case m => throw new IllegalArgumentException(s"output mode $m not supported")
}

val result = inputMode match {
case "avro" =>
// TODO: handle schema evolution
val fs = FileSystem.get(new URI(rhs), GcsConfiguration.get())
Expand All @@ -413,9 +478,9 @@ object BigDiffy extends Command {
val diffy = new TableRowDiffy(schema, ignore, unordered)
BigDiffy.diffTableRow(sc, lhs, rhs, tableRowKeyFn(key), diffy)
case m =>
throw new IllegalArgumentException(s"mode $m not supported")
throw new IllegalArgumentException(s"input mode $m not supported")
}
saveStats(result, output, header)
saveStats(result, output, header, om)

sc.close().waitUntilDone()
}
Expand Down