diff --git a/seatunnel-api/pom.xml b/seatunnel-api/pom.xml
index e86d7301a03..f3efb218c20 100644
--- a/seatunnel-api/pom.xml
+++ b/seatunnel-api/pom.xml
@@ -1,4 +1,22 @@
+
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
index 9cd8efbef1d..8461e7dc1a7 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
@@ -19,7 +19,7 @@
import org.apache.seatunnel.api.sink.SeaTunnelSink;
-public interface TableSink {
+public interface TableSink {
- SeaTunnelSink, ?, ?, ?> createSink();
+ SeaTunnelSink createSink();
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
index 7727735a730..edb8ee69dfe 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
@@ -18,8 +18,9 @@
package org.apache.seatunnel.api.table.connector;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
-public interface TableSource {
+public interface TableSource {
- SeaTunnelSource, ?, ?> createSource();
+ SeaTunnelSource createSource();
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index b0677676647..c260c2640e7 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -19,6 +19,7 @@
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSource;
@@ -41,37 +42,40 @@ public final class FactoryUtil {
private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
- public static List createAndPrepareSource(
- List multipleTables,
- Map options,
- ClassLoader classLoader,
- String factoryIdentifier) {
+ public static List> createAndPrepareSource(
+ List multipleTables,
+ Map options,
+ ClassLoader classLoader,
+ String factoryIdentifier) {
try {
-
final TableSourceFactory factory = discoverFactory(classLoader, TableSourceFactory.class, factoryIdentifier);
- List sources = new ArrayList<>(multipleTables.size());
+ List> sources = new ArrayList<>(multipleTables.size());
if (factory instanceof SupportMultipleTable) {
TableFactoryContext context = new TableFactoryContext(multipleTables, options, classLoader);
SupportMultipleTable multipleTableSourceFactory = (SupportMultipleTable) factory;
// TODO: create all source
SupportMultipleTable.Result result = multipleTableSourceFactory.applyTables(context);
- TableSource multipleTableSource = factory.createSource(new TableFactoryContext(result.getAcceptedTables(), options, classLoader));
+ TableSource multipleTableSource = factory.createSource(
+ new TableFactoryContext(result.getAcceptedTables(), options, classLoader));
// TODO: handle reading metadata
- SeaTunnelSource, ?, ?> source = multipleTableSource.createSource();
+ SeaTunnelSource source = multipleTableSource.createSource();
sources.add(source);
}
return sources;
} catch (Throwable t) {
throw new FactoryException(
- String.format(
- "Unable to create a source for identifier '%s'.", factoryIdentifier),
- t);
+ String.format(
+ "Unable to create a source for identifier '%s'.", factoryIdentifier),
+ t);
}
}
- public static List createAndPrepareSink() {
- return null;
+ public static SeaTunnelSink createAndPrepareSink(
+ ClassLoader classLoader, String factoryIdentifier) {
+ // todo: do we need to set table?
+ TableSinkFactory factory = discoverFactory(classLoader, TableSinkFactory.class, factoryIdentifier);
+ return factory.createSink(null).createSink();
}
public static Catalog createCatalog(String catalogName,
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
index bb92a76a0d7..bdc3d1a642b 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
@@ -19,7 +19,7 @@
import org.apache.seatunnel.api.table.connector.TableSink;
-public interface TableSinkFactory extends Factory {
+public interface TableSinkFactory extends Factory {
- TableSink createSink(TableFactoryContext context);
+ TableSink createSink(TableFactoryContext context);
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
index 241deeb039b..2206a6bb9ac 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
@@ -17,9 +17,10 @@
package org.apache.seatunnel.api.table.factory;
+import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.connector.TableSource;
public interface TableSourceFactory extends Factory {
- TableSource createSource(TableFactoryContext context);
+ TableSource createSource(TableFactoryContext context);
}
diff --git a/seatunnel-connectors/pom.xml b/seatunnel-connectors/pom.xml
index 69a02445745..482cf788422 100644
--- a/seatunnel-connectors/pom.xml
+++ b/seatunnel-connectors/pom.xml
@@ -35,6 +35,8 @@
seatunnel-connectors-flink-dist
seatunnel-connectors-spark
seatunnel-connectors-spark-dist
+ seatunnel-connectors-seatunnel
+ seatunnel-connectors-seatunnel-dist
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
new file mode 100644
index 00000000000..b487fe1a7df
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
@@ -0,0 +1,67 @@
+
+
+
+
+ seatunnel-connectors
+ org.apache.seatunnel
+ ${revision}
+
+ 4.0.0
+
+ seatunnel-connectors-seatunnel-dist
+
+
+
+ org.apache.seatunnel
+ seatunnel-connectors-seatunnel-fake
+ ${project.version}
+
+
+ org.apache.seatunnel
+ seatunnel-connectors-seatunnel-console
+ ${project.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ copy-connector
+ package
+
+ copy-dependencies
+
+
+ jar
+ jar
+ ${project.build.directory}/lib
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
new file mode 100644
index 00000000000..7897a316c4d
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
@@ -0,0 +1,37 @@
+
+
+
+
+ seatunnel-connectors
+ org.apache.seatunnel
+ ${revision}
+
+ 4.0.0
+ pom
+
+ seatunnel-connectors-seatunnel
+
+
+ seatunnel-connectors-seatunnel-console
+ seatunnel-connectors-seatunnel-fake
+
+
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/pom.xml
new file mode 100644
index 00000000000..9bc8680f675
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/pom.xml
@@ -0,0 +1,38 @@
+
+
+
+
+ org.apache.seatunnel
+ seatunnel-connectors-seatunnel
+ ${revision}
+
+ 4.0.0
+
+ seatunnel-connectors-seatunnel-console
+
+
+
+ org.apache.seatunnel
+ seatunnel-api
+ ${project.version}
+
+
+
+
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleAggregatedCommitInfo.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleAggregatedCommitInfo.java
new file mode 100644
index 00000000000..d866e084490
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleAggregatedCommitInfo.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.console.sink;
+
+public class ConsoleAggregatedCommitInfo {
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleCommitInfo.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleCommitInfo.java
new file mode 100644
index 00000000000..6c41be0ca68
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleCommitInfo.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.console.sink;
+
+public class ConsoleCommitInfo {
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
new file mode 100644
index 00000000000..947b312569f
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.console.sink;
+
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;
+
+import java.util.List;
+import java.util.Optional;
+
+public class ConsoleSink implements SeaTunnelSink {
+
+ @Override
+ public SinkWriter createWriter(SinkWriter.Context context) {
+ return new ConsoleSinkWriter();
+ }
+
+ @Override
+ public SinkWriter restoreWriter(
+ SinkWriter.Context context, List states) {
+ return restoreWriter(context, states);
+ }
+
+ @Override
+ public Optional> getWriterStateSerializer() {
+ return getWriterStateSerializer();
+ }
+
+ @Override
+ public Optional> createCommitter() {
+ return createCommitter();
+ }
+
+ @Override
+ public Optional> getCommitInfoSerializer() {
+ return getCommitInfoSerializer();
+ }
+
+ @Override
+ public Optional> createAggregatedCommitter() {
+ return createAggregatedCommitter();
+ }
+
+ @Override
+ public Optional> getAggregatedCommitInfoSerializer() {
+ return getAggregatedCommitInfoSerializer();
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
new file mode 100644
index 00000000000..e395ad3e2a0
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.console.sink;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsoleSinkWriter implements SinkWriter {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConsoleSinkWriter.class);
+
+ @Override
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ public void write(SeaTunnelRow element) {
+ System.out.println(element.toString());
+ }
+
+ @Override
+ public ConsoleCommitInfo prepareCommit() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/state/ConsoleState.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/state/ConsoleState.java
new file mode 100644
index 00000000000..b2abaeff9b6
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/state/ConsoleState.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.console.state;
+
+import org.apache.seatunnel.api.state.State;
+
+public class ConsoleState implements State {
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/pom.xml
new file mode 100644
index 00000000000..cc2daab7435
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/pom.xml
@@ -0,0 +1,41 @@
+
+
+
+
+ org.apache.seatunnel
+ seatunnel-connectors-seatunnel
+ ${revision}
+
+ 4.0.0
+
+ seatunnel-connectors-seatunnel-fake
+
+
+
+ org.apache.seatunnel
+ seatunnel-api
+ ${project.version}
+
+
+
+
+
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
new file mode 100644
index 00000000000..0c3fb31b7c9
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState;
+
+public class FakeSource implements SeaTunnelSource {
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SourceReader createReader(SourceReader.Context readerContext) {
+ return new FakeSourceReader(readerContext);
+ }
+
+ @Override
+ public Serializer getSplitSerializer() {
+ return new ObjectSerializer<>();
+ }
+
+ @Override
+ public SourceSplitEnumerator createEnumerator(
+ SourceSplitEnumerator.Context enumeratorContext) {
+ return new FakeSourceSplitEnumerator(enumeratorContext);
+ }
+
+ @Override
+ public SourceSplitEnumerator restoreEnumerator(
+ SourceSplitEnumerator.Context enumeratorContext, FakeState checkpointState) {
+ // todo:
+ return null;
+ }
+
+ @Override
+ public Serializer getEnumeratorStateSerializer() {
+ return new ObjectSerializer<>();
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceEvent.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceEvent.java
new file mode 100644
index 00000000000..76e0a3eb7c7
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceEvent.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.api.source.SourceEvent;
+
+public class FakeSourceEvent implements SourceEvent {
+
+ private final String name;
+ private final int age;
+ private final long timestamp;
+
+ public FakeSourceEvent(String name, int age, long timestamp) {
+ this.name = name;
+ this.age = age;
+ this.timestamp = timestamp;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
new file mode 100644
index 00000000000..fa1c28c4ab3
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class FakeSourceReader implements SourceReader {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceReader.class);
+
+ private final SourceReader.Context context;
+
+ public FakeSourceReader(SourceReader.Context context) {
+ this.context = context;
+ }
+
+ @Override
+ public void open() {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ @SuppressWarnings("magicnumber")
+ public void pollNext(Collector output) {
+ output.collect(new FakeSourceEvent("Tom", 19, System.currentTimeMillis()));
+ }
+
+ @Override
+ public List snapshotState(long checkpointId) {
+ return null;
+ }
+
+ @Override
+ public void addSplits(List splits) {
+
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
new file mode 100644
index 00000000000..05e85061846
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import java.io.Serializable;
+
+public class FakeSourceSplit implements SourceSplit, Serializable {
+
+ private static final long serialVersionUID = -1L;
+
+ private final String splitId;
+
+ public FakeSourceSplit(String splitId) {
+ this.splitId = splitId;
+ }
+
+ @Override
+ public String splitId() {
+ return splitId;
+ }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
new file mode 100644
index 00000000000..e41d96c0815
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState;
+
+import java.io.IOException;
+import java.util.List;
+
+public class FakeSourceSplitEnumerator implements SourceSplitEnumerator {
+
+ private final SourceSplitEnumerator.Context enumeratorContext;
+
+ public FakeSourceSplitEnumerator(SourceSplitEnumerator.Context enumeratorContext) {
+ this.enumeratorContext = enumeratorContext;
+ }
+
+ @Override
+ public void open() {
+
+ }
+
+ @Override
+ public void run() {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void addSplitsBack(List splits, int subtaskId) {
+
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {
+
+ }
+
+ @Override
+ public void registerReader(int subtaskId) {
+
+ }
+
+ @Override
+ public FakeState snapshotState(long checkpointId) throws Exception {
+ return null;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSupportCoordinate.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSupportCoordinate.java
new file mode 100644
index 00000000000..f7a9b098a43
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSupportCoordinate.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.api.source.SupportCoordinate;
+
+public class FakeSupportCoordinate implements SupportCoordinate {
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/ObjectSerializer.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/ObjectSerializer.java
new file mode 100644
index 00000000000..bd53b4a2596
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/ObjectSerializer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.api.serialization.Serializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+public class ObjectSerializer implements Serializer {
+
+ @Override
+ public byte[] serialize(T obj) throws IOException {
+ try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
+ objectOutputStream.writeObject(obj);
+ return byteArrayOutputStream.toByteArray();
+ }
+ }
+
+ @Override
+ public T deserialize(byte[] serialized) throws IOException {
+ try {
+ return (T) new ObjectInputStream(new ByteArrayInputStream(serialized)).readObject();
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("deserialize split error", e);
+ }
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeState.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeState.java
new file mode 100644
index 00000000000..a920f830820
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeState.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.state;
+
+import org.apache.seatunnel.api.state.State;
+
+public class FakeState implements State {
+}