Skip to content

Commit

Permalink
[Improve] Refactor CatalogTable and add `SeaTunnelSource::getProduced…
Browse files Browse the repository at this point in the history
…CatalogTables` (apache#5562)
  • Loading branch information
Hisoka-X authored Sep 28, 2023
1 parent 36754cc commit 4117335
Show file tree
Hide file tree
Showing 37 changed files with 395 additions and 336 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;

import java.io.Serializable;
import java.util.List;

/**
* The interface for Source. It acts like a factory class that helps construct the {@link
Expand All @@ -49,9 +51,23 @@ public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT extends S
/**
* Get the data type of the records produced by this source.
*
* @deprecated Please use {@link #getProducedCatalogTables}
* @return SeaTunnel data type.
*/
SeaTunnelDataType<T> getProducedType();
@Deprecated
default SeaTunnelDataType<T> getProducedType() {
throw new UnsupportedOperationException("getProducedType method has not been implemented.");
}

/**
* Get the catalog tables output by this source, It is recommended that all connectors implement
* this method instead of {@link #getProducedType}. CatalogTable contains more information to
* help downstream support more accurate and complete synchronization capabilities.
*/
default List<CatalogTable> getProducedCatalogTables() {
throw new UnsupportedOperationException(
"getProducedCatalogTables method has not been implemented.");
}

/**
* Create source reader, used to produce data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.utils.JsonUtils;
Expand Down Expand Up @@ -138,9 +140,14 @@ public static List<CatalogTable> getCatalogTables(Config config, ClassLoader cla
@Deprecated
public static List<CatalogTable> getCatalogTablesFromConfig(
ReadonlyConfig readonlyConfig, ClassLoader classLoader) {

// We use plugin_name as factoryId, so MySQL-CDC should be MySQL
String factoryId = readonlyConfig.get(CommonOptions.PLUGIN_NAME).replace("-CDC", "");
return getCatalogTablesFromConfig(factoryId, readonlyConfig, classLoader);
}

@Deprecated
public static List<CatalogTable> getCatalogTablesFromConfig(
String factoryId, ReadonlyConfig readonlyConfig, ClassLoader classLoader) {
// Highest priority: specified schema
Map<String, String> schemaMap = readonlyConfig.get(CatalogTableUtil.SCHEMA);
if (schemaMap != null) {
Expand Down Expand Up @@ -188,6 +195,20 @@ public static CatalogTable buildWithConfig(Config config) {
return buildWithConfig(readonlyConfig);
}

public static SeaTunnelDataType<SeaTunnelRow> convertToDataType(
List<CatalogTable> catalogTables) {
if (catalogTables.size() == 1) {
return catalogTables.get(0).getTableSchema().toPhysicalRowDataType();
} else {
Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
for (CatalogTable catalogTable : catalogTables) {
String tableId = catalogTable.getTableId().toTablePath().toString();
rowTypeMap.put(tableId, catalogTable.getTableSchema().toPhysicalRowDataType());
}
return new MultipleRowType(rowTypeMap);
}
}

public static CatalogTable buildWithConfig(ReadonlyConfig readonlyConfig) {
if (readonlyConfig.get(SCHEMA) == null) {
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,20 @@
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.env.ParsingMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceOptions;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;

import org.slf4j.Logger;
Expand Down Expand Up @@ -55,10 +62,11 @@ public final class FactoryUtil {

private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);

static final String DEFAULT_ID = "default-identifier";

public static <T, SplitT extends SourceSplit, StateT extends Serializable>
List<Tuple2<SeaTunnelSource<T, SplitT, StateT>, List<CatalogTable>>>
createAndPrepareSource(
List<CatalogTable> multipleTables,
ReadonlyConfig options,
ClassLoader classLoader,
String factoryIdentifier) {
Expand All @@ -67,32 +75,44 @@ public final class FactoryUtil {
final TableSourceFactory factory =
discoverFactory(classLoader, TableSourceFactory.class, factoryIdentifier);
List<Tuple2<SeaTunnelSource<T, SplitT, StateT>, List<CatalogTable>>> sources =
new ArrayList<>(multipleTables.size());
if (factory instanceof SupportMultipleTable) {
List<CatalogTable> remainingTables = multipleTables;
while (!remainingTables.isEmpty()) {
TableFactoryContext context =
new TableFactoryContext(remainingTables, options, classLoader);
SupportMultipleTable.Result result =
((SupportMultipleTable) factory).applyTables(context);
List<CatalogTable> acceptedTables = result.getAcceptedTables();
sources.add(
new Tuple2<>(
createAndPrepareSource(
factory, acceptedTables, options, classLoader),
acceptedTables));
remainingTables = result.getRemainingTables();
}
} else {
for (CatalogTable catalogTable : multipleTables) {
List<CatalogTable> acceptedTables = Collections.singletonList(catalogTable);
sources.add(
new Tuple2<>(
createAndPrepareSource(
factory, acceptedTables, options, classLoader),
acceptedTables));
new ArrayList<>();
SeaTunnelSource<T, SplitT, StateT> source =
createAndPrepareSource(factory, options, classLoader);
List<CatalogTable> catalogTables;
try {
catalogTables = source.getProducedCatalogTables();
} catch (UnsupportedOperationException e) {
// TODO remove it when all connector use `getProducedCatalogTables`
SeaTunnelDataType<T> seaTunnelDataType = source.getProducedType();
final String tableId =
options.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse(DEFAULT_ID);
if (seaTunnelDataType instanceof MultipleRowType) {
catalogTables = new ArrayList<>();
for (String id : ((MultipleRowType) seaTunnelDataType).getTableIds()) {
catalogTables.add(
CatalogTableUtil.getCatalogTable(
id, ((MultipleRowType) seaTunnelDataType).getRowType(id)));
}
} else {
catalogTables =
Collections.singletonList(
CatalogTableUtil.getCatalogTable(
tableId, (SeaTunnelRowType) seaTunnelDataType));
}
}
LOG.info(
"get the CatalogTable from source {}: {}",
source.getPluginName(),
catalogTables.stream()
.map(CatalogTable::getTableId)
.map(TableIdentifier::toString)
.collect(Collectors.joining(",")));
if (options.get(SourceOptions.DAG_PARSING_MODE) == ParsingMode.SHARDING) {
CatalogTable catalogTable = catalogTables.get(0);
catalogTables.clear();
catalogTables.add(catalogTable);
}
sources.add(new Tuple2<>(source, catalogTables));
return sources;
} catch (Throwable t) {
throw new FactoryException(
Expand All @@ -104,22 +124,13 @@ public final class FactoryUtil {

private static <T, SplitT extends SourceSplit, StateT extends Serializable>
SeaTunnelSource<T, SplitT, StateT> createAndPrepareSource(
TableSourceFactory factory,
List<CatalogTable> acceptedTables,
ReadonlyConfig options,
ClassLoader classLoader) {
TableFactoryContext context = new TableFactoryContext(acceptedTables, options, classLoader);
TableSourceFactory factory, ReadonlyConfig options, ClassLoader classLoader) {
TableSourceFactoryContext context = new TableSourceFactoryContext(options, classLoader);
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
TableSource<T, SplitT, StateT> tableSource = factory.createSource(context);
validateAndApplyMetadata(acceptedTables, tableSource);
return tableSource.createSource();
}

private static void validateAndApplyMetadata(
List<CatalogTable> catalogTables, TableSource<?, ?, ?> tableSource) {
// TODO: handle reading metadata
}

public static <IN, StateT, CommitInfoT, AggregatedCommitInfoT>
SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createAndPrepareSink(
CatalogTable catalogTable,
Expand All @@ -129,9 +140,8 @@ SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createAndPrepareSi
try {
TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT> factory =
discoverFactory(classLoader, TableSinkFactory.class, factoryIdentifier);
TableFactoryContext context =
new TableFactoryContext(
Collections.singletonList(catalogTable), options, classLoader);
TableSinkFactoryContext context =
new TableSinkFactoryContext(catalogTable, options, classLoader);
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
return factory.createSink(context).createSink();
} catch (Throwable t) {
Expand Down Expand Up @@ -293,8 +303,8 @@ public static SeaTunnelTransform<?> createAndPrepareTransform(
String factoryIdentifier) {
final TableTransformFactory factory =
discoverFactory(classLoader, TableTransformFactory.class, factoryIdentifier);
TableFactoryContext context =
new TableFactoryContext(
TableTransformFactoryContext context =
new TableTransformFactoryContext(
Collections.singletonList(catalogTable), options, classLoader);
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
return factory.createTransform(context).createTransform();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,17 @@
package org.apache.seatunnel.api.table.factory;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;

import lombok.Getter;

import java.util.List;
@Getter
public abstract class TableFactoryContext {

public class TableFactoryContext {

private final List<CatalogTable> catalogTables;
@Getter private final ReadonlyConfig options;
private final ReadonlyConfig options;
private final ClassLoader classLoader;

public TableFactoryContext(
List<CatalogTable> catalogTables, ReadonlyConfig options, ClassLoader classLoader) {
this.catalogTables = catalogTables;
public TableFactoryContext(ReadonlyConfig options, ClassLoader classLoader) {
this.options = options;
this.classLoader = classLoader;
}

public ClassLoader getClassLoader() {
return this.classLoader;
}

/**
* Returns a list of tables that need to be processed.
*
* <p>By default, return only single table.
*
* <p>If you need multiple tables, implement {@link SupportMultipleTable}.
*/
public List<CatalogTable> getCatalogTables() {
return catalogTables;
}

/** @return single table. */
public CatalogTable getCatalogTable() {
return catalogTables.get(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public interface TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT
* @return
*/
default TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createSink(
TableFactoryContext context) {
TableSinkFactoryContext context) {
throw new UnsupportedOperationException(
"The Factory has not been implemented and the deprecated Plugin will be used.");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.table.factory;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;

import lombok.Getter;

@Getter
public class TableSinkFactoryContext extends TableFactoryContext {

private final CatalogTable catalogTable;

public TableSinkFactoryContext(
CatalogTable catalogTable, ReadonlyConfig options, ClassLoader classLoader) {
super(options, classLoader);
this.catalogTable = catalogTable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface TableSourceFactory extends Factory {
* @param context TableFactoryContext
*/
default <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(TableFactoryContext context) {
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
throw new UnsupportedOperationException(
"The Factory has not been implemented and the deprecated Plugin will be used.");
}
Expand Down
Loading

0 comments on commit 4117335

Please sign in to comment.