Skip to content

Commit

Permalink
Major refactor of plan attribute, add new types of validations, refac…
Browse files Browse the repository at this point in the history
…tor usage of HTTP APIs, remove spray-json
  • Loading branch information
pflooky committed Dec 25, 2024
1 parent f71a43a commit 09b044c
Show file tree
Hide file tree
Showing 181 changed files with 5,122 additions and 5,165 deletions.
32 changes: 16 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer") //na

```scala
postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.schema(field.name("account_id").regex("ACC[0-9]{10}").unique(true))
.fields(field.name("account_id").regex("ACC[0-9]{10}").unique(true))
```

##### [I then want to test my job ingests all the data after generating](https://github.com/data-catering/data-caterer-example/blob/b0f03fb26f185ec8613241205b998aef1d5f5a01/src/main/scala/io/github/datacatering/plan/ValidationPlanRun.scala)

```scala
val postgresTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.schema(field.name("account_id").regex("ACC[0-9]{10}").unique(true))
.fields(field.name("account_id").regex("ACC[0-9]{10}").unique(true))

val parquetValidation = parquet("output_parquet", "/data/parquet/customer")
.validation(validation.count.isEqual(1000))
Expand All @@ -154,12 +154,12 @@ val parquetValidation = parquet("output_parquet", "/data/parquet/customer")

```scala
val postgresTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.schema(field.name("account_id").regex("ACC[0-9]{10}").unique(true))
.fields(field.name("account_id").regex("ACC[0-9]{10}").unique(true))

val parquetValidation = parquet("output_parquet", "/data/parquet/customer")
.validation(
validation.upstreamData(postgresTask)
.joinColumns("account_id")
.joinFields("account_id")
.withValidation(validation.count().isEqual(1000))
)
```
Expand All @@ -168,12 +168,12 @@ val parquetValidation = parquet("output_parquet", "/data/parquet/customer")

```scala
val postgresTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.schema(field.name("account_id").regex("ACC[0-9]{10}").unique(true))
.fields(field.name("account_id").regex("ACC[0-9]{10}").unique(true))

val parquetValidation = parquet("output_parquet", "/data/parquet/customer")
.validation(
validation.upstreamData(postgresTask)
.joinColumns("account_id")
.joinFields("account_id")
.withValidation(validation.count().isEqual(1000))
)
.validationWait(waitCondition.file("/data/parquet/customer"))
Expand All @@ -186,18 +186,18 @@ val parquetValidation = parquet("output_parquet", "/data/parquet/customer")
```scala
kafka("my_kafka", "localhost:29092")
.topic("account-topic")
.schema(...)
.fields(...)
```

##### [But I want the same `account_id` to show in Postgres and Kafka](https://github.com/data-catering/data-caterer-example/blob/b0f03fb26f185ec8613241205b998aef1d5f5a01/src/main/scala/io/github/datacatering/plan/AdvancedBatchEventPlanRun.scala)

```scala
val postgresTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.schema(field.name("account_id").regex("ACC[0-9]{10}"))
.fields(field.name("account_id").regex("ACC[0-9]{10}"))

val kafkaTask = kafka("my_kafka", "localhost:29092")
.topic("account-topic")
.schema(...)
.fields(...)

plan.addForeignKeyRelationship(
postgresTask, List("account_id"),
Expand All @@ -212,23 +212,23 @@ plan.addForeignKeyRelationship(
```scala
postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.table("account", "transactions")
.count(count.recordsPerColumn(5, "account_id"))
.count(count.recordsPerField(5, "account_id"))
```

##### [Randomly generate 1 to 5 transactions per `account_id`](https://github.com/data-catering/data-caterer-example/blob/b0f03fb26f185ec8613241205b998aef1d5f5a01/src/main/scala/io/github/datacatering/plan/MultipleRecordsPerColPlan.scala)

```scala
postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.table("account", "transactions")
.count(count.recordsPerColumnGenerator(generator.min(1).max(5), "account_id"))
.count(count.recordsPerFieldGenerator(generator.min(1).max(5), "account_id"))
```

##### [I want to delete the generated data](https://github.com/data-catering/data-caterer-example/blob/b0f03fb26f185ec8613241205b998aef1d5f5a01/src/main/scala/io/github/datacatering/plan/AdvancedDeletePlanRun.scala)

```scala
val postgresTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.table("account", "transactions")
.count(count.recordsPerColumnGenerator(generator.min(0).max(5), "account_id"))
.count(count.recordsPerFieldGenerator(generator.min(0).max(5), "account_id"))

val conf = configuration
.enableDeleteGeneratedRecords(true)
Expand All @@ -240,7 +240,7 @@ val conf = configuration
```scala
val postgresTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.table("account", "transactions")
.count(count.recordsPerColumnGenerator(generator.min(0).max(5), "account_id"))
.count(count.recordsPerFieldGenerator(generator.min(0).max(5), "account_id"))

val cassandraTxns = cassandra("ingested_data", "localhost:9042")
.table("account", "transactions")
Expand All @@ -260,7 +260,7 @@ val conf = configuration

```scala
val postgresTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.count(count.recordsPerColumnGenerator(generator.min(0).max(5), "account_id"))
.count(count.recordsPerFieldGenerator(generator.min(0).max(5), "account_id"))

