Skip to content

Commit

Permalink
Merge pull request #84 from data-catering/confluent-schema-registry
Browse files Browse the repository at this point in the history
Major Refactor
  • Loading branch information
pflooky authored Dec 25, 2024
2 parents 827305d + 09b044c commit 0e642c2
Show file tree
Hide file tree
Showing 182 changed files with 5,602 additions and 5,129 deletions.
32 changes: 16 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer") //na

```scala
postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.schema(field.name("account_id").regex("ACC[0-9]{10}").unique(true))
.fields(field.name("account_id").regex("ACC[0-9]{10}").unique(true))
```

##### [I then want to test my job ingests all the data after generating](https://github.com/data-catering/data-caterer-example/blob/b0f03fb26f185ec8613241205b998aef1d5f5a01/src/main/scala/io/github/datacatering/plan/ValidationPlanRun.scala)

```scala
val postgresTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.schema(field.name("account_id").regex("ACC[0-9]{10}").unique(true))
.fields(field.name("account_id").regex("ACC[0-9]{10}").unique(true))

val parquetValidation = parquet("output_parquet", "/data/parquet/customer")
.validation(validation.count.isEqual(1000))
Expand All @@ -154,12 +154,12 @@ val parquetValidation = parquet("output_parquet", "/data/parquet/customer")

```scala
val postgresTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.schema(field.name("account_id").regex("ACC[0-9]{10}").unique(true))
.fields(field.name("account_id").regex("ACC[0-9]{10}").unique(true))

val parquetValidation = parquet("output_parquet", "/data/parquet/customer")
.validation(
validation.upstreamData(postgresTask)
.joinColumns("account_id")
.joinFields("account_id")
.withValidation(validation.count().isEqual(1000))
)
```
Expand All @@ -168,12 +168,12 @@ val parquetValidation = parquet("output_parquet", "/data/parquet/customer")

```scala
val postgresTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.schema(field.name("account_id").regex("ACC[0-9]{10}").unique(true))
.fields(field.name("account_id").regex("ACC[0-9]{10}").unique(true))

val parquetValidation = parquet("output_parquet", "/data/parquet/customer")
.validation(
validation.upstreamData(postgresTask)
.joinColumns("account_id")
.joinFields("account_id")
.withValidation(validation.count().isEqual(1000))
)
.validationWait(waitCondition.file("/data/parquet/customer"))
Expand All @@ -186,18 +186,18 @@ val parquetValidation = parquet("output_parquet", "/data/parquet/customer")
```scala
kafka("my_kafka", "localhost:29092")
.topic("account-topic")
.schema(...)
.fields(...)
```

##### [But I want the same `account_id` to show in Postgres and Kafka](https://github.com/data-catering/data-caterer-example/blob/b0f03fb26f185ec8613241205b998aef1d5f5a01/src/main/scala/io/github/datacatering/plan/AdvancedBatchEventPlanRun.scala)

```scala
val postgresTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.schema(field.name("account_id").regex("ACC[0-9]{10}"))
.fields(field.name("account_id").regex("ACC[0-9]{10}"))

val kafkaTask = kafka("my_kafka", "localhost:29092")
.topic("account-topic")
.schema(...)
.fields(...)

plan.addForeignKeyRelationship(
postgresTask, List("account_id"),
Expand All @@ -212,23 +212,23 @@ plan.addForeignKeyRelationship(
```scala
postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.table("account", "transactions")
.count(count.recordsPerColumn(5, "account_id"))
.count(count.recordsPerField(5, "account_id"))
```

##### [Randomly generate 1 to 5 transactions per `account_id`](https://github.com/data-catering/data-caterer-example/blob/b0f03fb26f185ec8613241205b998aef1d5f5a01/src/main/scala/io/github/datacatering/plan/MultipleRecordsPerColPlan.scala)

```scala
postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.table("account", "transactions")
.count(count.recordsPerColumnGenerator(generator.min(1).max(5), "account_id"))
.count(count.recordsPerFieldGenerator(generator.min(1).max(5), "account_id"))
```

##### [I want to delete the generated data](https://github.com/data-catering/data-caterer-example/blob/b0f03fb26f185ec8613241205b998aef1d5f5a01/src/main/scala/io/github/datacatering/plan/AdvancedDeletePlanRun.scala)

```scala
val postgresTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.table("account", "transactions")
.count(count.recordsPerColumnGenerator(generator.min(0).max(5), "account_id"))
.count(count.recordsPerFieldGenerator(generator.min(0).max(5), "account_id"))

val conf = configuration
.enableDeleteGeneratedRecords(true)
Expand All @@ -240,7 +240,7 @@ val conf = configuration
```scala
val postgresTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.table("account", "transactions")
.count(count.recordsPerColumnGenerator(generator.min(0).max(5), "account_id"))
.count(count.recordsPerFieldGenerator(generator.min(0).max(5), "account_id"))

val cassandraTxns = cassandra("ingested_data", "localhost:9042")
.table("account", "transactions")
Expand All @@ -260,7 +260,7 @@ val conf = configuration

```scala
val postgresTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.count(count.recordsPerColumnGenerator(generator.min(0).max(5), "account_id"))
.count(count.recordsPerFieldGenerator(generator.min(0).max(5), "account_id"))

