Skip to content

Commit

Permalink
Merge pull request #29 from data-catering/iceberg-support
Browse files Browse the repository at this point in the history
Iceberg support
  • Loading branch information
pflooky authored May 30, 2024
2 parents b609199 + 4d521e3 commit 9694868
Showing 34 changed files with 715 additions and 331 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ build
tmp
*.dmg
*.exe
*.jfr

app/out
app/src/test/resources/sample/parquet
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
ARG SPARK_VERSION=3.5.1
FROM cloudnativek8s/spark:3.5.1-b1.0.18
FROM cloudnativek8s/spark:3.5.1-b1.0.19

USER root
RUN mkdir -p /opt/app /opt/DataCaterer/connection /opt/DataCaterer/plan /opt/DataCaterer/execution /opt/DataCaterer/report
64 changes: 33 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
@@ -47,7 +47,7 @@ and deep dive into issues [from the generated report](https://data.catering/samp
3. [Linux download](https://nightly.link/data-catering/data-caterer/workflows/build/main/data-caterer-linux.zip)
4. Docker
```shell
docker run -d -i -p 9898:9898 -e DEPLOY_MODE=standalone --name datacaterer datacatering/data-caterer:0.10.5
docker run -d -i -p 9898:9898 -e DEPLOY_MODE=standalone --name datacaterer datacatering/data-caterer-basic:0.10.6
```
[Open localhost:9898](http://localhost:9898).

@@ -66,36 +66,38 @@ cd data-caterer-example && ./run.sh
Data Caterer supports the below data sources. Additional data sources can be added on a demand basis. [Check here for
the full roadmap](#roadmap).

| Data Source Type | Data Source | Support | Free |
|------------------|---------------------|---------|------|
| Cloud Storage | AWS S3 |||
| Cloud Storage | GCP Cloud Storage |||
| Cloud Storage | Azure Blob Storage |||
| Database | Postgres |||
| Database | MySQL |||
| Database | Cassandra |||
| Database | MongoDB |||
| Database | Elasticsearch |||
| File | CSV |||
| File | JSON |||
| File | ORC |||
| File | Parquet |||
| File | Hudi |||
| File | Iceberg |||
| File | Delta Lake |||
| HTTP | REST API |||
| Messaging | Kafka |||
| Messaging | Solace |||
| Messaging | Pulsar |||
| Messaging | RabbitMQ |||
| Messaging | ActiveMQ |||
| Metadata | Marquez |||
| Metadata | OpenMetadata |||
| Metadata | OpenAPI/Swagger |||
| Metadata | Great Expectations |||
| Metadata | Amundsen |||
| Metadata | Datahub |||
| Metadata | Solace Event Portal |||
| Data Source Type | Data Source | Support | Free |
|------------------|------------------------------------|---------|------|
| Cloud Storage | AWS S3 |||
| Cloud Storage | Azure Blob Storage |||
| Cloud Storage | GCP Cloud Storage |||
| Database | Cassandra |||
| Database | MySQL |||
| Database | Postgres |||
| Database | Elasticsearch |||
| Database | MongoDB |||
| File | CSV |||
| File | JSON |||
| File | Iceberg |||
| File | ORC |||
| File | Parquet |||
| File | Delta Lake |||
| File | Hudi |||
| HTTP | REST API |||
| Messaging | Kafka |||
| Messaging | Solace |||
| Messaging | ActiveMQ |||
| Messaging | Pulsar |||
| Messaging | RabbitMQ |||
| Metadata | Great Expectations |||
| Metadata | Marquez |||
| Metadata | OpenAPI/Swagger |||
| Metadata | OpenMetadata |||
| Metadata | Amundsen |||
| Metadata | Datahub |||
| Metadata | Data Contract CLI |||
| Metadata | ODCS (Open Data Contract Standard) |||
| Metadata | Solace Event Portal |||


## Supported use cases
Original file line number Diff line number Diff line change
@@ -161,6 +161,21 @@ public FileBuilder parquet(String name, String path) {
return parquet(name, path, Collections.emptyMap());
}

public FileBuilder iceberg(String name, String path, String tableName) {
return basePlanRun.icebergJava(name, path, tableName);
}

public FileBuilder iceberg(
String name,
String path,
String tableName,
String catalogType,
String catalogUri,
Map<String, String> options
) {
return basePlanRun.iceberg(name, path, tableName, catalogType, catalogUri, toScalaMap(options));
}

public PostgresBuilder postgres(
String name,
String url,
Original file line number Diff line number Diff line change
@@ -86,11 +86,11 @@ case class DataCatererConfigurationBuilder(build: DataCatererConfiguration = Dat
def delta(name: String, path: String, options: java.util.Map[String, String]): DataCatererConfigurationBuilder =
delta(name, path, toScalaMap(options))

def iceberg(name: String, path: String = "", options: Map[String, String] = Map()): DataCatererConfigurationBuilder =
addConnectionConfig(name, ICEBERG, path, options)
def iceberg(name: String, path: String, tableName: String, options: Map[String, String] = Map()): DataCatererConfigurationBuilder =
addConnectionConfig(name, ICEBERG, path, options ++ Map(TABLE -> tableName))

def iceberg(name: String, path: String, options: java.util.Map[String, String]): DataCatererConfigurationBuilder =
iceberg(name, path, toScalaMap(options))
def iceberg(name: String, path: String, tableName: String, options: java.util.Map[String, String]): DataCatererConfigurationBuilder =
iceberg(name, path, tableName, toScalaMap(options))

def postgres(
name: String,
@@ -382,11 +382,15 @@ final case class ConnectionConfigWithTaskBuilder(
case HUDI =>
options.get(HUDI_TABLE_NAME) match {
case Some(value) => configBuilder.hudi(name, path, value, options)
case None => throw new IllegalArgumentException(s"Missing $HUDI_TABLE_NAME from options: $options")
case None => throw new IllegalArgumentException(s"Missing $HUDI_TABLE_NAME from options: $options, connection-name=$name")
}
case DELTA => configBuilder.delta(name, path, options)
case ICEBERG => configBuilder.iceberg(name, path, options)
case _ => throw new IllegalArgumentException(s"Unsupported file format: $format")
case ICEBERG =>
options.get(TABLE) match {
case Some(value) => configBuilder.iceberg(name, path, value, options)
case None => throw new IllegalArgumentException(s"Missing $TABLE from options: $options, connection-name=$name")
}
case _ => throw new IllegalArgumentException(s"Unsupported file format: $format, connection-name=$name")
}
setConnectionConfig(name, fileConnectionConfig, FileBuilder())
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.github.datacatering.datacaterer.api

import io.github.datacatering.datacaterer.api.converter.Converters.toScalaMap
import io.github.datacatering.datacaterer.api.model.Constants.{DATA_CONTRACT_FILE, EXPECTATIONS_FILE, METADATA_SOURCE_URL, OPEN_LINEAGE_DATASET, OPEN_LINEAGE_NAMESPACE, OPEN_METADATA_API_VERSION, OPEN_METADATA_AUTH_TYPE, OPEN_METADATA_AUTH_TYPE_OPEN_METADATA, OPEN_METADATA_DEFAULT_API_VERSION, OPEN_METADATA_HOST, OPEN_METADATA_JWT_TOKEN, SCHEMA_LOCATION}
import io.github.datacatering.datacaterer.api.model.Constants.{DATA_CONTRACT_FILE, GREAT_EXPECTATIONS_FILE, METADATA_SOURCE_URL, OPEN_LINEAGE_DATASET, OPEN_LINEAGE_NAMESPACE, OPEN_METADATA_API_VERSION, OPEN_METADATA_AUTH_TYPE, OPEN_METADATA_AUTH_TYPE_OPEN_METADATA, OPEN_METADATA_DEFAULT_API_VERSION, OPEN_METADATA_HOST, OPEN_METADATA_JWT_TOKEN, SCHEMA_LOCATION}
import com.softwaremill.quicklens.ModifyPimp
import io.github.datacatering.datacaterer.api.model.{GreatExpectationsSource, MarquezMetadataSource, MetadataSource, OpenAPISource, OpenDataContractStandardSource, OpenMetadataSource}

@@ -73,7 +73,7 @@ case class MetadataSourceBuilder(metadataSource: MetadataSource = MarquezMetadat
}

def greatExpectations(expectationsFile: String): MetadataSourceBuilder = {
this.modify(_.metadataSource).setTo(GreatExpectationsSource(Map(EXPECTATIONS_FILE -> expectationsFile)))
this.modify(_.metadataSource).setTo(GreatExpectationsSource(Map(GREAT_EXPECTATIONS_FILE -> expectationsFile)))
}

def openDataContractStandard(dataContractFile: String): MetadataSourceBuilder = {
Original file line number Diff line number Diff line change
@@ -104,10 +104,10 @@ trait PlanRun {
/**
* Create new HUDI generation step with configurations
*
* @param name Data source name
* @param path File path to generated HUDI
* @param name Data source name
* @param path File path to generated HUDI
* @param tableName Table name to be used for HUDI generation
* @param options Additional options for HUDI generation
* @param options Additional options for HUDI generation
* @return FileBuilder
*/
def hudi(name: String, path: String, tableName: String, options: Map[String, String] = Map()): FileBuilder =
@@ -127,13 +127,40 @@ trait PlanRun {
/**
* Create new ICEBERG generation step with configurations
*
* @param name Data source name
* @param path File path to generated ICEBERG
* @param options Additional options for ICEBERG generation
* @param name Data source name
* @param tableName Table name for generated ICEBERG
* @param path Warehouse path to generated ICEBERG
* @param catalogType Type of catalog for generated ICEBERG
* @param catalogUri Uri of catalog for generated ICEBERG
* @param options Additional options for ICEBERG generation
* @return FileBuilder
*/
def iceberg(
name: String,
tableName: String,
path: String = "",
catalogType: String = DEFAULT_ICEBERG_CATALOG_TYPE,
catalogUri: String = "",
options: Map[String, String] = Map()
): FileBuilder = {
ConnectionConfigWithTaskBuilder().file(name, ICEBERG, path, options ++ Map(
TABLE -> tableName,
SPARK_ICEBERG_CATALOG_WAREHOUSE -> path,
SPARK_ICEBERG_CATALOG_TYPE -> catalogType,
SPARK_ICEBERG_CATALOG_URI -> catalogUri,
))
}

/**
* Create new ICEBERG generation step with only warehouse path and table name. Uses hadoop as the catalog type.
*
* @param name Data source name
* @param path Warehouse path to generated ICEBERG
* @param tableName Table name for generated ICEBERG
* @return FileBuilder
*/
def iceberg(name: String, path: String, options: Map[String, String] = Map()): FileBuilder =
ConnectionConfigWithTaskBuilder().file(name, ICEBERG, path, options)
def icebergJava(name: String, path: String, tableName: String): FileBuilder =
iceberg(name, path, tableName)

/**
* Create new POSTGRES generation step with connection configuration
Original file line number Diff line number Diff line change
@@ -47,16 +47,19 @@ object Constants {
lazy val PARTITIONS = "partitions"
lazy val PARTITION_BY = "partitionBy"
lazy val BODY_FIELD = "bodyField"
lazy val JMS_DESTINATION_NAME = "destinationName"
lazy val KAFKA_TOPIC = "topic"
lazy val JMS_DESTINATION_NAME = "destinationName"
lazy val JMS_INITIAL_CONTEXT_FACTORY = "initialContextFactory"
lazy val JMS_CONNECTION_FACTORY = "connectionFactory"
lazy val JMS_VPN_NAME = "vpnName"
lazy val SCHEMA_LOCATION = "schemaLocation"
lazy val EXPECTATIONS_FILE = "expectationsFile"
lazy val GREAT_EXPECTATIONS_FILE = "expectationsFile"
lazy val DATA_CONTRACT_FILE = "dataContractFile"
lazy val ROWS_PER_SECOND = "rowsPerSecond"
lazy val HUDI_TABLE_NAME = "hoodie.table.name"
lazy val ICEBERG_CATALOG_TYPE = "catalogType"
lazy val ICEBERG_CATALOG_URI = "catalogUri"
lazy val ICEBERG_CATALOG_DEFAULT_NAMESPACE = "catalogDefaultNamespace"

//field metadata
lazy val FIELD_DATA_TYPE = "type"
@@ -207,6 +210,8 @@ object Constants {
"spark.sql.shuffle.partitions" -> "10",
"spark.sql.catalog.postgres" -> "",
"spark.sql.catalog.cassandra" -> "com.datastax.spark.connector.datasource.CassandraCatalog",
"spark.sql.catalog.iceberg" -> "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.iceberg.type" -> "hadoop",
// "spark.serializer" -> "org.apache.spark.serializer.KryoSerializer",
// "spark.sql.catalog.hudi" -> "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
// "spark.kryo.registrator" -> "org.apache.spark.HoodieSparkKryoRegistrar",
@@ -215,6 +220,7 @@ object Constants {
"spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled" -> "true",
"spark.hadoop.fs.hdfs.impl" -> "org.apache.hadoop.hdfs.DistributedFileSystem",
"spark.hadoop.fs.file.impl" -> "com.globalmentor.apache.hadoop.fs.BareLocalFileSystem",
"spark.sql.extensions" -> "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
)

//jdbc defaults
@@ -248,6 +254,9 @@ object Constants {
lazy val HTTP_QUERY_PARAMETER = "query"
lazy val HTTP_HEADER_PARAMETER = "header"

//iceberg defaults
lazy val DEFAULT_ICEBERG_CATALOG_TYPE = ICEBERG_CATALOG_HADOOP

//foreign key defaults
lazy val DEFAULT_FOREIGN_KEY_COLUMN = "default_column"
lazy val FOREIGN_KEY_DELIMITER = "||"
@@ -358,6 +367,18 @@ object Constants {
lazy val OPEN_METADATA_TABLE_FQN = "tableFqn"
lazy val OPEN_METADATA_SERVICE = "service"

//iceberg
lazy val SPARK_ICEBERG_CATALOG_TYPE = "spark.sql.catalog.iceberg.type"
lazy val SPARK_ICEBERG_CATALOG_WAREHOUSE = "spark.sql.catalog.iceberg.warehouse"
lazy val SPARK_ICEBERG_CATALOG_URI = "spark.sql.catalog.iceberg.uri"
lazy val SPARK_ICEBERG_CATALOG_DEFAULT_NAMESPACE = "spark.sql.catalog.iceberg.default-namespace"
lazy val ICEBERG_CATALOG_HIVE = "hive"
lazy val ICEBERG_CATALOG_HADOOP = "hadoop"
lazy val ICEBERG_CATALOG_REST = "rest"
lazy val ICEBERG_CATALOG_GLUE = "glue"
lazy val ICEBERG_CATALOG_JDBC = "jdbc"
lazy val ICEBERG_CATALOG_NESSIE = "nessie"

//aggregation types
lazy val AGGREGATION_SUM = "sum"
lazy val AGGREGATION_COUNT = "count"
@@ -368,7 +389,9 @@ object Constants {

//validation types
lazy val VALIDATION_COLUMN = "column"
lazy val VALIDATION_FIELD = "field"
lazy val VALIDATION_COLUMN_NAMES = "columnNames"
lazy val VALIDATION_FIELD_NAMES = "fieldNames"
lazy val VALIDATION_UPSTREAM = "upstream"
lazy val VALIDATION_GROUP_BY = "groupBy"
//validation support
@@ -427,7 +450,7 @@ object Constants {
lazy val VALIDATION_COLUMN_NAMES_MATCH_SET = "matchSet"

lazy val VALIDATION_OPTION_DELIMITER = ","
lazy val VALIDATION_SUPPORTING_OPTIONS = List(VALIDATION_COLUMN, VALIDATION_MIN, VALIDATION_MAX, VALIDATION_GROUP_BY_COLUMNS, VALIDATION_DESCRIPTION, VALIDATION_ERROR_THRESHOLD)
lazy val VALIDATION_SUPPORTING_OPTIONS = List(VALIDATION_COLUMN, VALIDATION_FIELD, VALIDATION_MIN, VALIDATION_MAX, VALIDATION_GROUP_BY_COLUMNS, VALIDATION_DESCRIPTION, VALIDATION_ERROR_THRESHOLD)

lazy val VALIDATION_PREFIX_JOIN_EXPRESSION = "expr:"
lazy val VALIDATION_COLUMN_NAME_COUNT_EQUAL = "column_count_equal"
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ class DataCatererConfigurationBuilderTest extends AnyFunSuite {
assert(result.metadataConfig == MetadataConfig())
assert(result.generationConfig == GenerationConfig())
assert(result.connectionConfigByName.isEmpty)
assert(result.runtimeConfig.size == 13)
assert(result.runtimeConfig.size == 16)
assert(result.master == "local[*]")
}

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.github.datacatering.datacaterer.api

import io.github.datacatering.datacaterer.api.model.Constants.{DATA_CONTRACT_FILE, EXPECTATIONS_FILE, METADATA_SOURCE_URL, OPEN_LINEAGE_DATASET, OPEN_LINEAGE_NAMESPACE, OPEN_METADATA_API_VERSION, OPEN_METADATA_AUTH_TYPE, OPEN_METADATA_AUTH_TYPE_BASIC, OPEN_METADATA_AUTH_TYPE_OPEN_METADATA, OPEN_METADATA_BASIC_AUTH_PASSWORD, OPEN_METADATA_BASIC_AUTH_USERNAME, OPEN_METADATA_DEFAULT_API_VERSION, OPEN_METADATA_HOST, OPEN_METADATA_JWT_TOKEN, SCHEMA_LOCATION}
import io.github.datacatering.datacaterer.api.model.Constants.{DATA_CONTRACT_FILE, GREAT_EXPECTATIONS_FILE, METADATA_SOURCE_URL, OPEN_LINEAGE_DATASET, OPEN_LINEAGE_NAMESPACE, OPEN_METADATA_API_VERSION, OPEN_METADATA_AUTH_TYPE, OPEN_METADATA_AUTH_TYPE_BASIC, OPEN_METADATA_AUTH_TYPE_OPEN_METADATA, OPEN_METADATA_BASIC_AUTH_PASSWORD, OPEN_METADATA_BASIC_AUTH_USERNAME, OPEN_METADATA_DEFAULT_API_VERSION, OPEN_METADATA_HOST, OPEN_METADATA_JWT_TOKEN, SCHEMA_LOCATION}
import io.github.datacatering.datacaterer.api.model.{GreatExpectationsSource, MarquezMetadataSource, OpenAPISource, OpenDataContractStandardSource, OpenMetadataSource}
import org.junit.runner.RunWith
import org.scalatest.funsuite.AnyFunSuite
@@ -56,7 +56,7 @@ class MetadataSourceBuilderTest extends AnyFunSuite {
val result = MetadataSourceBuilder().greatExpectations("/tmp/expectations").metadataSource

assert(result.isInstanceOf[GreatExpectationsSource])
assert(result.asInstanceOf[GreatExpectationsSource].connectionOptions == Map(EXPECTATIONS_FILE -> "/tmp/expectations"))
assert(result.asInstanceOf[GreatExpectationsSource].connectionOptions == Map(GREAT_EXPECTATIONS_FILE -> "/tmp/expectations"))
}

test("Can create Open Data Contract Standard metadata source") {
2 changes: 1 addition & 1 deletion app/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -120,7 +120,7 @@ dependencies {
// exclude(group = "org.scala-lang")
// }
// iceberg
basicImpl("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:1.4.3") {
basicImpl("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:1.5.2") {
exclude(group = "org.scala-lang")
}
// delta lake
3 changes: 3 additions & 0 deletions app/src/main/resources/log4j2.properties
Original file line number Diff line number Diff line change
@@ -64,3 +64,6 @@ logger.kafka.level=warn
# Hudi logging
logger.hudi.name=org.apache.hudi
logger.hudi.level=error
# Iceberg logging
logger.iceberg.name=org.apache.iceberg
logger.iceberg.level=error
Loading

0 comments on commit 9694868

Please sign in to comment.