Skip to content

Commit

Permalink
[Feature][CDC][Zeta] Support schema evolution framework(DDL)
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 committed Jul 20, 2023
1 parent 02114db commit af7cbaa
Show file tree
Hide file tree
Showing 54 changed files with 2,039 additions and 175 deletions.
3 changes: 3 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,9 @@
<version>${spotless.version}</version>
<configuration>
<java>
<excludes>
<exclude>src/main/java/org/apache/seatunnel/antlr4/generated/*.*</exclude>
</excludes>
<googleJavaFormat>
<version>1.7</version>
<style>AOSP</style>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.api.sink;

import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;

import java.io.IOException;
import java.io.Serializable;
Expand All @@ -44,6 +45,14 @@ public interface SinkWriter<T, CommitInfoT, StateT> {
*/
void write(T element) throws IOException;

/**
* apply schema change to third party data receiver.
*
* @param event
* @throws IOException
*/
default void applySchemaChange(SchemaChangeEvent event) throws IOException {}

/**
* prepare the commit, will be called before {@link #snapshotState(long checkpointId)}. If you
* need to use 2pc, you can return the commit info in this method, and receive the commit info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.api.source;

import org.apache.seatunnel.api.table.event.SchemaChangeEvent;

/**
* A {@link Collector} is used to collect data from {@link SourceReader}.
*
Expand All @@ -26,6 +28,12 @@ public interface Collector<T> {

void collect(T record);

default void markSchemaChangeBeforeCheckpoint() {}

default void collect(SchemaChangeEvent event) {}

default void markSchemaChangeAfterCheckpoint() {}

/**
* Returns the checkpoint lock.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.table.event;

import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;

import lombok.Getter;
import lombok.ToString;

@Getter
@ToString(callSuper = true)
public class AlterTableAddColumnEvent extends AlterTableColumnEvent {
private final Column column;
private final boolean first;
private final String afterColumn;

public AlterTableAddColumnEvent(
TablePath tablePath, Column column, boolean first, String afterColumn) {
super(tablePath);
this.column = column;
this.first = first;
this.afterColumn = afterColumn;
}

public static AlterTableAddColumnEvent addFirst(TablePath tablePath, Column column) {
return new AlterTableAddColumnEvent(tablePath, column, true, null);
}

public static AlterTableAddColumnEvent add(TablePath tablePath, Column column) {
return new AlterTableAddColumnEvent(tablePath, column, false, null);
}

public static AlterTableAddColumnEvent addAfter(
TablePath tablePath, Column column, String afterColumn) {
return new AlterTableAddColumnEvent(tablePath, column, false, afterColumn);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.table.event;

import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;

import lombok.Getter;
import lombok.ToString;

@Getter
@ToString(callSuper = true)
public class AlterTableChangeColumnEvent extends AlterTableAddColumnEvent {
private final String oldColumn;

public AlterTableChangeColumnEvent(
TablePath tablePath,
String oldColumn,
Column column,
boolean first,
String afterColumn) {
super(tablePath, column, first, afterColumn);
this.oldColumn = oldColumn;
}

public static AlterTableChangeColumnEvent changeFirst(
TablePath tablePath, String oldColumn, Column column) {
return new AlterTableChangeColumnEvent(tablePath, oldColumn, column, true, null);
}

public static AlterTableChangeColumnEvent change(
TablePath tablePath, String oldColumn, Column column) {
return new AlterTableChangeColumnEvent(tablePath, oldColumn, column, false, null);
}

public static AlterTableChangeColumnEvent changeAfter(
TablePath tablePath, String oldColumn, Column column, String afterColumn) {
return new AlterTableChangeColumnEvent(tablePath, oldColumn, column, false, afterColumn);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.apache.seatunnel.api.table.event;

import org.apache.seatunnel.api.table.catalog.TablePath;

import lombok.ToString;

@ToString(callSuper = true)
public abstract class AlterTableColumnEvent extends AlterTableEvent {
public AlterTableColumnEvent(TablePath tablePath) {
super(tablePath);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.table.event;

import org.apache.seatunnel.api.table.catalog.TablePath;

import lombok.Getter;
import lombok.ToString;

import java.util.ArrayList;
import java.util.List;

@Getter
@ToString(callSuper = true)
public class AlterTableColumnsEvent extends AlterTableEvent {
private final List<AlterTableColumnEvent> events;

public AlterTableColumnsEvent(TablePath tablePath) {
this(tablePath, new ArrayList<>());
}

public AlterTableColumnsEvent(TablePath tablePath, List<AlterTableColumnEvent> events) {
super(tablePath);
this.events = events;
}

public AlterTableColumnsEvent addEvent(AlterTableColumnEvent event) {
events.add(event);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.table.event;

import org.apache.seatunnel.api.table.catalog.TablePath;

import lombok.Getter;
import lombok.ToString;

@Getter
@ToString(callSuper = true)
public class AlterTableDropColumnEvent extends AlterTableColumnEvent {
private final String column;

public AlterTableDropColumnEvent(TablePath tablePath, String column) {
super(tablePath);
this.column = column;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,15 @@
* limitations under the License.
*/

package org.apache.seatunnel.engine.server.task.record;
package org.apache.seatunnel.api.table.event;

/** Change the schema of the task and flow. */
public class SchemaBarrier implements Barrier {
@Override
public long getId() {
return -1;
}
import org.apache.seatunnel.api.table.catalog.TablePath;

@Override
public boolean snapshot() {
return false;
}
import lombok.ToString;

@Override
public boolean prepareClose() {
return false;
@ToString(callSuper = true)
public abstract class AlterTableEvent extends TableEvent {
public AlterTableEvent(TablePath tablePath) {
super(tablePath);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.table.event;

import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;

import lombok.Getter;
import lombok.ToString;

@Getter
@ToString(callSuper = true)
public class AlterTableModifyColumnEvent extends AlterTableAddColumnEvent {
public AlterTableModifyColumnEvent(
TablePath tablePath, Column column, boolean first, String afterColumn) {
super(tablePath, column, first, afterColumn);
}

public static AlterTableModifyColumnEvent modifyFirst(TablePath tablePath, Column column) {
return new AlterTableModifyColumnEvent(tablePath, column, true, null);
}

public static AlterTableModifyColumnEvent modify(TablePath tablePath, Column column) {
return new AlterTableModifyColumnEvent(tablePath, column, false, null);
}

public static AlterTableModifyColumnEvent modifyAfter(
TablePath tablePath, Column column, String afterColumn) {
return new AlterTableModifyColumnEvent(tablePath, column, false, afterColumn);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.table.event;

import org.apache.seatunnel.api.table.catalog.TablePath;

import lombok.Getter;
import lombok.ToString;

@Getter
@ToString(callSuper = true)
public class AlterTableNameEvent extends AlterTableColumnEvent {
private final TablePath newTablePath;

public AlterTableNameEvent(TablePath tablePath, TablePath newTablePath) {
super(tablePath);
this.newTablePath = newTablePath;
}
}
Loading

0 comments on commit af7cbaa

Please sign in to comment.