val cassandraTxns = cassandra("ingested_data", "localhost:9042")
.table("account", "transactions")
Expand All @@ -282,14 +282,14 @@ val conf = configuration

```scala
parquet("customer_parquet", "/data/parquet/customer")
.schema(metadataSource.openDataContractStandard("/data/odcs/full-example.odcs.yaml"))
.fields(metadataSource.openDataContractStandard("/data/odcs/full-example.odcs.yaml"))
```

##### [I have an OpenAPI/Swagger doc](https://github.com/data-catering/data-caterer-example/blob/b0f03fb26f185ec8613241205b998aef1d5f5a01/src/main/scala/io/github/datacatering/plan/AdvancedHttpPlanRun.scala)

```scala
http("my_http")
.schema(metadataSource.openApi("/data/http/petstore.json"))
.fields(metadataSource.openApi("/data/http/petstore.json"))
```

#### Validate data using validations from metadata source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@


import io.github.datacatering.datacaterer.api.BasePlanRun;
import io.github.datacatering.datacaterer.api.ColumnValidationBuilder;
import io.github.datacatering.datacaterer.api.CombinationPreFilterBuilder;
import io.github.datacatering.datacaterer.api.CountBuilder;
import io.github.datacatering.datacaterer.api.DataCatererConfigurationBuilder;
import io.github.datacatering.datacaterer.api.DataSourceValidationBuilder;
import io.github.datacatering.datacaterer.api.FieldBuilder;
import io.github.datacatering.datacaterer.api.FieldValidationBuilder;
import io.github.datacatering.datacaterer.api.GeneratorBuilder;
import io.github.datacatering.datacaterer.api.MetadataSourceBuilder;
import io.github.datacatering.datacaterer.api.PlanBuilder;
import io.github.datacatering.datacaterer.api.PreFilterBuilder;
import io.github.datacatering.datacaterer.api.SchemaBuilder;
import io.github.datacatering.datacaterer.api.StepBuilder;
import io.github.datacatering.datacaterer.api.TaskBuilder;
import io.github.datacatering.datacaterer.api.TaskSummaryBuilder;
Expand Down Expand Up @@ -98,15 +97,6 @@ public StepBuilder step() {
return new StepBuilder();
}

/**
* Creates a SchemaBuilder instance.
*
* @return A SchemaBuilder instance.
*/
public SchemaBuilder schema() {
return new SchemaBuilder();
}

