Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: increase task throughput in Android using thread pool executor #4981

Merged
merged 24 commits into from
Apr 4, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,19 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.SynchronousQueue;

import javax.annotation.OverridingMethodsMustInvokeSuper;

public class UniversalFirebaseModule {
private static final int MAXIMUM_POOL_SIZE = 20;
private static final int KEEP_ALIVE_SECONDS = 3;
mikehardy marked this conversation as resolved.
Show resolved Hide resolved
private static Map<String, ExecutorService> executors = new HashMap<>();

private final Context context;
Expand All @@ -46,11 +53,45 @@ public Context getApplicationContext() {
}

protected ExecutorService getExecutor() {
ExecutorService existingSingleThreadExecutor = executors.get(getName());
if (existingSingleThreadExecutor != null) return existingSingleThreadExecutor;
ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
executors.put(getName(), newSingleThreadExecutor);
return newSingleThreadExecutor;
return getExecutor(false);
}

protected ExecutorService getTransactionalExecutor() {
return getExecutor(true);
}

private ExecutorService getExecutor(boolean isTransactional) {
String executorName = getExecutorName(isTransactional);
ExecutorService existingExecutor = executors.get(executorName);
if (existingExecutor != null) return existingExecutor;
ExecutorService newExecutor = getNewExecutor(isTransactional);
executors.put(executorName, newExecutor);
return newExecutor;
}

private ExecutorService getNewExecutor(boolean isTransactional) {
if (isTransactional == true) {
return Executors.newSingleThreadExecutor();
} else {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
threadPoolExecutor.setRejectedExecutionHandler(executeInFallback);
return threadPoolExecutor;
}
}

private RejectedExecutionHandler executeInFallback = new RejectedExecutionHandler() {
mikehardy marked this conversation as resolved.
Show resolved Hide resolved
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
ExecutorService fallbackExecutor = getTransactionalExecutor();
fallbackExecutor.execute(r);
};
};

public String getExecutorName(boolean isTransactional) {
String name = getName();
if (isTransactional == true) {
return name + "TransactionalExecutor";
}
return name + "Executor";
}

