Skip to content

Commit

Permalink
[cdc-connector][postgres] Fix potential PostgresSQL database connecti…
Browse files Browse the repository at this point in the history
…on leak (#3013)

This closes #2980.
  • Loading branch information
loserwang1024 authored Jan 24, 2024
1 parent 20e44e6 commit 3be3417
Showing 1 changed file with 17 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.slf4j.LoggerFactory;

import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicReference;

import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
Expand All @@ -84,8 +83,7 @@ public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
private PostgresTaskContext taskContext;
private ChangeEventQueue<DataChangeEvent> queue;
private PostgresConnection jdbcConnection;
private final AtomicReference<ReplicationConnection> replicationConnection =
new AtomicReference<>();
private ReplicationConnection replicationConnection;
private PostgresOffsetContext offsetContext;
private PostgresPartition partition;
private PostgresSchema schema;
Expand Down Expand Up @@ -190,13 +188,14 @@ public void configure(SourceSplitBase sourceSplitBase) {
this.partition = new PostgresPartition(dbzConfig.getLogicalName());
this.taskContext = PostgresObjectUtils.newTaskContext(dbzConfig, schema, topicSelector);

this.replicationConnection.compareAndSet(
null,
createReplicationConnection(
this.taskContext,
jdbcConnection,
this.snapShotter.shouldSnapshot(),
dbzConfig));
if (replicationConnection == null) {
replicationConnection =
createReplicationConnection(
this.taskContext,
jdbcConnection,
this.snapShotter.shouldSnapshot(),
dbzConfig);
}

this.queue =
new ChangeEventQueue.Builder<DataChangeEvent>()
Expand Down Expand Up @@ -302,8 +301,13 @@ public Offset getStreamOffset(SourceRecord sourceRecord) {
}

@Override
public void close() {
jdbcConnection.close();
public void close() throws Exception {
if (jdbcConnection != null) {
jdbcConnection.close();
}
if (replicationConnection != null) {
replicationConnection.close();
}
}

public PostgresConnection getConnection() {
Expand All @@ -315,7 +319,7 @@ public PostgresTaskContext getTaskContext() {
}

public ReplicationConnection getReplicationConnection() {
return replicationConnection.get();
return replicationConnection;
}

public SnapshotChangeEventSourceMetrics<PostgresPartition>
Expand Down

0 comments on commit 3be3417

Please sign in to comment.