Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Connector-V2][JDBC] Support database: greenplum #2429

Merged
merged 9 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.greenplum;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialect;

import com.google.auto.service.AutoService;

@AutoService(JdbcDialectFactory.class)
public class GreenplumDialectFactory implements JdbcDialectFactory {

@Override
public boolean acceptsURL(String url) {
// Support greenplum native driver: com.pivotal.jdbc.GreenplumDriver
return url.startsWith("jdbc:pivotal:greenplum:");
hailin0 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public JdbcDialect create() {
return new PostgresDialect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.e2e.flink.v2.jdbc;

import static org.testcontainers.shaded.org.awaitility.Awaitility.given;

import org.apache.seatunnel.e2e.flink.FlinkContainer;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

@Slf4j
public class FakeSourceToJdbcGreenplumIT extends FlinkContainer {

private static final String GREENPLUM_IMAGE = "datagrip/greenplum:6.8";
private static final String GREENPLUM_CONTAINER_HOST = "flink_e2e_greenplum_sink";
private static final int GREENPLUM_CONTAINER_PORT = 5432;
private static final String GREENPLUM_HOST = "localhost";
private static final int GREENPLUM_PORT = 5436;
private static final String GREENPLUM_USER = "tester";
private static final String GREENPLUM_PASSWORD = "pivotal";
private static final String GREENPLUM_DRIVER = "org.postgresql.Driver";
private static final String GREENPLUM_JDBC_URL = String.format(
"jdbc:postgresql://%s:%s/testdb", GREENPLUM_HOST, GREENPLUM_PORT);

private GenericContainer<?> greenplumServer;
private Connection jdbcConnection;

@BeforeEach
public void startGreenplumContainer() throws ClassNotFoundException, SQLException {
greenplumServer = new GenericContainer<>(GREENPLUM_IMAGE)
.withNetwork(NETWORK)
.withNetworkAliases(GREENPLUM_CONTAINER_HOST)
.withLogConsumer(new Slf4jLogConsumer(log));
greenplumServer.setPortBindings(Lists.newArrayList(
String.format("%s:%s", GREENPLUM_PORT, GREENPLUM_CONTAINER_PORT)));
Startables.deepStart(Stream.of(greenplumServer)).join();
log.info("Greenplum container started");
// wait for Greenplum fully start
Class.forName(GREENPLUM_DRIVER);
given().ignoreExceptions()
.await()
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(() -> initializeJdbcConnection());
initializeJdbcTable();
}

@Test
public void testFakeSourceToJdbcGreenplumSink() throws SQLException, IOException, InterruptedException {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/fakesource_to_jdbc_greenplum.conf");
Assertions.assertEquals(0, execResult.getExitCode());

// query result
String sql = "select age, name from test";
List<Object> result = new ArrayList<>();
try (Statement statement = jdbcConnection.createStatement()) {
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
result.add(Arrays.asList(
resultSet.getInt("age"),
resultSet.getString("name")));
}
}
Assertions.assertEquals(false, result.isEmpty());
}

private void initializeJdbcConnection() throws SQLException {
jdbcConnection = DriverManager.getConnection(GREENPLUM_JDBC_URL,
GREENPLUM_USER, GREENPLUM_PASSWORD);
}

private void initializeJdbcTable() throws SQLException {
try (Statement statement = jdbcConnection.createStatement()) {
String sql = "CREATE TABLE test (\n" +
"age INT NOT NULL,\n" +
"name VARCHAR(255) NOT NULL\n" +
")";
statement.execute(sql);
}
}

@AfterEach
public void closeGreenplumContainer() throws SQLException {
if (jdbcConnection != null) {
jdbcConnection.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testFakeSourceToJdbcSink() throws SQLException, IOException, Interru
}

@AfterEach
public void closeClickHouseContainer() {
public void closePostgreSqlContainer() {
if (psl != null) {
psl.stop();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.e2e.flink.v2.jdbc;

import static org.testcontainers.shaded.org.awaitility.Awaitility.given;

import org.apache.seatunnel.e2e.flink.FlinkContainer;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

@Slf4j
public class JdbcGreenplumToConsoleIT extends FlinkContainer {

private static final String GREENPLUM_IMAGE = "datagrip/greenplum:6.8";
private static final String GREENPLUM_CONTAINER_HOST = "flink_e2e_greenplum_source";
private static final int GREENPLUM_CONTAINER_PORT = 5432;
private static final String GREENPLUM_HOST = "localhost";
private static final int GREENPLUM_PORT = 5435;
private static final String GREENPLUM_USER = "tester";
private static final String GREENPLUM_PASSWORD = "pivotal";
private static final String GREENPLUM_DRIVER = "org.postgresql.Driver";
private static final String GREENPLUM_JDBC_URL = String.format(
"jdbc:postgresql://%s:%s/testdb", GREENPLUM_HOST, GREENPLUM_PORT);

private GenericContainer<?> greenplumServer;
private Connection jdbcConnection;

@BeforeEach
public void startGreenplumContainer() throws ClassNotFoundException, SQLException {
greenplumServer = new GenericContainer<>(GREENPLUM_IMAGE)
.withNetwork(NETWORK)
.withNetworkAliases(GREENPLUM_CONTAINER_HOST)
.withLogConsumer(new Slf4jLogConsumer(log));
greenplumServer.setPortBindings(Lists.newArrayList(
String.format("%s:%s", GREENPLUM_PORT, GREENPLUM_CONTAINER_PORT)));
Startables.deepStart(Stream.of(greenplumServer)).join();
log.info("Greenplum container started");
// wait for Greenplum fully start
Class.forName(GREENPLUM_DRIVER);
given().ignoreExceptions()
.await()
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(() -> initializeJdbcConnection());
initializeJdbcTable();
batchInsertData();
}

@Test
public void testJdbcGreenplumToConsoleSink() throws IOException, InterruptedException {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_greenplum_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}

private void initializeJdbcConnection() throws SQLException {
jdbcConnection = DriverManager.getConnection(GREENPLUM_JDBC_URL,
GREENPLUM_USER, GREENPLUM_PASSWORD);
}

private void initializeJdbcTable() throws SQLException {
try (Statement statement = jdbcConnection.createStatement()) {
String sql = "CREATE TABLE test (\n" +
"age INT NOT NULL,\n" +
"name VARCHAR(255) NOT NULL\n" +
")";
statement.execute(sql);
}
}

private void batchInsertData() throws SQLException {
int batchSize = 100;
String sql = "insert into test(age, name) values(?, ?)";

try {
jdbcConnection.setAutoCommit(false);
try (PreparedStatement preparedStatement = jdbcConnection.prepareStatement(sql)) {
for (int i = 1; i <= batchSize; i++) {
preparedStatement.setInt(1, i);
preparedStatement.setString(2, String.format("test_%s", i));
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
}
jdbcConnection.commit();
} catch (SQLException e) {
jdbcConnection.rollback();
throw e;
}
}

@AfterEach
public void closeGreenplumContainer() throws SQLException {
if (jdbcConnection != null) {
jdbcConnection.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
field_name = "age, name"
}

# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
}

transform {
sql {
sql = "select age, name from fake"
}

# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}

sink {
Jdbc {
driver = org.postgresql.Driver
url = "jdbc:postgresql://flink_e2e_greenplum_sink:5432/testdb?loggerLevel=OFF"
user = tester
password = pivotal
query = "insert into test(age, name) values(?, ?)"
}

# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
}
Loading