public String getName() {
Expand All @@ -59,10 +100,21 @@ public String getName() {

@OverridingMethodsMustInvokeSuper
public void onTearDown() {
ExecutorService existingSingleThreadExecutor = executors.get(getName());
if (existingSingleThreadExecutor != null) {
existingSingleThreadExecutor.shutdownNow();
executors.remove(getName());
String name = getName();
Set<String> existingExecutorNames = executors.keySet();
existingExecutorNames.removeIf((executorName) -> {
return executorName.startsWith(name) == false;
});
existingExecutorNames.forEach((executorName) -> {
removeExecutor(executorName);
});
}

public void removeExecutor(String executorName) {
ExecutorService existingExecutor = executors.get(executorName);
if (existingExecutor != null) {
existingExecutor.shutdownNow();
executors.remove(executorName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,17 @@
import javax.annotation.Nonnull;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.SynchronousQueue;

public class ReactNativeFirebaseModule extends ReactContextBaseJavaModule implements ContextProvider {
private static final int MAXIMUM_POOL_SIZE = 20;
private static final int KEEP_ALIVE_SECONDS = 3;
private static Map<String, ExecutorService> executors = new HashMap<>();
private String moduleName;

Expand Down Expand Up @@ -74,19 +81,65 @@ public ReactContext getContext() {
}

public ExecutorService getExecutor() {
ExecutorService existingSingleThreadExecutor = executors.get(getName());
if (existingSingleThreadExecutor != null) return existingSingleThreadExecutor;
ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
executors.put(getName(), newSingleThreadExecutor);
return newSingleThreadExecutor;
return getExecutor(false, "");
}

public ExecutorService getTransactionalExecutor() {
return getExecutor(true, "");
}

public ExecutorService getTransactionalExecutor(String identifier) {
return getExecutor(true, identifier);
}

public ExecutorService getExecutor(boolean isTransactional, String identifier) {
String executorName = getExecutorName(isTransactional, identifier);
ExecutorService existingExecutor = executors.get(executorName);
if (existingExecutor != null) return existingExecutor;
ExecutorService newExecutor = getNewExecutor(isTransactional);
executors.put(executorName, newExecutor);
return newExecutor;
}

private ExecutorService getNewExecutor(boolean isTransactional) {
if (isTransactional == true) {
return Executors.newSingleThreadExecutor();
} else {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
threadPoolExecutor.setRejectedExecutionHandler(executeInFallback);
return threadPoolExecutor;
}
}

private RejectedExecutionHandler executeInFallback = new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
ExecutorService fallbackExecutor = getTransactionalExecutor("");
fallbackExecutor.execute(r);
};
};
mikehardy marked this conversation as resolved.
Show resolved Hide resolved

@Override
public void onCatalystInstanceDestroy() {
ExecutorService existingSingleThreadExecutor = executors.get(getName());
if (existingSingleThreadExecutor != null) {
existingSingleThreadExecutor.shutdownNow();
executors.remove(getName());
String name = getName();
Set<String> existingExecutorNames = executors.keySet();
existingExecutorNames.removeIf((executorName) -> {
return executorName.startsWith(name) == false;
});
existingExecutorNames.forEach((executorName) -> {
removeExecutor(executorName);
});
}

public void removeEventListeningExecutor(String identifier) {
String executorName = getExecutorName(true, identifier);
removeExecutor(executorName);
}

public void removeExecutor(String executorName) {
ExecutorService existingExecutor = executors.get(executorName);
if (existingExecutor != null) {
existingExecutor.shutdownNow();
executors.remove(executorName);
}
}

Expand All @@ -98,6 +151,14 @@ public Activity getActivity() {
return getCurrentActivity();
}

public String getExecutorName(boolean isTransactional, String identifier) {
String name = getName();
if (isTransactional == true) {
return name + "TransactionalExecutor" + identifier;
}
return name + "Executor" + identifier;
}

@Nonnull
@Override
public String getName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ private void handleDatabaseEvent(
DataSnapshot dataSnapshot,
@Nullable String previousChildName
) {
Tasks.call(getExecutor(), () -> {
final String eventRegistrationKey = registration.getString("eventRegistrationKey");
Tasks.call(getTransactionalExecutor(eventRegistrationKey), () -> {
if (eventType.equals("value")) {
return snapshotToMap(dataSnapshot);
} else {
Expand Down Expand Up @@ -407,6 +408,7 @@ public void off(String queryKey, String eventRegistrationKey) {

if (databaseQuery != null) {
databaseQuery.removeEventListener(eventRegistrationKey);
removeEventListeningExecutor(eventRegistrationKey);

if (!databaseQuery.hasListeners()) {
queryMap.remove(queryKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public class ReactNativeFirebaseDatabaseReferenceModule extends ReactNativeFireb
@ReactMethod
public void set(String app, String dbURL, String path, ReadableMap props, Promise promise) {
Tasks
.call(getExecutor(), () -> toHashMap(props).get("value"))
.call(getTransactionalExecutor(), () -> toHashMap(props).get("value"))
.onSuccessTask(aValue -> module.set(app, dbURL, path, aValue))
.addOnCompleteListener(getExecutor(), task -> {
.addOnCompleteListener(getTransactionalExecutor(), task -> {
if (task.isSuccessful()) {
promise.resolve(task.getResult());
} else {
Expand All @@ -56,9 +56,9 @@ public void set(String app, String dbURL, String path, ReadableMap props, Promis
@ReactMethod
public void update(String app, String dbURL, String path, ReadableMap props, Promise promise) {
Tasks
.call(getExecutor(), () -> toHashMap(props).get("values"))
.call(getTransactionalExecutor(), () -> toHashMap(props).get("values"))
.onSuccessTask(aMap -> module.update(app, dbURL, path, (Map<String, Object>) aMap))
.addOnCompleteListener(getExecutor(), task -> {
.addOnCompleteListener(getTransactionalExecutor(), task -> {
if (task.isSuccessful()) {
promise.resolve(task.getResult());
} else {
Expand All @@ -70,9 +70,9 @@ public void update(String app, String dbURL, String path, ReadableMap props, Pro
@ReactMethod
public void setWithPriority(String app, String dbURL, String path, ReadableMap props, Promise promise) {
Tasks
.call(getExecutor(), () -> toHashMap(props))
.call(getTransactionalExecutor(), () -> toHashMap(props))
.onSuccessTask(aMap -> module.setWithPriority(app, dbURL, path, aMap.get("value"), aMap.get("priority")))
.addOnCompleteListener(getExecutor(), task -> {
.addOnCompleteListener(getTransactionalExecutor(), task -> {
if (task.isSuccessful()) {
promise.resolve(task.getResult());
} else {
Expand All @@ -85,7 +85,7 @@ public void setWithPriority(String app, String dbURL, String path, ReadableMap p
public void remove(String app, String dbURL, String path, Promise promise) {
// continuation tasks not needed for this as no data
module.remove(app, dbURL, path)
.addOnCompleteListener(getExecutor(), task -> {
.addOnCompleteListener(getTransactionalExecutor(), task -> {
if (task.isSuccessful()) {
promise.resolve(task.getResult());
} else {
Expand All @@ -98,7 +98,7 @@ public void remove(String app, String dbURL, String path, Promise promise) {
public void setPriority(String app, String dbURL, String path, ReadableMap props, Promise promise) {
// continuation tasks not needed for this as minimal data
module.setPriority(app, dbURL, path, toHashMap(props).get("priority"))
.addOnCompleteListener(getExecutor(), task -> {
.addOnCompleteListener(getTransactionalExecutor(), task -> {
if (task.isSuccessful()) {
promise.resolve(task.getResult());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public void collectionOffSnapshot(
if (listenerRegistration != null) {
listenerRegistration.remove();
collectionSnapshotListeners.remove(listenerId);
removeEventListeningExecutor(Integer.toString(listenerId));
}
}

Expand Down Expand Up @@ -159,7 +160,7 @@ public void collectionGet(
}

private void sendOnSnapshotEvent(String appName, int listenerId, QuerySnapshot querySnapshot, MetadataChanges metadataChanges) {
Tasks.call(getExecutor(), () -> snapshotToWritableMap("onSnapshot", querySnapshot, metadataChanges)).addOnCompleteListener(task -> {
Tasks.call(getTransactionalExecutor(Integer.toString(listenerId)), () -> snapshotToWritableMap("onSnapshot", querySnapshot, metadataChanges)).addOnCompleteListener(task -> {
if (task.isSuccessful()) {
WritableMap body = Arguments.createMap();
body.putMap("snapshot", task.getResult());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void documentGet(String appName, String path, ReadableMap getOptions, Pro
public void documentDelete(String appName, String path, Promise promise) {
FirebaseFirestore firebaseFirestore = getFirestoreForApp(appName);
DocumentReference documentReference = getDocumentForFirestore(firebaseFirestore, path);
Tasks.call(getExecutor(), documentReference::delete).addOnCompleteListener(task -> {
Tasks.call(getTransactionalExecutor(), documentReference::delete).addOnCompleteListener(task -> {
if (task.isSuccessful()) {
promise.resolve(null);
} else {
Expand All @@ -160,7 +160,7 @@ public void documentSet(String appName, String path, ReadableMap data, ReadableM
DocumentReference documentReference = getDocumentForFirestore(firebaseFirestore, path);


Tasks.call(getExecutor(), () -> parseReadableMap(firebaseFirestore, data)).continueWithTask(getExecutor(), task -> {
Tasks.call(getTransactionalExecutor(), () -> parseReadableMap(firebaseFirestore, data)).continueWithTask(getTransactionalExecutor(), task -> {
Task<Void> setTask;
Map<String, Object> settableData = Objects.requireNonNull(task.getResult());

Expand Down Expand Up @@ -193,8 +193,8 @@ public void documentUpdate(String appName, String path, ReadableMap data, Promis
FirebaseFirestore firebaseFirestore = getFirestoreForApp(appName);
DocumentReference documentReference = getDocumentForFirestore(firebaseFirestore, path);

Tasks.call(getExecutor(), () -> parseReadableMap(firebaseFirestore, data))
.continueWithTask(getExecutor(), task -> documentReference.update(Objects.requireNonNull(task.getResult())))
Tasks.call(getTransactionalExecutor(), () -> parseReadableMap(firebaseFirestore, data))
.continueWithTask(getTransactionalExecutor(), task -> documentReference.update(Objects.requireNonNull(task.getResult())))
.addOnCompleteListener(task -> {
if (task.isSuccessful()) {
promise.resolve(null);
Expand All @@ -208,8 +208,8 @@ public void documentUpdate(String appName, String path, ReadableMap data, Promis
public void documentBatch(String appName, ReadableArray writes, Promise promise) {
FirebaseFirestore firebaseFirestore = getFirestoreForApp(appName);

Tasks.call(getExecutor(), () -> parseDocumentBatches(firebaseFirestore, writes))
.continueWithTask(getExecutor(), task -> {
Tasks.call(getTransactionalExecutor(), () -> parseDocumentBatches(firebaseFirestore, writes))
.continueWithTask(getTransactionalExecutor(), task -> {
WriteBatch batch = firebaseFirestore.batch();
List<Object> writesArray = task.getResult();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void transactionGetDocument(String appName, int transactionId, String pat
DocumentReference documentReference = getDocumentForFirestore(firebaseFirestore, path);

Tasks
.call(getExecutor(), () -> snapshotToWritableMap(transactionHandler.getDocument(documentReference)))
.call(getTransactionalExecutor(), () -> snapshotToWritableMap(transactionHandler.getDocument(documentReference)))
.addOnCompleteListener(task -> {
if (task.isSuccessful()) {
promise.resolve(task.getResult());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ public void writeToFile(
reference,
appName
);
storageTask.begin(getExecutor(), localFilePath);
storageTask.addOnCompleteListener(getExecutor(), promise);
storageTask.begin(getTransactionalExecutor(), localFilePath);
storageTask.addOnCompleteListener(getTransactionalExecutor(), promise);
}

/**
Expand All @@ -244,8 +244,8 @@ public void putString(
reference,
appName
);
storageTask.begin(getExecutor(), string, format, metadataMap);
storageTask.addOnCompleteListener(getExecutor(),promise);
storageTask.begin(getTransactionalExecutor(), string, format, metadataMap);
storageTask.addOnCompleteListener(getTransactionalExecutor(),promise);
}

/**
Expand All @@ -266,8 +266,8 @@ public void putFile(
reference,
appName
);
storageTask.begin(getExecutor(),localFilePath, metadata);
storageTask.addOnCompleteListener(getExecutor(), promise);
storageTask.begin(getTransactionalExecutor(),localFilePath, metadata);
storageTask.addOnCompleteListener(getTransactionalExecutor(), promise);
}

@ReactMethod
Expand Down