Skip to content

Commit

Permalink
Allow to configure different implementations for Pulsar functions sta…
Browse files Browse the repository at this point in the history
…te store (#12646)
  • Loading branch information
merlimat authored Nov 6, 2021
1 parent 837fa68 commit 08a49c0
Show file tree
Hide file tree
Showing 16 changed files with 515 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ void shutdown() throws Exception {
}
}

private WorkerConfig createWorkerConfig(ServiceConfiguration config) {
protected WorkerConfig createWorkerConfig(ServiceConfiguration config) {

System.setProperty(JAVA_INSTANCE_JAR_PROPERTY,
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());
Expand Down Expand Up @@ -560,7 +560,7 @@ private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl, int parallelis
}
}

private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws Exception {
protected void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws Exception {
testE2EPulsarFunctionLocalRun(jarFilePathUrl, 1);
}

Expand Down Expand Up @@ -1133,7 +1133,7 @@ private void runWithNarClassLoader(Assert.ThrowingRunnable throwingRunnable) thr
}
}

private void runWithPulsarFunctionsClassLoader(Assert.ThrowingRunnable throwingRunnable) throws Throwable {
protected void runWithPulsarFunctionsClassLoader(Assert.ThrowingRunnable throwingRunnable) throws Throwable {
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(pulsarApiExamplesClassLoader);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker;

import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.LocalRunner;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.state.PulsarMetadataStateStoreProviderImpl;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
* Test Pulsar sink on function
*/
@Slf4j
@Test
public class PulsarFunctionMetadataStoreTest extends PulsarFunctionLocalRunTest {


protected WorkerConfig createWorkerConfig(ServiceConfiguration config) {
WorkerConfig wc = super.createWorkerConfig(config);
wc.setStateStorageProviderImplementation(PulsarMetadataStateStoreProviderImpl.class.getName());
wc.setStateStorageServiceUrl("memory://local");
return wc;
}

@Test
public void testE2EPulsarFunctionLocalRun() throws Throwable {
runWithPulsarFunctionsClassLoader(() -> testE2EPulsarFunctionLocalRun(null));
}
}
6 changes: 6 additions & 0 deletions pulsar-functions/instance/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-metadata</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private LogAppender logAppender;

// provide tables for storing states
private final String stateStorageImplClass;
private final String stateStorageServiceUrl;
private StateStoreProvider stateStoreProvider;
private StateManager stateManager;
Expand Down Expand Up @@ -144,6 +145,7 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig,
ClientBuilder clientBuilder,
PulsarClient pulsarClient,
PulsarAdmin pulsarAdmin,
String stateStorageImplClass,
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
FunctionCollectorRegistry collectorRegistry,
Expand All @@ -152,6 +154,7 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig,
this.clientBuilder = clientBuilder;
this.client = (PulsarClientImpl) pulsarClient;
this.pulsarAdmin = pulsarAdmin;
this.stateStorageImplClass = stateStorageImplClass;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.secretsProvider = secretsProvider;
this.functionClassLoader = functionClassLoader;
Expand Down Expand Up @@ -322,8 +325,8 @@ private void setupStateStore() throws Exception {
if (null == stateStorageServiceUrl) {
stateStoreProvider = StateStoreProvider.NULL;
} else {
stateStoreProvider = new BKStateStoreProviderImpl();
Map<String, Object> stateStoreProviderConfig = new HashMap();
stateStoreProvider = getStateStoreProvider();
Map<String, Object> stateStoreProviderConfig = new HashMap<>();
stateStoreProviderConfig.put(BKStateStoreProviderImpl.STATE_STORAGE_SERVICE_URL, stateStorageServiceUrl);
stateStoreProvider.init(stateStoreProviderConfig, instanceConfig.getFunctionDetails());

Expand All @@ -339,6 +342,14 @@ private void setupStateStore() throws Exception {
}
}

private StateStoreProvider getStateStoreProvider() throws Exception {
if (stateStorageImplClass == null) {
return new BKStateStoreProviderImpl();
} else {
return (StateStoreProvider) Class.forName(stateStorageImplClass).getConstructor().newInstance();
}
}

private void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception {
if (result.getUserException() != null) {
Exception t = result.getUserException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@
@Slf4j
public class BKStateStoreProviderImpl implements StateStoreProvider {

public static final String STATE_STORAGE_SERVICE_URL = "stateStorageServiceUrl";

private String stateStorageServiceUrl;
private Map<String, StorageClient> clients;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.instance.state;

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.StateStoreContext;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;

public class PulsarMetadataStateStoreImpl implements DefaultStateStore {

private final MetadataStore store;
private final String prefixPath;
private final MetadataCache<Long> countersCache;

private final String namespace;
private final String tenant;
private final String name;
private final String fqsn;

PulsarMetadataStateStoreImpl(MetadataStore store, String prefix, String tenant, String namespace, String name) {
this.store = store;
this.tenant = tenant;
this.namespace = namespace;
this.name = name;
this.fqsn = tenant + '/' + namespace + '/' + name;

this.prefixPath = prefix + '/' + fqsn + '/';
this.countersCache = store.getMetadataCache(Long.class);
}

@Override
public String tenant() {
return tenant;
}

@Override
public String namespace() {
return namespace;
}

@Override
public String name() {
return name;
}

@Override
public String fqsn() {
return fqsn;
}

@Override
public void init(StateStoreContext ctx) {
}

@Override
public void close() {
}

@Override
public void put(String key, ByteBuffer value) {
putAsync(key, value).join();
}

@Override
public CompletableFuture<Void> putAsync(String key, ByteBuffer value) {
byte[] bytes = new byte[value.remaining()];
value.get(bytes);
return store.put(getPath(key), bytes, Optional.empty())
.thenApply(__ -> null);
}

@Override
public void delete(String key) {
deleteAsync(key).join();
}

@Override
public CompletableFuture<Void> deleteAsync(String key) {
return store.delete(getPath(key), Optional.empty());
}

@Override
public ByteBuffer get(String key) {
return getAsync(key).join();
}

@Override
public CompletableFuture<ByteBuffer> getAsync(String key) {
return store.get(getPath(key))
.thenApply(optRes ->
optRes.map(x -> ByteBuffer.wrap(x.getValue()))
.orElse(null));
}

@Override
public void incrCounter(String key, long amount) {
incrCounterAsync(key, amount);
}

@Override
public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
return countersCache.readModifyUpdateOrCreate(getPath(key), optValue ->
optValue.orElse(0L) + amount
).thenApply(__ -> null);
}

@Override
public long getCounter(String key) {
return getCounterAsync(key).join();
}

@Override
public CompletableFuture<Long> getCounterAsync(String key) {
return countersCache.get(getPath(key))
.thenApply(optValue -> optValue.orElse(0L));
}

private String getPath(String key) {
return prefixPath + key;
}
}
Loading

0 comments on commit 08a49c0

Please sign in to comment.