val cassandraTxns = cassandra("ingested_data", "localhost:9042")
.table("account", "transactions")
Expand All @@ -282,14 +282,14 @@ val conf = configuration

```scala
parquet("customer_parquet", "/data/parquet/customer")
.schema(metadataSource.openDataContractStandard("/data/odcs/full-example.odcs.yaml"))
.fields(metadataSource.openDataContractStandard("/data/odcs/full-example.odcs.yaml"))
```

##### [I have an OpenAPI/Swagger doc](https://github.com/data-catering/data-caterer-example/blob/b0f03fb26f185ec8613241205b998aef1d5f5a01/src/main/scala/io/github/datacatering/plan/AdvancedHttpPlanRun.scala)

```scala
http("my_http")
.schema(metadataSource.openApi("/data/http/petstore.json"))
.fields(metadataSource.openApi("/data/http/petstore.json"))
```

#### Validate data using validations from metadata source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@


import io.github.datacatering.datacaterer.api.BasePlanRun;
import io.github.datacatering.datacaterer.api.ColumnValidationBuilder;
import io.github.datacatering.datacaterer.api.CombinationPreFilterBuilder;
import io.github.datacatering.datacaterer.api.CountBuilder;
import io.github.datacatering.datacaterer.api.DataCatererConfigurationBuilder;
import io.github.datacatering.datacaterer.api.DataSourceValidationBuilder;
import io.github.datacatering.datacaterer.api.FieldBuilder;
import io.github.datacatering.datacaterer.api.FieldValidationBuilder;
import io.github.datacatering.datacaterer.api.GeneratorBuilder;
import io.github.datacatering.datacaterer.api.MetadataSourceBuilder;
import io.github.datacatering.datacaterer.api.PlanBuilder;
import io.github.datacatering.datacaterer.api.PreFilterBuilder;
import io.github.datacatering.datacaterer.api.SchemaBuilder;
import io.github.datacatering.datacaterer.api.StepBuilder;
import io.github.datacatering.datacaterer.api.TaskBuilder;
import io.github.datacatering.datacaterer.api.TaskSummaryBuilder;
Expand Down Expand Up @@ -98,15 +97,6 @@ public StepBuilder step() {
return new StepBuilder();
}

/**
* Creates a SchemaBuilder instance.
*
* @return A SchemaBuilder instance.
*/
public SchemaBuilder schema() {
return new SchemaBuilder();
}

