Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

when i pakcage cdc-3.3.0, my idea has some error, add maven-gpg-plugin #3889

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions docs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

baseURL = '//nightlies.apache.org/flink/flink-cdc-docs-master'
baseURL = '//nightlies.apache.org/flink/flink-cdc-docs-release-3.3'
languageCode = 'en-us'
title = 'Apache Flink CDC'
enableGitInfo = false
Expand All @@ -24,7 +24,7 @@ pygmentsUseClasses = true
[params]
# Flag whether this is a stable version or not.
# Used for the quickstart page.
IsStable = false
IsStable = true

# Flag to indicate whether an outdated warning should be shown.
ShowOutDatedWarning = false
Expand All @@ -34,14 +34,14 @@ pygmentsUseClasses = true
# where we change the version for the complete docs when forking of a release
# branch etc.
# The full version string as referenced in Maven (e.g. 1.2.1)
Version = "3.3-SNAPSHOT"
Version = "3.3.0"

# For stable releases, leave the bugfix version out (e.g. 1.2). For snapshot
# release this should be the same as the regular version
VersionTitle = "3.3-SNAPSHOT"
VersionTitle = "3.3"

# The branch for this version of Apache Flink CDC
Branch = "master"
Branch = "release-3.3"

# The GitHub repository for Apache Flink CDC
Repo = "//github.com/apache/flink-cdc"
Expand All @@ -54,7 +54,7 @@ pygmentsUseClasses = true
# of the menu
MenuLinks = [
["Project Homepage", "//flink.apache.org"],
["JavaDocs", "//nightlies.apache.org/flink/flink-cdc-docs-master/api/java/"],
["JavaDocs", "//nightlies.apache.org/flink/flink-cdc-docs-release-3.3/api/java/"],
]

