From 3561d3878f229555802abeaf4e7ed9ba8a49ba43 Mon Sep 17 00:00:00 2001 From: hailin0 Date: Wed, 24 Aug 2022 09:57:28 +0800 Subject: [PATCH] [Connector-V2][JDBC] Support database: greenplum (#2429) * [Connector-V2][JDBC] Support database: Greenplum Support connect greenplum drivers: * postgresql driver: org.postgresql.Driver * greenplum driver: com.pivotal.jdbc.GreenplumDriver Co-authored-by: wanghailin --- docs/en/connector-v2/sink/Greenplum.md | 27 +++ docs/en/connector-v2/source/Greenplum.md | 17 ++ .../greenplum/GreenplumDialectFactory.java | 40 +++++ .../e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java | 2 +- .../e2e/flink/v2/jdbc/JdbcGreenplumIT.java | 159 ++++++++++++++++++ .../jdbc/jdbc_greenplum_source_and_sink.conf | 60 +++++++ .../e2e/spark/v2/jdbc/JdbcGreenplumIT.java | 159 ++++++++++++++++++ .../jdbc/jdbc_greenplum_source_and_sink.conf | 62 +++++++ 8 files changed, 525 insertions(+), 1 deletion(-) create mode 100644 docs/en/connector-v2/sink/Greenplum.md create mode 100644 docs/en/connector-v2/source/Greenplum.md create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/greenplum/GreenplumDialectFactory.java create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbc_greenplum_source_and_sink.conf create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/jdbc/jdbc_greenplum_source_and_sink.conf diff --git a/docs/en/connector-v2/sink/Greenplum.md b/docs/en/connector-v2/sink/Greenplum.md new file mode 100644 index 00000000000..9317e5c625f --- /dev/null +++ b/docs/en/connector-v2/sink/Greenplum.md @@ -0,0 +1,27 @@ +# Greenplum + +> Greenplum sink connector + +## Description + +Write data to Greenplum using [Jdbc connector](Jdbc.md). + +:::tip + +Not support exactly-once semantics (XA transaction is not yet supported in Greenplum database). + +::: + +## Options + +### driver [string] + +Optional jdbc drivers: +- `org.postgresql.Driver` +- `com.pivotal.jdbc.GreenplumDriver` + +Warn: for license compliance, if you use `GreenplumDriver` the have to provide Greenplum JDBC driver yourself, e.g. copy greenplum-xxx.jar to $SEATNUNNEL_HOME/lib for Standalone. + +### url [string] + +The URL of the JDBC connection. if you use postgresql driver the value is `jdbc:postgresql://${yous_host}:${yous_port}/${yous_database}`, or you use greenplum driver the value is `jdbc:pivotal:greenplum://${yous_host}:${yous_port};DatabaseName=${yous_database}` \ No newline at end of file diff --git a/docs/en/connector-v2/source/Greenplum.md b/docs/en/connector-v2/source/Greenplum.md new file mode 100644 index 00000000000..cd140549b78 --- /dev/null +++ b/docs/en/connector-v2/source/Greenplum.md @@ -0,0 +1,17 @@ +# Greenplum + +> Greenplum source connector + +## Description + +Read Greenplum data through [Jdbc connector](Jdbc.md). + +:::tip + +Optional jdbc drivers: +- `org.postgresql.Driver` +- `com.pivotal.jdbc.GreenplumDriver` + +Warn: for license compliance, if you use `GreenplumDriver` the have to provide Greenplum JDBC driver yourself, e.g. copy greenplum-xxx.jar to $SEATNUNNEL_HOME/lib for Standalone. + +::: \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/greenplum/GreenplumDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/greenplum/GreenplumDialectFactory.java new file mode 100644 index 00000000000..fb4ca3865f8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/greenplum/GreenplumDialectFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.greenplum; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialect; + +import com.google.auto.service.AutoService; +import lombok.NonNull; + +@AutoService(JdbcDialectFactory.class) +public class GreenplumDialectFactory implements JdbcDialectFactory { + + @Override + public boolean acceptsURL(@NonNull String url) { + // Support greenplum native driver: com.pivotal.jdbc.GreenplumDriver + return url.startsWith("jdbc:pivotal:greenplum:"); + } + + @Override + public JdbcDialect create() { + return new PostgresDialect(); + } +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java index 784bd9a060c..9bc241bf477 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java @@ -89,7 +89,7 @@ public void testFakeSourceToJdbcSink() throws SQLException, IOException, Interru } @AfterEach - public void closeClickHouseContainer() { + public void closePostgreSqlContainer() { if (psl != null) { psl.stop(); } diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java new file mode 100644 index 00000000000..715441032e1 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.flink.v2.jdbc; + +import static org.testcontainers.shaded.org.awaitility.Awaitility.given; + +import org.apache.seatunnel.e2e.flink.FlinkContainer; + +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +@Slf4j +public class JdbcGreenplumIT extends FlinkContainer { + + private static final String GREENPLUM_IMAGE = "datagrip/greenplum:6.8"; + private static final String GREENPLUM_CONTAINER_HOST = "flink_e2e_greenplum"; + private static final int GREENPLUM_CONTAINER_PORT = 5432; + private static final String GREENPLUM_HOST = "localhost"; + private static final int GREENPLUM_PORT = 5435; + private static final String GREENPLUM_USER = "tester"; + private static final String GREENPLUM_PASSWORD = "pivotal"; + private static final String GREENPLUM_DRIVER = "org.postgresql.Driver"; + private static final String GREENPLUM_JDBC_URL = String.format( + "jdbc:postgresql://%s:%s/testdb", GREENPLUM_HOST, GREENPLUM_PORT); + private static final List TEST_DATASET = generateTestDataset(); + + private GenericContainer greenplumServer; + private Connection jdbcConnection; + + @BeforeEach + public void startGreenplumContainer() throws ClassNotFoundException, SQLException { + greenplumServer = new GenericContainer<>(GREENPLUM_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(GREENPLUM_CONTAINER_HOST) + .withLogConsumer(new Slf4jLogConsumer(log)); + greenplumServer.setPortBindings(Lists.newArrayList( + String.format("%s:%s", GREENPLUM_PORT, GREENPLUM_CONTAINER_PORT))); + Startables.deepStart(Stream.of(greenplumServer)).join(); + log.info("Greenplum container started"); + // wait for Greenplum fully start + Class.forName(GREENPLUM_DRIVER); + given().ignoreExceptions() + .await() + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(() -> initializeJdbcConnection()); + initializeJdbcTable(); + batchInsertData(); + } + + @Test + public void testJdbcGreenplumSourceAndSink() throws IOException, InterruptedException, SQLException { + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_greenplum_source_and_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + // query result + String sql = "select age, name from sink order by age asc"; + List result = new ArrayList<>(); + try (Statement statement = jdbcConnection.createStatement()) { + ResultSet resultSet = statement.executeQuery(sql); + while (resultSet.next()) { + result.add(Arrays.asList( + resultSet.getInt(1), + resultSet.getString(2))); + } + } + Assertions.assertIterableEquals(TEST_DATASET, result); + } + + private void initializeJdbcConnection() throws SQLException { + jdbcConnection = DriverManager.getConnection(GREENPLUM_JDBC_URL, + GREENPLUM_USER, GREENPLUM_PASSWORD); + } + + private void initializeJdbcTable() throws SQLException { + try (Statement statement = jdbcConnection.createStatement()) { + String createSource = "CREATE TABLE source (\n" + + "age INT NOT NULL,\n" + + "name VARCHAR(255) NOT NULL\n" + + ")"; + String createSink = "CREATE TABLE sink (\n" + + "age INT NOT NULL,\n" + + "name VARCHAR(255) NOT NULL\n" + + ")"; + statement.execute(createSource); + statement.execute(createSink); + } + } + + private static List generateTestDataset() { + List rows = new ArrayList<>(); + for (int i = 1; i <= 100; i++) { + rows.add(Arrays.asList(i, String.format("test_%s", i))); + } + return rows; + } + + private void batchInsertData() throws SQLException { + String sql = "insert into source(age, name) values(?, ?)"; + + try { + jdbcConnection.setAutoCommit(false); + try (PreparedStatement preparedStatement = jdbcConnection.prepareStatement(sql)) { + for (List row : TEST_DATASET) { + preparedStatement.setInt(1, (Integer) row.get(0)); + preparedStatement.setString(2, (String) row.get(1)); + preparedStatement.addBatch(); + } + preparedStatement.executeBatch(); + } + jdbcConnection.commit(); + } catch (SQLException e) { + jdbcConnection.rollback(); + throw e; + } + } + + @AfterEach + public void closeGreenplumContainer() throws SQLException { + if (jdbcConnection != null) { + jdbcConnection.close(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbc_greenplum_source_and_sink.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbc_greenplum_source_and_sink.conf new file mode 100644 index 00000000000..e6ee34bf325 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbc_greenplum_source_and_sink.conf @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Jdbc { + driver = org.postgresql.Driver + url = "jdbc:postgresql://flink_e2e_greenplum:5432/testdb?loggerLevel=OFF" + user = tester + password = pivotal + query = "select age, name from source" + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc +} + +transform { + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Jdbc { + driver = org.postgresql.Driver + url = "jdbc:postgresql://flink_e2e_greenplum:5432/testdb?loggerLevel=OFF" + user = tester + password = pivotal + query = "insert into sink(age, name) values(?, ?)" + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java new file mode 100644 index 00000000000..7910a0dde5d --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.spark.v2.jdbc; + +import static org.testcontainers.shaded.org.awaitility.Awaitility.given; + +import org.apache.seatunnel.e2e.spark.SparkContainer; + +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +@Slf4j +public class JdbcGreenplumIT extends SparkContainer { + + private static final String GREENPLUM_IMAGE = "datagrip/greenplum:6.8"; + private static final String GREENPLUM_CONTAINER_HOST = "spark_e2e_greenplum"; + private static final int GREENPLUM_CONTAINER_PORT = 5432; + private static final String GREENPLUM_HOST = "localhost"; + private static final int GREENPLUM_PORT = 5436; + private static final String GREENPLUM_USER = "tester"; + private static final String GREENPLUM_PASSWORD = "pivotal"; + private static final String GREENPLUM_DRIVER = "org.postgresql.Driver"; + private static final String GREENPLUM_JDBC_URL = String.format( + "jdbc:postgresql://%s:%s/testdb", GREENPLUM_HOST, GREENPLUM_PORT); + private static final List TEST_DATASET = generateTestDataset(); + + private GenericContainer greenplumServer; + private Connection jdbcConnection; + + @BeforeEach + public void startGreenplumContainer() throws ClassNotFoundException, SQLException { + greenplumServer = new GenericContainer<>(GREENPLUM_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(GREENPLUM_CONTAINER_HOST) + .withLogConsumer(new Slf4jLogConsumer(log)); + greenplumServer.setPortBindings(Lists.newArrayList( + String.format("%s:%s", GREENPLUM_PORT, GREENPLUM_CONTAINER_PORT))); + Startables.deepStart(Stream.of(greenplumServer)).join(); + log.info("Greenplum container started"); + // wait for Greenplum fully start + Class.forName(GREENPLUM_DRIVER); + given().ignoreExceptions() + .await() + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(() -> initializeJdbcConnection()); + initializeJdbcTable(); + batchInsertData(); + } + + @Test + public void testJdbcGreenplumSourceAndSink() throws IOException, InterruptedException, SQLException { + Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_greenplum_source_and_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + // query result + String sql = "select age, name from sink order by age asc"; + List result = new ArrayList<>(); + try (Statement statement = jdbcConnection.createStatement()) { + ResultSet resultSet = statement.executeQuery(sql); + while (resultSet.next()) { + result.add(Arrays.asList( + resultSet.getInt(1), + resultSet.getString(2))); + } + } + Assertions.assertIterableEquals(TEST_DATASET, result); + } + + private void initializeJdbcConnection() throws SQLException { + jdbcConnection = DriverManager.getConnection(GREENPLUM_JDBC_URL, + GREENPLUM_USER, GREENPLUM_PASSWORD); + } + + private void initializeJdbcTable() throws SQLException { + try (Statement statement = jdbcConnection.createStatement()) { + String createSource = "CREATE TABLE source (\n" + + "age INT NOT NULL,\n" + + "name VARCHAR(255) NOT NULL\n" + + ")"; + String createSink = "CREATE TABLE sink (\n" + + "age INT NOT NULL,\n" + + "name VARCHAR(255) NOT NULL\n" + + ")"; + statement.execute(createSource); + statement.execute(createSink); + } + } + + private static List generateTestDataset() { + List rows = new ArrayList<>(); + for (int i = 1; i <= 100; i++) { + rows.add(Arrays.asList(i, String.format("test_%s", i))); + } + return rows; + } + + private void batchInsertData() throws SQLException { + String sql = "insert into source(age, name) values(?, ?)"; + + try { + jdbcConnection.setAutoCommit(false); + try (PreparedStatement preparedStatement = jdbcConnection.prepareStatement(sql)) { + for (List row : TEST_DATASET) { + preparedStatement.setInt(1, (Integer) row.get(0)); + preparedStatement.setString(2, (String) row.get(1)); + preparedStatement.addBatch(); + } + preparedStatement.executeBatch(); + } + jdbcConnection.commit(); + } catch (SQLException e) { + jdbcConnection.rollback(); + throw e; + } + } + + @AfterEach + public void closeGreenplumContainer() throws SQLException { + if (jdbcConnection != null) { + jdbcConnection.close(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/jdbc/jdbc_greenplum_source_and_sink.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/jdbc/jdbc_greenplum_source_and_sink.conf new file mode 100644 index 00000000000..6c06feb2381 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/jdbc/jdbc_greenplum_source_and_sink.conf @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Jdbc { + driver = org.postgresql.Driver + url = "jdbc:postgresql://spark_e2e_greenplum:5432/testdb?loggerLevel=OFF" + user = tester + password = pivotal + query = "select age, name from source" + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc +} + +transform { + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Jdbc { + driver = org.postgresql.Driver + url = "jdbc:postgresql://spark_e2e_greenplum:5432/testdb?loggerLevel=OFF" + user = tester + password = pivotal + query = "insert into sink(age, name) values(?, ?)" + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc +} \ No newline at end of file