Skip to content

Commit

Permalink
[improve][fn] Add configuration for connector & functions package url…
Browse files Browse the repository at this point in the history
… sources (apache#22184)

(cherry picked from commit 207335a)
(cherry picked from commit b107387)
(cherry picked from commit b83dab5)

 Conflicts:
	pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
	pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
	pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
	pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
  • Loading branch information
lhotari authored and nikhil-ctds committed Mar 11, 2024
1 parent 2ce5a5e commit b6e23d9
Show file tree
Hide file tree
Showing 19 changed files with 318 additions and 112 deletions.
8 changes: 8 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,15 @@ saslJaasServerRoleTokenSignerSecretPath:
########################

connectorsDirectory: ./connectors
# Whether to enable referencing connectors directory files by file url in connector (sink/source) creation
enableReferencingConnectorDirectoryFiles: true
# Regex patterns for enabling creation of connectors by referencing packages in matching http/https urls
additionalEnabledConnectorUrlPatterns: []
functionsDirectory: ./functions
# Whether to enable referencing functions directory files by file url in functions creation
enableReferencingFunctionsDirectoryFiles: true
# Regex patterns for enabling creation of functions by referencing packages in matching http/https urls
additionalEnabledFunctionsUrlPatterns: []

# Enables extended validation for connector config with fine-grain annotation based validation
# during submission. Classloading with either enableClassloadingOfExternalFiles or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.jsonwebtoken.SignatureAlgorithm;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -266,6 +267,11 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setAuthorizationEnabled(config.isAuthorizationEnabled());
workerConfig.setAuthorizationProvider(config.getAuthorizationProvider());

List<String> urlPatterns =
Arrays.asList(getPulsarApiExamplesJar().getParentFile().toURI() + ".*", "http://127\\.0\\.0\\.1:.*");
workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns);
workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns);

PulsarWorkerService workerService = new PulsarWorkerService();
workerService.init(workerConfig, null, false);
return workerService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.File;
import java.lang.reflect.Method;
import java.net.URI;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -261,6 +261,10 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setAuthenticationEnabled(true);
workerConfig.setAuthorizationEnabled(true);

List<String> urlPatterns = Arrays.asList(getPulsarApiExamplesJar().getParentFile().toURI() + ".*");
workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns);
workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns);

PulsarWorkerService workerService = new PulsarWorkerService();
workerService.init(workerConfig, null, false);
return workerService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -71,6 +73,7 @@ public class PulsarFunctionTlsTest {
protected PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT];
protected PulsarService leaderPulsar;
protected PulsarAdmin leaderAdmin;
protected WorkerService[] fnWorkerServices = new WorkerService[BROKER_COUNT];
protected String testCluster = "my-cluster";
protected String testTenant = "my-tenant";
protected String testNamespace = testTenant + "/my-ns";
Expand Down Expand Up @@ -137,12 +140,18 @@ void setup() throws Exception {
workerConfig.setBrokerClientAuthenticationEnabled(true);
workerConfig.setTlsEnabled(true);
workerConfig.setUseTls(true);
WorkerService fnWorkerService = WorkerServiceLoader.load(workerConfig);
File packagePath = new File(
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath()).getParentFile();
List<String> urlPatterns =
Arrays.asList(packagePath.toURI() + ".*");
workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns);
workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns);
fnWorkerServices[i] = WorkerServiceLoader.load(workerConfig);

configurations[i] = config;

pulsarServices[i] = new PulsarService(
config, workerConfig, Optional.of(fnWorkerService), code -> {});
config, workerConfig, Optional.of(fnWorkerServices[i]), code -> {});
pulsarServices[i].start();

// Sleep until pulsarServices[0] becomes leader, this way we can spy namespace bundle assignment easily.
Expand Down Expand Up @@ -181,6 +190,9 @@ void tearDown() throws Exception {
if (pulsarAdmins[i] != null) {
pulsarAdmins[i].close();
}
if (fnWorkerServices[i] != null) {
fnWorkerServices[i].stop();
}
if (pulsarServices[i] != null) {
pulsarServices[i].close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import static org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertTrue;

import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
Expand All @@ -35,10 +35,10 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
Expand Down Expand Up @@ -71,8 +71,6 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import com.google.common.collect.Sets;

public abstract class AbstractPulsarE2ETest {

public static final Logger log = LoggerFactory.getLogger(AbstractPulsarE2ETest.class);
Expand Down Expand Up @@ -296,6 +294,11 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setAuthenticationEnabled(true);
workerConfig.setAuthorizationEnabled(true);

List<String> urlPatterns =
Arrays.asList(getPulsarApiExamplesJar().getParentFile().toURI() + ".*", "http://127\\.0\\.0\\.1:.*");
workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns);
workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns);

PulsarWorkerService workerService = new PulsarWorkerService();
workerService.init(workerConfig, null, false);
return workerService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,18 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "The directory where nar packages are extractors"
)
private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "Whether to enable referencing connectors directory files by file url in connector (sink/source) "
+ "creation. Default is true."
)
private Boolean enableReferencingConnectorDirectoryFiles = true;
@FieldContext(
category = CATEGORY_FUNCTIONS,
doc = "Regex patterns for enabling creation of connectors by referencing packages in matching http/https "
+ "urls."
)
private List<String> additionalEnabledConnectorUrlPatterns = new ArrayList<>();
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "Enables extended validation for connector config with fine-grain annotation based validation "
Expand All @@ -254,6 +266,18 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "The path to the location to locate builtin functions"
)
private String functionsDirectory = "./functions";
@FieldContext(
category = CATEGORY_FUNCTIONS,
doc = "Whether to enable referencing functions directory files by file url in functions creation. "
+ "Default is true."
)
private Boolean enableReferencingFunctionsDirectoryFiles = true;
@FieldContext(
category = CATEGORY_FUNCTIONS,
doc = "Regex patterns for enabling creation of functions by referencing packages in matching http/https "
+ "urls."
)
private List<String> additionalEnabledFunctionsUrlPatterns = new ArrayList<>();
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The Pulsar topic used for storing function metadata"
Expand Down
Loading

0 comments on commit b6e23d9

Please sign in to comment.