Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Base feature #1

Merged
merged 16 commits into from
Dec 13, 2021
Merged
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#
# Copyright 2021 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# top-most EditorConfig file
dk1844 marked this conversation as resolved.
Show resolved Hide resolved
root = true

[*]
charset = utf-8
end_of_line = lf
trim_trailing_whitespace = true

[*.xml]
indent_size = 4
indent_style = space
insert_final_newline = true

[*.{java,scala,js,json,css}]
indent_size = 2
indent_style = space
insert_final_newline = true
max_line_length = 120

[*.md]
trim_trailing_whitespace = false
41 changes: 41 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#
# Copyright 2021 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

name: Build

on:
push:
branches: [ main, develop, master ]
pull_request:
branches: [ master, develop ]
types: [ assigned, opened, synchronize, reopened, labeled ]

jobs:
test-sbt:
runs-on: ubuntu-latest
strategy:
fail-fast: false
name: SBT Test
steps:
- name: Checkout code
uses: actions/checkout@v2
- uses: coursier/cache-action@v5
- name: Setup Scala
uses: olafurpg/setup-scala@v10
with:
java-version: "adopt@1.8"
- name: Build and run tests
run: sbt test
41 changes: 41 additions & 0 deletions .github/workflows/licence_check.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#
# Copyright 2021 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

name: License Check

on:
push:
branches: [ main, develop, master ]
pull_request:
branches: [ master ]
types: [ assigned, opened, synchronize, reopened, labeled ]

jobs:
license-test:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Setup Scala
uses: olafurpg/setup-scala@v10
with:
java-version: "adopt@1.8"
# note, that task "headerCheck" defaults to just "compile:headerCheck" - see https://github.com/sbt/sbt-header/issues/14
- name: SBT src licence header check
run: sbt Compile/headerCheck
- name: SBT test licence header check
run: sbt Test/headerCheck

1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
41 changes: 41 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.00
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


ThisBuild / name := "standardization"
ThisBuild / organization := "za.co.absa"
ThisBuild / version := "0.0.1-SNAPSHOT"
ThisBuild / scalaVersion := "2.11.12"

libraryDependencies ++= List(
"org.apache.spark" %% "spark-core" % "2.4.7" % "provided",
"org.apache.spark" %% "spark-sql" % "2.4.7" % "provided",
"za.co.absa" %% "spark-hats" % "0.2.2",
"za.co.absa" %% "spark-hofs" % "0.4.0",
"org.scalatest" %% "scalatest" % "3.2.2" % Test,
"com.typesafe" % "config" % "1.4.1"
)

Test / parallelExecution := false

// licenceHeader check:

ThisBuild / organizationName := "ABSA Group Limited"
ThisBuild / startYear := Some(2021)
ThisBuild / licenses += "Apache-2.0" -> url("https://www.apache.org/licenses/LICENSE-2.0.txt")

// linting
Global / excludeLintKeys += ThisBuild / name // will be used in publish, todo #3 - confirm if lint ignore is still needed
1 change: 1 addition & 0 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.5.5
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.6.0")
31 changes: 31 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2021 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Configuration added here is considered the application default and it will be used
# for keys that are not specified in the provided 'application.conf' or system properties.
# Here is the precedence of configuration (top ones have higher precedence):
# 1. System Properties (e.g. passed as '-Dkey=value')
# 2. application.conf (e.g. provided as '-Dconfig.file=...')
# 3. reference.conf

# 'enceladus_record_id' with an id can be added containing either true UUID, always the same IDs (row-hash-based) or the
# column will not be added at all. Allowed values: "uuid", "stableHashId", "none"
standardization.recordId.generation.strategy="uuid"

# system-wide time zone
timezone="UTC"

standardization.testUtils.sparkTestBaseMaster="local[4]"

standardization.failOnInputNotPerSchema=false
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.standardization

import org.apache.spark.sql.{Column, Dataset, Row, SparkSession}
import org.apache.spark.sql.api.java.UDF1
import org.apache.spark.sql.functions.{callUDF, col, struct}
import org.apache.spark.sql.types.{ArrayType, DataType, StructType}
import org.slf4j.LoggerFactory
import za.co.absa.standardization.schema.SchemaUtils

object ArrayTransformations {
private val logger = LoggerFactory.getLogger(this.getClass)
def flattenArrays(df: Dataset[Row], colName: String)(implicit spark: SparkSession): Dataset[Row] = {
val typ = SchemaUtils.getFieldType(colName, df.schema).getOrElse(throw new Error(s"Field $colName does not exist in ${df.schema.printTreeString()}"))
if (!typ.isInstanceOf[ArrayType]) {
logger.info(s"Field $colName is not an ArrayType, returning the original dataset!")
df
} else {
val arrType = typ.asInstanceOf[ArrayType]
if (!arrType.elementType.isInstanceOf[ArrayType]) {
logger.info(s"Field $colName is not a nested array, returning the original dataset!")
df
} else {
val udfName = colName.replace('.', '_') + System.currentTimeMillis()

spark.udf.register(udfName, new UDF1[Seq[Seq[Row]], Seq[Row]] {
def call(t1: Seq[Seq[Row]]): Seq[Row] = if (t1 == null) null.asInstanceOf[Seq[Row]] else t1.filter(_ != null).flatten // scalastyle:ignore null
}, arrType.elementType)

nestedWithColumn(df)(colName, callUDF(udfName, col(colName)))
}
}

}

def nestedWithColumn(ds: Dataset[Row])(columnName: String, column: Column): Dataset[Row] = {
val toks = columnName.split("\\.").toList

def helper(tokens: List[String], pathAcc: Seq[String]): Column = {
val currPath = (pathAcc :+ tokens.head).mkString(".")
val topType = SchemaUtils.getFieldType(currPath, ds.schema)

// got a match
if (currPath == columnName) {
column as tokens.head
} // some other attribute
else if (!columnName.startsWith(currPath)) {
arrCol(currPath)
} // partial match, keep going
else if (topType.isEmpty) {
struct(helper(tokens.tail, pathAcc ++ List(tokens.head))) as tokens.head
} else {
topType.get match {
case s: StructType =>
val cols = s.fields.map(_.name)
val fields = if (tokens.size > 1 && !cols.contains(tokens(1))) {
cols :+ tokens(1)
} else {
cols
}
struct(fields.map(field => helper((List(field) ++ tokens.tail).distinct, pathAcc :+ tokens.head) as field): _*) as tokens.head
case _: ArrayType => throw new IllegalStateException("Cannot reconstruct array columns. Please use this within arrayTransform.")
case _: DataType => arrCol(currPath) as tokens.head
}
}
}

ds.withColumn(toks.head, helper(toks, Seq()))
}

def arrCol(any: String): Column = {
val toks = any.replaceAll("\\[(\\d+)\\]", "\\.$1").split("\\.")
toks.tail.foldLeft(col(toks.head)){
case (acc, tok) =>
if (tok.matches("\\d+")) {
acc(tok.toInt)
} else {
acc(tok)
}
}
}
}
Loading