/**
* Creates a FieldBuilder instance.
*
Expand Down Expand Up @@ -172,13 +162,13 @@ public CombinationPreFilterBuilder preFilterBuilder(ValidationBuilder validation
}

/**
* Creates a ColumnValidationBuilder instance for the specified column.
* Creates a FieldValidationBuilder instance for the specified field.
*
* @param column The name of the column.
* @return A ColumnValidationBuilder instance for the specified column.
* @param field The name of the field.
* @return A FieldValidationBuilder instance for the specified field.
*/
public ColumnValidationBuilder columnPreFilter(String column) {
return new ValidationBuilder().col(column);
public FieldValidationBuilder fieldPreFilter(String field) {
return new ValidationBuilder().field(field);
}

/**
Expand Down Expand Up @@ -209,39 +199,39 @@ public MetadataSourceBuilder metadataSource() {
}

/**
* Creates a ForeignKeyRelation instance with the provided data source, step, and column.
* Creates a ForeignKeyRelation instance with the provided data source, step, and field.
*
* @param dataSource The name of the data source.
* @param step The step associated with the ForeignKeyRelation.
* @param column The column for the ForeignKeyRelation.
* @param field The field for the ForeignKeyRelation.
* @return A ForeignKeyRelation instance.
*/
public ForeignKeyRelation foreignField(String dataSource, String step, String column) {
return new ForeignKeyRelation(dataSource, step, column);
public ForeignKeyRelation foreignField(String dataSource, String step, String field) {
return new ForeignKeyRelation(dataSource, step, field);
}

/**
* Creates a ForeignKeyRelation instance with the provided data source, step, and columns.
* Creates a ForeignKeyRelation instance with the provided data source, step, and fields.
*
* @param dataSource The name of the data source.
* @param step The step associated with the ForeignKeyRelation.
* @param columns The list of columns for the ForeignKeyRelation.
* @param fields The list of fields for the ForeignKeyRelation.
* @return A ForeignKeyRelation instance.
*/
public ForeignKeyRelation foreignField(String dataSource, String step, List<String> columns) {
return new ForeignKeyRelation(dataSource, step, toScalaList(columns));
public ForeignKeyRelation foreignField(String dataSource, String step, List<String> fields) {
return new ForeignKeyRelation(dataSource, step, toScalaList(fields));
}

/**
* Creates a ForeignKeyRelation instance with the provided ConnectionTaskBuilder, step, and columns.
* Creates a ForeignKeyRelation instance with the provided ConnectionTaskBuilder, step, and fields.
*
* @param connectionTaskBuilder The ConnectionTaskBuilder instance representing the task.
* @param step The step associated with the ForeignKeyRelation.
* @param columns The list of columns for the ForeignKeyRelation.
* @param fields The list of fields for the ForeignKeyRelation.
* @return A ForeignKeyRelation instance.
*/
public ForeignKeyRelation foreignField(ConnectionTaskBuilder<?> connectionTaskBuilder, String step, List<String> columns) {
return new ForeignKeyRelation(connectionTaskBuilder.connectionConfigWithTaskBuilder().dataSourceName(), step, toScalaList(columns));
public ForeignKeyRelation foreignField(ConnectionTaskBuilder<?> connectionTaskBuilder, String step, List<String> fields) {
return new ForeignKeyRelation(connectionTaskBuilder.connectionConfigWithTaskBuilder().dataSourceName(), step, toScalaList(fields));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.github.datacatering.datacaterer.api

import io.github.datacatering.datacaterer.api.converter.Converters.toScalaMap
import io.github.datacatering.datacaterer.api.model.Constants.{DATA_CONTRACT_FILE, DATA_CONTRACT_SCHEMA, GREAT_EXPECTATIONS_FILE, METADATA_SOURCE_URL, OPEN_LINEAGE_DATASET, OPEN_LINEAGE_NAMESPACE, OPEN_METADATA_API_VERSION, OPEN_METADATA_AUTH_TYPE, OPEN_METADATA_AUTH_TYPE_OPEN_METADATA, OPEN_METADATA_DEFAULT_API_VERSION, OPEN_METADATA_HOST, OPEN_METADATA_JWT_TOKEN, SCHEMA_LOCATION}
import io.github.datacatering.datacaterer.api.model.Constants.{CONFLUENT_SCHEMA_REGISTRY_ID, CONFLUENT_SCHEMA_REGISTRY_SUBJECT, CONFLUENT_SCHEMA_REGISTRY_VERSION, DATA_CONTRACT_FILE, DATA_CONTRACT_SCHEMA, GREAT_EXPECTATIONS_FILE, METADATA_SOURCE_URL, OPEN_LINEAGE_DATASET, OPEN_LINEAGE_NAMESPACE, OPEN_METADATA_API_VERSION, OPEN_METADATA_AUTH_TYPE, OPEN_METADATA_AUTH_TYPE_OPEN_METADATA, OPEN_METADATA_DEFAULT_API_VERSION, OPEN_METADATA_HOST, OPEN_METADATA_JWT_TOKEN, SCHEMA_LOCATION}
import com.softwaremill.quicklens.ModifyPimp
import io.github.datacatering.datacaterer.api.model.{DataContractCliSource, GreatExpectationsSource, MarquezMetadataSource, MetadataSource, OpenAPISource, OpenDataContractStandardSource, OpenMetadataSource}
import io.github.datacatering.datacaterer.api.model.{ConfluentSchemaRegistrySource, DataContractCliSource, GreatExpectationsSource, MarquezMetadataSource, MetadataSource, OpenAPISource, OpenDataContractStandardSource, OpenMetadataSource}

case class MetadataSourceBuilder(metadataSource: MetadataSource = MarquezMetadataSource()) {
def this() = this(MarquezMetadataSource())
Expand Down Expand Up @@ -105,4 +105,26 @@ case class MetadataSourceBuilder(metadataSource: MetadataSource = MarquezMetadat
DATA_CONTRACT_SCHEMA -> modelNames.mkString(",")
)))
}

def confluentSchemaRegistry(url: String, schemaId: Int): MetadataSourceBuilder = {
this.modify(_.metadataSource).setTo(ConfluentSchemaRegistrySource(Map(
METADATA_SOURCE_URL -> url,
CONFLUENT_SCHEMA_REGISTRY_ID -> schemaId.toString
)))
}

def confluentSchemaRegistry(url: String, schemaSubject: String): MetadataSourceBuilder = {
this.modify(_.metadataSource).setTo(ConfluentSchemaRegistrySource(Map(
METADATA_SOURCE_URL -> url,
CONFLUENT_SCHEMA_REGISTRY_SUBJECT -> schemaSubject
)))
}

def confluentSchemaRegistry(url: String, schemaSubject: String, version: Int): MetadataSourceBuilder = {
this.modify(_.metadataSource).setTo(ConfluentSchemaRegistrySource(Map(
METADATA_SOURCE_URL -> url,
CONFLUENT_SCHEMA_REGISTRY_SUBJECT -> schemaSubject,
CONFLUENT_SCHEMA_REGISTRY_VERSION -> version.toString,
)))
}
}
Loading

0 comments on commit 0e642c2

Please sign in to comment.