-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upload diff results from BigDiffy to BigQuery #113
Changes from 6 commits
4997873
ddabcd9
2bf63f2
e61620d
5786779
088e884
3bc5251
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ import com.spotify.ratatool.{Command, GcsConfiguration} | |
import com.spotify.ratatool.samplers.AvroSampler | ||
import com.spotify.scio._ | ||
import com.spotify.scio.bigquery.BigQueryClient | ||
import com.spotify.scio.bigquery.types.BigQueryType | ||
import com.spotify.scio.io.Tap | ||
import com.spotify.scio.values.SCollection | ||
import com.twitter.algebird._ | ||
|
@@ -157,6 +158,10 @@ class BigDiffy[T](lhs: SCollection[T], rhs: SCollection[T], | |
|
||
} | ||
|
||
sealed trait OutputMode | ||
case object GCS extends OutputMode | ||
case object BQ extends OutputMode | ||
|
||
/** Big diff between two data sets given a primary key. */ | ||
object BigDiffy extends Command { | ||
val command: String = "bigDiffy" | ||
|
@@ -282,19 +287,69 @@ object BigDiffy extends Command { | |
r.setFields(mergeFields(x.getFields.asScala, y.getFields.asScala).asJava) | ||
} | ||
|
||
def saveStats[T](bigDiffy: BigDiffy[T], output: String, withHeader: Boolean = false): Unit = { | ||
if (withHeader) { | ||
bigDiffy.keyStats.map(_.toString).saveAsTextFileWithHeader(s"$output/keys", "key\tdifftype") | ||
bigDiffy.fieldStats.map(_.toString).saveAsTextFileWithHeader(s"$output/fields", | ||
"field\tcount\tfraction\tdeltaType\tmin" + | ||
"\tmax\tcount\tmean\tvariance\tstddev\tskewness\tkurtosis") | ||
bigDiffy.globalStats.map(_.toString).saveAsTextFileWithHeader(s"$output/global", | ||
"numTotal\tnumSame\tnumDiff\tnumMissingLhs\tnumMissingRhs") | ||
} | ||
else { | ||
bigDiffy.keyStats.saveAsTextFile(s"$output/keys") | ||
bigDiffy.fieldStats.saveAsTextFile(s"$output/fields") | ||
bigDiffy.globalStats.saveAsTextFile(s"$output/global") | ||
@BigQueryType.toTable | ||
case class KeyStatsBigQuery(key: String, diffType: String, delta: Option[DeltaBigQuery]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Outside the scope of this PR but we can potentially add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should I create a follow up issue for this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
case class DeltaBigQuery(field: String, left: String, right: String, delta: DeltaValueBigQuery) | ||
case class DeltaValueBigQuery(deltaType: String, deltaValue: Option[Double]) | ||
@BigQueryType.toTable | ||
case class GlobalStatsBigQuery(numTotal: Long, numSame: Long, numDiff: Long, | ||
numMissingLhs: Long, numMissingRhs: Long) | ||
@BigQueryType.toTable | ||
case class FieldStatsBigQuery(field: String, | ||
count: Long, | ||
fraction: Double, | ||
deltaStats: Option[DeltaStatsBigQuery]) | ||
case class DeltaStatsBigQuery(deltaType: String, | ||
min: Double, max: Double, count: Long, | ||
mean: Double, variance: Double, stddev: Double, | ||
skewness: Double, kurtosis: Double) | ||
|
||
/** saves stats to either GCS as text, or BigQuery */ | ||
def saveStats[T](bigDiffy: BigDiffy[T], output: String, withHeader: Boolean = false, | ||
outputMode: OutputMode = GCS): Unit = { | ||
outputMode match { | ||
case GCS => | ||
// Saving to GCS, either with or without header | ||
val keyStatsPath = s"$output/keys" | ||
val fieldStatsPath = s"$output/fields" | ||
val globalStatsPath = s"$output/global" | ||
|
||
if (withHeader) { | ||
bigDiffy.keyStats.map(_.toString).saveAsTextFileWithHeader(keyStatsPath, | ||
Seq("key", "difftype").mkString("\t")) | ||
bigDiffy.fieldStats.map(_.toString).saveAsTextFileWithHeader(fieldStatsPath, | ||
Seq("field", "count", "fraction", "deltaType", "min", "max", "count", "mean", | ||
"variance", "stddev", "skewness", "kurtosis").mkString("\t")) | ||
bigDiffy.globalStats.map(_.toString).saveAsTextFileWithHeader(globalStatsPath, | ||
Seq("numTotal", "numSame", "numDiff", "numMissingLhs", "numMissingRhs").mkString("\t")) | ||
} else { | ||
bigDiffy.keyStats.saveAsTextFile(keyStatsPath) | ||
bigDiffy.fieldStats.saveAsTextFile(fieldStatsPath) | ||
bigDiffy.globalStats.saveAsTextFile(globalStatsPath) | ||
} | ||
case BQ => | ||
// Saving to BQ, header irrelevant | ||
bigDiffy.keyStats.map(stat => | ||
KeyStatsBigQuery(stat.key, stat.diffType.toString, stat.delta.map(d => { | ||
val dv = d.delta match { | ||
case TypedDelta(dt, v) => | ||
DeltaValueBigQuery(dt.toString, Option(v)) | ||
case _ => | ||
DeltaValueBigQuery("UNKNOWN", None) | ||
} | ||
DeltaBigQuery(d.field, d.left.toString, d.right.toString, dv)}) | ||
)) | ||
.saveAsTypedBigQuery(s"${output}_keys") | ||
bigDiffy.fieldStats.map(stat => | ||
FieldStatsBigQuery(stat.field, stat.count, stat.fraction, stat.deltaStats.map(ds => | ||
DeltaStatsBigQuery(ds.deltaType.toString, ds.min, ds.max, ds.count, ds.mean, | ||
ds.variance, ds.stddev, ds.skewness, ds.kurtosis) | ||
))) | ||
.saveAsTypedBigQuery(s"${output}_fields") | ||
bigDiffy.globalStats.map( stat => | ||
GlobalStatsBigQuery(stat.numTotal, stat.numSame, stat.numDiff, | ||
stat.numMissingLhs, stat.numMissingRhs)) | ||
.saveAsTypedBigQuery(s"${output}_global") | ||
} | ||
} | ||
|
||
|
@@ -329,14 +384,15 @@ object BigDiffy extends Command { | |
s"""BigDiffy - pair-wise field-level statistical diff | ||
|Usage: ratatool $command [dataflow_options] [options] | ||
| | ||
| --mode=[avro|bigquery] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we want to support There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's fine to break it, will add a more helpful log error if |
||
| --key=<key> '.' separated key field | ||
| --lhs=<path> LHS File path or BigQuery table | ||
| --rhs=<path> RHS File path or BigQuery table | ||
| --output=<output> File path prefix for output | ||
| --ignore=<keys> ',' separated field list to ignore | ||
| --unordered=<keys> ',' separated field list to treat as unordered | ||
| [--with-header] Output all TSVs with header rows | ||
| --input-mode=(avro|bigquery) Diff-ing Avro or BQ records | ||
| [--output-mode=(gcs|bigquery)] Saves to a text file in GCS or a BigQuery dataset | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what do the braces connote - optional args? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, common convention in bash help/usage guides There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But I was making this optional and then reverted the change, so usage needs to be updated |
||
| --key=<key> '.' separated key field | ||
| --lhs=<path> LHS File path or BigQuery table | ||
| --rhs=<path> RHS File path or BigQuery table | ||
| --output=<output> File path prefix for output | ||
| --ignore=<keys> ',' separated field list to ignore | ||
| --unordered=<keys> ',' separated field list to treat as unordered | ||
| [--with-header] Output all TSVs with header rows | ||
""".stripMargin) | ||
// scalastyle:on regex | ||
sys.exit(1) | ||
|
@@ -380,23 +436,32 @@ object BigDiffy extends Command { | |
} | ||
} | ||
|
||
/** for easier running via sbt */ | ||
def main(cmdlineArgs: Array[String]): Unit = run(cmdlineArgs) | ||
|
||
/** Scio pipeline for BigDiffy. */ | ||
def run(cmdlineArgs: Array[String]): Unit = { | ||
val (sc, args) = ContextAndArgs(cmdlineArgs) | ||
|
||
val (mode, key, lhs, rhs, output, header, ignore, unordered) = { | ||
val (inputMode, key, lhs, rhs, output, header, ignore, unordered, outputMode) = { | ||
try { | ||
(args("mode"), args("key"), args("lhs"), args("rhs"), args("output"), | ||
(args("input-mode"), args("key"), args("lhs"), args("rhs"), args("output"), | ||
args.boolean("with-header", false), args.list("ignore").toSet, | ||
args.list("unordered").toSet) | ||
args.list("unordered").toSet, args.optional("output-mode")) | ||
} catch { | ||
case e: Throwable => | ||
usage() | ||
throw e | ||
} | ||
} | ||
|
||
val result = mode match { | ||
val om: OutputMode = outputMode match { | ||
case Some("gcs") => GCS | ||
case Some("bigquery") => BQ | ||
case m => throw new IllegalArgumentException(s"output mode $m not supported") | ||
} | ||
|
||
val result = inputMode match { | ||
case "avro" => | ||
// TODO: handle schema evolution | ||
val fs = FileSystem.get(new URI(rhs), GcsConfiguration.get()) | ||
|
@@ -413,9 +478,9 @@ object BigDiffy extends Command { | |
val diffy = new TableRowDiffy(schema, ignore, unordered) | ||
BigDiffy.diffTableRow(sc, lhs, rhs, tableRowKeyFn(key), diffy) | ||
case m => | ||
throw new IllegalArgumentException(s"mode $m not supported") | ||
throw new IllegalArgumentException(s"input mode $m not supported") | ||
} | ||
saveStats(result, output, header) | ||
saveStats(result, output, header, om) | ||
|
||
sc.close().waitUntilDone() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If paradise import fixes the annotation compilation then we can probably reduce these down to a single class again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was also seeing other errors around the subtypes not being mappable to BigQuery types - in particular, I'm saving enums and Any types as Strings in BigQuery with these new case classes. I would prefer to leave them separate.