Skip to content

Commit

Permalink
[Connector-V2][JDBC] Support database: greenplum (#2429)
Browse files Browse the repository at this point in the history
* [Connector-V2][JDBC] Support database: Greenplum

Support connect greenplum drivers:
* postgresql driver: org.postgresql.Driver
* greenplum driver: com.pivotal.jdbc.GreenplumDriver

Co-authored-by: wanghailin <hailin@fiture.com>
  • Loading branch information
hailin0 and wanghailin authored Aug 24, 2022
1 parent 712b777 commit 3561d38
Show file tree
Hide file tree
Showing 8 changed files with 525 additions and 1 deletion.
27 changes: 27 additions & 0 deletions docs/en/connector-v2/sink/Greenplum.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Greenplum

> Greenplum sink connector
## Description

Write data to Greenplum using [Jdbc connector](Jdbc.md).

:::tip

Not support exactly-once semantics (XA transaction is not yet supported in Greenplum database).

:::

## Options

### driver [string]

Optional jdbc drivers:
- `org.postgresql.Driver`
- `com.pivotal.jdbc.GreenplumDriver`

Warn: for license compliance, if you use `GreenplumDriver` the have to provide Greenplum JDBC driver yourself, e.g. copy greenplum-xxx.jar to $SEATNUNNEL_HOME/lib for Standalone.

### url [string]

The URL of the JDBC connection. if you use postgresql driver the value is `jdbc:postgresql://${yous_host}:${yous_port}/${yous_database}`, or you use greenplum driver the value is `jdbc:pivotal:greenplum://${yous_host}:${yous_port};DatabaseName=${yous_database}`
17 changes: 17 additions & 0 deletions docs/en/connector-v2/source/Greenplum.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Greenplum

> Greenplum source connector
## Description

Read Greenplum data through [Jdbc connector](Jdbc.md).

:::tip

Optional jdbc drivers:
- `org.postgresql.Driver`
- `com.pivotal.jdbc.GreenplumDriver`

Warn: for license compliance, if you use `GreenplumDriver` the have to provide Greenplum JDBC driver yourself, e.g. copy greenplum-xxx.jar to $SEATNUNNEL_HOME/lib for Standalone.

:::
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.greenplum;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialect;

import com.google.auto.service.AutoService;
import lombok.NonNull;

@AutoService(JdbcDialectFactory.class)
public class GreenplumDialectFactory implements JdbcDialectFactory {

@Override
public boolean acceptsURL(@NonNull String url) {
// Support greenplum native driver: com.pivotal.jdbc.GreenplumDriver
return url.startsWith("jdbc:pivotal:greenplum:");
}

@Override
public JdbcDialect create() {
return new PostgresDialect();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testFakeSourceToJdbcSink() throws SQLException, IOException, Interru
}

@AfterEach
public void closeClickHouseContainer() {
public void closePostgreSqlContainer() {
if (psl != null) {
psl.stop();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.e2e.flink.v2.jdbc;

import static org.testcontainers.shaded.org.awaitility.Awaitility.given;

import org.apache.seatunnel.e2e.flink.FlinkContainer;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

@Slf4j
public class JdbcGreenplumIT extends FlinkContainer {

private static final String GREENPLUM_IMAGE = "datagrip/greenplum:6.8";
private static final String GREENPLUM_CONTAINER_HOST = "flink_e2e_greenplum";
private static final int GREENPLUM_CONTAINER_PORT = 5432;
private static final String GREENPLUM_HOST = "localhost";
private static final int GREENPLUM_PORT = 5435;
private static final String GREENPLUM_USER = "tester";
private static final String GREENPLUM_PASSWORD = "pivotal";
private static final String GREENPLUM_DRIVER = "org.postgresql.Driver";
private static final String GREENPLUM_JDBC_URL = String.format(
"jdbc:postgresql://%s:%s/testdb", GREENPLUM_HOST, GREENPLUM_PORT);
private static final List<List> TEST_DATASET = generateTestDataset();

private GenericContainer<?> greenplumServer;
private Connection jdbcConnection;

@BeforeEach
public void startGreenplumContainer() throws ClassNotFoundException, SQLException {
greenplumServer = new GenericContainer<>(GREENPLUM_IMAGE)
.withNetwork(NETWORK)
.withNetworkAliases(GREENPLUM_CONTAINER_HOST)
.withLogConsumer(new Slf4jLogConsumer(log));
greenplumServer.setPortBindings(Lists.newArrayList(
String.format("%s:%s", GREENPLUM_PORT, GREENPLUM_CONTAINER_PORT)));
Startables.deepStart(Stream.of(greenplumServer)).join();
log.info("Greenplum container started");
// wait for Greenplum fully start
Class.forName(GREENPLUM_DRIVER);
given().ignoreExceptions()
.await()
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(() -> initializeJdbcConnection());
initializeJdbcTable();
batchInsertData();
}

@Test
public void testJdbcGreenplumSourceAndSink() throws IOException, InterruptedException, SQLException {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_greenplum_source_and_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());

// query result
String sql = "select age, name from sink order by age asc";
List<List> result = new ArrayList<>();
try (Statement statement = jdbcConnection.createStatement()) {
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
result.add(Arrays.asList(
resultSet.getInt(1),
resultSet.getString(2)));
}
}
Assertions.assertIterableEquals(TEST_DATASET, result);
}

private void initializeJdbcConnection() throws SQLException {
jdbcConnection = DriverManager.getConnection(GREENPLUM_JDBC_URL,
GREENPLUM_USER, GREENPLUM_PASSWORD);
}

private void initializeJdbcTable() throws SQLException {
try (Statement statement = jdbcConnection.createStatement()) {
String createSource = "CREATE TABLE source (\n" +
"age INT NOT NULL,\n" +
"name VARCHAR(255) NOT NULL\n" +
")";
String createSink = "CREATE TABLE sink (\n" +
"age INT NOT NULL,\n" +
"name VARCHAR(255) NOT NULL\n" +
")";
statement.execute(createSource);
statement.execute(createSink);
}
}

private static List<List> generateTestDataset() {
List<List> rows = new ArrayList<>();
for (int i = 1; i <= 100; i++) {
rows.add(Arrays.asList(i, String.format("test_%s", i)));
}
return rows;
}

private void batchInsertData() throws SQLException {
String sql = "insert into source(age, name) values(?, ?)";

try {
jdbcConnection.setAutoCommit(false);
try (PreparedStatement preparedStatement = jdbcConnection.prepareStatement(sql)) {
for (List row : TEST_DATASET) {
preparedStatement.setInt(1, (Integer) row.get(0));
preparedStatement.setString(2, (String) row.get(1));
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
}
jdbcConnection.commit();
} catch (SQLException e) {
jdbcConnection.rollback();
throw e;
}
}

@AfterEach
public void closeGreenplumContainer() throws SQLException {
if (jdbcConnection != null) {
jdbcConnection.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Jdbc {
driver = org.postgresql.Driver
url = "jdbc:postgresql://flink_e2e_greenplum:5432/testdb?loggerLevel=OFF"
user = tester
password = pivotal
query = "select age, name from source"
}

# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
}

transform {

# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}

sink {
Jdbc {
driver = org.postgresql.Driver
url = "jdbc:postgresql://flink_e2e_greenplum:5432/testdb?loggerLevel=OFF"
user = tester
password = pivotal
query = "insert into sink(age, name) values(?, ?)"
}

# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
}
Loading

0 comments on commit 3561d38

Please sign in to comment.