Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#156 Add sum of truncated values as measure #157

Merged
merged 4 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/jacoco_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
run: sbt ++${{matrix.scala}} jacoco
- name: Add coverage to PR
id: jacoco
uses: madrapps/jacoco-report@v1.3
uses: madrapps/jacoco-report@v1.7.1
with:
paths: >
${{ github.workspace }}/atum/target/scala-${{ matrix.scala_short }}/jacoco/report/jacoco.xml,
Expand Down
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -419,13 +419,15 @@ The summary of common control framework routines you can use as Spark and Datafr
The control measurement of a column is a hash sum. It can be calculated differently depending on the column's data type and
on business requirements. This table represents all currently supported measurement types:

| Type | Description |
| ------------------------------ |:----------------------------------------------------- |
| controlType.Count | Calculates the number of rows in the dataset |
| controlType.distinctCount | Calculates DISTINCT(COUNT(()) of the specified column |
| controlType.aggregatedTotal | Calculates SUM() of the specified column |
| controlType.absAggregatedTotal | Calculates SUM(ABS()) of the specified column |
| controlType.HashCrc32 | Calculates SUM(CRC32()) of the specified column |
| Type | Description |
| ----------------------------------- |:----------------------------------------------------- |
| controlType.Count | Calculates the number of rows in the dataset |
| controlType.distinctCount | Calculates DISTINCT(COUNT(()) of the specified column |
| controlType.aggregatedTotal | Calculates SUM() of the specified column |
| controlType.absAggregatedTotal | Calculates SUM(ABS()) of the specified column |
| controlType.HashCrc32 | Calculates SUM(CRC32()) of the specified column |
| controlType.aggregatedTruncTotal | Calculates SUM(TRUNC()) of the specified column |
| controlType.absAggregatedTruncTotal | Calculates SUM(TRUNC(ABS())) of the specified column |

## How to generate Code coverage report
```sbt
Expand Down
5 changes: 4 additions & 1 deletion atum/src/main/scala/za/co/absa/atum/core/ControlType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ object ControlType {
case object DistinctCount extends ControlType("distinctCount", false)
case object AggregatedTotal extends ControlType("aggregatedTotal", true)
case object AbsAggregatedTotal extends ControlType("absAggregatedTotal", true)
case object AggregatedTruncTotal extends ControlType("aggregatedTruncTotal", true)
case object AbsAggregatedTruncTotal extends ControlType("absAggregatedTruncTotal", true)
case object HashCrc32 extends ControlType("hashCrc32", false)

val values: Seq[ControlType] = Seq(Count, DistinctCount, AggregatedTotal, AbsAggregatedTotal, HashCrc32)
val values: Seq[ControlType] = Seq(Count, DistinctCount, AggregatedTotal, AbsAggregatedTotal,
AggregatedTruncTotal, AbsAggregatedTruncTotal, HashCrc32)
val valueNames: Seq[String] = values.map(_.value)

def getNormalizedValueName(input: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ object MeasurementProcessor {
.agg(sum(col(aggColName))).collect()(0)(0)
if (v == null) "" else v.toString
}
case AggregatedTruncTotal =>
(ds: Dataset[Row]) => {
val aggCol = sum(col(valueColumnName).cast(LongType))
aggregateColumn(ds, controlCol, aggCol)
}
case AbsAggregatedTruncTotal =>
(ds: Dataset[Row]) => {
val aggCol = sum(abs(col(valueColumnName).cast(LongType)))
aggregateColumn(ds, controlCol, aggCol)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ trait SparkLocalMaster {
// in order to runSampleMeasuremts as tests, otherwise
// java.lang.IllegalArgumentException: System memory 259522560 must be at least 471859200... is thrown
System.getProperties.setProperty("spark.testing.memory", (1024*1024*1024).toString) // 1g
System.getProperties.setProperty("spark.app.name", "unit-test")
}
36 changes: 33 additions & 3 deletions atum/src/test/scala/za/co/absa/atum/ControlMeasurementsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ControlMeasurementsSpec extends AnyFlatSpec with Matchers with SparkTestBa
)
))

val measurementsIntOverflow = List(
val measurementsIntOverflow: Seq[Measurement] = List(
Measurement(
controlName = "RecordCount",
controlType = ControlType.Count.value,
Expand Down Expand Up @@ -112,7 +112,7 @@ class ControlMeasurementsSpec extends AnyFlatSpec with Matchers with SparkTestBa
assert(newMeasurements == measurementsIntOverflow)
}

val measurementsAggregation = List(
val measurementsAggregation: Seq[Measurement] = List(
Measurement(
controlName = "RecordCount",
controlType = ControlType.Count.value,
Expand Down Expand Up @@ -304,7 +304,7 @@ class ControlMeasurementsSpec extends AnyFlatSpec with Matchers with SparkTestBa
assert(newMeasurements == measurements3)
}

val measurementsWithHash = List(
val measurementsWithHash: Seq[Measurement] = List(
Measurement(
controlName = "RecordCount",
controlType = ControlType.Count.value,
Expand Down Expand Up @@ -394,4 +394,34 @@ class ControlMeasurementsSpec extends AnyFlatSpec with Matchers with SparkTestBa
assert(newMeasurements == measurementsAggregationShort)
}

val measurementsAggregatedTruncTotal: Seq[Measurement] = List(
Measurement(
controlName = "aggregatedTruncTotal",
controlType = "aggregatedTruncTotal",
controlCol = "price",
controlValue = "999"
),
Measurement(
controlName = "absAggregatedTruncTotal",
controlType = "absAggregatedTruncTotal",
controlCol = "price",
controlValue = "2999"
)
)

"aggregatedTruncTotal types" should "return truncated sum of values" in {
val inputDataJson = spark.sparkContext.parallelize(
s"""{"id": ${Long.MaxValue}, "price": -1000.000001, "order": { "orderid": 1, "items": 1 } } """ ::
s"""{"id": ${Long.MinValue}, "price": 1000.9, "order": { "orderid": -1, "items": -1 } } """ ::
s"""{"id": ${Long.MinValue}, "price": 999.999999, "order": { "orderid": -1, "items": -1 } } """ ::Nil)
val df = spark.read
.schema(schema)
.json(inputDataJson.toDS)

val processor = new MeasurementProcessor(measurementsAggregatedTruncTotal)
val newMeasurements = processor.measureDataset(df)

assert(newMeasurements == measurementsAggregatedTruncTotal)
}

}
Loading