Skip to content

Commit

Permalink
[FLINK-36504][table-common]Remove all deprecated methods ModuleFactory (
Browse files Browse the repository at this point in the history
#25963)

Co-authored-by: shiwei10 <shiwei10@staff.weibo.com>
  • Loading branch information
xishuaidelin and Edward-Gavin authored Jan 15, 2025
1 parent 5e87e8a commit 8af0259
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -427,41 +426,32 @@ public static Module createModule(
MODULE_TYPE.key(), options.get(MODULE_TYPE.key())));
}

final DefaultModuleContext discoveryContext =
new DefaultModuleContext(options, configuration, classLoader);
try {
final Map<String, String> optionsWithType = new HashMap<>(options);
optionsWithType.put(MODULE_TYPE.key(), moduleName);

final ModuleFactory legacyFactory =
TableFactoryService.find(ModuleFactory.class, optionsWithType, classLoader);
return legacyFactory.createModule(optionsWithType);
} catch (NoMatchingTableFactoryException e) {
final DefaultModuleContext discoveryContext =
final ModuleFactory factory =
discoverFactory(
((ModuleFactory.Context) discoveryContext).getClassLoader(),
ModuleFactory.class,
moduleName);

final DefaultModuleContext context =
new DefaultModuleContext(options, configuration, classLoader);
try {
final ModuleFactory factory =
discoverFactory(
((ModuleFactory.Context) discoveryContext).getClassLoader(),
ModuleFactory.class,
moduleName);

final DefaultModuleContext context =
new DefaultModuleContext(options, configuration, classLoader);
return factory.createModule(context);
} catch (Throwable t) {
throw new ValidationException(
String.format(
"Unable to create module '%s'.%n%nModule options are:%n%s",
moduleName,
options.entrySet().stream()
.map(
optionEntry ->
stringifyOption(
optionEntry.getKey(),
optionEntry.getValue()))
.sorted()
.collect(Collectors.joining("\n"))),
t);
}
return factory.createModule(context);
} catch (Throwable t) {
throw new ValidationException(
String.format(
"Unable to create module '%s'.%n%nModule options are:%n%s",
moduleName,
options.entrySet().stream()
.map(
optionEntry ->
stringifyOption(
optionEntry.getKey(),
optionEntry.getValue()))
.sorted()
.collect(Collectors.joining("\n"))),
t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.ModuleException;

import java.util.List;
import java.util.Map;
import java.util.Set;

Expand All @@ -38,20 +37,7 @@
* instead.
*/
@PublicEvolving
public interface ModuleFactory extends TableFactory, Factory {

/**
* Creates and configures a {@link Module} using the given properties.
*
* @param properties normalized properties describing a module.
* @return the configured module.
* @deprecated Use {@link #createModule(Context)} instead and implement {@link Factory} instead
* of {@link TableFactory}.
*/
@Deprecated
default Module createModule(Map<String, String> properties) {
throw new ModuleException("Module factories must implement createModule().");
}
public interface ModuleFactory extends Factory {

/** Creates and configures a {@link Module}. */
default Module createModule(Context context) {
Expand Down Expand Up @@ -79,49 +65,9 @@ interface Context {
ClassLoader getClassLoader();
}

default String factoryIdentifier() {
if (requiredContext() == null || supportedProperties() == null) {
throw new ModuleException("Module factories must implement factoryIdentifier()");
}

return null;
}

default Set<ConfigOption<?>> requiredOptions() {
if (requiredContext() == null || supportedProperties() == null) {
throw new ModuleException("Module factories must implement requiredOptions()");
}

return null;
}
String factoryIdentifier();

default Set<ConfigOption<?>> optionalOptions() {
if (requiredContext() == null || supportedProperties() == null) {
throw new ModuleException("Module factories must implement optionalOptions()");
}
Set<ConfigOption<?>> requiredOptions();

return null;
}

// --------------------------------------------------------------------------------------------
// Default implementations for legacy {@link TableFactory} stack.
// --------------------------------------------------------------------------------------------

/**
* @deprecated Implement the {@link Factory} based stack instead.
*/
@Deprecated
default Map<String, String> requiredContext() {
// Default implementation for modules implementing the new {@link Factory} stack instead.
return null;
}

/**
* @deprecated Implement the {@link Factory} based stack instead.
*/
@Deprecated
default List<String> supportedProperties() {
// Default implementation for modules implementing the new {@link Factory} stack instead.
return null;
}
Set<ConfigOption<?>> optionalOptions();
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,3 @@
# limitations under the License.

org.apache.flink.table.factories.TestTableSinkFactory
org.apache.flink.table.factories.module.LegacyDummyModuleFactory
Original file line number Diff line number Diff line change
Expand Up @@ -1631,12 +1631,6 @@ class TableEnvironmentTest {
validateShowModules(("core", false))
}

@Test
def testLegacyModule(): Unit = {
tableEnv.executeSql("LOAD MODULE LegacyModule")
validateShowModules(("core", true), ("LegacyModule", true))
}

@Test
def testExecuteSqlWithCreateDropView(): Unit = {
createTableForTests()
Expand Down

0 comments on commit 8af0259

Please sign in to comment.