diff --git a/.gitignore b/.gitignore index 249e7cd1..2c7572e1 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ build tmp *.dmg *.exe +*.jfr app/out app/src/test/resources/sample/parquet diff --git a/Dockerfile b/Dockerfile index 0291394f..a6ac722e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ ARG SPARK_VERSION=3.5.1 -FROM cloudnativek8s/spark:3.5.1-b1.0.18 +FROM cloudnativek8s/spark:3.5.1-b1.0.19 USER root RUN mkdir -p /opt/app /opt/DataCaterer/connection /opt/DataCaterer/plan /opt/DataCaterer/execution /opt/DataCaterer/report diff --git a/README.md b/README.md index 48c34a27..9e99e408 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,7 @@ and deep dive into issues [from the generated report](https://data.catering/samp 3. [Linux download](https://nightly.link/data-catering/data-caterer/workflows/build/main/data-caterer-linux.zip) 4. Docker ```shell - docker run -d -i -p 9898:9898 -e DEPLOY_MODE=standalone --name datacaterer datacatering/data-caterer:0.10.5 + docker run -d -i -p 9898:9898 -e DEPLOY_MODE=standalone --name datacaterer datacatering/data-caterer-basic:0.10.6 ``` [Open localhost:9898](http://localhost:9898). @@ -66,36 +66,38 @@ cd data-caterer-example && ./run.sh Data Caterer supports the below data sources. Additional data sources can be added on a demand basis. [Check here for the full roadmap](#roadmap). -| Data Source Type | Data Source | Support | Free | -|------------------|---------------------|---------|------| -| Cloud Storage | AWS S3 | ✅ | ✅ | -| Cloud Storage | GCP Cloud Storage | ✅ | ✅ | -| Cloud Storage | Azure Blob Storage | ✅ | ✅ | -| Database | Postgres | ✅ | ✅ | -| Database | MySQL | ✅ | ✅ | -| Database | Cassandra | ✅ | ✅ | -| Database | MongoDB | ❌ | ✅ | -| Database | Elasticsearch | ❌ | ✅ | -| File | CSV | ✅ | ✅ | -| File | JSON | ✅ | ✅ | -| File | ORC | ✅ | ✅ | -| File | Parquet | ✅ | ✅ | -| File | Hudi | ❌ | ✅ | -| File | Iceberg | ❌ | ✅ | -| File | Delta Lake | ❌ | ✅ | -| HTTP | REST API | ✅ | ❌ | -| Messaging | Kafka | ✅ | ❌ | -| Messaging | Solace | ✅ | ❌ | -| Messaging | Pulsar | ❌ | ❌ | -| Messaging | RabbitMQ | ❌ | ❌ | -| Messaging | ActiveMQ | ❌ | ❌ | -| Metadata | Marquez | ✅ | ❌ | -| Metadata | OpenMetadata | ✅ | ❌ | -| Metadata | OpenAPI/Swagger | ✅ | ❌ | -| Metadata | Great Expectations | ✅ | ❌ | -| Metadata | Amundsen | ❌ | ❌ | -| Metadata | Datahub | ❌ | ❌ | -| Metadata | Solace Event Portal | ❌ | ❌ | +| Data Source Type | Data Source | Support | Free | +|------------------|------------------------------------|---------|------| +| Cloud Storage | AWS S3 | ✅ | ✅ | +| Cloud Storage | Azure Blob Storage | ✅ | ✅ | +| Cloud Storage | GCP Cloud Storage | ✅ | ✅ | +| Database | Cassandra | ✅ | ✅ | +| Database | MySQL | ✅ | ✅ | +| Database | Postgres | ✅ | ✅ | +| Database | Elasticsearch | ❌ | ✅ | +| Database | MongoDB | ❌ | ✅ | +| File | CSV | ✅ | ✅ | +| File | JSON | ✅ | ✅ | +| File | Iceberg | ✅ | ✅ | +| File | ORC | ✅ | ✅ | +| File | Parquet | ✅ | ✅ | +| File | Delta Lake | ❌ | ✅ | +| File | Hudi | ❌ | ✅ | +| HTTP | REST API | ✅ | ❌ | +| Messaging | Kafka | ✅ | ❌ | +| Messaging | Solace | ✅ | ❌ | +| Messaging | ActiveMQ | ❌ | ❌ | +| Messaging | Pulsar | ❌ | ❌ | +| Messaging | RabbitMQ | ❌ | ❌ | +| Metadata | Great Expectations | ✅ | ❌ | +| Metadata | Marquez | ✅ | ❌ | +| Metadata | OpenAPI/Swagger | ✅ | ❌ | +| Metadata | OpenMetadata | ✅ | ❌ | +| Metadata | Amundsen | ❌ | ❌ | +| Metadata | Datahub | ❌ | ❌ | +| Metadata | Data Contract CLI | ❌ | ❌ | +| Metadata | ODCS (Open Data Contract Standard) | ❌ | ❌ | +| Metadata | Solace Event Portal | ❌ | ❌ | ## Supported use cases diff --git a/api/src/main/java/io/github/datacatering/datacaterer/javaapi/api/PlanRun.java b/api/src/main/java/io/github/datacatering/datacaterer/javaapi/api/PlanRun.java index 71df86b3..5dc4cb49 100644 --- a/api/src/main/java/io/github/datacatering/datacaterer/javaapi/api/PlanRun.java +++ b/api/src/main/java/io/github/datacatering/datacaterer/javaapi/api/PlanRun.java @@ -161,6 +161,21 @@ public FileBuilder parquet(String name, String path) { return parquet(name, path, Collections.emptyMap()); } + public FileBuilder iceberg(String name, String path, String tableName) { + return basePlanRun.icebergJava(name, path, tableName); + } + + public FileBuilder iceberg( + String name, + String path, + String tableName, + String catalogType, + String catalogUri, + Map options + ) { + return basePlanRun.iceberg(name, path, tableName, catalogType, catalogUri, toScalaMap(options)); + } + public PostgresBuilder postgres( String name, String url, diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/DataCatererConfigurationBuilder.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/DataCatererConfigurationBuilder.scala index bb254eca..6f5bdeaf 100644 --- a/api/src/main/scala/io/github/datacatering/datacaterer/api/DataCatererConfigurationBuilder.scala +++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/DataCatererConfigurationBuilder.scala @@ -86,11 +86,11 @@ case class DataCatererConfigurationBuilder(build: DataCatererConfiguration = Dat def delta(name: String, path: String, options: java.util.Map[String, String]): DataCatererConfigurationBuilder = delta(name, path, toScalaMap(options)) - def iceberg(name: String, path: String = "", options: Map[String, String] = Map()): DataCatererConfigurationBuilder = - addConnectionConfig(name, ICEBERG, path, options) + def iceberg(name: String, path: String, tableName: String, options: Map[String, String] = Map()): DataCatererConfigurationBuilder = + addConnectionConfig(name, ICEBERG, path, options ++ Map(TABLE -> tableName)) - def iceberg(name: String, path: String, options: java.util.Map[String, String]): DataCatererConfigurationBuilder = - iceberg(name, path, toScalaMap(options)) + def iceberg(name: String, path: String, tableName: String, options: java.util.Map[String, String]): DataCatererConfigurationBuilder = + iceberg(name, path, tableName, toScalaMap(options)) def postgres( name: String, @@ -382,11 +382,15 @@ final case class ConnectionConfigWithTaskBuilder( case HUDI => options.get(HUDI_TABLE_NAME) match { case Some(value) => configBuilder.hudi(name, path, value, options) - case None => throw new IllegalArgumentException(s"Missing $HUDI_TABLE_NAME from options: $options") + case None => throw new IllegalArgumentException(s"Missing $HUDI_TABLE_NAME from options: $options, connection-name=$name") } case DELTA => configBuilder.delta(name, path, options) - case ICEBERG => configBuilder.iceberg(name, path, options) - case _ => throw new IllegalArgumentException(s"Unsupported file format: $format") + case ICEBERG => + options.get(TABLE) match { + case Some(value) => configBuilder.iceberg(name, path, value, options) + case None => throw new IllegalArgumentException(s"Missing $TABLE from options: $options, connection-name=$name") + } + case _ => throw new IllegalArgumentException(s"Unsupported file format: $format, connection-name=$name") } setConnectionConfig(name, fileConnectionConfig, FileBuilder()) } diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/MetadataSourceBuilder.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/MetadataSourceBuilder.scala index 2ba27946..5b27b8fa 100644 --- a/api/src/main/scala/io/github/datacatering/datacaterer/api/MetadataSourceBuilder.scala +++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/MetadataSourceBuilder.scala @@ -1,7 +1,7 @@ package io.github.datacatering.datacaterer.api import io.github.datacatering.datacaterer.api.converter.Converters.toScalaMap -import io.github.datacatering.datacaterer.api.model.Constants.{DATA_CONTRACT_FILE, EXPECTATIONS_FILE, METADATA_SOURCE_URL, OPEN_LINEAGE_DATASET, OPEN_LINEAGE_NAMESPACE, OPEN_METADATA_API_VERSION, OPEN_METADATA_AUTH_TYPE, OPEN_METADATA_AUTH_TYPE_OPEN_METADATA, OPEN_METADATA_DEFAULT_API_VERSION, OPEN_METADATA_HOST, OPEN_METADATA_JWT_TOKEN, SCHEMA_LOCATION} +import io.github.datacatering.datacaterer.api.model.Constants.{DATA_CONTRACT_FILE, GREAT_EXPECTATIONS_FILE, METADATA_SOURCE_URL, OPEN_LINEAGE_DATASET, OPEN_LINEAGE_NAMESPACE, OPEN_METADATA_API_VERSION, OPEN_METADATA_AUTH_TYPE, OPEN_METADATA_AUTH_TYPE_OPEN_METADATA, OPEN_METADATA_DEFAULT_API_VERSION, OPEN_METADATA_HOST, OPEN_METADATA_JWT_TOKEN, SCHEMA_LOCATION} import com.softwaremill.quicklens.ModifyPimp import io.github.datacatering.datacaterer.api.model.{GreatExpectationsSource, MarquezMetadataSource, MetadataSource, OpenAPISource, OpenDataContractStandardSource, OpenMetadataSource} @@ -73,7 +73,7 @@ case class MetadataSourceBuilder(metadataSource: MetadataSource = MarquezMetadat } def greatExpectations(expectationsFile: String): MetadataSourceBuilder = { - this.modify(_.metadataSource).setTo(GreatExpectationsSource(Map(EXPECTATIONS_FILE -> expectationsFile))) + this.modify(_.metadataSource).setTo(GreatExpectationsSource(Map(GREAT_EXPECTATIONS_FILE -> expectationsFile))) } def openDataContractStandard(dataContractFile: String): MetadataSourceBuilder = { diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/PlanRun.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/PlanRun.scala index 4bfdfc6c..d149f0bf 100644 --- a/api/src/main/scala/io/github/datacatering/datacaterer/api/PlanRun.scala +++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/PlanRun.scala @@ -104,10 +104,10 @@ trait PlanRun { /** * Create new HUDI generation step with configurations * - * @param name Data source name - * @param path File path to generated HUDI + * @param name Data source name + * @param path File path to generated HUDI * @param tableName Table name to be used for HUDI generation - * @param options Additional options for HUDI generation + * @param options Additional options for HUDI generation * @return FileBuilder */ def hudi(name: String, path: String, tableName: String, options: Map[String, String] = Map()): FileBuilder = @@ -127,13 +127,40 @@ trait PlanRun { /** * Create new ICEBERG generation step with configurations * - * @param name Data source name - * @param path File path to generated ICEBERG - * @param options Additional options for ICEBERG generation + * @param name Data source name + * @param tableName Table name for generated ICEBERG + * @param path Warehouse path to generated ICEBERG + * @param catalogType Type of catalog for generated ICEBERG + * @param catalogUri Uri of catalog for generated ICEBERG + * @param options Additional options for ICEBERG generation + * @return FileBuilder + */ + def iceberg( + name: String, + tableName: String, + path: String = "", + catalogType: String = DEFAULT_ICEBERG_CATALOG_TYPE, + catalogUri: String = "", + options: Map[String, String] = Map() + ): FileBuilder = { + ConnectionConfigWithTaskBuilder().file(name, ICEBERG, path, options ++ Map( + TABLE -> tableName, + SPARK_ICEBERG_CATALOG_WAREHOUSE -> path, + SPARK_ICEBERG_CATALOG_TYPE -> catalogType, + SPARK_ICEBERG_CATALOG_URI -> catalogUri, + )) + } + + /** + * Create new ICEBERG generation step with only warehouse path and table name. Uses hadoop as the catalog type. + * + * @param name Data source name + * @param path Warehouse path to generated ICEBERG + * @param tableName Table name for generated ICEBERG * @return FileBuilder */ - def iceberg(name: String, path: String, options: Map[String, String] = Map()): FileBuilder = - ConnectionConfigWithTaskBuilder().file(name, ICEBERG, path, options) + def icebergJava(name: String, path: String, tableName: String): FileBuilder = + iceberg(name, path, tableName) /** * Create new POSTGRES generation step with connection configuration diff --git a/api/src/main/scala/io/github/datacatering/datacaterer/api/model/Constants.scala b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/Constants.scala index e70be4f4..30341b44 100644 --- a/api/src/main/scala/io/github/datacatering/datacaterer/api/model/Constants.scala +++ b/api/src/main/scala/io/github/datacatering/datacaterer/api/model/Constants.scala @@ -47,16 +47,19 @@ object Constants { lazy val PARTITIONS = "partitions" lazy val PARTITION_BY = "partitionBy" lazy val BODY_FIELD = "bodyField" - lazy val JMS_DESTINATION_NAME = "destinationName" lazy val KAFKA_TOPIC = "topic" + lazy val JMS_DESTINATION_NAME = "destinationName" lazy val JMS_INITIAL_CONTEXT_FACTORY = "initialContextFactory" lazy val JMS_CONNECTION_FACTORY = "connectionFactory" lazy val JMS_VPN_NAME = "vpnName" lazy val SCHEMA_LOCATION = "schemaLocation" - lazy val EXPECTATIONS_FILE = "expectationsFile" + lazy val GREAT_EXPECTATIONS_FILE = "expectationsFile" lazy val DATA_CONTRACT_FILE = "dataContractFile" lazy val ROWS_PER_SECOND = "rowsPerSecond" lazy val HUDI_TABLE_NAME = "hoodie.table.name" + lazy val ICEBERG_CATALOG_TYPE = "catalogType" + lazy val ICEBERG_CATALOG_URI = "catalogUri" + lazy val ICEBERG_CATALOG_DEFAULT_NAMESPACE = "catalogDefaultNamespace" //field metadata lazy val FIELD_DATA_TYPE = "type" @@ -207,6 +210,8 @@ object Constants { "spark.sql.shuffle.partitions" -> "10", "spark.sql.catalog.postgres" -> "", "spark.sql.catalog.cassandra" -> "com.datastax.spark.connector.datasource.CassandraCatalog", + "spark.sql.catalog.iceberg" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.iceberg.type" -> "hadoop", // "spark.serializer" -> "org.apache.spark.serializer.KryoSerializer", // "spark.sql.catalog.hudi" -> "org.apache.spark.sql.hudi.catalog.HoodieCatalog", // "spark.kryo.registrator" -> "org.apache.spark.HoodieSparkKryoRegistrar", @@ -215,6 +220,7 @@ object Constants { "spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled" -> "true", "spark.hadoop.fs.hdfs.impl" -> "org.apache.hadoop.hdfs.DistributedFileSystem", "spark.hadoop.fs.file.impl" -> "com.globalmentor.apache.hadoop.fs.BareLocalFileSystem", + "spark.sql.extensions" -> "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" ) //jdbc defaults @@ -248,6 +254,9 @@ object Constants { lazy val HTTP_QUERY_PARAMETER = "query" lazy val HTTP_HEADER_PARAMETER = "header" + //iceberg defaults + lazy val DEFAULT_ICEBERG_CATALOG_TYPE = ICEBERG_CATALOG_HADOOP + //foreign key defaults lazy val DEFAULT_FOREIGN_KEY_COLUMN = "default_column" lazy val FOREIGN_KEY_DELIMITER = "||" @@ -358,6 +367,18 @@ object Constants { lazy val OPEN_METADATA_TABLE_FQN = "tableFqn" lazy val OPEN_METADATA_SERVICE = "service" + //iceberg + lazy val SPARK_ICEBERG_CATALOG_TYPE = "spark.sql.catalog.iceberg.type" + lazy val SPARK_ICEBERG_CATALOG_WAREHOUSE = "spark.sql.catalog.iceberg.warehouse" + lazy val SPARK_ICEBERG_CATALOG_URI = "spark.sql.catalog.iceberg.uri" + lazy val SPARK_ICEBERG_CATALOG_DEFAULT_NAMESPACE = "spark.sql.catalog.iceberg.default-namespace" + lazy val ICEBERG_CATALOG_HIVE = "hive" + lazy val ICEBERG_CATALOG_HADOOP = "hadoop" + lazy val ICEBERG_CATALOG_REST = "rest" + lazy val ICEBERG_CATALOG_GLUE = "glue" + lazy val ICEBERG_CATALOG_JDBC = "jdbc" + lazy val ICEBERG_CATALOG_NESSIE = "nessie" + //aggregation types lazy val AGGREGATION_SUM = "sum" lazy val AGGREGATION_COUNT = "count" @@ -368,7 +389,9 @@ object Constants { //validation types lazy val VALIDATION_COLUMN = "column" + lazy val VALIDATION_FIELD = "field" lazy val VALIDATION_COLUMN_NAMES = "columnNames" + lazy val VALIDATION_FIELD_NAMES = "fieldNames" lazy val VALIDATION_UPSTREAM = "upstream" lazy val VALIDATION_GROUP_BY = "groupBy" //validation support @@ -427,7 +450,7 @@ object Constants { lazy val VALIDATION_COLUMN_NAMES_MATCH_SET = "matchSet" lazy val VALIDATION_OPTION_DELIMITER = "," - lazy val VALIDATION_SUPPORTING_OPTIONS = List(VALIDATION_COLUMN, VALIDATION_MIN, VALIDATION_MAX, VALIDATION_GROUP_BY_COLUMNS, VALIDATION_DESCRIPTION, VALIDATION_ERROR_THRESHOLD) + lazy val VALIDATION_SUPPORTING_OPTIONS = List(VALIDATION_COLUMN, VALIDATION_FIELD, VALIDATION_MIN, VALIDATION_MAX, VALIDATION_GROUP_BY_COLUMNS, VALIDATION_DESCRIPTION, VALIDATION_ERROR_THRESHOLD) lazy val VALIDATION_PREFIX_JOIN_EXPRESSION = "expr:" lazy val VALIDATION_COLUMN_NAME_COUNT_EQUAL = "column_count_equal" diff --git a/api/src/test/scala/io/github/datacatering/datacaterer/api/DataCatererConfigurationBuilderTest.scala b/api/src/test/scala/io/github/datacatering/datacaterer/api/DataCatererConfigurationBuilderTest.scala index 5266dba5..856aaff1 100644 --- a/api/src/test/scala/io/github/datacatering/datacaterer/api/DataCatererConfigurationBuilderTest.scala +++ b/api/src/test/scala/io/github/datacatering/datacaterer/api/DataCatererConfigurationBuilderTest.scala @@ -17,7 +17,7 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite { assert(result.metadataConfig == MetadataConfig()) assert(result.generationConfig == GenerationConfig()) assert(result.connectionConfigByName.isEmpty) - assert(result.runtimeConfig.size == 13) + assert(result.runtimeConfig.size == 16) assert(result.master == "local[*]") } diff --git a/api/src/test/scala/io/github/datacatering/datacaterer/api/MetadataSourceBuilderTest.scala b/api/src/test/scala/io/github/datacatering/datacaterer/api/MetadataSourceBuilderTest.scala index 062f6364..da15745c 100644 --- a/api/src/test/scala/io/github/datacatering/datacaterer/api/MetadataSourceBuilderTest.scala +++ b/api/src/test/scala/io/github/datacatering/datacaterer/api/MetadataSourceBuilderTest.scala @@ -1,6 +1,6 @@ package io.github.datacatering.datacaterer.api -import io.github.datacatering.datacaterer.api.model.Constants.{DATA_CONTRACT_FILE, EXPECTATIONS_FILE, METADATA_SOURCE_URL, OPEN_LINEAGE_DATASET, OPEN_LINEAGE_NAMESPACE, OPEN_METADATA_API_VERSION, OPEN_METADATA_AUTH_TYPE, OPEN_METADATA_AUTH_TYPE_BASIC, OPEN_METADATA_AUTH_TYPE_OPEN_METADATA, OPEN_METADATA_BASIC_AUTH_PASSWORD, OPEN_METADATA_BASIC_AUTH_USERNAME, OPEN_METADATA_DEFAULT_API_VERSION, OPEN_METADATA_HOST, OPEN_METADATA_JWT_TOKEN, SCHEMA_LOCATION} +import io.github.datacatering.datacaterer.api.model.Constants.{DATA_CONTRACT_FILE, GREAT_EXPECTATIONS_FILE, METADATA_SOURCE_URL, OPEN_LINEAGE_DATASET, OPEN_LINEAGE_NAMESPACE, OPEN_METADATA_API_VERSION, OPEN_METADATA_AUTH_TYPE, OPEN_METADATA_AUTH_TYPE_BASIC, OPEN_METADATA_AUTH_TYPE_OPEN_METADATA, OPEN_METADATA_BASIC_AUTH_PASSWORD, OPEN_METADATA_BASIC_AUTH_USERNAME, OPEN_METADATA_DEFAULT_API_VERSION, OPEN_METADATA_HOST, OPEN_METADATA_JWT_TOKEN, SCHEMA_LOCATION} import io.github.datacatering.datacaterer.api.model.{GreatExpectationsSource, MarquezMetadataSource, OpenAPISource, OpenDataContractStandardSource, OpenMetadataSource} import org.junit.runner.RunWith import org.scalatest.funsuite.AnyFunSuite @@ -56,7 +56,7 @@ class MetadataSourceBuilderTest extends AnyFunSuite { val result = MetadataSourceBuilder().greatExpectations("/tmp/expectations").metadataSource assert(result.isInstanceOf[GreatExpectationsSource]) - assert(result.asInstanceOf[GreatExpectationsSource].connectionOptions == Map(EXPECTATIONS_FILE -> "/tmp/expectations")) + assert(result.asInstanceOf[GreatExpectationsSource].connectionOptions == Map(GREAT_EXPECTATIONS_FILE -> "/tmp/expectations")) } test("Can create Open Data Contract Standard metadata source") { diff --git a/app/build.gradle.kts b/app/build.gradle.kts index 4e183de7..14c79633 100644 --- a/app/build.gradle.kts +++ b/app/build.gradle.kts @@ -120,7 +120,7 @@ dependencies { // exclude(group = "org.scala-lang") // } // iceberg - basicImpl("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:1.4.3") { + basicImpl("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:1.5.2") { exclude(group = "org.scala-lang") } // delta lake diff --git a/app/src/main/resources/log4j2.properties b/app/src/main/resources/log4j2.properties index f5b4faa6..2a5a074e 100644 --- a/app/src/main/resources/log4j2.properties +++ b/app/src/main/resources/log4j2.properties @@ -64,3 +64,6 @@ logger.kafka.level=warn # Hudi logging logger.hudi.name=org.apache.hudi logger.hudi.level=error +# Iceberg logging +logger.iceberg.name=org.apache.iceberg +logger.iceberg.level=error diff --git a/app/src/main/resources/ui/configuration-data.js b/app/src/main/resources/ui/configuration-data.js index b7b98b83..a6562804 100644 --- a/app/src/main/resources/ui/configuration-data.js +++ b/app/src/main/resources/ui/configuration-data.js @@ -889,6 +889,53 @@ dataSourcePropertiesMap.set("http", { }, } }); +dataSourcePropertiesMap.set("iceberg", { + optGroupLabel: "Data Source", + Name: "Iceberg", + properties: { + catalogType: { + displayName: "Catalog Type", + default: "hadoop", + type: "text", + choice: ["hadoop", "glue", "hive", "jdbc", "nessie", "rest"], + help: "Catalog Type for Iceberg metadata.", + required: "" + }, + path: { + displayName: "Warehouse Path", + default: "/tmp/generated-data/iceberg", + type: "text", + help: "File pathway to Iceberg warehouse.", + }, + catalogUri: { + displayName: "Catalog URI", + default: "", + type: "text", + help: "Catalog URI for Iceberg metadata." + }, + table: { + displayName: "Table Name", + default: "", + type: "text", + help: "Table name for Iceberg dataset (format: [database name].[table name]).", + override: "true" + }, + partitions: { + displayName: "Num Partitions", + default: "1", + type: "number", + help: "Number of file partitions.", + override: "true" + }, + partitionBy: { + displayName: "Partition By", + default: "", + type: "text", + help: "Column name(s) to partition by (comma separated).", + override: "true" + } + } +}); dataSourcePropertiesMap.set("json", { optGroupLabel: "Data Source", Name: "JSON", diff --git a/app/src/main/resources/ui/helper-generation.js b/app/src/main/resources/ui/helper-generation.js index b0f005f4..46506927 100644 --- a/app/src/main/resources/ui/helper-generation.js +++ b/app/src/main/resources/ui/helper-generation.js @@ -1,17 +1,14 @@ import { addNewDataTypeAttribute, camelize, - createIconWithConnectionTooltip, + createAutoFromMetadata, + createAutoFromMetadataSourceContainer, createManualContainer, createNewField, - createSelect, - createToast, dispatchEvent, - findNextLevelNodesByClass, - getDataConnectionsAndAddToSelect, - wait + findNextLevelNodesByClass } from "./shared.js"; -import {dataSourcePropertiesMap, dataTypeOptionsMap} from "./configuration-data.js"; +import {dataTypeOptionsMap} from "./configuration-data.js"; export let numFields = 0; @@ -19,71 +16,6 @@ export function incFields() { numFields++; } -function addMetadataConnectionOverrideOptions(metadataConnectionSelect, overrideOptionsContainer, index) { - metadataConnectionSelect.addEventListener("change", (event) => { - let metadataSourceName = event.target.value; - fetch(`http://localhost:9898/connection/${metadataSourceName}`, {method: "GET"}) - .then(r => { - if (r.ok) { - return r.json(); - } else { - r.text().then(text => { - createToast(`Get connection ${metadataSourceName}`, `Failed to get connection ${metadataSourceName}! Error: ${err}`, "fail"); - throw new Error(text); - }); - } - }) - .then(respJson => { - if (respJson) { - //remove previous properties - overrideOptionsContainer.replaceChildren(); - let metadataSourceType = respJson.type; - let metadataSourceProperties = dataSourcePropertiesMap.get(metadataSourceType).properties; - - for (const [key, value] of Object.entries(metadataSourceProperties)) { - if (value["override"] && value["override"] === "true") { - //add properties that can be overridden - addNewDataTypeAttribute(key, value, `connection-config-${index}-${key}`, "metadata-source-property", overrideOptionsContainer); - } - } - } - }); - }); -} - -// allow users to get schema information from a metadata source such as openmetadata or marquez -export async function createAutoSchema(index) { - let baseContainer = document.createElement("div"); - baseContainer.setAttribute("id", `data-source-auto-schema-container-${index}`); - baseContainer.setAttribute("class", "card card-body data-source-auto-schema-container"); - baseContainer.style.display = "inherit"; - let autoSchemaHeader = document.createElement("h5"); - autoSchemaHeader.innerText = "Auto Schema"; - let baseTaskDiv = document.createElement("div"); - baseTaskDiv.setAttribute("class", "row m-2 g-2 align-items-center"); - - let metadataConnectionSelect = createSelect(`metadata-connection-${index}`, "Metadata source", "selectpicker form-control input-field metadata-connection-name", "Select metadata source..."); - let dataConnectionCol = document.createElement("div"); - dataConnectionCol.setAttribute("class", "col"); - dataConnectionCol.append(metadataConnectionSelect); - - let iconDiv = createIconWithConnectionTooltip(metadataConnectionSelect); - let iconCol = document.createElement("div"); - iconCol.setAttribute("class", "col-md-auto"); - iconCol.append(iconDiv); - baseTaskDiv.append(dataConnectionCol, iconCol); - - // get connection list, filter only metadata sources - let metadataConnection = await getDataConnectionsAndAddToSelect(metadataConnectionSelect, baseTaskDiv, "metadata"); - // provide opportunity to override non-connection options for metadata source (i.e. namespace, dataset) - let overrideOptionsContainer = document.createElement("div"); - // when the metadata source connection is selected, populate override options container with options with "override: true" - addMetadataConnectionOverrideOptions(metadataConnectionSelect, overrideOptionsContainer, index); - - baseContainer.append(autoSchemaHeader, metadataConnection, overrideOptionsContainer); - return baseContainer; -} - async function createGenerationFields(dataSourceFields, manualSchema) { let allCollapsedAccordionButton = $(document).find(".accordion-button.collapsed"); allCollapsedAccordionButton.click(); @@ -117,15 +49,6 @@ async function createGenerationFields(dataSourceFields, manualSchema) { collapseShow.click(); } -async function createAutoGenerationSchema(autoFromMetadataSchema, dataSource) { - let updatedMetadataSource = $(autoFromMetadataSchema).find(".metadata-connection-name").selectpicker("val", dataSource.fields.optMetadataSource.name); - dispatchEvent(updatedMetadataSource, "change"); - // takes some time to get the override options - await wait(100); - for (let [key, value] of Object.entries(dataSource.fields.optMetadataSource.overrideOptions)) { - $(autoFromMetadataSchema).find(`input[aria-label="${key}"]`).val(value); - } -} export async function createGenerationElements(dataSource, newDataSource, numDataSources) { let dataSourceGenContainer = $(newDataSource).find("#data-source-generation-config-container"); @@ -133,10 +56,10 @@ export async function createGenerationElements(dataSource, newDataSource, numDat // check if there is auto schema from metadata source defined if (dataSource.fields && dataSource.fields.optMetadataSource) { $(dataSourceGenContainer).find("[id^=auto-from-metadata-source-generation-checkbox]").prop("checked", true); - let autoFromMetadataSchema = await createAutoSchema(numDataSources); + let autoFromMetadataSchema = await createAutoFromMetadataSourceContainer(numDataSources); $(dataSourceGenContainer).find(".manual").after(autoFromMetadataSchema); - await createAutoGenerationSchema(autoFromMetadataSchema, dataSource); + await createAutoFromMetadata(autoFromMetadataSchema, dataSource); } // check if there is manual schema defined if (dataSource.fields && dataSource.fields.optFields && dataSource.fields.optFields.length > 0) { @@ -190,7 +113,7 @@ export function getGeneration(dataSource, currentDataSource) { } if (isAutoFromMetadataChecked) { - let dataSourceAutoSchemaContainer = $(dataSource).find("[class~=data-source-auto-schema-container]")[0]; + let dataSourceAutoSchemaContainer = $(dataSource).find("[class~=data-source-auto-from-metadata-container]")[0]; let metadataConnectionName = $(dataSourceAutoSchemaContainer).find("select[class~=metadata-connection-name]").val(); let metadataConnectionOptions = $(dataSourceAutoSchemaContainer).find("input[class~=metadata-source-property]").toArray() .reduce(function (map, option) { diff --git a/app/src/main/resources/ui/helper-validation.js b/app/src/main/resources/ui/helper-validation.js index 4077da98..ebf3e045 100644 --- a/app/src/main/resources/ui/helper-validation.js +++ b/app/src/main/resources/ui/helper-validation.js @@ -10,6 +10,8 @@ import { addItemsToAttributeMenu, addNewDataTypeAttribute, camelize, + createAutoFromMetadata, + createAutoFromMetadataSourceContainer, createButtonWithMenu, createCloseButton, createManualContainer, @@ -58,16 +60,22 @@ function createNewValidateAttribute(optKey, validationType, optVal, mainContaine } else if (optKey.startsWith("equalOr")) { baseKey = optKey.charAt(7).toLowerCase() + optKey.slice(8); } - let baseOptions = Object.create(validationTypeOptionsMap.get(validationType)[baseKey]); + // if it is 'notEqual' or `equalOrLessThan`, need to ensure checkbox is checked - if (baseKey !== optKey) baseOptions.group.checked = "true"; - baseOptions["default"] = optVal; - addNewDataTypeAttribute(baseKey, baseOptions, `data-validation-container-${numValidations}-${optKey}`, "data-validation-field", mainContainer); + if (optKey === "notNull") { + let baseOptions = Object.create(validationTypeOptionsMap.get(validationType)[optKey]); + addNewDataTypeAttribute(optKey, baseOptions, `data-validation-container-${numValidations}-${optKey}`, "data-validation-field", mainContainer); + } else { + let baseOptions = Object.create(validationTypeOptionsMap.get(validationType)[baseKey]); + if (baseKey !== optKey) baseOptions.group.checked = "true"; + baseOptions["default"] = optVal; + addNewDataTypeAttribute(baseKey, baseOptions, `data-validation-container-${numValidations}-${optKey}`, "data-validation-field", mainContainer); + } document.getElementById(`data-validation-container-${numValidations}-${optKey}`).dispatchEvent(new Event("input")); } async function createValidationsFromDataSource(dataSource, manualValidation) { - for (const validation of dataSource.validations) { + for (const validation of dataSource.optValidations) { numValidations += 1; let newValidation = await createNewField(numValidations, "validation"); $(manualValidation).children(".accordion").append(newValidation); @@ -76,13 +84,13 @@ async function createValidationsFromDataSource(dataSource, manualValidation) { let validationOpts = validation.options; let mainContainer = $(newValidation).find("[id^=data-validation-container]")[0]; - if (validation.type === "column" && validationOpts.column) { - let updatedValidationCol = $(newValidation).find("[aria-label=Field]").val(validationOpts.column) + if (validation.type === "column" && validationOpts.field) { + let updatedValidationCol = $(newValidation).find("[aria-label=Field]").val(validationOpts.field); dispatchEvent(updatedValidationCol, "input"); } else if (validation.type === "groupBy" && validationOpts.groupByColumns) { createGroupByValidationFromPlan(newValidation, validationOpts, validation); } else if (validation.type === "upstream" && validationOpts.upstreamTaskName) { - let updatedUpstreamTaskName = $(newValidation).find("[aria-label=UpstreamTaskName]").val(validationOpts.upstreamTaskName) + let updatedUpstreamTaskName = $(newValidation).find("[aria-label=UpstreamTaskName]").val(validationOpts.upstreamTaskName); dispatchEvent(updatedUpstreamTaskName, "input"); // can be nested validations @@ -94,21 +102,31 @@ async function createValidationsFromDataSource(dataSource, manualValidation) { //otherwise it is column name validation which doesn't have any default options for (const [optKey, optVal] of Object.entries(validationOpts)) { - if (optKey !== "groupByColumns" && optKey !== "column" && optKey !== "upstreamTaskName") { + if (optKey !== "groupByColumns" && optKey !== "column" && optKey !== "field" && optKey !== "upstreamTaskName") { createNewValidateAttribute(optKey, validation.type, optVal, mainContainer); } } } } -export async function createValidationFromPlan(dataSource, newDataSource) { - if (dataSource.validations && dataSource.validations.length > 0) { +export async function createValidationFromPlan(dataSource, newDataSource, numDataSources) { + let dataSourceValidationContainer = $(newDataSource).find("#data-source-validation-config-container"); + + if (dataSource.validations && dataSource.validations.optMetadataSource) { + $(dataSourceValidationContainer).find("[id^=auto-from-metadata-source-validation-checkbox]").prop("checked", true); + let autoFromMetadataSchema = await createAutoFromMetadataSourceContainer(numDataSources); + $(dataSourceValidationContainer).find(".manual").after(autoFromMetadataSchema); + + await createAutoFromMetadata(autoFromMetadataSchema, dataSource); + } + + if (dataSource.validations && dataSource.validations.optValidations && dataSource.validations.optValidations.length > 0) { let manualValidation = createManualContainer(numValidations, "validation"); let dataSourceGenContainer = $(newDataSource).find("#data-source-validation-config-container"); dataSourceGenContainer.append(manualValidation); $(dataSourceGenContainer).find("[id^=manual-validation-checkbox]").prop("checked", true); - await createValidationsFromDataSource(dataSource, manualValidation); + await createValidationsFromDataSource(dataSource.validations, manualValidation); } } @@ -189,11 +207,37 @@ function getValidationsFromContainer(dataSourceValidations, visitedValidations) } export function getValidations(dataSource, currentDataSource) { - // get top level validation container - let dataSourceValidations = dataSource.querySelector(".data-source-validation-container"); - let visitedValidations = new Set(); - let dataValidationsWithAttributes = getValidationsFromContainer(dataSourceValidations, visitedValidations); - currentDataSource["validations"] = Object.values(dataValidationsWithAttributes); + let dataValidationInfo = {}; + // check which checkboxes are enabled: auto, auto with external, manual + let isAutoChecked = $(dataSource).find("[id^=auto-validation-checkbox]").is(":checked"); + let isAutoFromMetadataChecked = $(dataSource).find("[id^=auto-from-metadata-source-validation-checkbox]").is(":checked"); + let isManualChecked = $(dataSource).find("[id^=manual-validation-checkbox]").is(":checked"); + + if (isAutoChecked) { + // need to enable data generation within data source options + currentDataSource["options"] = {enableDataValidation: "true"}; + } else if (isAutoFromMetadataChecked) { + let dataSourceAutoSchemaContainer = $(dataSource).find("[class~=data-source-auto-from-metadata-container]")[0]; + let metadataConnectionName = $(dataSourceAutoSchemaContainer).find("select[class~=metadata-connection-name]").val(); + let metadataConnectionOptions = $(dataSourceAutoSchemaContainer).find("input[class~=metadata-source-property]").toArray() + .reduce(function (map, option) { + if (option.value !== "") { + map[option.getAttribute("aria-label")] = option.value; + } + return map; + }, {}); + dataValidationInfo["optMetadataSource"] = { + name: metadataConnectionName, + overrideOptions: metadataConnectionOptions + }; + } else if (isManualChecked) { + // get top level validation container + let dataSourceValidations = $(dataSource).find("[id^=data-source-validation-container]")[0]; + let visitedValidations = new Set(); + let dataValidationsWithAttributes = getValidationsFromContainer(dataSourceValidations, visitedValidations); + dataValidationInfo["optValidations"] = Object.values(dataValidationsWithAttributes); + } + currentDataSource["validations"] = dataValidationInfo; } export function addColumnValidationBlock(newAttributeRow, mainContainer, attributeContainerId, inputClass) { diff --git a/app/src/main/resources/ui/index.js b/app/src/main/resources/ui/index.js index eb1e4601..851e6b0c 100644 --- a/app/src/main/resources/ui/index.js +++ b/app/src/main/resources/ui/index.js @@ -14,6 +14,7 @@ import { createAccordionItem, + createAutoFromMetadataSourceContainer, createCloseButton, createFieldValidationCheck, createFormFloating, @@ -36,7 +37,7 @@ import { createNewConfigRow, getConfiguration } from "./helper-configuration.js"; -import {createAutoSchema, createGenerationElements, getGeneration} from "./helper-generation.js"; +import {createGenerationElements, getGeneration} from "./helper-generation.js"; import {createValidationFromPlan, getValidations} from "./helper-validation.js"; import {createCountElementsFromPlan, createRecordCount, getRecordCount} from "./helper-record-count.js"; import {configurationOptionsMap, reportConfigKeys} from "./configuration-data.js"; @@ -167,8 +168,8 @@ async function checkboxListenerDisplay(index, event, configContainer, name, auto querySelector = `#${details["containerName"]}-${index}`; newElement = createManualContainer(index, name); } else if (autoOrManualValue === "auto-from-metadata-source") { - querySelector = `#data-source-auto-schema-container-${index}`; - newElement = await createAutoSchema(index); + querySelector = `#data-source-auto-from-metadata-container-${index}`; + newElement = await createAutoFromMetadataSourceContainer(index); } else { querySelector = "unknown"; setEnableAutoGeneratePlanAndTasks(); @@ -404,10 +405,14 @@ if (currUrlParams.includes("plan-name=")) { $(newDataSource).find(".task-name-field").val(dataSource.taskName); let updatedConnectionName = $(newDataSource).find(".data-connection-name").selectpicker("val", dataSource.name); dispatchEvent(updatedConnectionName, "change"); + await wait(100); + for (let [key, value] of Object.entries(dataSource.options)) { + $(newDataSource).find(`[class~=data-source-property][aria-label=${key}]`).val(value); + } await createGenerationElements(dataSource, newDataSource, numDataSources); createCountElementsFromPlan(dataSource, newDataSource); - await createValidationFromPlan(dataSource, newDataSource); + await createValidationFromPlan(dataSource, newDataSource, numDataSources); } await createForeignKeysFromPlan(respJson); createConfigurationFromPlan(respJson); diff --git a/app/src/main/resources/ui/shared.js b/app/src/main/resources/ui/shared.js index 37779426..f898dba2 100644 --- a/app/src/main/resources/ui/shared.js +++ b/app/src/main/resources/ui/shared.js @@ -479,6 +479,8 @@ export function createAttributeFormFloating(attrMetadata, attributeContainerId, createFieldValidationCheck(inputAttr); inputAttr.setAttribute("required", ""); } + } else { + inputAttr.setAttribute("disabled", ""); } inputAttr.dispatchEvent(new Event("input")); } @@ -886,7 +888,7 @@ newFieldDetails.set("validation", { }, accordion: { name: "validation", - classes: "accordion-data-validation-container", + classes: "data-validation-container", header: { updateOn: "inputAndSelect" } @@ -947,6 +949,81 @@ export async function createNewField(index, type) { return accordionItem; } +export async function createAutoFromMetadata(autoFromMetadataContainer, dataSource) { + let updatedMetadataSource = $(autoFromMetadataContainer).find(".metadata-connection-name").selectpicker("val", dataSource.fields.optMetadataSource.name); + dispatchEvent(updatedMetadataSource, "change"); + // takes some time to get the override options + await wait(100); + for (let [key, value] of Object.entries(dataSource.fields.optMetadataSource.overrideOptions)) { + $(autoFromMetadataContainer).find(`input[aria-label="${key}"]`).val(value); + } +} + +function addMetadataConnectionOverrideOptions(metadataConnectionSelect, overrideOptionsContainer, index) { + metadataConnectionSelect.addEventListener("change", (event) => { + let metadataSourceName = event.target.value; + fetch(`http://localhost:9898/connection/${metadataSourceName}`, {method: "GET"}) + .then(r => { + if (r.ok) { + return r.json(); + } else { + r.text().then(text => { + createToast(`Get connection ${metadataSourceName}`, `Failed to get connection ${metadataSourceName}! Error: ${err}`, "fail"); + throw new Error(text); + }); + } + }) + .then(respJson => { + if (respJson) { + //remove previous properties + overrideOptionsContainer.replaceChildren(); + let metadataSourceType = respJson.type; + let metadataSourceProperties = dataSourcePropertiesMap.get(metadataSourceType).properties; + + for (const [key, value] of Object.entries(metadataSourceProperties)) { + if (value["override"] && value["override"] === "true") { + //add properties that can be overridden + addNewDataTypeAttribute(key, value, `connection-config-${index}-${key}`, "metadata-source-property", overrideOptionsContainer); + } + } + } + }); + }); +} + +// allow users to get schema or validation information from a metadata source such as openmetadata or marquez +export async function createAutoFromMetadataSourceContainer(index) { + let baseContainer = document.createElement("div"); + baseContainer.setAttribute("id", `data-source-auto-from-metadata-container-${index}`); + baseContainer.setAttribute("class", "card card-body data-source-auto-from-metadata-container"); + baseContainer.style.display = "inherit"; + let autoSchemaHeader = document.createElement("h5"); + autoSchemaHeader.innerText = "Auto from Metadata"; + let baseTaskDiv = document.createElement("div"); + baseTaskDiv.setAttribute("class", "row m-2 g-2 align-items-center"); + + let metadataConnectionSelect = createSelect(`metadata-connection-${index}`, "Metadata source", "selectpicker form-control input-field metadata-connection-name", "Select metadata source..."); + let dataConnectionCol = document.createElement("div"); + dataConnectionCol.setAttribute("class", "col"); + dataConnectionCol.append(metadataConnectionSelect); + + let iconDiv = createIconWithConnectionTooltip(metadataConnectionSelect); + let iconCol = document.createElement("div"); + iconCol.setAttribute("class", "col-md-auto"); + iconCol.append(iconDiv); + baseTaskDiv.append(dataConnectionCol, iconCol); + + // get connection list, filter only metadata sources + let metadataConnection = await getDataConnectionsAndAddToSelect(metadataConnectionSelect, baseTaskDiv, "metadata"); + // provide opportunity to override non-connection options for metadata source (i.e. namespace, dataset) + let overrideOptionsContainer = document.createElement("div"); + // when the metadata source connection is selected, populate override options container with options with "override: true" + addMetadataConnectionOverrideOptions(metadataConnectionSelect, overrideOptionsContainer, index); + + baseContainer.append(autoSchemaHeader, metadataConnection, overrideOptionsContainer); + return baseContainer; +} + export function getOverrideConnectionOptionsAsMap(dataSource) { return $(dataSource).find("[id^=connection-config-]").toArray() .reduce(function (map, option) { diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala index c19f2a9c..3a4216ec 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala @@ -82,7 +82,7 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String val (step, task) = stepAndTaskByDataSourceName(df._1) val dataSourceConfig = connectionConfigsByName.getOrElse(dataSourceName, Map()) val stepWithDataSourceConfig = step.copy(options = dataSourceConfig ++ step.options) - val sinkResult = sinkFactory.pushToSink(df._2, dataSourceName, stepWithDataSourceConfig, flagsConfig, startTime) + val sinkResult = sinkFactory.pushToSink(df._2, dataSourceName, stepWithDataSourceConfig, startTime) DataSourceResult(dataSourceName, task, stepWithDataSourceConfig, sinkResult, batchNum) }) } diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/DataGeneratorProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/DataGeneratorProcessor.scala index 857cc6fc..fb2fad15 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/DataGeneratorProcessor.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/DataGeneratorProcessor.scala @@ -54,7 +54,7 @@ class DataGeneratorProcessor(dataCatererConfiguration: DataCatererConfiguration) PlanRunResults(List(), List()) } else { val generationResult = if (flagsConfig.enableGenerateData) { - LOGGER.info(s"Following tasks are enabled and will be executed: num-tasks=${summaryWithTask.size}, tasks=$stepNames") + LOGGER.debug(s"Following tasks are enabled and will be executed: num-tasks=${summaryWithTask.size}, tasks=$stepNames") batchDataProcessor.splitAndProcess(plan, summaryWithTask) } else List() diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkFactory.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkFactory.scala index d43e919f..664a1f09 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkFactory.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkFactory.scala @@ -1,13 +1,15 @@ package io.github.datacatering.datacaterer.core.sink -import io.github.datacatering.datacaterer.api.model.Constants.{DRIVER, FORMAT, JDBC, OMIT, PARTITIONS, PARTITION_BY, POSTGRES_DRIVER, SAVE_MODE} +import io.github.datacatering.datacaterer.api.model.Constants.{DRIVER, FORMAT, ICEBERG, JDBC, OMIT, PARTITIONS, PARTITION_BY, PATH, POSTGRES_DRIVER, SAVE_MODE, SPARK_ICEBERG_CATALOG_TYPE, SPARK_ICEBERG_CATALOG_WAREHOUSE, TABLE} import io.github.datacatering.datacaterer.api.model.{FlagsConfig, MetadataConfig, Step} import io.github.datacatering.datacaterer.core.model.Constants.{FAILED, FINISHED, STARTED} import io.github.datacatering.datacaterer.core.model.SinkResult import io.github.datacatering.datacaterer.core.util.ConfigUtil import io.github.datacatering.datacaterer.core.util.MetadataUtil.getFieldMetadata import org.apache.log4j.Logger -import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.{CreateTableWriter, DataFrame, DataFrameWriter, DataFrameWriterV2, Dataset, Row, SaveMode, SparkSession} import java.time.LocalDateTime import scala.util.{Failure, Success, Try} @@ -17,7 +19,7 @@ class SinkFactory(val flagsConfig: FlagsConfig, val metadataConfig: MetadataConf private val LOGGER = Logger.getLogger(getClass.getName) private var HAS_LOGGED_COUNT_DISABLE_WARNING = false - def pushToSink(df: DataFrame, dataSourceName: String, step: Step, flagsConfig: FlagsConfig, startTime: LocalDateTime): SinkResult = { + def pushToSink(df: DataFrame, dataSourceName: String, step: Step, startTime: LocalDateTime): SinkResult = { val dfWithoutOmitFields = removeOmitFields(df) val saveMode = step.options.get(SAVE_MODE).map(_.toLowerCase.capitalize).map(SaveMode.valueOf).getOrElse(SaveMode.Append) val format = step.options(FORMAT) @@ -65,12 +67,21 @@ class SinkFactory(val flagsConfig: FlagsConfig, val metadataConfig: MetadataConf private def saveBatchData(dataSourceName: String, df: DataFrame, saveMode: SaveMode, connectionConfig: Map[String, String], count: String, startTime: LocalDateTime): SinkResult = { val format = connectionConfig(FORMAT) - val partitionedDf = partitionDf(df, connectionConfig) - val trySaveData = Try(partitionedDf - .format(format) - .mode(saveMode) - .options(connectionConfig) - .save()) + + // if format is iceberg, need to use dataframev2 api for partition and writing + val trySaveData = if (format == ICEBERG) { + connectionConfig.filter(_._1.startsWith("spark.sql.catalog")) + .foreach(conf => df.sqlContext.setConf(conf._1, conf._2)) + Try(tryPartitionAndSaveDfV2(df, saveMode, connectionConfig)) + } else { + val partitionedDf = partitionDf(df, connectionConfig) + Try(partitionedDf + .format(format) + .mode(saveMode) + .options(connectionConfig) + .save()) + } + val optException = trySaveData match { case Failure(exception) => Some(exception) case Success(_) => None @@ -86,6 +97,49 @@ class SinkFactory(val flagsConfig: FlagsConfig, val metadataConfig: MetadataConf .getOrElse(partitionDf.write) } + private def tryPartitionAndSaveDfV2(df: DataFrame, saveMode: SaveMode, stepOptions: Map[String, String]): Unit = { + val tableName = s"$ICEBERG.${stepOptions(TABLE)}" + val repartitionDf = stepOptions.get(PARTITIONS) + .map(partitionNum => df.repartition(partitionNum.toInt)).getOrElse(df) + val baseTable = repartitionDf.writeTo(tableName).options(stepOptions) + + stepOptions.get(PARTITION_BY) + .map(partitionCols => { + val spt = partitionCols.split(",").map(c => col(c.trim)) + + val partitionedDf = if (spt.length > 1) { + baseTable.partitionedBy(spt.head, spt.tail: _*) + } else if (spt.length == 1) { + baseTable.partitionedBy(spt.head) + } else { + baseTable + } + + saveDataframeV2(saveMode, tableName, baseTable, partitionedDf) + }) + .getOrElse(saveDataframeV2(saveMode, tableName, baseTable, baseTable)) + } + + private def saveDataframeV2(saveMode: SaveMode, tableName: String, baseDf: DataFrameWriterV2[Row], partitionedDf: CreateTableWriter[Row]): Unit = { + saveMode match { + case SaveMode.Append | SaveMode.Ignore => + val tryCreate = Try(partitionedDf.create()) + tryCreate match { + case Failure(exception) => + if (exception.isInstanceOf[TableAlreadyExistsException]) { + LOGGER.debug(s"Table already exists, appending to existing table, table-name=$tableName") + baseDf.append() + } else { + throw new RuntimeException(exception) + } + case Success(_) => + LOGGER.debug(s"Successfully created partitioned table, table-name=$tableName") + } + case SaveMode.Overwrite => baseDf.overwritePartitions() + case SaveMode.ErrorIfExists => partitionedDf.create() + } + } + private def additionalConnectionConfig(format: String, connectionConfig: Map[String, String]): Map[String, String] = { format match { case JDBC => if (connectionConfig(DRIVER).equalsIgnoreCase(POSTGRES_DRIVER) && !connectionConfig.contains("stringtype")) { diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/mapper/UiMapper.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/mapper/UiMapper.scala index deaa4322..5ae20032 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/mapper/UiMapper.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/mapper/UiMapper.scala @@ -193,7 +193,7 @@ object UiMapper { def validationMapping(dataSourceRequest: DataSourceRequest): List[ValidationBuilder] = { dataSourceRequest.validations - .map(validations => validations.flatMap(validationItemRequestToValidationBuilders)) + .map(validations => validations.optValidations.map(v => v.flatMap(validationItemRequestToValidationBuilders)).getOrElse(List())) .getOrElse(List()) } @@ -202,8 +202,8 @@ object UiMapper { case VALIDATION_COLUMN => //map type of column validation to builder method //each option is a new validation - val mappedValids = validateItem.options.map(opts => { - val colName = opts(VALIDATION_COLUMN) + validateItem.options.map(opts => { + val colName = opts(VALIDATION_FIELD) opts .filter(o => !VALIDATION_SUPPORTING_OPTIONS.contains(o._1)) .map(opt => { @@ -212,7 +212,6 @@ object UiMapper { }) .toList }).getOrElse(List()) - mappedValids case VALIDATION_COLUMN_NAMES => validateItem.options.map(opts => { opts @@ -270,7 +269,9 @@ object UiMapper { } def fieldMapping(dataSourceRequest: DataSourceRequest): List[FieldBuilder] = { - dataSourceRequest.fields.map(fields => fieldMapping(dataSourceRequest.name, fields.optFields.getOrElse(List()))).getOrElse(List()) + dataSourceRequest.fields + .map(fields => fieldMapping(dataSourceRequest.name, fields.optFields.getOrElse(List()))) + .getOrElse(List()) } private def fieldMapping(dataSourceName: String, fields: List[FieldRequest]): List[FieldBuilder] = { @@ -295,6 +296,7 @@ object UiMapper { case Some(JSON) => createFileConnection(dataSourceRequest, JSON) case Some(PARQUET) => createFileConnection(dataSourceRequest, PARQUET) case Some(ORC) => createFileConnection(dataSourceRequest, ORC) + case Some(ICEBERG) => createIcebergConnection(dataSourceRequest) case Some(SOLACE) => val opt = dataSourceRequest.options.getOrElse(Map()) checkOptions(dataSourceRequest.name, List(URL, USERNAME, PASSWORD, JMS_DESTINATION_NAME, JMS_VPN_NAME, JMS_CONNECTION_FACTORY, JMS_INITIAL_CONTEXT_FACTORY), opt) @@ -318,8 +320,12 @@ object UiMapper { def connectionsWithUpstreamValidationMapping(connections: List[ConnectionTaskBuilder[_]], dataSources: List[DataSourceRequest]): List[ConnectionTaskBuilder[_]] = { val dataSourcesWithUpstreamValidation = dataSources - .filter(ds => ds.validations.getOrElse(List()).exists(_.`type` == VALIDATION_UPSTREAM)) - .map(ds => (ds.taskName, ds.validations.getOrElse(List()))) + .filter(ds => { + ds.validations + .map(_.optValidations.getOrElse(List())).getOrElse(List()) + .exists(_.`type` == VALIDATION_UPSTREAM) + }) + .map(ds => (ds.taskName, ds.validations.map(_.optValidations.get).getOrElse(List()))) .toMap connections.map(connection => { @@ -461,6 +467,26 @@ object UiMapper { ConnectionConfigWithTaskBuilder().file(dataSourceRequest.name, format, opt(PATH), opt) } + private def createIcebergConnection(dataSourceRequest: DataSourceRequest): FileBuilder = { + val opt = dataSourceRequest.options.getOrElse(Map()) + val name = dataSourceRequest.name + checkOptions(name, List(ICEBERG_CATALOG_TYPE, TABLE), opt) + val baseSparkOpts = Map( + SPARK_ICEBERG_CATALOG_TYPE -> opt(ICEBERG_CATALOG_TYPE), + TABLE -> opt(TABLE) + ) + val sparkOpts = opt(ICEBERG_CATALOG_TYPE) match { + case ICEBERG_CATALOG_HADOOP | ICEBERG_CATALOG_GLUE => + checkOptions(name, List(PATH), opt) + Map(SPARK_ICEBERG_CATALOG_WAREHOUSE -> opt(PATH)) + case ICEBERG_CATALOG_HIVE | ICEBERG_CATALOG_REST => + checkOptions(name, List(ICEBERG_CATALOG_URI), opt) + Map(SPARK_ICEBERG_CATALOG_URI -> opt(ICEBERG_CATALOG_URI)) + case _ => Map() + } + ConnectionConfigWithTaskBuilder().file(name, ICEBERG, opt.getOrElse(PATH, ""), baseSparkOpts ++ sparkOpts) + } + private def createJdbcConnection(dataSourceRequest: DataSourceRequest, format: String): JdbcBuilder[_] = { val opt = dataSourceRequest.options.getOrElse(Map()) checkOptions(dataSourceRequest.name, List(URL, USERNAME, PASSWORD), opt) diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/model/JsonSupport.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/model/JsonSupport.scala index dc2ee6c9..b8bd78d7 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/model/JsonSupport.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/model/JsonSupport.scala @@ -31,6 +31,7 @@ trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { implicit val waitRequestFormat: RootJsonFormat[WaitRequest] = jsonFormat1(WaitRequest.apply) implicit val validationItemRequestsFormat: RootJsonFormat[ValidationItemRequests] = rootFormat(lazyFormat(jsonFormat1(ValidationItemRequests.apply))) implicit val validationItemRequestFormat: RootJsonFormat[ValidationItemRequest] = rootFormat(lazyFormat(jsonFormat4(ValidationItemRequest.apply))) + implicit val validationRequestFormat: RootJsonFormat[ValidationRequest] = rootFormat(lazyFormat(jsonFormat2(ValidationRequest.apply))) implicit val foreignKeyItemRequestFormat: RootJsonFormat[ForeignKeyRequestItem] = jsonFormat3(ForeignKeyRequestItem.apply) implicit val foreignKeyRequestFormat: RootJsonFormat[ForeignKeyRequest] = jsonFormat3(ForeignKeyRequest.apply) implicit val dataSourceRequestFormat: RootJsonFormat[DataSourceRequest] = jsonFormat7(DataSourceRequest.apply) diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/model/models.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/model/models.scala index ae155109..1a01a344 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/model/models.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/model/models.scala @@ -21,7 +21,7 @@ case class DataSourceRequest( options: Option[Map[String, String]] = None, fields: Option[FieldRequests] = None, count: Option[RecordCountRequest] = None, - validations: Option[List[ValidationItemRequest]] = None, + validations: Option[ValidationRequest] = None, ) case class FieldRequest( @@ -50,6 +50,11 @@ case class RecordCountRequest( perColumnRecordsDistributionRateParam: Option[String] = None, ) +case class ValidationRequest( + optValidations: Option[List[ValidationItemRequest]] = None, + optMetadataSource: Option[MetadataSourceRequest] = None + ) + case class ValidationItemRequest( `type`: String, options: Option[Map[String, String]] = None, diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/plan/PlanRepository.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/plan/PlanRepository.scala index 2a6178cf..fe09406e 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/plan/PlanRepository.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/ui/plan/PlanRepository.scala @@ -55,6 +55,8 @@ object PlanRepository extends JsonSupport { final case class RemovePlan(name: String) extends PlanCommand + final case class StartupSpark() extends PlanCommand + private val executionSaveFolder = s"$INSTALL_DIRECTORY/execution" private val planSaveFolder = s"$INSTALL_DIRECTORY/plan" implicit val ec: ExecutionContextExecutor = ExecutionContext.global @@ -83,6 +85,8 @@ object PlanRepository extends JsonSupport { case GetPlanRuns(replyTo) => replyTo ! getAllPlanExecutions Behaviors.same + case StartupSpark() => + Behaviors.same } }.onFailure(SupervisorStrategy.restart) } diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessor.scala index 5698951b..32aa2adf 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessor.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessor.scala @@ -1,7 +1,7 @@ package io.github.datacatering.datacaterer.core.validator import io.github.datacatering.datacaterer.api.ValidationBuilder -import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_ENABLE_VALIDATION, ENABLE_DATA_VALIDATION, FORMAT, HTTP, JMS} +import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_ENABLE_VALIDATION, ENABLE_DATA_VALIDATION, FORMAT, HTTP, ICEBERG, JMS, TABLE} import io.github.datacatering.datacaterer.api.model.{DataSourceValidation, ExpressionValidation, FoldersConfig, GroupByValidation, UpstreamDataSourceValidation, ValidationConfig, ValidationConfiguration} import io.github.datacatering.datacaterer.core.model.{DataSourceValidationResult, ValidationConfigResult, ValidationResult} import io.github.datacatering.datacaterer.core.parser.ValidationParser @@ -46,7 +46,7 @@ class ValidationProcessor( val dataSourceValidations = dataSource._2 val numValidations = dataSourceValidations.flatMap(_.validations).size - LOGGER.info(s"Executing data validations for data source, name=${vc.name}, " + + LOGGER.debug(s"Executing data validations for data source, name=${vc.name}, " + s"data-source-name=$dataSourceName, num-validations=$numValidations") dataSourceValidations.map(dataSourceValidation => executeDataValidations(vc, dataSourceName, dataSourceValidation)) }).toList @@ -65,21 +65,26 @@ class ValidationProcessor( ): DataSourceValidationResult = { val isValidationsEnabled = dataSourceValidation.options.get(ENABLE_DATA_VALIDATION).map(_.toBoolean).getOrElse(DEFAULT_ENABLE_VALIDATION) if (isValidationsEnabled) { - LOGGER.debug(s"Waiting for validation condition to be successful before running validations, name=${vc.name}," + - s"data-source-name=$dataSourceName, details=${dataSourceValidation.options}, num-validations=${dataSourceValidation.validations.size}") - dataSourceValidation.waitCondition.waitForCondition(connectionConfigsByName) - - val df = getDataFrame(dataSourceName, dataSourceValidation.options) - if (df.isEmpty) { - LOGGER.info("No data found to run validations") + if (dataSourceValidation.validations.isEmpty) { + LOGGER.debug(s"No validations defined, skipping data validations, name=${vc.name}, data-source-name=$dataSourceName") DataSourceValidationResult(dataSourceName, dataSourceValidation.options, List()) } else { - if (!df.storageLevel.useMemory) df.cache() - val results = dataSourceValidation.validations.map(validBuilder => tryValidate(df, validBuilder)) - df.unpersist() - LOGGER.debug(s"Finished data validations, name=${vc.name}," + + LOGGER.debug(s"Waiting for validation condition to be successful before running validations, name=${vc.name}," + s"data-source-name=$dataSourceName, details=${dataSourceValidation.options}, num-validations=${dataSourceValidation.validations.size}") - DataSourceValidationResult(dataSourceName, dataSourceValidation.options, results) + dataSourceValidation.waitCondition.waitForCondition(connectionConfigsByName) + + val df = getDataFrame(dataSourceName, dataSourceValidation.options) + if (df.isEmpty) { + LOGGER.info("No data found to run validations") + DataSourceValidationResult(dataSourceName, dataSourceValidation.options, List()) + } else { + if (!df.storageLevel.useMemory) df.cache() + val results = dataSourceValidation.validations.map(validBuilder => tryValidate(df, validBuilder)) + df.unpersist() + LOGGER.debug(s"Finished data validations, name=${vc.name}," + + s"data-source-name=$dataSourceName, details=${dataSourceValidation.options}, num-validations=${dataSourceValidation.validations.size}") + DataSourceValidationResult(dataSourceName, dataSourceValidation.options, results) + } } } else { LOGGER.debug(s"Data validations are disabled, data-source-name=$dataSourceName, details=${dataSourceValidation.options}") @@ -110,27 +115,41 @@ class ValidationProcessor( private def getDataFrame(dataSourceName: String, options: Map[String, String]): DataFrame = { val connectionConfig = connectionConfigsByName(dataSourceName) val format = connectionConfig(FORMAT) - if (format == HTTP || format == JMS) { + val df = if (format == HTTP || format == JMS) { LOGGER.warn("No support for HTTP or JMS data validations, will skip validations") sparkSession.emptyDataFrame + } else if (format == ICEBERG) { + val allOpts = connectionConfig ++ options + val tableName = allOpts(TABLE) + val tableNameWithCatalog = if (tableName.split("\\.").length == 2) { + s"iceberg.$tableName" + } else tableName + sparkSession.read + .options(allOpts) + .table(tableNameWithCatalog) } else { - val df = sparkSession.read + sparkSession.read .format(format) .options(connectionConfig ++ options) .load() - if (!df.storageLevel.useMemory) df.cache() - df } + + if (!df.storageLevel.useMemory) df.cache() + df } private def logValidationErrors(validationResults: List[ValidationConfigResult]): Unit = { validationResults.foreach(vcr => vcr.dataSourceValidationResults.map(dsr => { val failedValidations = dsr.validationResults.filter(r => !r.isSuccess) - if (failedValidations.isEmpty) { + if (dsr.validationResults.isEmpty) { + LOGGER.debug(s"Validation results are empty, no validations run, name=${vcr.name}, description=${vcr.description}, data-source-name=${dsr.dataSourceName}") + } else if (failedValidations.isEmpty) { LOGGER.info(s"Data validations successful for validation, name=${vcr.name}, description=${vcr.description}, data-source-name=${dsr.dataSourceName}, " + - s"data-source-options=${dsr.options}, is-success=true") + s"data-source-options=${dsr.options}, num-validations=${dsr.validationResults.size}, is-success=true") } else { + LOGGER.error(s"Data validations failed, name=${vcr.name}, description=${vcr.description}, data-source-name=${dsr.dataSourceName}, " + + s"data-source-options=${dsr.options}, num-validations=${dsr.validationResults.size}, num-failed=${failedValidations.size}, is-success=false") failedValidations.foreach(validationRes => { val (validationType, validationCheck) = validationRes.validation match { case ExpressionValidation(expr, selectExpr) => ("expression", expr) diff --git a/app/src/test/resources/log4j2.properties b/app/src/test/resources/log4j2.properties index bf543d28..a66ec128 100644 --- a/app/src/test/resources/log4j2.properties +++ b/app/src/test/resources/log4j2.properties @@ -52,4 +52,7 @@ logger.kafka.name = org.apache.kafka logger.kafka.level = warn # Hudi logging logger.hudi.name=org.apache.hudi -logger.hudi.level=error \ No newline at end of file +logger.hudi.level=error +# Iceberg logging +logger.iceberg.name=org.apache.iceberg +logger.iceberg.level=error diff --git a/app/src/test/resources/sample/task/file/json-account-task.yaml b/app/src/test/resources/sample/task/file/json-account-task.yaml index 24285558..1fb5c355 100644 --- a/app/src/test/resources/sample/task/file/json-account-task.yaml +++ b/app/src/test/resources/sample/task/file/json-account-task.yaml @@ -3,7 +3,7 @@ steps: - name: "file_account" type: "json" count: - records: 1000 + records: 10000 options: path: "app/src/test/resources/sample/json/account-gen" schema: @@ -44,7 +44,7 @@ steps: false END - name: "updated_time" - type: "string" + type: "timestamp" generator: type: "sql" options: @@ -96,53 +96,53 @@ steps: type: "array" generator: type: "random" - - name: "transactions" - type: "json" - options: - path: "app/src/test/resources/sample/json/txn-gen" - count: - records: 100 - perColumn: - columnNames: - - "account_id" - - "name" - generator: - type: "random" - options: - max: 10 - min: 1 - schema: - fields: - - name: "account_id" - type: "string" - generator: - type: "regex" - options: - regex: "ACC1[0-9]{9}" - - name: "name" - type: "string" - generator: - type: "random" - options: - expression: "#{Name.name}" - - name: "year" - type: "int" - generator: - type: "random" - options: - min: 2021 - max: 2022 - - name: "amount" - type: "double" - generator: - type: "random" - options: - min: 10.0 - max: 100.0 - - name: "txn_date" - type: "date" - generator: - type: "random" - options: - min: "2021-01-01" - max: "2021-12-31" \ No newline at end of file +# - name: "transactions" +# type: "json" +# options: +# path: "app/src/test/resources/sample/json/txn-gen" +# count: +# records: 100 +# perColumn: +# columnNames: +# - "account_id" +# - "name" +# generator: +# type: "random" +# options: +# max: 10 +# min: 1 +# schema: +# fields: +# - name: "account_id" +# type: "string" +# generator: +# type: "regex" +# options: +# regex: "ACC1[0-9]{9}" +# - name: "name" +# type: "string" +# generator: +# type: "random" +# options: +# expression: "#{Name.name}" +# - name: "year" +# type: "int" +# generator: +# type: "random" +# options: +# min: 2021 +# max: 2022 +# - name: "amount" +# type: "double" +# generator: +# type: "random" +# options: +# min: 10.0 +# max: 100.0 +# - name: "txn_date" +# type: "date" +# generator: +# type: "random" +# options: +# min: "2021-01-01" +# max: "2021-12-31" \ No newline at end of file diff --git a/app/src/test/scala/io/github/datacatering/datacaterer/core/plan/PlanProcessorTest.scala b/app/src/test/scala/io/github/datacatering/datacaterer/core/plan/PlanProcessorTest.scala index 2580f1b7..e67bbadd 100644 --- a/app/src/test/scala/io/github/datacatering/datacaterer/core/plan/PlanProcessorTest.scala +++ b/app/src/test/scala/io/github/datacatering/datacaterer/core/plan/PlanProcessorTest.scala @@ -120,7 +120,7 @@ class PlanProcessorTest extends SparkSuite { } ignore("Can run Postgres plan run") { - PlanProcessor.determineAndExecutePlan(Some(new TestValidation)) + PlanProcessor.determineAndExecutePlan(Some(new TestOtherFileFormats)) } class TestPostgres extends PlanRun { @@ -299,7 +299,7 @@ class PlanProcessorTest extends SparkSuite { // val deltaTask = delta("my_delta", "/tmp/data/delta", Map("saveMode" -> "overwrite")) // .schema(basicSchema: _*) // - val icebergTask = iceberg("my_iceberg", "/tmp/data/iceberg", Map("saveMode" -> "overwrite")) + val icebergTask = iceberg("my_iceberg", "/tmp/data/iceberg", "account.accounts", options = Map("saveMode" -> "overwrite")) .schema(basicSchema: _*) execute(icebergTask) diff --git a/app/src/test/scala/io/github/datacatering/datacaterer/core/sink/SinkFactoryTest.scala b/app/src/test/scala/io/github/datacatering/datacaterer/core/sink/SinkFactoryTest.scala new file mode 100644 index 00000000..0ce2aa74 --- /dev/null +++ b/app/src/test/scala/io/github/datacatering/datacaterer/core/sink/SinkFactoryTest.scala @@ -0,0 +1,56 @@ +package io.github.datacatering.datacaterer.core.sink + +import io.github.datacatering.datacaterer.api.model.Constants.{FORMAT, ICEBERG, PATH, SAVE_MODE, TABLE} +import io.github.datacatering.datacaterer.api.model.{FlagsConfig, MetadataConfig, Step} +import io.github.datacatering.datacaterer.core.util.{SparkSuite, Transaction} +import org.junit.runner.RunWith +import org.scalatestplus.junit.JUnitRunner + +import java.sql.Date +import java.time.LocalDateTime + +@RunWith(classOf[JUnitRunner]) +class SinkFactoryTest extends SparkSuite { + + private val sampleData = Seq( + Transaction("acc123", "peter", "txn1", Date.valueOf("2020-01-01"), 10.0), + Transaction("acc123", "peter", "txn2", Date.valueOf("2020-01-01"), 50.0), + Transaction("acc123", "peter", "txn3", Date.valueOf("2020-01-01"), 200.0), + Transaction("acc123", "peter", "txn4", Date.valueOf("2020-01-01"), 500.0) + ) + private val df = sparkSession.createDataFrame(sampleData) + + test("Can save data in Iceberg format") { + val sinkFactory = new SinkFactory(FlagsConfig(), MetadataConfig()) + val step = Step(options = Map(FORMAT -> ICEBERG, TABLE -> "account.transactions", PATH -> "/tmp/iceberg-test")) + val res = sinkFactory.pushToSink(df, "iceberg-data-source", step, LocalDateTime.now()) + + assert(res.isSuccess) + assertResult(4)(res.count) + assertResult(ICEBERG)(res.format) + assert(res.exception.isEmpty) + } + + test("Can overwrite existing Iceberg data") { + sparkSession.sql("DELETE FROM iceberg.account.transactions_overwrite").count() + val sinkFactory = new SinkFactory(FlagsConfig(), MetadataConfig()) + val options = Map(FORMAT -> ICEBERG, TABLE -> "account.transactions_overwrite", PATH -> "/tmp/iceberg-test") + val step = Step(options = options) + val existingDataRes = sinkFactory.pushToSink(df, "iceberg-data-source", step, LocalDateTime.now()) + + assert(existingDataRes.isSuccess) + assertResult(4)(existingDataRes.count) + assertResult(ICEBERG)(existingDataRes.format) + assert(existingDataRes.exception.isEmpty) + assertResult(4)(sparkSession.table("iceberg.account.transactions_overwrite").count()) + + val newStep = Step(options = options ++ Map(SAVE_MODE -> "overwrite")) + val res = sinkFactory.pushToSink(df, "iceberg-data-source", newStep, LocalDateTime.now()) + + assert(res.isSuccess) + assertResult(4)(res.count) + assertResult(ICEBERG)(res.format) + assert(res.exception.isEmpty) + assertResult(4)(sparkSession.table("iceberg.account.transactions_overwrite").count()) + } +} diff --git a/app/src/test/scala/io/github/datacatering/datacaterer/core/ui/mapper/UiMapperTest.scala b/app/src/test/scala/io/github/datacatering/datacaterer/core/ui/mapper/UiMapperTest.scala index f93fb62f..3729c9b8 100644 --- a/app/src/test/scala/io/github/datacatering/datacaterer/core/ui/mapper/UiMapperTest.scala +++ b/app/src/test/scala/io/github/datacatering/datacaterer/core/ui/mapper/UiMapperTest.scala @@ -4,7 +4,7 @@ import io.github.datacatering.datacaterer.api.connection.FileBuilder import io.github.datacatering.datacaterer.api.model.Constants._ import io.github.datacatering.datacaterer.api.model.{ColumnNamesValidation, ExpressionValidation, GroupByValidation, UpstreamDataSourceValidation} import io.github.datacatering.datacaterer.api.{DataCatererConfigurationBuilder, FieldBuilder, PlanBuilder} -import io.github.datacatering.datacaterer.core.ui.model.{ConfigurationRequest, DataSourceRequest, FieldRequest, FieldRequests, ForeignKeyRequest, ForeignKeyRequestItem, PlanRunRequest, RecordCountRequest, ValidationItemRequest, ValidationItemRequests} +import io.github.datacatering.datacaterer.core.ui.model.{ConfigurationRequest, DataSourceRequest, FieldRequest, FieldRequests, ForeignKeyRequest, ForeignKeyRequestItem, PlanRunRequest, RecordCountRequest, ValidationItemRequest, ValidationItemRequests, ValidationRequest} import org.junit.runner.RunWith import org.scalatest.funsuite.AnyFunSuite import org.scalatestplus.junit.JUnitRunner @@ -428,7 +428,9 @@ class UiMapperTest extends AnyFunSuite { } test("Can convert UI validation mapping for basic column validation") { - val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(List(ValidationItemRequest(VALIDATION_COLUMN, Some(Map(VALIDATION_COLUMN -> "account_id", VALIDATION_EQUAL -> "abc123", "description" -> "valid desc", "errorThreshold" -> "2")))))) + val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(ValidationRequest(Some( + List(ValidationItemRequest(VALIDATION_COLUMN, Some(Map(VALIDATION_FIELD -> "account_id", VALIDATION_EQUAL -> "abc123", "description" -> "valid desc", "errorThreshold" -> "2")))) + )))) val res = UiMapper.validationMapping(dataSourceRequest) assertResult(1)(res.size) val exprValid = res.head.validation.asInstanceOf[ExpressionValidation] @@ -440,7 +442,9 @@ class UiMapperTest extends AnyFunSuite { } test("Can convert UI validation mapping for column name validation") { - val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(List(ValidationItemRequest(VALIDATION_COLUMN_NAMES, Some(Map(VALIDATION_COLUMN_NAMES_COUNT_EQUAL -> "5")))))) + val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(ValidationRequest(Some( + List(ValidationItemRequest(VALIDATION_COLUMN_NAMES, Some(Map(VALIDATION_COLUMN_NAMES_COUNT_EQUAL -> "5")))) + )))) val res = UiMapper.validationMapping(dataSourceRequest) assertResult(1)(res.size) val valid = res.head.validation.asInstanceOf[ColumnNamesValidation] @@ -449,7 +453,9 @@ class UiMapperTest extends AnyFunSuite { } test("Can convert UI validation mapping for column name validation count between") { - val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(List(ValidationItemRequest(VALIDATION_COLUMN_NAMES, Some(Map(VALIDATION_COLUMN_NAMES_COUNT_BETWEEN -> "blah", VALIDATION_MIN -> "1", VALIDATION_MAX -> "2")))))) + val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(ValidationRequest(Some( + List(ValidationItemRequest(VALIDATION_COLUMN_NAMES, Some(Map(VALIDATION_COLUMN_NAMES_COUNT_BETWEEN -> "blah", VALIDATION_MIN -> "1", VALIDATION_MAX -> "2")))) + )))) val res = UiMapper.validationMapping(dataSourceRequest) assertResult(1)(res.size) val valid = res.head.validation.asInstanceOf[ColumnNamesValidation] @@ -459,7 +465,9 @@ class UiMapperTest extends AnyFunSuite { } test("Can convert UI validation mapping for column name validation match order") { - val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(List(ValidationItemRequest(VALIDATION_COLUMN_NAMES, Some(Map(VALIDATION_COLUMN_NAMES_MATCH_ORDER -> "account_id,year")))))) + val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(ValidationRequest(Some( + List(ValidationItemRequest(VALIDATION_COLUMN_NAMES, Some(Map(VALIDATION_COLUMN_NAMES_MATCH_ORDER -> "account_id,year")))) + )))) val res = UiMapper.validationMapping(dataSourceRequest) assertResult(1)(res.size) val valid = res.head.validation.asInstanceOf[ColumnNamesValidation] @@ -468,7 +476,9 @@ class UiMapperTest extends AnyFunSuite { } test("Can convert UI validation mapping for column name validation match set") { - val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(List(ValidationItemRequest(VALIDATION_COLUMN_NAMES, Some(Map(VALIDATION_COLUMN_NAMES_MATCH_SET -> "account_id,year")))))) + val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(ValidationRequest(Some( + List(ValidationItemRequest(VALIDATION_COLUMN_NAMES, Some(Map(VALIDATION_COLUMN_NAMES_MATCH_SET -> "account_id,year")))) + )))) val res = UiMapper.validationMapping(dataSourceRequest) assertResult(1)(res.size) val valid = res.head.validation.asInstanceOf[ColumnNamesValidation] @@ -477,7 +487,9 @@ class UiMapperTest extends AnyFunSuite { } test("Can convert UI validation mapping, when unknown option, default to column name count equals 1") { - val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(List(ValidationItemRequest(VALIDATION_COLUMN_NAMES, Some(Map("unknown" -> "hello")))))) + val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(ValidationRequest(Some( + List(ValidationItemRequest(VALIDATION_COLUMN_NAMES, Some(Map("unknown" -> "hello")))) + )))) val res = UiMapper.validationMapping(dataSourceRequest) assertResult(1)(res.size) val valid = res.head.validation.asInstanceOf[ColumnNamesValidation] @@ -486,11 +498,12 @@ class UiMapperTest extends AnyFunSuite { } test("Can convert UI validation mapping with min group by validation") { - val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(List(ValidationItemRequest(VALIDATION_GROUP_BY, Some(Map(VALIDATION_GROUP_BY_COLUMNS -> "account_id")), - Some(ValidationItemRequests(List(ValidationItemRequest( - VALIDATION_COLUMN, Some(Map("aggType" -> VALIDATION_MIN, "aggCol" -> "amount", VALIDATION_EQUAL -> "10")) - )))) - )))) + val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(ValidationRequest( + Some(List(ValidationItemRequest(VALIDATION_GROUP_BY, Some(Map(VALIDATION_GROUP_BY_COLUMNS -> "account_id")), + Some(ValidationItemRequests(List(ValidationItemRequest( + VALIDATION_COLUMN, Some(Map("aggType" -> VALIDATION_MIN, "aggCol" -> "amount", VALIDATION_EQUAL -> "10")) + )))) + )))))) val res = UiMapper.validationMapping(dataSourceRequest) assertResult(1)(res.size) val valid = res.head.validation.asInstanceOf[GroupByValidation] @@ -501,11 +514,12 @@ class UiMapperTest extends AnyFunSuite { } test("Can convert UI validation mapping with max group by validation") { - val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(List(ValidationItemRequest(VALIDATION_GROUP_BY, Some(Map(VALIDATION_GROUP_BY_COLUMNS -> "account_id")), - Some(ValidationItemRequests(List(ValidationItemRequest( - VALIDATION_COLUMN, Some(Map("aggType" -> VALIDATION_MAX, "aggCol" -> "amount", VALIDATION_EQUAL -> "10")) - )))) - )))) + val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(ValidationRequest( + Some(List(ValidationItemRequest(VALIDATION_GROUP_BY, Some(Map(VALIDATION_GROUP_BY_COLUMNS -> "account_id")), + Some(ValidationItemRequests(List(ValidationItemRequest( + VALIDATION_COLUMN, Some(Map("aggType" -> VALIDATION_MAX, "aggCol" -> "amount", VALIDATION_EQUAL -> "10")) + )))) + )))))) val res = UiMapper.validationMapping(dataSourceRequest) assertResult(1)(res.size) val valid = res.head.validation.asInstanceOf[GroupByValidation] @@ -516,11 +530,12 @@ class UiMapperTest extends AnyFunSuite { } test("Can convert UI validation mapping with count group by validation") { - val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(List(ValidationItemRequest(VALIDATION_GROUP_BY, Some(Map(VALIDATION_GROUP_BY_COLUMNS -> "account_id")), - Some(ValidationItemRequests(List(ValidationItemRequest( - VALIDATION_COLUMN, Some(Map("aggType" -> VALIDATION_COUNT, "aggCol" -> "amount", VALIDATION_EQUAL -> "10")) - )))) - )))) + val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(ValidationRequest(Some( + List(ValidationItemRequest(VALIDATION_GROUP_BY, Some(Map(VALIDATION_GROUP_BY_COLUMNS -> "account_id")), + Some(ValidationItemRequests(List(ValidationItemRequest( + VALIDATION_COLUMN, Some(Map("aggType" -> VALIDATION_COUNT, "aggCol" -> "amount", VALIDATION_EQUAL -> "10")) + )))) + )))))) val res = UiMapper.validationMapping(dataSourceRequest) assertResult(1)(res.size) val valid = res.head.validation.asInstanceOf[GroupByValidation] @@ -531,11 +546,12 @@ class UiMapperTest extends AnyFunSuite { } test("Can convert UI validation mapping with sum group by validation") { - val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(List(ValidationItemRequest(VALIDATION_GROUP_BY, Some(Map(VALIDATION_GROUP_BY_COLUMNS -> "account_id")), - Some(ValidationItemRequests(List(ValidationItemRequest( - VALIDATION_COLUMN, Some(Map("aggType" -> VALIDATION_SUM, "aggCol" -> "amount", VALIDATION_EQUAL -> "10")) - )))) - )))) + val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(ValidationRequest( + Some(List(ValidationItemRequest(VALIDATION_GROUP_BY, Some(Map(VALIDATION_GROUP_BY_COLUMNS -> "account_id")), + Some(ValidationItemRequests(List(ValidationItemRequest( + VALIDATION_COLUMN, Some(Map("aggType" -> VALIDATION_SUM, "aggCol" -> "amount", VALIDATION_EQUAL -> "10")) + )))) + )))))) val res = UiMapper.validationMapping(dataSourceRequest) assertResult(1)(res.size) val valid = res.head.validation.asInstanceOf[GroupByValidation] @@ -546,11 +562,12 @@ class UiMapperTest extends AnyFunSuite { } test("Can convert UI validation mapping with average group by validation") { - val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(List(ValidationItemRequest(VALIDATION_GROUP_BY, Some(Map(VALIDATION_GROUP_BY_COLUMNS -> "account_id")), - Some(ValidationItemRequests(List(ValidationItemRequest( - VALIDATION_COLUMN, Some(Map("aggType" -> VALIDATION_AVERAGE, "aggCol" -> "amount", VALIDATION_EQUAL -> "10")) - )))) - )))) + val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(ValidationRequest( + Some(List(ValidationItemRequest(VALIDATION_GROUP_BY, Some(Map(VALIDATION_GROUP_BY_COLUMNS -> "account_id")), + Some(ValidationItemRequests(List(ValidationItemRequest( + VALIDATION_COLUMN, Some(Map("aggType" -> VALIDATION_AVERAGE, "aggCol" -> "amount", VALIDATION_EQUAL -> "10")) + )))) + )))))) val res = UiMapper.validationMapping(dataSourceRequest) assertResult(1)(res.size) val valid = res.head.validation.asInstanceOf[GroupByValidation] @@ -561,11 +578,12 @@ class UiMapperTest extends AnyFunSuite { } test("Can convert UI validation mapping with standard deviation group by validation") { - val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(List(ValidationItemRequest(VALIDATION_GROUP_BY, Some(Map(VALIDATION_GROUP_BY_COLUMNS -> "account_id")), - Some(ValidationItemRequests(List(ValidationItemRequest( - VALIDATION_COLUMN, Some(Map("aggType" -> VALIDATION_STANDARD_DEVIATION, "aggCol" -> "amount", VALIDATION_EQUAL -> "10")) - )))) - )))) + val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(ValidationRequest( + Some(List(ValidationItemRequest(VALIDATION_GROUP_BY, Some(Map(VALIDATION_GROUP_BY_COLUMNS -> "account_id")), + Some(ValidationItemRequests(List(ValidationItemRequest( + VALIDATION_COLUMN, Some(Map("aggType" -> VALIDATION_STANDARD_DEVIATION, "aggCol" -> "amount", VALIDATION_EQUAL -> "10")) + )))) + )))))) val res = UiMapper.validationMapping(dataSourceRequest) assertResult(1)(res.size) val valid = res.head.validation.asInstanceOf[GroupByValidation] @@ -576,30 +594,34 @@ class UiMapperTest extends AnyFunSuite { } test("Throw error when given unknown aggregation type") { - val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(List(ValidationItemRequest(VALIDATION_GROUP_BY, Some(Map(VALIDATION_GROUP_BY_COLUMNS -> "account_id")), - Some(ValidationItemRequests(List(ValidationItemRequest( - VALIDATION_COLUMN, Some(Map("aggType" -> "unknown", "aggCol" -> "amount", VALIDATION_EQUAL -> "10")) - )))) - )))) + val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(ValidationRequest( + Some(List(ValidationItemRequest(VALIDATION_GROUP_BY, Some(Map(VALIDATION_GROUP_BY_COLUMNS -> "account_id")), + Some(ValidationItemRequests(List(ValidationItemRequest( + VALIDATION_COLUMN, Some(Map("aggType" -> "unknown", "aggCol" -> "amount", VALIDATION_EQUAL -> "10")) + )))) + )))))) assertThrows[IllegalArgumentException](UiMapper.validationMapping(dataSourceRequest)) } test("Throw error when no aggType or aggCol is given") { - val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(List(ValidationItemRequest(VALIDATION_GROUP_BY, Some(Map(VALIDATION_GROUP_BY_COLUMNS -> "account_id")), - Some(ValidationItemRequests(List(ValidationItemRequest( - VALIDATION_COLUMN, Some(Map("aggCol" -> "amount", VALIDATION_EQUAL -> "10")) - )))) - )))) - val dataSourceRequest1 = DataSourceRequest("csv-name", "task-1", validations = Some(List(ValidationItemRequest(VALIDATION_GROUP_BY, Some(Map(VALIDATION_GROUP_BY_COLUMNS -> "account_id")), - Some(ValidationItemRequests(List(ValidationItemRequest( - VALIDATION_COLUMN, Some(Map("aggType" -> "max", VALIDATION_EQUAL -> "10")) - )))) - )))) - val dataSourceRequest2 = DataSourceRequest("csv-name", "task-1", validations = Some(List(ValidationItemRequest(VALIDATION_GROUP_BY, Some(Map(VALIDATION_GROUP_BY_COLUMNS -> "account_id")), - Some(ValidationItemRequests(List(ValidationItemRequest( - VALIDATION_COLUMN, Some(Map(VALIDATION_EQUAL -> "10")) - )))) - )))) + val dataSourceRequest = DataSourceRequest("csv-name", "task-1", validations = Some(ValidationRequest( + Some(List(ValidationItemRequest(VALIDATION_GROUP_BY, Some(Map(VALIDATION_GROUP_BY_COLUMNS -> "account_id")), + Some(ValidationItemRequests(List(ValidationItemRequest( + VALIDATION_COLUMN, Some(Map("aggCol" -> "amount", VALIDATION_EQUAL -> "10")) + )))) + )))))) + val dataSourceRequest1 = DataSourceRequest("csv-name", "task-1", validations = Some(ValidationRequest( + Some(List(ValidationItemRequest(VALIDATION_GROUP_BY, Some(Map(VALIDATION_GROUP_BY_COLUMNS -> "account_id")), + Some(ValidationItemRequests(List(ValidationItemRequest( + VALIDATION_COLUMN, Some(Map("aggType" -> "max", VALIDATION_EQUAL -> "10")) + )))) + )))))) + val dataSourceRequest2 = DataSourceRequest("csv-name", "task-1", validations = Some(ValidationRequest( + Some(List(ValidationItemRequest(VALIDATION_GROUP_BY, Some(Map(VALIDATION_GROUP_BY_COLUMNS -> "account_id")), + Some(ValidationItemRequests(List(ValidationItemRequest( + VALIDATION_COLUMN, Some(Map(VALIDATION_EQUAL -> "10")) + )))) + )))))) assertThrows[RuntimeException](UiMapper.validationMapping(dataSourceRequest)) assertThrows[RuntimeException](UiMapper.validationMapping(dataSourceRequest1)) assertThrows[RuntimeException](UiMapper.validationMapping(dataSourceRequest2)) @@ -611,13 +633,13 @@ class UiMapperTest extends AnyFunSuite { FileBuilder().name("task-2").schema(FieldBuilder().name("account_id"), FieldBuilder().name("date")) ) val dataSourceRequest = List( - DataSourceRequest("csv-name", "task-1", validations = Some(List(ValidationItemRequest(VALIDATION_UPSTREAM, Some(Map( + DataSourceRequest("csv-name", "task-1", validations = Some(ValidationRequest(Some(List(ValidationItemRequest(VALIDATION_UPSTREAM, Some(Map( VALIDATION_UPSTREAM_TASK_NAME -> "task-2", VALIDATION_UPSTREAM_JOIN_TYPE -> "outer", VALIDATION_UPSTREAM_JOIN_COLUMNS -> "account_id", )), - Some(ValidationItemRequests(List(ValidationItemRequest(VALIDATION_COLUMN, Some(Map(VALIDATION_COLUMN -> "year", VALIDATION_EQUAL -> "2020")))))) - )))) + Some(ValidationItemRequests(List(ValidationItemRequest(VALIDATION_COLUMN, Some(Map(VALIDATION_FIELD -> "year", VALIDATION_EQUAL -> "2020")))))) + )))))) ) val res = UiMapper.connectionsWithUpstreamValidationMapping(connections, dataSourceRequest) assertResult(2)(res.size) @@ -639,13 +661,13 @@ class UiMapperTest extends AnyFunSuite { FileBuilder().name("task-2").schema(FieldBuilder().name("account_id"), FieldBuilder().name("date")) ) val dataSourceRequest = List( - DataSourceRequest("csv-name", "task-1", validations = Some(List(ValidationItemRequest(VALIDATION_UPSTREAM, Some(Map( + DataSourceRequest("csv-name", "task-1", validations = Some(ValidationRequest(Some(List(ValidationItemRequest(VALIDATION_UPSTREAM, Some(Map( VALIDATION_UPSTREAM_TASK_NAME -> "task-2", VALIDATION_UPSTREAM_JOIN_TYPE -> "outer", VALIDATION_UPSTREAM_JOIN_EXPR -> "account_id == task-2_account_id", )), - Some(ValidationItemRequests(List(ValidationItemRequest(VALIDATION_COLUMN, Some(Map(VALIDATION_COLUMN -> "year", VALIDATION_EQUAL -> "2020")))))) - )))) + Some(ValidationItemRequests(List(ValidationItemRequest(VALIDATION_COLUMN, Some(Map(VALIDATION_FIELD -> "year", VALIDATION_EQUAL -> "2020")))))) + )))))) ) val res = UiMapper.connectionsWithUpstreamValidationMapping(connections, dataSourceRequest) assertResult(2)(res.size) @@ -667,12 +689,12 @@ class UiMapperTest extends AnyFunSuite { FileBuilder().name("task-2").schema(FieldBuilder().name("account_id"), FieldBuilder().name("date")) ) val dataSourceRequest = List( - DataSourceRequest("csv-name", "task-1", validations = Some(List(ValidationItemRequest(VALIDATION_UPSTREAM, Some(Map( + DataSourceRequest("csv-name", "task-1", validations = Some(ValidationRequest(Some(List(ValidationItemRequest(VALIDATION_UPSTREAM, Some(Map( VALIDATION_UPSTREAM_TASK_NAME -> "task-2", VALIDATION_UPSTREAM_JOIN_COLUMNS -> "account_id", )), - Some(ValidationItemRequests(List(ValidationItemRequest(VALIDATION_COLUMN, Some(Map(VALIDATION_COLUMN -> "year", VALIDATION_EQUAL -> "2020")))))) - )))) + Some(ValidationItemRequests(List(ValidationItemRequest(VALIDATION_COLUMN, Some(Map(VALIDATION_FIELD -> "year", VALIDATION_EQUAL -> "2020")))))) + )))))) ) val res = UiMapper.connectionsWithUpstreamValidationMapping(connections, dataSourceRequest) assertResult(2)(res.size) @@ -694,12 +716,12 @@ class UiMapperTest extends AnyFunSuite { FileBuilder().name("task-2").schema(FieldBuilder().name("account_id"), FieldBuilder().name("date")) ) val dataSourceRequest = List( - DataSourceRequest("csv-name", "task-1", validations = Some(List(ValidationItemRequest(VALIDATION_UPSTREAM, Some(Map( + DataSourceRequest("csv-name", "task-1", validations = Some(ValidationRequest(Some(List(ValidationItemRequest(VALIDATION_UPSTREAM, Some(Map( VALIDATION_UPSTREAM_TASK_NAME -> "task-2", VALIDATION_UPSTREAM_JOIN_EXPR -> "account_id == task-2_account_id", )), - Some(ValidationItemRequests(List(ValidationItemRequest(VALIDATION_COLUMN, Some(Map(VALIDATION_COLUMN -> "year", VALIDATION_EQUAL -> "2020")))))) - )))) + Some(ValidationItemRequests(List(ValidationItemRequest(VALIDATION_COLUMN, Some(Map(VALIDATION_FIELD -> "year", VALIDATION_EQUAL -> "2020")))))) + )))))) ) val res = UiMapper.connectionsWithUpstreamValidationMapping(connections, dataSourceRequest) assertResult(2)(res.size) diff --git a/app/src/test/scala/io/github/datacatering/datacaterer/core/util/SparkSuite.scala b/app/src/test/scala/io/github/datacatering/datacaterer/core/util/SparkSuite.scala index a6c002e4..c1bfd732 100644 --- a/app/src/test/scala/io/github/datacatering/datacaterer/core/util/SparkSuite.scala +++ b/app/src/test/scala/io/github/datacatering/datacaterer/core/util/SparkSuite.scala @@ -13,7 +13,7 @@ trait SparkSuite extends AnyFunSuite with BeforeAndAfterAll with BeforeAndAfterE .config("spark.sql.legacy.allowUntypedScalaUDF", "true") .config("spark.sql.shuffle.partitions", "2") .config("spark.ui.enabled", "false") -// .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //used for hudi + // .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //used for hudi .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.local.type", "hadoop") diff --git a/app/src/test/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessorTest.scala b/app/src/test/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessorTest.scala index 0f468d2c..c6fec206 100644 --- a/app/src/test/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessorTest.scala +++ b/app/src/test/scala/io/github/datacatering/datacaterer/core/validator/ValidationProcessorTest.scala @@ -1,6 +1,7 @@ package io.github.datacatering.datacaterer.core.validator -import io.github.datacatering.datacaterer.api.model.{FoldersConfig, ValidationConfig} +import io.github.datacatering.datacaterer.api.model.Constants.{FORMAT, ICEBERG, TABLE} +import io.github.datacatering.datacaterer.api.model.{DataSourceValidation, FoldersConfig, ValidationConfig, ValidationConfiguration} import io.github.datacatering.datacaterer.api.{PreFilterBuilder, ValidationBuilder} import io.github.datacatering.datacaterer.core.util.{SparkSuite, Transaction} import org.junit.runner.RunWith @@ -46,4 +47,27 @@ class ValidationProcessorTest extends SparkSuite { assertResult(2)(result.total) assert(result.sampleErrorValues.isEmpty) } + + test("Can read Iceberg data for validation") { + df.writeTo("tmp.transactions").using("iceberg").createOrReplace() + val connectionConfig = Map("test_iceberg" -> Map(FORMAT -> ICEBERG, TABLE -> "local.tmp.transactions")) + val validationConfig = ValidationConfiguration(dataSources = + Map("test_iceberg" -> + List(DataSourceValidation( + options = connectionConfig.head._2, + validations = List(ValidationBuilder().col("transaction_id").startsWith("txn")) + )) + ) + ) + val validationProcessor = new ValidationProcessor(connectionConfig, Some(List(validationConfig)), ValidationConfig(), FoldersConfig()) + val result = validationProcessor.executeValidations + + assertResult(1)(result.size) + assertResult(1)(result.head.dataSourceValidationResults.size) + assertResult(1)(result.head.dataSourceValidationResults.head.validationResults.size) + val resultValidation = result.head.dataSourceValidationResults.head.validationResults.head + assert(resultValidation.isSuccess) + assert(resultValidation.sampleErrorValues.isEmpty) + assertResult(4)(resultValidation.total) + } } diff --git a/gradle.properties b/gradle.properties index 35beb407..db552721 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ groupId=io.github.data-catering -version=0.10.5 +version=0.10.6 scalaVersion=2.12 scalaSpecificVersion=2.12.15 diff --git a/script/run-data-caterer.sh b/script/run-data-caterer.sh index 15907280..72b5d0f8 100644 --- a/script/run-data-caterer.sh +++ b/script/run-data-caterer.sh @@ -11,8 +11,7 @@ if [[ "$DEPLOY_MODE" == "standalone" ]] ; then echo "Running Data Caterer as a standalone application" CMD=( java - "$JAVA_OPTS" - "$JAVA_17_OPTS" + "$JAVA_OPTS $JAVA_17_OPTS" -cp "/opt/spark/jars/*:/opt/app/job.jar" io.github.datacatering.datacaterer.core.ui.DataCatererUI ) @@ -26,7 +25,7 @@ else --driver-memory "$DRIVER_MEMORY" --executor-memory "$EXECUTOR_MEMORY" "$ALL_OPTS" - file:///opt/app/job.jar + "file:///opt/app/job.jar" ) fi