Skip to content

Commit

Permalink
Issue Qbeast-io#416: Add CDFQuantile Transformers and Transformations (
Browse files Browse the repository at this point in the history
  • Loading branch information
osopardo1 authored Oct 1, 2024
1 parent e8f560e commit c70670f
Show file tree
Hide file tree
Showing 34 changed files with 1,455 additions and 545 deletions.
43 changes: 30 additions & 13 deletions docs/AdvancedConfiguration.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,36 +167,53 @@ val columnStats =
|"date_max":"${formatter.format(maxTimestamp)}" }""".stripMargin
```

## String indexing via Histograms
## Indexing via Quantile Based CDF

The default **String** column transformation (`HashTransformation`) has limited range query supports since the lexicographic ordering of the String values are not preserved.
> **WARNING**: This is an **Experimental Feature**, and the API might change in the near future.
This can be addressed by introducing a custom **String** histogram in the form of sorted `Seq[String]`, and can lead to several improvements including:
The default column transformation for Strings (`HashTransformation`) has limited range query supports since the lexicographic ordering of the String values are not preserved. On the numeric side, the default transformation is `LinearTransformation`, which is a simple linear transformation that preserves the ordering of the values.

This can be addressed by introducing a custom Quantile Based sequence in the form of sorted `Seq`, and can lead to several improvements including:
1. more efficient file-pruning because of its reduced file-level column min/max
2. support for range queries on String columns
3. improved overall query speed

The following code snippet demonstrates the extraction of a **String** histogram from the source data:
The following code snippet demonstrates the extraction of a Quantile-based CDF from the source data:

```scala
import io.qbeast.spark.utils.QbeastUtils

val brandStats = QbeastUtils.computeHistogramForColumn(df, "brand", 50)
val statsStr = s"""{"brand_histogram":$brandStats}"""
val columnQuantiles = QbeastUtils.computeQuantilesForColumn(df, "brand")
val columnStats = s"""{"brand_quantiles":$columnQuantiles}"""

(df
.write
.mode("overwrite")
.format("qbeast")
.option("columnsToIndex", "brand:histogram")
.option("columnStats", statsStr)
.save(targetPath))
.option("columnsToIndex", "brand:quantiles")
.option("columnStats", columnStats)
.save("/tmp/qbeast_table_quantiles"))
```
This is only necessary for the first write, if not otherwise made explicit, all subsequent appends will reuse the same histogram.
Any new custom histogram provided during `appends` forces the creation of a new `Revision`.

A default **String** histogram("a" t0 "z") will be used if the use of histogram is stated(`stringColName:string_hist`) with no histogram in `columnStats`.
The default histogram can not supersede an existing `StringHashTransformation`.
This is only necessary for the first write, if not otherwise made explicit, all subsequent appends will reuse the same quantile calculation.
Any new custom quantiles provided during `appends` forces the creation of a new `Revision`.

### How to configure quantiles computation
The `computeQuantilesForColumn` method computes the quantiles for the specified column and returns a `Seq` of quantile values. The `Seq` is then serialized into a `String` and passed as a custom column transformation to the `columnsToIndex` option.

You can **tune the number of quantiles and the relative error** for numeric columns using the QbeastUtils API.
```scala
val columnQuantiles =
QbeastUtils.computeQuantilesForColumn(df = df, columnName = columnName)
val columnQuantilesNumberOfQuantiles =
QbeastUtils.computeQuantilesForColumn(df = df, columnName = columnName, numberOfQuantiles = 100)
// For numeric columns, you can also specify the relative error
// For String columns, the relativeError is ignored
val columnQuantilesRelativeError =
QbeastUtils.computeQuantilesForColumn(df = df, columnName = columnName, relativeError = 0.3)
val columnQuantilesNumAndError =
QbeastUtils.computeQuantilesForColumn(df = df, columnName = columnName, numberOfQuantiles = 100, relativeError = 0.3)
```
## DefaultCubeSize

If you don't specify the cubeSize at DataFrame level, the default value is used. This is set to 5M, so if you want to change it
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.core.transform

import com.fasterxml.jackson.core.JsonFactory
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.core.TreeNode
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.databind.annotation.JsonSerialize
import com.fasterxml.jackson.databind.deser.std.StdDeserializer
import com.fasterxml.jackson.databind.jsontype.TypeSerializer
import com.fasterxml.jackson.databind.node.ArrayNode
import com.fasterxml.jackson.databind.node.TextNode
import com.fasterxml.jackson.databind.ser.std.StdSerializer
import com.fasterxml.jackson.databind.DeserializationContext
import com.fasterxml.jackson.databind.SerializerProvider
import io.qbeast.core.model.OrderedDataType
import org.apache.spark.annotation.Experimental

/**
* CDF Quantiles Transformation for Numeric types
* @param quantiles
* A set of quantiles that define the transformation
* @param dataType
* The data type of the column
*/
@Experimental
@JsonSerialize(using = classOf[CDFNumericQuantilesTransformationSerializer])
@JsonDeserialize(using = classOf[CDFNumericQuantilesTransformationDeserializer])
case class CDFNumericQuantilesTransformation(
quantiles: IndexedSeq[Double],
dataType: OrderedDataType)
extends CDFQuantilesTransformation {
require(quantiles.size > 1, "Quantiles size should be greater than 1")

override def ordering: Ordering[Any] =
Ordering[Double].asInstanceOf[Ordering[Any]]

override def mapValue(value: Any): Any = {
value match {
case v: Double => v
case v: Long => v.toDouble
case v: Int => v.toDouble
case v: BigDecimal => v.doubleValue()
case v: Float => v.toDouble
case v: java.sql.Timestamp => v.getTime.toDouble
case v: java.sql.Date => v.getTime.toDouble
case v: java.time.Instant => v.toEpochMilli.toDouble
}
}

}

class CDFNumericQuantilesTransformationSerializer
extends StdSerializer[CDFNumericQuantilesTransformation](
classOf[CDFNumericQuantilesTransformation]) {
val jsonFactory = new JsonFactory()

override def serializeWithType(
value: CDFNumericQuantilesTransformation,
gen: JsonGenerator,
serializers: SerializerProvider,
typeSer: TypeSerializer): Unit = {
gen.writeStartObject()
typeSer.getPropertyName
gen.writeStringField(typeSer.getPropertyName, typeSer.getTypeIdResolver.idFromValue(value))

gen.writeFieldName("quantiles")
gen.writeStartArray()
value.quantiles.foreach(gen.writeNumber)
gen.writeEndArray()
gen.writeObjectField("dataType", value.dataType)
gen.writeEndObject()
}

override def serialize(
value: CDFNumericQuantilesTransformation,
gen: JsonGenerator,
provider: SerializerProvider): Unit = {
gen.writeStartObject()

gen.writeFieldName("quantiles")
gen.writeStartArray()
value.quantiles.foreach(gen.writeNumber)
gen.writeEndArray()

gen.writeEndObject()
}

}

class CDFNumericQuantilesTransformationDeserializer
extends StdDeserializer[CDFNumericQuantilesTransformation](
classOf[CDFNumericQuantilesTransformation]) {

override def deserialize(
p: JsonParser,
ctxt: DeserializationContext): CDFNumericQuantilesTransformation = {
val tree: TreeNode = p.getCodec.readTree(p)
// Deserialize the ordered data type
val odt = tree.get("dataType") match {
case tn: TextNode => OrderedDataType(tn.asText())
}
// Deserialize the quantiles
val quantilesBuilder = IndexedSeq.newBuilder[Double]
tree.get("quantiles") match {
case an: ArrayNode =>
(0 until an.size()).foreach(i => quantilesBuilder += an.get(i).asDouble())
}
CDFNumericQuantilesTransformation(quantilesBuilder.result(), odt)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.core.transform

import io.qbeast.core.model.OrderedDataType
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.AnalysisExceptionFactory

/**
* CDF Quantile Transformer for Numeric Type
* @param columnName
* the name of the column
* @param orderedDataType
* the ordered data type
*/
@Experimental
case class CDFNumericQuantilesTransformer(columnName: String, orderedDataType: OrderedDataType)
extends CDFQuantilesTransformer {

/**
* Returns the Transformation given a row representation of the values
*
* @param row
* the values
* @return
* the transformation
*/
override def makeTransformation(row: String => Any): Transformation = {
row(columnTransformerName) match {
case null => EmptyTransformation()
case q: Seq[_] if q.nonEmpty =>
try {
val quantiles = q.map(_.asInstanceOf[Double]).toIndexedSeq
CDFNumericQuantilesTransformation(quantiles, orderedDataType)
} catch {
case _: ClassCastException =>
throw AnalysisExceptionFactory.create(
"Quantiles should be of type Double, but found another type")
}
case q: Seq[_] if q.isEmpty =>
throw AnalysisExceptionFactory.create(
s"Quantiles for column $columnName size should be greater than 1")
case _ =>
throw AnalysisExceptionFactory.create(
s"Quantiles for column $columnName should be of type Array[Double]")
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.core.transform

import io.qbeast.core.model.QDataType
import org.apache.spark.annotation.Experimental

import scala.collection.Searching._

/**
* A type of transformation that converts Any value to a Double between 0 and 1 based on the
* specified quantiles
*/
@Experimental
trait CDFQuantilesTransformation extends Transformation {

/**
* A sequence of quantiles including the extremes at 0 and 1
*/
val quantiles: IndexedSeq[Any]

/**
* The data type of the transformation
*/
val dataType: QDataType

/**
* The ordering of the data type
*
* When the type is an OrderedDataType, the ordering is the one defined in the data type. When
* the type is a StringDataType, the ordering is the one defined in the String class.
*/
implicit def ordering: Ordering[Any]

/**
* Performs any needed mapping to the value before searching it in the quantiles sequence
*
* @return
*/
def mapValue(value: Any): Any

/**
* Transforms a value to a Double between 0 and 1
*
* 1. Checks if the value is null, if so, returns 0
*
* 2. Searches for the value in the quantiles
*
* 3. If the value is found, returns the current index divided by the length of the quantiles
*
* 4. If the value is not found, returns the relative position of the insertion point
*
* WARNING: If the same number is repeated in the quantiles, the transformation will not always
* select the same index
* @param value
* the value to convert
* @return
* the number between 0 and 1
*/
override def transform(value: Any): Double = {

// If the value is null, we return 0
if (value == null) return 0d
// Otherwise, we search for the value in the quantiles
quantiles.search(mapValue(value)) match {
// First case when the index is found
case Found(foundIndex) => foundIndex.toDouble / (quantiles.length - 1)
// When the index is not found, we return the relative position of the insertion point
case InsertionPoint(insertionPoint) =>
if (insertionPoint == 0) 0d
else if (insertionPoint == quantiles.length) 1d
else (insertionPoint - 1).toDouble / (quantiles.length - 1)
}
}

/**
* This method should determine if the new data will cause the creation of a new revision.
*
* The current CDFQuantilesTransformation is superseded by another if
* - the new transformation is a CDFQuantilesTransformation
* - the ordering of the new transformation is the same as the current one
* - the quantiles of the new transformation are non-empty
* - the quantiles of the new transformation are different from the current one
*
* @param newTransformation
* the new transformation created with statistics over the new data
* @return
* true if the domain of the newTransformation is not fully contained in this one.
*/
override def isSupersededBy(newTransformation: Transformation): Boolean =
newTransformation match {
case newT: CDFQuantilesTransformation =>
// Is superseded by other CDFQuantilesTransformations with different and non-empty quantiles
this.ordering == newT.ordering && newT.quantiles.nonEmpty && this.quantiles != newT.quantiles
case _ =>
// Not superseded by other transformations: Empty, Linear...
false
}

/**
* Merges two transformations. The domain of the resulting transformation is the union of this
*
* In the case of CDFQuantilesTransformation, the merge would automatically select the new
* transformation if it's of the same type
*
* @param other
* the other transformation
* @return
* a new Transformation that contains both this and other.
*/
override def merge(other: Transformation): Transformation = other match {
case otherT: CDFQuantilesTransformation => otherT
case _ => this
}

}
Loading

0 comments on commit c70670f

Please sign in to comment.