Skip to content

Commit

Permalink
Include JDBC to DataSourceRegister
Browse files Browse the repository at this point in the history
  • Loading branch information
pflooky committed Oct 17, 2024
1 parent c6e75b0 commit 78b1d4c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
org.apache.spark.sql.kafka010.KafkaSourceProvider
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.junit.runner.RunWith
import org.scalatestplus.junit.JUnitRunner

import java.sql.{Date, Timestamp}
import java.time.LocalDate

@RunWith(classOf[JUnitRunner])
class PlanProcessorTest extends SparkSuite {
Expand Down Expand Up @@ -112,10 +113,22 @@ class PlanProcessorTest extends SparkSuite {
}

ignore("Can run Postgres plan run") {
PlanProcessor.determineAndExecutePlan(Some(new AdvancedMySqlPlanRun))
PlanProcessor.determineAndExecutePlan(Some(new TestPostgres))
}

class TestPostgres extends PlanRun {
val accountTask = postgres("customer_postgres", "jdbc:postgresql://localhost:5432/customer")
.table("account", "accounts")
.schema(
field.name("account_number").regex("[0-9]{10}").unique(true),
field.name("customer_id_int").`type`(IntegerType).min(1).max(1000),
field.name("created_by").expression("#{Name.name}"),
field.name("created_by_fixed_length").sql("CASE WHEN account_status IN ('open', 'closed') THEN 'eod' ELSE 'event' END"),
field.name("open_timestamp").`type`(TimestampType).min(Date.valueOf(LocalDate.now())),
field.name("account_status").oneOf("open", "closed", "suspended", "pending")
)
.count(count.records(100))

val jsonTask = json("my_json", "/tmp/data/json", Map("saveMode" -> "overwrite"))
.schema(
field.name("account_id").regex("ACC[0-9]{8}"),
Expand All @@ -142,7 +155,7 @@ class PlanProcessorTest extends SparkSuite {
.generatedReportsFolderPath("/tmp/report")
.enableSinkMetadata(true)

execute(conf, jsonTask, csvTask)
execute(conf, accountTask)
}

class TestCsvPostgres extends PlanRun {
Expand Down

0 comments on commit 78b1d4c

Please sign in to comment.