Skip to content

Commit

Permalink
perf: increase task throughput in Android using thread pool executor (#…
Browse files Browse the repository at this point in the history
…4981)

* implement thread pool executor service
* add getTransactionalExecutor method
* use transactional executor during write action
* transactional executor with identifier
* execute database event in serial
* execute firestore event in serial
* absctract task excutor
* do not re-excute rejected task while shutdown
* add documentation
* tests configuration
* disable identified executor when maximum pool size is zero
* update document
* Avoid race condition in executors
  • Loading branch information
kmsbernard authored Apr 4, 2021
1 parent eeeba2e commit 0e4e331
Show file tree
Hide file tree
Showing 13 changed files with 207 additions and 47 deletions.
20 changes: 20 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,26 @@ setting present in `/android/gradle.properties`:
org.gradle.jvmargs=-Xmx2048m -XX:MaxPermSize=512m -XX:+HeapDumpOnOutOfMemoryError -Dfile.encoding=UTF-8
```

### Android Performance

On Android, React Native Firebase uses [thread pool executor](https://developer.android.com/reference/java/util/concurrent/ThreadPoolExecutor) to provide improved performance and managed resources.
To increase throughput, you can tune the thread pool executor via `firebase.json` file within the root of your project:

```json
// <project-root>/firebase.json
{
"react-native": {
"android_task_executor_maximum_pool_size": 10,
"android_task_executor_keep_alive_seconds": 3,
}
}
```

| Key | Description |
| ------------ | ----------------------------------------------- |
| `android_task_executor_maximum_pool_size` | Maximum pool size of ThreadPoolExecutor. Defaults to `1`. Larger values typically improve performance when executing large numbers of asynchronous tasks, e.g. Firestore queries. Setting this value to `0` completely disables the pooled executor and all tasks execute in serial per module. |
| `android_task_executor_keep_alive_seconds` | Keep-alive time of ThreadPoolExecutor, in seconds. Defaults to `3`. Excess threads in the pool executor will be terminated if they have been idle for more than the keep-alive time. This value doesn't have any effect when the maximum pool size is lower than `2`. |

### Allow iOS Static Frameworks

If you are using Static Frameworks on iOS, you need to manually enable this for the project. To enable Static Framework
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package io.invertase.firebase.common;

/*
* Copyright (c) 2016-present Invertase Limited & Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this library except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import io.invertase.firebase.common.ReactNativeFirebaseJSON;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.SynchronousQueue;

public class TaskExecutorService {
private static final String MAXIMUM_POOL_SIZE_KEY = "android_task_executor_maximum_pool_size";
private static final String KEEP_ALIVE_SECONDS_KEY = "android_task_executor_keep_alive_seconds";

private final String name;
private final int maximumPoolSize;
private final int keepAliveSeconds;
private static Map<String, ExecutorService> executors = new HashMap<>();

TaskExecutorService(String name) {
this.name = name;
ReactNativeFirebaseJSON json = ReactNativeFirebaseJSON.getSharedInstance();
this.maximumPoolSize = json.getIntValue(MAXIMUM_POOL_SIZE_KEY, 1);
this.keepAliveSeconds = json.getIntValue(KEEP_ALIVE_SECONDS_KEY, 3);
}

public ExecutorService getExecutor() {
boolean isTransactional = maximumPoolSize <= 1;
return getExecutor(isTransactional, "");
}

public ExecutorService getTransactionalExecutor() {
return getExecutor(true, "");
}

public ExecutorService getTransactionalExecutor(String identifier) {
String executorIdentifier = maximumPoolSize != 0 ? identifier : "";
return getExecutor(true, executorIdentifier);
}

public ExecutorService getExecutor(boolean isTransactional, String identifier) {
String executorName = getExecutorName(isTransactional, identifier);
synchronized(executors) {
ExecutorService existingExecutor = executors.get(executorName);
if (existingExecutor == null) {
ExecutorService newExecutor = getNewExecutor(isTransactional);
executors.put(executorName, newExecutor);
return newExecutor;
}
return existingExecutor;
}
}

private ExecutorService getNewExecutor(boolean isTransactional) {
if (isTransactional == true) {
return Executors.newSingleThreadExecutor();
} else {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, maximumPoolSize, keepAliveSeconds, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
threadPoolExecutor.setRejectedExecutionHandler(executeInFallback);
return threadPoolExecutor;
}
}

private RejectedExecutionHandler executeInFallback = new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.isShutdown() || executor.isTerminated() || executor.isTerminating()) {
return;
}
ExecutorService fallbackExecutor = getTransactionalExecutor();
fallbackExecutor.execute(r);
};
};

public String getExecutorName(boolean isTransactional, String identifier) {
if (isTransactional == true) {
return name + "TransactionalExecutor" + identifier;
}
return name + "Executor" + identifier;
}

public void shutdown() {
Set<String> existingExecutorNames = executors.keySet();
existingExecutorNames.removeIf((executorName) -> {
return executorName.startsWith(name) == false;
});
existingExecutorNames.forEach((executorName) -> {
removeExecutor(executorName);
});
}

public void removeExecutor(String executorName) {
synchronized(executors) {
ExecutorService existingExecutor = executors.get(executorName);
if (existingExecutor != null) {
existingExecutor.shutdownNow();
executors.remove(executorName);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,24 @@
*/

import android.content.Context;
import io.invertase.firebase.common.TaskExecutorService;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.annotation.OverridingMethodsMustInvokeSuper;

public class UniversalFirebaseModule {
private static Map<String, ExecutorService> executors = new HashMap<>();
private final TaskExecutorService executorService;

private final Context context;
private final String serviceName;

protected UniversalFirebaseModule(Context context, String serviceName) {
this.context = context;
this.serviceName = serviceName;
this.executorService = new TaskExecutorService(getName());
}

public Context getContext() {
Expand All @@ -46,11 +47,7 @@ public Context getApplicationContext() {
}

protected ExecutorService getExecutor() {
ExecutorService existingSingleThreadExecutor = executors.get(getName());
if (existingSingleThreadExecutor != null) return existingSingleThreadExecutor;
ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
executors.put(getName(), newSingleThreadExecutor);
return newSingleThreadExecutor;
return executorService.getExecutor();
}

public String getName() {
Expand All @@ -59,11 +56,7 @@ public String getName() {

@OverridingMethodsMustInvokeSuper
public void onTearDown() {
ExecutorService existingSingleThreadExecutor = executors.get(getName());
if (existingSingleThreadExecutor != null) {
existingSingleThreadExecutor.shutdownNow();
executors.remove(getName());
}
executorService.shutdown();
}

public Map<String, Object> getConstants() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public boolean getBooleanValue(String key, boolean defaultValue) {
return jsonObject.optBoolean(key, defaultValue);
}

public int getIntValue(String key, int defaultValue) {
if (jsonObject == null) return defaultValue;
return jsonObject.optInt(key, defaultValue);
}

public long getLongValue(String key, long defaultValue) {
if (jsonObject == null) return defaultValue;
return jsonObject.optLong(key, defaultValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@
import android.content.Context;
import com.facebook.react.bridge.*;
import io.invertase.firebase.interfaces.ContextProvider;
import io.invertase.firebase.common.TaskExecutorService;

import javax.annotation.Nonnull;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ReactNativeFirebaseModule extends ReactContextBaseJavaModule implements ContextProvider {
private static Map<String, ExecutorService> executors = new HashMap<>();
private final TaskExecutorService executorService;

private String moduleName;

public ReactNativeFirebaseModule(
Expand All @@ -38,6 +39,7 @@ public ReactNativeFirebaseModule(
) {
super(reactContext);
this.moduleName = moduleName;
this.executorService = new TaskExecutorService(getName());
}

public static void rejectPromiseWithExceptionMap(Promise promise, Exception exception) {
Expand Down Expand Up @@ -74,20 +76,25 @@ public ReactContext getContext() {
}

public ExecutorService getExecutor() {
ExecutorService existingSingleThreadExecutor = executors.get(getName());
if (existingSingleThreadExecutor != null) return existingSingleThreadExecutor;
ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
executors.put(getName(), newSingleThreadExecutor);
return newSingleThreadExecutor;
return executorService.getExecutor();
}

public ExecutorService getTransactionalExecutor() {
return executorService.getTransactionalExecutor();
}

public ExecutorService getTransactionalExecutor(String identifier) {
return executorService.getTransactionalExecutor(identifier);
}

@Override
public void onCatalystInstanceDestroy() {
ExecutorService existingSingleThreadExecutor = executors.get(getName());
if (existingSingleThreadExecutor != null) {
existingSingleThreadExecutor.shutdownNow();
executors.remove(getName());
}
executorService.shutdown();
}

public void removeEventListeningExecutor(String identifier) {
String executorName = executorService.getExecutorName(true, identifier);
executorService.removeExecutor(executorName);
}

public Context getApplicationContext() {
Expand Down
8 changes: 8 additions & 0 deletions packages/app/firebase-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@
"perf_auto_collection_enabled": {
"description": "Disable or enable auto collection of performance monitoring data collection.\n This is useful for opt-in-first data flows, for example when dealing with GDPR compliance.\nThis can be overridden in JavaScript.",
"type": "boolean"
},
"android_task_executor_maximum_pool_size": {
"description": "Maximum pool size of ThreadPoolExecutor used by RNFirebase for Android. Defaults to `1`.\n Larger values typically improve performance when executing large numbers of asynchronous tasks, e.g. Firestore queries.",
"type": "number"
},
"android_task_executor_keep_alive_seconds": {
"description": "Keep-alive time of ThreadPoolExecutor used by RNFirebase for Android, in seconds. Defaults to `3`.\n Excess threads in the pool executor will be terminated if they have been idle for more than the keep-alive time.",
"type": "number"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ private void handleDatabaseEvent(
DataSnapshot dataSnapshot,
@Nullable String previousChildName
) {
Tasks.call(getExecutor(), () -> {
final String eventRegistrationKey = registration.getString("eventRegistrationKey");
Tasks.call(getTransactionalExecutor(eventRegistrationKey), () -> {
if (eventType.equals("value")) {
return snapshotToMap(dataSnapshot);
} else {
Expand Down Expand Up @@ -407,6 +408,7 @@ public void off(String queryKey, String eventRegistrationKey) {

if (databaseQuery != null) {
databaseQuery.removeEventListener(eventRegistrationKey);
removeEventListeningExecutor(eventRegistrationKey);

if (!databaseQuery.hasListeners()) {
queryMap.remove(queryKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public class ReactNativeFirebaseDatabaseReferenceModule extends ReactNativeFireb
@ReactMethod
public void set(String app, String dbURL, String path, ReadableMap props, Promise promise) {
Tasks
.call(getExecutor(), () -> toHashMap(props).get("value"))
.call(getTransactionalExecutor(), () -> toHashMap(props).get("value"))
.onSuccessTask(aValue -> module.set(app, dbURL, path, aValue))
.addOnCompleteListener(getExecutor(), task -> {
.addOnCompleteListener(getTransactionalExecutor(), task -> {
if (task.isSuccessful()) {
promise.resolve(task.getResult());
} else {
Expand All @@ -56,9 +56,9 @@ public void set(String app, String dbURL, String path, ReadableMap props, Promis
@ReactMethod
public void update(String app, String dbURL, String path, ReadableMap props, Promise promise) {
Tasks
.call(getExecutor(), () -> toHashMap(props).get("values"))
.call(getTransactionalExecutor(), () -> toHashMap(props).get("values"))
.onSuccessTask(aMap -> module.update(app, dbURL, path, (Map<String, Object>) aMap))
.addOnCompleteListener(getExecutor(), task -> {
.addOnCompleteListener(getTransactionalExecutor(), task -> {
if (task.isSuccessful()) {
promise.resolve(task.getResult());
} else {
Expand All @@ -70,9 +70,9 @@ public void update(String app, String dbURL, String path, ReadableMap props, Pro
@ReactMethod
public void setWithPriority(String app, String dbURL, String path, ReadableMap props, Promise promise) {
Tasks
.call(getExecutor(), () -> toHashMap(props))
.call(getTransactionalExecutor(), () -> toHashMap(props))
.onSuccessTask(aMap -> module.setWithPriority(app, dbURL, path, aMap.get("value"), aMap.get("priority")))
.addOnCompleteListener(getExecutor(), task -> {
.addOnCompleteListener(getTransactionalExecutor(), task -> {
if (task.isSuccessful()) {
promise.resolve(task.getResult());
} else {
Expand All @@ -85,7 +85,7 @@ public void setWithPriority(String app, String dbURL, String path, ReadableMap p
public void remove(String app, String dbURL, String path, Promise promise) {
// continuation tasks not needed for this as no data
module.remove(app, dbURL, path)
.addOnCompleteListener(getExecutor(), task -> {
.addOnCompleteListener(getTransactionalExecutor(), task -> {
if (task.isSuccessful()) {
promise.resolve(task.getResult());
} else {
Expand All @@ -98,7 +98,7 @@ public void remove(String app, String dbURL, String path, Promise promise) {
public void setPriority(String app, String dbURL, String path, ReadableMap props, Promise promise) {
// continuation tasks not needed for this as minimal data
module.setPriority(app, dbURL, path, toHashMap(props).get("priority"))
.addOnCompleteListener(getExecutor(), task -> {
.addOnCompleteListener(getTransactionalExecutor(), task -> {
if (task.isSuccessful()) {
promise.resolve(task.getResult());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public void collectionOffSnapshot(
if (listenerRegistration != null) {
listenerRegistration.remove();
collectionSnapshotListeners.remove(listenerId);
removeEventListeningExecutor(Integer.toString(listenerId));
}
}

Expand Down Expand Up @@ -159,7 +160,7 @@ public void collectionGet(
}

private void sendOnSnapshotEvent(String appName, int listenerId, QuerySnapshot querySnapshot, MetadataChanges metadataChanges) {
Tasks.call(getExecutor(), () -> snapshotToWritableMap("onSnapshot", querySnapshot, metadataChanges)).addOnCompleteListener(task -> {
Tasks.call(getTransactionalExecutor(Integer.toString(listenerId)), () -> snapshotToWritableMap("onSnapshot", querySnapshot, metadataChanges)).addOnCompleteListener(task -> {
if (task.isSuccessful()) {
WritableMap body = Arguments.createMap();
body.putMap("snapshot", task.getResult());
Expand Down
Loading

1 comment on commit 0e4e331

@vercel
Copy link

@vercel vercel bot commented on 0e4e331 Apr 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.