PreviousDocs = [
Expand Down
4 changes: 2 additions & 2 deletions docs/content.zh/docs/connectors/pipeline-connectors/doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ pipeline:
<td>String</td>
<td>StreamLoad的参数。
For example: <code> sink.properties.strict_mode: true</code>.
查看更多关于 <a href="https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-statements/Data-Manipulation-Statements/Load/STREAM-LOAD/"> StreamLoad 的属性</a></td>
查看更多关于 <a href="https://doris.apache.org/zh-CN/docs/dev/data-operate/import/import-way/stream-load-manual"> StreamLoad 的属性</a></td>
</td>
</tr>
<tr>
Expand All @@ -179,7 +179,7 @@ pipeline:
<td>String</td>
<td>创建表的Properties配置。
For example: <code> table.create.properties.replication_num: 1</code>.
查看更多关于 <a href="https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-statements/Data-Definition-Statements/Create/CREATE-TABLE/"> Doris Table 的属性</a></td>
查看更多关于 <a href="https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-statements/table-and-view/table/CREATE-TABLE"> Doris Table 的属性</a></td>
</td>
</tr>
</tbody>
Expand Down
4 changes: 2 additions & 2 deletions docs/content/docs/connectors/pipeline-connectors/doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ pipeline:
<td>String</td>
<td> Parameters of StreamLoad.
For example: <code> sink.properties.strict_mode: true</code>.
See more about <a href="https://doris.apache.org/docs/dev/sql-manual/sql-statements/Data-Manipulation-Statements/Load/STREAM-LOAD/"> StreamLoad Properties</a></td>
See more about <a href="https://doris.apache.org/docs/dev/data-operate/import/import-way/stream-load-manual"> StreamLoad Properties</a></td>
</td>
</tr>
<tr>
Expand All @@ -179,7 +179,7 @@ pipeline:
<td>String</td>
<td>Create the Properties configuration of the table.
For example: <code> table.create.properties.replication_num: 1</code>.
See more about <a href="https://doris.apache.org/docs/dev/sql-manual/sql-statements/Data-Definition-Statements/Create/CREATE-TABLE/"> Doris Table Properties</a></td>
See more about <a href="https://doris.apache.org/docs/dev/sql-manual/sql-statements/table-and-view/table/CREATE-TABLE"> Doris Table Properties</a></td>
</td>
</tr>
</tbody>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
@Experimental
public interface JdbcDataSourceDialect extends DataSourceDialect<JdbcSourceConfig> {

/** Discovers the list of table to capture. */
/** Discovers the list of table to capture. 发现需要捕获的表的lie*/
@Override
List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig);

/** Discovers the captured tables' schema by {@link SourceConfig}. */
/** Discovers the captured tables' schema by {@link SourceConfig}. 发现表的schema信息*/
@Override
Map<TableId, TableChange> discoverDataCollectionSchemas(JdbcSourceConfig sourceConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@

package org.apache.flink.cdc.debezium;

import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.heartbeat.Heartbeat;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
Expand All @@ -27,50 +34,28 @@
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.debezium.internal.DebeziumChangeConsumer;
import org.apache.flink.cdc.debezium.internal.DebeziumChangeFetcher;
import org.apache.flink.cdc.debezium.internal.DebeziumOffset;
import org.apache.flink.cdc.debezium.internal.DebeziumOffsetSerializer;
import org.apache.flink.cdc.debezium.internal.FlinkDatabaseHistory;
import org.apache.flink.cdc.debezium.internal.FlinkDatabaseSchemaHistory;
import org.apache.flink.cdc.debezium.internal.FlinkOffsetBackingStore;
import org.apache.flink.cdc.debezium.internal.Handover;
import org.apache.flink.cdc.debezium.internal.SchemaRecord;
import org.apache.flink.cdc.debezium.internal.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;

import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;

import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.heartbeat.Heartbeat;
import org.apache.commons.collections.map.LinkedMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

import static org.apache.flink.cdc.debezium.internal.Handover.ClosedException.isGentlyClosedException;
import static org.apache.flink.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
*
* <p>Line 356: Replace < condition with <= to be able to catch ongoing transactions during snapshot
* if current SCN points to START/INSERT/DELETE/UPDATE event.
* 如果当前 SCN 指向 START/INSERT/DELETE/UPDATE 事件,则将 < 条件替换为 <=,以便能够在快照期间捕获正在进行的事务。
*/
public class LogMinerAdapter extends AbstractStreamingAdapter {

Expand Down Expand Up @@ -98,6 +99,18 @@ public OffsetContext.Loader<OracleOffsetContext> getOffsetContextLoader() {
return new LogMinerOracleOffsetContextLoader(connectorConfig);
}

/**
* 从 DB 日志(例如 MySQL 的 binlog 或类似日志)发出事件的变更事件源
* @param connection
* @param dispatcher
* @param errorHandler
* @param clock
* @param schema
* @param taskContext
* @param jdbcConfig
* @param streamingMetrics
* @return
*/
@Override
public StreamingChangeEventSource<OraclePartition, OracleOffsetContext> getSource(
OracleConnection connection,
Expand All @@ -119,6 +132,14 @@ public StreamingChangeEventSource<OraclePartition, OracleOffsetContext> getSourc
streamingMetrics);
}

/**
* 获取快照的偏移量
* @param ctx the relational snapshot context, should never be {@code null}
* @param connectorConfig the connector configuration, should never be {@code null}
* @param connection the database connection, should never be {@code null}
* @return
* @throws SQLException
*/
@Override
public OracleOffsetContext determineSnapshotOffset(
RelationalSnapshotContext<OraclePartition, OracleOffsetContext> ctx,
Expand Down Expand Up @@ -161,6 +182,13 @@ public OracleOffsetContext determineSnapshotOffset(
}
}

/**
* 获取当前的SCN(System Change Number) 是在某个时间点定义数据库已提交版本的时间戳标记
* @param latestTableDdlScn
* @param connection
* @return
* @throws SQLException
*/
private Optional<Scn> getCurrentScn(Scn latestTableDdlScn, OracleConnection connection)
throws SQLException {
final String query = "SELECT CURRENT_SCN FROM V$DATABASE";
Expand All @@ -175,6 +203,15 @@ private Optional<Scn> getCurrentScn(Scn latestTableDdlScn, OracleConnection conn
return Optional.ofNullable(currentScn);
}

/**
* 获取等待的事务
* @param latestTableDdlScn
* @param connection
* @param transactions
* @param transactionTableName
* @return
* @throws SQLException
*/
private Optional<Scn> getPendingTransactions(
Scn latestTableDdlScn,
OracleConnection connection,
Expand Down Expand Up @@ -285,6 +322,12 @@ private OracleOffsetContext determineSnapshotOffset(
.build();
}

/**
* 将logs添加到session中
* @param logs
* @param connection
* @throws SQLException
*/
private void addLogsToSession(List<LogFile> logs, OracleConnection connection)
throws SQLException {
for (LogFile logFile : logs) {
Expand All @@ -294,6 +337,11 @@ private void addLogsToSession(List<LogFile> logs, OracleConnection connection)
}
}

/**
* 启动一个事务会话
* @param connection
* @throws SQLException
*/
private void startSession(OracleConnection connection) throws SQLException {
// We explicitly use the ONLINE data dictionary mode here.
// Since we are only concerned about non-SQL columns, it is safe to always use this mode
Expand All @@ -319,6 +367,9 @@ private void stopSession(OracleConnection connection) throws SQLException {
}
}

/**
*
*/
private Scn getOldestScnAvailableInLogs(
OracleConnectorConfig config, OracleConnection connection) throws SQLException {
final Duration archiveLogRetention = config.getLogMiningArchiveLogRetention();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
*
* <p>Diff: Make createProcessor method as protected to produce a LogMinerEventProcessor with
* enhanced processRow method to distinguish whether is bounded.
*
*/
public class LogMinerStreamingChangeEventSource
implements StreamingChangeEventSource<OraclePartition, OracleOffsetContext> {
Expand Down Expand Up @@ -131,7 +132,7 @@ public LogMinerStreamingChangeEventSource(

/**
* This is the loop to get changes from LogMiner.
*
* 从logminer获取日志数据
* @param context change event source context
*/
@Override
Expand All @@ -144,7 +145,7 @@ public void execute(
return;
}
try {
// We explicitly expect auto-commit to be disabled
// We explicitly expect auto-commit to be disabled 取消数据库事务的自动提交
jdbcConnection.setAutoCommit(false);

startScn = offsetContext.getScn();
Expand Down Expand Up @@ -617,7 +618,7 @@ private void pauseBetweenMiningSessions() throws InterruptedException {

/**
* Sets the NLS parameters for the mining session.
*
* 设置LNLS参数
* @param connection database connection, should not be {@code null}
* @throws SQLException if a database exception occurred
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,18 @@

/**
* A builder to build a SourceFunction which can read snapshot and continue to consume log miner.
*
*/
public class OracleSource {


private static final String DATABASE_SERVER_NAME = "oracle_logminer";

public static <T> Builder<T> builder() {
return new Builder<>();
}

/** Builder class of {@link OracleSource}. */
/** Builder class of {@link OracleSource}. 构件类*/
public static class Builder<T> {

private Integer port = 1521; // default 1521 port
Expand Down Expand Up @@ -136,6 +138,7 @@ public Builder<T> startupOptions(StartupOptions startupOptions) {
return this;
}

// 构件源处理oracle数据的日志数据
public DebeziumSourceFunction<T> build() {
Properties props = new Properties();
props.setProperty("connector.class", OracleConnector.class.getCanonicalName());
Expand Down Expand Up @@ -195,6 +198,7 @@ public DebeziumSourceFunction<T> build() {
}

return new DebeziumSourceFunction<>(
//对于传入的数据库配置参数进行校验
deserializer, props, null, new OracleValidator(props));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.List;
import java.util.Properties;

/** Validates the version of the database connecting to. */
/** Validates the version of the database connecting to.验证Oracle数据库的版本 */
public class OracleValidator implements Validator {

private static final long serialVersionUID = 1L;
Expand All @@ -43,6 +43,7 @@ public OracleValidator(Properties properties) {
this.properties = properties;
}

//判断数据库是否满足处理数据的版本要求
@Override
public void validate() {
try (Connection connection = openConnection(properties)) {
Expand All @@ -62,11 +63,13 @@ public void validate() {
}
}

//建立数据库连接
public static Connection openConnection(Properties properties) throws SQLException {
DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
String url = OracleJdbcUrlUtils.getConnectionUrlWithSid(properties);
String userName = properties.getProperty("database.user");
String userpwd = properties.getProperty("database.password");
System.out.println("========数据库连接===");
return DriverManager.getConnection(url, userName, userpwd);
}
}
Loading
Loading