Skip to content

Commit

Permalink
[Improve] [Seatunnel-Engine] remove seatunnel-api from engine stora…
Browse files Browse the repository at this point in the history
…ge. (#3834)

* remove `seatunnel-api` from engine storage
  • Loading branch information
liugddx authored Jan 4, 2023
1 parent ca06ea1 commit dfceabf
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.seatunnel.engine.common.utils;

import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;

import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.InvocationTargetException;
import java.util.LinkedList;
import java.util.List;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.stream.Collectors;

@Slf4j
public class FactoryUtil<T> {

public static <T> T discoverFactory(ClassLoader classLoader, Class<T> factoryClass, String factoryIdentifier) {
try {
final List<T> result = new LinkedList<>();
ServiceLoader.load(factoryClass, classLoader)
.iterator()
.forEachRemaining(result::add);

List<T> foundFactories = result.stream().filter(f -> factoryClass.isAssignableFrom(f.getClass()))
.filter(t -> {
try {
return t.getClass().getMethod("factoryIdentifier").invoke(t).equals(factoryIdentifier);
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new SeaTunnelEngineException("Failed to call factoryIdentifier method.");
}
})
.collect(Collectors.toList());

if (foundFactories.isEmpty()) {
throw new SeaTunnelEngineException(
String.format(
"Could not find any factories that implement '%s' in the classpath.",
factoryClass.getName()));
}

if (foundFactories.size() > 1) {
throw new SeaTunnelEngineException(
String.format(
"Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\n"
+ "Ambiguous factory classes are:\n\n"
+ "%s",
factoryIdentifier,
factoryClass.getName(),
foundFactories.stream()
.map(f -> f.getClass().getName())
.sorted()
.collect(Collectors.joining("\n"))));
}

return foundFactories.get(0);
} catch (ServiceConfigurationError e) {
log.error("Could not load service provider for factories.", e);
throw new SeaTunnelEngineException("Could not load service provider for factories.", e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID;

import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.FactoryUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
import org.apache.seatunnel.engine.core.dag.actions.Action;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@

import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID;

import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
import org.apache.seatunnel.engine.checkpoint.storage.common.ProtoStuffSerializer;
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
import org.apache.seatunnel.engine.common.utils.FactoryUtil;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@

<artifactId>checkpoint-storage-api</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
</dependency>
<!--protostuff-->
<dependency>
<groupId>io.protostuff</groupId>
Expand All @@ -47,4 +42,4 @@
</dependency>
</dependencies>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,23 @@

package org.apache.seatunnel.engine.checkpoint.storage.api;

import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;

import java.util.Map;

/**
* All checkpoint storage plugins need to implement it
*/
public interface CheckpointStorageFactory extends Factory {
public interface CheckpointStorageFactory {

/**
* Returns a unique identifier among same factory interfaces.
*
* <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
* kafka}). If multiple factories exist for different versions, a version should be appended
* using "-" (e.g. {@code elasticsearch-7}).
*/
String factoryIdentifier();

/**
* create storage plugin instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

package org.apache.seatunnel.engine.checkpoint.storage.hdfs;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
Expand Down Expand Up @@ -50,18 +48,13 @@
* fs.s3a.aws.credentials.provider = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
* </pre>
*/
@AutoService(Factory.class)
@AutoService(CheckpointStorageFactory.class)
public class HdfsStorageFactory implements CheckpointStorageFactory {
@Override
public String factoryIdentifier() {
return "hdfs";
}

@Override
public OptionRule optionRule() {
return OptionRule.builder().build();
}

@Override
public CheckpointStorage create(Map<String, String> configuration) throws CheckpointStorageException {
if (HdfsFileStorageInstance.isFsNull()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

package org.apache.seatunnel.engine.checkpoint.storage.localfile;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;

Expand All @@ -36,19 +34,14 @@
* deprecated: use @see org.apache.seatunnel.engine.checkpoint.storage.hdfs.HdfsStorageFactory instead
*/
@Deprecated
@AutoService(Factory.class)
@AutoService(CheckpointStorageFactory.class)
public class LocalFileStorageFactory implements CheckpointStorageFactory {

@Override
public String factoryIdentifier() {
return "localfile";
}

@Override
public OptionRule optionRule() {
return OptionRule.builder().build();
}

@Override
public CheckpointStorage create(Map<String, String> configuration) {
return new LocalFileStorage(configuration);
Expand Down

0 comments on commit dfceabf

Please sign in to comment.