Skip to content

Commit

Permalink
fix: load pk from the downstream instead of Risingwave (risingwavelab…
Browse files Browse the repository at this point in the history
…s#8457)

Signed-off-by: tabVersion <tabvision@bupt.icu>
  • Loading branch information
tabVersion authored Mar 16, 2023
1 parent 25a4809 commit bd9d156
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/intergration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
- schema-registry
- mysql-cdc
- postgres-cdc
#- mysql-sink
- mysql-sink
- postgres-sink
- iceberg-sink
format: ["json", "protobuf"]
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/mysql-sink/mysql_prepare.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE target_count (
target_id VARCHAR(128),
target_id VARCHAR(128) primary key,
target_count BIGINT
);
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public static TableSchema fromProto(ConnectorServiceProto.TableSchema tableSchem
.collect(Collectors.toList()));
}

/** @deprecated pk here is from Risingwave, it may not match the pk in the database */
@Deprecated
public List<String> getPrimaryKeys() {
return primaryKeys;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import com.risingwave.proto.Data;
import io.grpc.Status;
import java.sql.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
Expand All @@ -30,10 +32,13 @@ public class JDBCSink extends SinkBase {
public static final String INSERT_TEMPLATE = "INSERT INTO %s (%s) VALUES (%s)";
private static final String DELETE_TEMPLATE = "DELETE FROM %s WHERE %s";
private static final String UPDATE_TEMPLATE = "UPDATE %s SET %s WHERE %s";
private static final String ERROR_REPORT_TEMPLATE = "Error when exec %s, message %s";

private final String tableName;
private final Connection conn;
private final String jdbcUrl;
private final List<String> pkColumnNames;
public static final String JDBC_COLUMN_NAME_KEY = "COLUMN_NAME";

private String updateDeleteConditionBuffer;
private Object[] updateDeleteValueBuffer;
Expand All @@ -48,16 +53,38 @@ public JDBCSink(String tableName, String jdbcUrl, TableSchema tableSchema) {
try {
this.conn = DriverManager.getConnection(jdbcUrl);
this.conn.setAutoCommit(false);
this.pkColumnNames = getPkColumnNames(conn, tableName);
} catch (SQLException e) {
throw Status.INTERNAL.withCause(e).asRuntimeException();
throw Status.INTERNAL
.withDescription(
String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
.asRuntimeException();
}
}

private static List<String> getPkColumnNames(Connection conn, String tableName) {
List<String> pkColumnNames = new ArrayList<>();
try {
var pks = conn.getMetaData().getPrimaryKeys(null, null, tableName);
while (pks.next()) {
pkColumnNames.add(pks.getString(JDBC_COLUMN_NAME_KEY));
}
} catch (SQLException e) {
throw Status.INTERNAL
.withDescription(
String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
.asRuntimeException();
}
LOG.info("detected pk {}", pkColumnNames);
return pkColumnNames;
}

public JDBCSink(Connection conn, TableSchema tableSchema, String tableName) {
super(tableSchema);
this.tableName = tableName;
this.jdbcUrl = null;
this.conn = conn;
this.pkColumnNames = getPkColumnNames(conn, tableName);
}

private PreparedStatement prepareStatement(SinkRow row) {
Expand All @@ -79,35 +106,75 @@ private PreparedStatement prepareStatement(SinkRow row) {
}
return stmt;
} catch (SQLException e) {
throw io.grpc.Status.INTERNAL.withCause(e).asRuntimeException();
throw io.grpc.Status.INTERNAL
.withDescription(
String.format(
ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
.asRuntimeException();
}
case DELETE:
String deleteCondition =
getTableSchema().getPrimaryKeys().stream()
.map(key -> key + " = ?")
.collect(Collectors.joining(" AND "));
String deleteCondition;
if (this.pkColumnNames.isEmpty()) {
deleteCondition =
IntStream.range(0, getTableSchema().getNumColumns())
.mapToObj(
index ->
getTableSchema().getColumnNames()[index]
+ " = ?")
.collect(Collectors.joining(" AND "));
} else {
deleteCondition =
this.pkColumnNames.stream()
.map(key -> key + " = ?")
.collect(Collectors.joining(" AND "));
}
String deleteStmt = String.format(DELETE_TEMPLATE, tableName, deleteCondition);
try {
int placeholderIdx = 1;
PreparedStatement stmt =
conn.prepareStatement(deleteStmt, Statement.RETURN_GENERATED_KEYS);
for (String primaryKey : getTableSchema().getPrimaryKeys()) {
for (String primaryKey : this.pkColumnNames) {
Object fromRow = getTableSchema().getFromRow(primaryKey, row);
stmt.setObject(placeholderIdx++, fromRow);
}
return stmt;
} catch (SQLException e) {
throw Status.INTERNAL.withCause(e).asRuntimeException();
throw Status.INTERNAL
.withDescription(
String.format(
ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
.asRuntimeException();
}
case UPDATE_DELETE:
updateDeleteConditionBuffer =
getTableSchema().getPrimaryKeys().stream()
.map(key -> key + " = ?")
.collect(Collectors.joining(" AND "));
updateDeleteValueBuffer =
getTableSchema().getPrimaryKeys().stream()
.map(key -> getTableSchema().getFromRow(key, row))
.toArray();
if (this.pkColumnNames.isEmpty()) {
updateDeleteConditionBuffer =
IntStream.range(0, getTableSchema().getNumColumns())
.mapToObj(
index ->
getTableSchema().getColumnNames()[index]
+ " = ?")
.collect(Collectors.joining(" AND "));
updateDeleteValueBuffer =
IntStream.range(0, getTableSchema().getNumColumns())
.mapToObj(
index ->
getTableSchema()
.getFromRow(
getTableSchema()
.getColumnNames()[
index],
row))
.toArray();
} else {
updateDeleteConditionBuffer =
this.pkColumnNames.stream()
.map(key -> key + " = ?")
.collect(Collectors.joining(" AND "));
updateDeleteValueBuffer =
this.pkColumnNames.stream()
.map(key -> getTableSchema().getFromRow(key, row))
.toArray();
}
LOG.debug(
"update delete condition: {} on values {}",
updateDeleteConditionBuffer,
Expand Down Expand Up @@ -144,7 +211,11 @@ private PreparedStatement prepareStatement(SinkRow row) {
updateDeleteValueBuffer = null;
return stmt;
} catch (SQLException e) {
throw Status.INTERNAL.withCause(e).asRuntimeException();
throw Status.INTERNAL
.withDescription(
String.format(
ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
.asRuntimeException();
}
default:
throw Status.INVALID_ARGUMENT
Expand All @@ -163,10 +234,14 @@ public void write(Iterator<SinkRow> rows) {
}
if (stmt != null) {
try {
LOG.debug("Executing statement: " + stmt);
LOG.debug("Executing statement: {}", stmt);
stmt.executeUpdate();
} catch (SQLException e) {
throw Status.INTERNAL.withCause(e).asRuntimeException();
throw Status.INTERNAL
.withDescription(
String.format(
ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
.asRuntimeException();
}
} else {
throw Status.INTERNAL
Expand All @@ -187,7 +262,10 @@ public void sync() {
try {
conn.commit();
} catch (SQLException e) {
throw io.grpc.Status.INTERNAL.withCause(e).asRuntimeException();
throw io.grpc.Status.INTERNAL
.withDescription(
String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
.asRuntimeException();
}
}

Expand All @@ -196,7 +274,10 @@ public void drop() {
try {
conn.close();
} catch (SQLException e) {
throw io.grpc.Status.INTERNAL.withCause(e).asRuntimeException();
throw io.grpc.Status.INTERNAL
.withDescription(
String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
.asRuntimeException();
}
}

Expand Down

0 comments on commit bd9d156

Please sign in to comment.