diff --git a/seatunnel-api/pom.xml b/seatunnel-api/pom.xml index e86d7301a03..f3efb218c20 100644 --- a/seatunnel-api/pom.xml +++ b/seatunnel-api/pom.xml @@ -1,4 +1,22 @@ + diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java index 9cd8efbef1d..8461e7dc1a7 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java @@ -19,7 +19,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink; -public interface TableSink { +public interface TableSink { - SeaTunnelSink createSink(); + SeaTunnelSink createSink(); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java index 7727735a730..edb8ee69dfe 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java @@ -18,8 +18,9 @@ package org.apache.seatunnel.api.table.connector; import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; -public interface TableSource { +public interface TableSource { - SeaTunnelSource createSource(); + SeaTunnelSource createSource(); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java index b0677676647..c260c2640e7 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableSource; @@ -41,37 +42,40 @@ public final class FactoryUtil { private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class); - public static List createAndPrepareSource( - List multipleTables, - Map options, - ClassLoader classLoader, - String factoryIdentifier) { + public static List> createAndPrepareSource( + List multipleTables, + Map options, + ClassLoader classLoader, + String factoryIdentifier) { try { - final TableSourceFactory factory = discoverFactory(classLoader, TableSourceFactory.class, factoryIdentifier); - List sources = new ArrayList<>(multipleTables.size()); + List> sources = new ArrayList<>(multipleTables.size()); if (factory instanceof SupportMultipleTable) { TableFactoryContext context = new TableFactoryContext(multipleTables, options, classLoader); SupportMultipleTable multipleTableSourceFactory = (SupportMultipleTable) factory; // TODO: create all source SupportMultipleTable.Result result = multipleTableSourceFactory.applyTables(context); - TableSource multipleTableSource = factory.createSource(new TableFactoryContext(result.getAcceptedTables(), options, classLoader)); + TableSource multipleTableSource = factory.createSource( + new TableFactoryContext(result.getAcceptedTables(), options, classLoader)); // TODO: handle reading metadata - SeaTunnelSource source = multipleTableSource.createSource(); + SeaTunnelSource source = multipleTableSource.createSource(); sources.add(source); } return sources; } catch (Throwable t) { throw new FactoryException( - String.format( - "Unable to create a source for identifier '%s'.", factoryIdentifier), - t); + String.format( + "Unable to create a source for identifier '%s'.", factoryIdentifier), + t); } } - public static List createAndPrepareSink() { - return null; + public static SeaTunnelSink createAndPrepareSink( + ClassLoader classLoader, String factoryIdentifier) { + // todo: do we need to set table? + TableSinkFactory factory = discoverFactory(classLoader, TableSinkFactory.class, factoryIdentifier); + return factory.createSink(null).createSink(); } public static Catalog createCatalog(String catalogName, diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java index bb92a76a0d7..bdc3d1a642b 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java @@ -19,7 +19,7 @@ import org.apache.seatunnel.api.table.connector.TableSink; -public interface TableSinkFactory extends Factory { +public interface TableSinkFactory extends Factory { - TableSink createSink(TableFactoryContext context); + TableSink createSink(TableFactoryContext context); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java index 241deeb039b..2206a6bb9ac 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java @@ -17,9 +17,10 @@ package org.apache.seatunnel.api.table.factory; +import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.connector.TableSource; public interface TableSourceFactory extends Factory { - TableSource createSource(TableFactoryContext context); + TableSource createSource(TableFactoryContext context); } diff --git a/seatunnel-connectors/pom.xml b/seatunnel-connectors/pom.xml index 69a02445745..482cf788422 100644 --- a/seatunnel-connectors/pom.xml +++ b/seatunnel-connectors/pom.xml @@ -35,6 +35,8 @@ seatunnel-connectors-flink-dist seatunnel-connectors-spark seatunnel-connectors-spark-dist + seatunnel-connectors-seatunnel + seatunnel-connectors-seatunnel-dist diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml new file mode 100644 index 00000000000..b487fe1a7df --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml @@ -0,0 +1,67 @@ + + + + + seatunnel-connectors + org.apache.seatunnel + ${revision} + + 4.0.0 + + seatunnel-connectors-seatunnel-dist + + + + org.apache.seatunnel + seatunnel-connectors-seatunnel-fake + ${project.version} + + + org.apache.seatunnel + seatunnel-connectors-seatunnel-console + ${project.version} + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-connector + package + + copy-dependencies + + + jar + jar + ${project.build.directory}/lib + + + + + + + \ No newline at end of file diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml new file mode 100644 index 00000000000..7897a316c4d --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml @@ -0,0 +1,37 @@ + + + + + seatunnel-connectors + org.apache.seatunnel + ${revision} + + 4.0.0 + pom + + seatunnel-connectors-seatunnel + + + seatunnel-connectors-seatunnel-console + seatunnel-connectors-seatunnel-fake + + \ No newline at end of file diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/pom.xml new file mode 100644 index 00000000000..9bc8680f675 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/pom.xml @@ -0,0 +1,38 @@ + + + + + org.apache.seatunnel + seatunnel-connectors-seatunnel + ${revision} + + 4.0.0 + + seatunnel-connectors-seatunnel-console + + + + org.apache.seatunnel + seatunnel-api + ${project.version} + + + + \ No newline at end of file diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleAggregatedCommitInfo.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleAggregatedCommitInfo.java new file mode 100644 index 00000000000..d866e084490 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleAggregatedCommitInfo.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.console.sink; + +public class ConsoleAggregatedCommitInfo { +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleCommitInfo.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleCommitInfo.java new file mode 100644 index 00000000000..6c41be0ca68 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleCommitInfo.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.console.sink; + +public class ConsoleCommitInfo { +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java new file mode 100644 index 00000000000..947b312569f --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.console.sink; + +import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; +import org.apache.seatunnel.api.sink.SinkCommitter; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState; + +import java.util.List; +import java.util.Optional; + +public class ConsoleSink implements SeaTunnelSink { + + @Override + public SinkWriter createWriter(SinkWriter.Context context) { + return new ConsoleSinkWriter(); + } + + @Override + public SinkWriter restoreWriter( + SinkWriter.Context context, List states) { + return restoreWriter(context, states); + } + + @Override + public Optional> getWriterStateSerializer() { + return getWriterStateSerializer(); + } + + @Override + public Optional> createCommitter() { + return createCommitter(); + } + + @Override + public Optional> getCommitInfoSerializer() { + return getCommitInfoSerializer(); + } + + @Override + public Optional> createAggregatedCommitter() { + return createAggregatedCommitter(); + } + + @Override + public Optional> getAggregatedCommitInfoSerializer() { + return getAggregatedCommitInfoSerializer(); + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java new file mode 100644 index 00000000000..e395ad3e2a0 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.console.sink; + +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConsoleSinkWriter implements SinkWriter { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConsoleSinkWriter.class); + + @Override + @SuppressWarnings("checkstyle:RegexpSingleline") + public void write(SeaTunnelRow element) { + System.out.println(element.toString()); + } + + @Override + public ConsoleCommitInfo prepareCommit() { + return null; + } + + @Override + public void close() { + + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/state/ConsoleState.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/state/ConsoleState.java new file mode 100644 index 00000000000..b2abaeff9b6 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/state/ConsoleState.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.console.state; + +import org.apache.seatunnel.api.state.State; + +public class ConsoleState implements State { + +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/pom.xml new file mode 100644 index 00000000000..cc2daab7435 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/pom.xml @@ -0,0 +1,41 @@ + + + + + org.apache.seatunnel + seatunnel-connectors-seatunnel + ${revision} + + 4.0.0 + + seatunnel-connectors-seatunnel-fake + + + + org.apache.seatunnel + seatunnel-api + ${project.version} + + + + + \ No newline at end of file diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java new file mode 100644 index 00000000000..0c3fb31b7c9 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.fake.source; + +import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState; + +public class FakeSource implements SeaTunnelSource { + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SourceReader createReader(SourceReader.Context readerContext) { + return new FakeSourceReader(readerContext); + } + + @Override + public Serializer getSplitSerializer() { + return new ObjectSerializer<>(); + } + + @Override + public SourceSplitEnumerator createEnumerator( + SourceSplitEnumerator.Context enumeratorContext) { + return new FakeSourceSplitEnumerator(enumeratorContext); + } + + @Override + public SourceSplitEnumerator restoreEnumerator( + SourceSplitEnumerator.Context enumeratorContext, FakeState checkpointState) { + // todo: + return null; + } + + @Override + public Serializer getEnumeratorStateSerializer() { + return new ObjectSerializer<>(); + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceEvent.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceEvent.java new file mode 100644 index 00000000000..76e0a3eb7c7 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceEvent.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.fake.source; + +import org.apache.seatunnel.api.source.SourceEvent; + +public class FakeSourceEvent implements SourceEvent { + + private final String name; + private final int age; + private final long timestamp; + + public FakeSourceEvent(String name, int age, long timestamp) { + this.name = name; + this.age = age; + this.timestamp = timestamp; + } + + public String getName() { + return name; + } + + public int getAge() { + return age; + } + + public long getTimestamp() { + return timestamp; + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java new file mode 100644 index 00000000000..fa1c28c4ab3 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.fake.source; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class FakeSourceReader implements SourceReader { + + private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceReader.class); + + private final SourceReader.Context context; + + public FakeSourceReader(SourceReader.Context context) { + this.context = context; + } + + @Override + public void open() { + + } + + @Override + public void close() { + + } + + @Override + @SuppressWarnings("magicnumber") + public void pollNext(Collector output) { + output.collect(new FakeSourceEvent("Tom", 19, System.currentTimeMillis())); + } + + @Override + public List snapshotState(long checkpointId) { + return null; + } + + @Override + public void addSplits(List splits) { + + } + + @Override + public void handleNoMoreSplits() { + + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java new file mode 100644 index 00000000000..05e85061846 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.fake.source; + +import org.apache.seatunnel.api.source.SourceSplit; + +import java.io.Serializable; + +public class FakeSourceSplit implements SourceSplit, Serializable { + + private static final long serialVersionUID = -1L; + + private final String splitId; + + public FakeSourceSplit(String splitId) { + this.splitId = splitId; + } + + @Override + public String splitId() { + return splitId; + } + +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java new file mode 100644 index 00000000000..e41d96c0815 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.fake.source; + +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState; + +import java.io.IOException; +import java.util.List; + +public class FakeSourceSplitEnumerator implements SourceSplitEnumerator { + + private final SourceSplitEnumerator.Context enumeratorContext; + + public FakeSourceSplitEnumerator(SourceSplitEnumerator.Context enumeratorContext) { + this.enumeratorContext = enumeratorContext; + } + + @Override + public void open() { + + } + + @Override + public void run() { + + } + + @Override + public void close() throws IOException { + + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + + } + + @Override + public void handleSplitRequest(int subtaskId) { + + } + + @Override + public void registerReader(int subtaskId) { + + } + + @Override + public FakeState snapshotState(long checkpointId) throws Exception { + return null; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSupportCoordinate.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSupportCoordinate.java new file mode 100644 index 00000000000..f7a9b098a43 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSupportCoordinate.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.fake.source; + +import org.apache.seatunnel.api.source.SupportCoordinate; + +public class FakeSupportCoordinate implements SupportCoordinate { +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/ObjectSerializer.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/ObjectSerializer.java new file mode 100644 index 00000000000..bd53b4a2596 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/ObjectSerializer.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.fake.source; + +import org.apache.seatunnel.api.serialization.Serializer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +public class ObjectSerializer implements Serializer { + + @Override + public byte[] serialize(T obj) throws IOException { + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) { + objectOutputStream.writeObject(obj); + return byteArrayOutputStream.toByteArray(); + } + } + + @Override + public T deserialize(byte[] serialized) throws IOException { + try { + return (T) new ObjectInputStream(new ByteArrayInputStream(serialized)).readObject(); + } catch (ClassNotFoundException e) { + throw new RuntimeException("deserialize split error", e); + } + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeState.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeState.java new file mode 100644 index 00000000000..a920f830820 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeState.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.fake.state; + +import org.apache.seatunnel.api.state.State; + +public class FakeState implements State { +}