diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index dc1876ad13cbc..7fc339a5dcea0 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -37,61 +37,69 @@ public class CmdFunctions extends CmdBase { private final LocalRunner cmdRunner; @Getter - @Parameters(commandDescription = "Run function locally") - class LocalRunner extends CliCommand { - + abstract class FunctionsCommand extends CliCommand { @Parameter(names = "--name", description = "Function Name\n") - private String name; + protected String name; @Parameter(names = "--function-classname", description = "Function Class Name\n") - private String className; + protected String className; @Parameter( - names = "--function-classpath", - description = "Function Classpath\n", - listConverter = StringConverter.class) - private List jarFiles; + names = "--function-classpath", + description = "Function Classpath\n", + listConverter = StringConverter.class) + protected List jarFiles; @Parameter(names = "--source-topic", description = "Input Topic Name\n") - private String sourceTopicName; + protected String sourceTopicName; @Parameter(names = "--sink-topic", description = "Output Topic Name\n") - private String sinkTopicName; + protected String sinkTopicName; @Parameter(names = "--serde-classname", description = "SerDe\n") - private String serDeClassName; + protected String serDeClassName; + protected SerDe serDe; @Parameter(names = "--function-config", description = "Function Config\n") - private String fnConfigFile; + protected String fnConfigFile; + protected FunctionConfig functionConfig; @Override void run() throws Exception { - FunctionConfig fc; - SerDe serDe = null; if (null != fnConfigFile) { - fc = FunctionConfig.load(fnConfigFile); + functionConfig = FunctionConfig.load(fnConfigFile); } else { - fc = new FunctionConfig(); + functionConfig = new FunctionConfig(); } if (null != sourceTopicName) { - fc.setSourceTopic(sourceTopicName); + functionConfig.setSourceTopic(sourceTopicName); } if (null != sinkTopicName) { - fc.setSinkTopic(sinkTopicName); + functionConfig.setSinkTopic(sinkTopicName); } if (null != name) { - fc.setName(name); + functionConfig.setName(name); } if (null != className) { - fc.setClassName(className); + functionConfig.setClassName(className); } if (null != serDeClassName) { serDe = createSerDe(serDeClassName); } if (null != jarFiles) { - fc.setJarFiles(jarFiles); + functionConfig.setJarFiles(jarFiles); } else { - fc.setJarFiles(Lists.newArrayList()); + functionConfig.setJarFiles(Lists.newArrayList()); } - // Construct the spawner + run_functions_cmd(); + } + + abstract void run_functions_cmd() throws Exception; + } + @Getter + @Parameters(commandDescription = "Run function locally") + class LocalRunner extends FunctionsCommand { + + @Override + void run_functions_cmd() throws Exception { LimitsConfig limitsConfig = new LimitsConfig( 60000, // 60 seconds 1024, // 1GB @@ -99,7 +107,7 @@ void run() throws Exception { ); Spawner spawner = Spawner.createSpawner( - fc, + functionConfig, limitsConfig, serDe, admin.getServiceUrl().toString());