/**
* Creates a FieldBuilder instance.
*
Expand Down Expand Up @@ -172,13 +162,13 @@ public CombinationPreFilterBuilder preFilterBuilder(ValidationBuilder validation
}

/**
* Creates a ColumnValidationBuilder instance for the specified column.
* Creates a FieldValidationBuilder instance for the specified field.
*
* @param column The name of the column.
* @return A ColumnValidationBuilder instance for the specified column.
* @param field The name of the field.
* @return A FieldValidationBuilder instance for the specified field.
*/
public ColumnValidationBuilder columnPreFilter(String column) {
return new ValidationBuilder().col(column);
public FieldValidationBuilder fieldPreFilter(String field) {
return new ValidationBuilder().field(field);
}

/**
Expand Down Expand Up @@ -209,39 +199,39 @@ public MetadataSourceBuilder metadataSource() {
}

/**
* Creates a ForeignKeyRelation instance with the provided data source, step, and column.
* Creates a ForeignKeyRelation instance with the provided data source, step, and field.
*
* @param dataSource The name of the data source.
* @param step The step associated with the ForeignKeyRelation.
* @param column The column for the ForeignKeyRelation.
* @param field The field for the ForeignKeyRelation.
* @return A ForeignKeyRelation instance.
*/
public ForeignKeyRelation foreignField(String dataSource, String step, String column) {
return new ForeignKeyRelation(dataSource, step, column);
public ForeignKeyRelation foreignField(String dataSource, String step, String field) {
return new ForeignKeyRelation(dataSource, step, field);
}

/**
* Creates a ForeignKeyRelation instance with the provided data source, step, and columns.
* Creates a ForeignKeyRelation instance with the provided data source, step, and fields.
*
* @param dataSource The name of the data source.
* @param step The step associated with the ForeignKeyRelation.
* @param columns The list of columns for the ForeignKeyRelation.
* @param fields The list of fields for the ForeignKeyRelation.
* @return A ForeignKeyRelation instance.
*/
public ForeignKeyRelation foreignField(String dataSource, String step, List<String> columns) {
return new ForeignKeyRelation(dataSource, step, toScalaList(columns));
public ForeignKeyRelation foreignField(String dataSource, String step, List<String> fields) {
return new ForeignKeyRelation(dataSource, step, toScalaList(fields));
}

/**
* Creates a ForeignKeyRelation instance with the provided ConnectionTaskBuilder, step, and columns.
* Creates a ForeignKeyRelation instance with the provided ConnectionTaskBuilder, step, and fields.
*
* @param connectionTaskBuilder The ConnectionTaskBuilder instance representing the task.
* @param step The step associated with the ForeignKeyRelation.
* @param columns The list of columns for the ForeignKeyRelation.
* @param fields The list of fields for the ForeignKeyRelation.
* @return A ForeignKeyRelation instance.
*/
public ForeignKeyRelation foreignField(ConnectionTaskBuilder<?> connectionTaskBuilder, String step, List<String> columns) {
return new ForeignKeyRelation(connectionTaskBuilder.connectionConfigWithTaskBuilder().dataSourceName(), step, toScalaList(columns));
public ForeignKeyRelation foreignField(ConnectionTaskBuilder<?> connectionTaskBuilder, String step, List<String> fields) {
return new ForeignKeyRelation(connectionTaskBuilder.connectionConfigWithTaskBuilder().dataSourceName(), step, toScalaList(fields));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.github.datacatering.datacaterer.api

import io.github.datacatering.datacaterer.api.converter.Converters.toScalaMap
import io.github.datacatering.datacaterer.api.model.Constants.{DATA_CONTRACT_FILE, DATA_CONTRACT_SCHEMA, GREAT_EXPECTATIONS_FILE, METADATA_SOURCE_URL, OPEN_LINEAGE_DATASET, OPEN_LINEAGE_NAMESPACE, OPEN_METADATA_API_VERSION, OPEN_METADATA_AUTH_TYPE, OPEN_METADATA_AUTH_TYPE_OPEN_METADATA, OPEN_METADATA_DEFAULT_API_VERSION, OPEN_METADATA_HOST, OPEN_METADATA_JWT_TOKEN, SCHEMA_LOCATION}
import io.github.datacatering.datacaterer.api.model.Constants.{CONFLUENT_SCHEMA_REGISTRY_ID, CONFLUENT_SCHEMA_REGISTRY_SUBJECT, CONFLUENT_SCHEMA_REGISTRY_VERSION, DATA_CONTRACT_FILE, DATA_CONTRACT_SCHEMA, GREAT_EXPECTATIONS_FILE, METADATA_SOURCE_URL, OPEN_LINEAGE_DATASET, OPEN_LINEAGE_NAMESPACE, OPEN_METADATA_API_VERSION, OPEN_METADATA_AUTH_TYPE, OPEN_METADATA_AUTH_TYPE_OPEN_METADATA, OPEN_METADATA_DEFAULT_API_VERSION, OPEN_METADATA_HOST, OPEN_METADATA_JWT_TOKEN, SCHEMA_LOCATION}
import com.softwaremill.quicklens.ModifyPimp
import io.github.datacatering.datacaterer.api.model.{DataContractCliSource, GreatExpectationsSource, MarquezMetadataSource, MetadataSource, OpenAPISource, OpenDataContractStandardSource, OpenMetadataSource}
import io.github.datacatering.datacaterer.api.model.{ConfluentSchemaRegistrySource, DataContractCliSource, GreatExpectationsSource, MarquezMetadataSource, MetadataSource, OpenAPISource, OpenDataContractStandardSource, OpenMetadataSource}

case class MetadataSourceBuilder(metadataSource: MetadataSource = MarquezMetadataSource()) {
def this() = this(MarquezMetadataSource())
Expand Down Expand Up @@ -105,4 +105,26 @@ case class MetadataSourceBuilder(metadataSource: MetadataSource = MarquezMetadat
DATA_CONTRACT_SCHEMA -> modelNames.mkString(",")
)))
}

def confluentSchemaRegistry(url: String, schemaId: Int): MetadataSourceBuilder = {
this.modify(_.metadataSource).setTo(ConfluentSchemaRegistrySource(Map(
METADATA_SOURCE_URL -> url,
CONFLUENT_SCHEMA_REGISTRY_ID -> schemaId.toString
)))
}

def confluentSchemaRegistry(url: String, schemaSubject: String): MetadataSourceBuilder = {
this.modify(_.metadataSource).setTo(ConfluentSchemaRegistrySource(Map(
METADATA_SOURCE_URL -> url,
CONFLUENT_SCHEMA_REGISTRY_SUBJECT -> schemaSubject
)))
}

def confluentSchemaRegistry(url: String, schemaSubject: String, version: Int): MetadataSourceBuilder = {
this.modify(_.metadataSource).setTo(ConfluentSchemaRegistrySource(Map(
METADATA_SOURCE_URL -> url,
CONFLUENT_SCHEMA_REGISTRY_SUBJECT -> schemaSubject,
CONFLUENT_SCHEMA_REGISTRY_VERSION -> version.toString,
)))
}
}
Loading

0 comments on commit 09b044c

Please sign in to comment.