Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Align to use virtual threads #540

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion core-api/src/main/java/com/optimizely/ab/Optimizely.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.optimizely.ab.optimizelyconfig.OptimizelyConfigService;
import com.optimizely.ab.optimizelydecision.*;
import com.optimizely.ab.optimizelyjson.OptimizelyJSON;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -101,6 +102,8 @@ public class Optimizely implements AutoCloseable {
@Nullable
private final ODPManager odpManager;

private final ReentrantLock lock = new ReentrantLock();

private Optimizely(@Nonnull EventHandler eventHandler,
@Nonnull EventProcessor eventProcessor,
@Nonnull ErrorHandler errorHandler,
Expand Down Expand Up @@ -1451,8 +1454,11 @@ public List<String> fetchQualifiedSegments(String userId, @Nonnull List<ODPSegme
return null;
}
if (odpManager != null) {
synchronized (odpManager) {
lock.lock();
try {
return odpManager.getSegmentManager().getQualifiedSegments(userId, segmentOptions);
} finally {
lock.unlock();
}
}
logger.error("Audience segments fetch failed (ODP is not enabled).");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.optimizely.ab.optimizelyconfig.OptimizelyConfig;
import com.optimizely.ab.optimizelyconfig.OptimizelyConfigManager;
import com.optimizely.ab.optimizelyconfig.OptimizelyConfigService;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -60,6 +62,7 @@ public abstract class PollingProjectConfigManager implements ProjectConfigManage
private volatile String sdkKey;
private volatile boolean started;
private ScheduledFuture<?> scheduledFuture;
private ReentrantLock lock = new ReentrantLock();

public PollingProjectConfigManager(long period, TimeUnit timeUnit) {
this(period, timeUnit, Long.MAX_VALUE, TimeUnit.MILLISECONDS, new NotificationCenter());
Expand All @@ -70,6 +73,15 @@ public PollingProjectConfigManager(long period, TimeUnit timeUnit, NotificationC
}

public PollingProjectConfigManager(long period, TimeUnit timeUnit, long blockingTimeoutPeriod, TimeUnit blockingTimeoutUnit, NotificationCenter notificationCenter) {
this(period, timeUnit, blockingTimeoutPeriod, blockingTimeoutUnit, notificationCenter, null);
}

public PollingProjectConfigManager(long period,
TimeUnit timeUnit,
long blockingTimeoutPeriod,
TimeUnit blockingTimeoutUnit,
NotificationCenter notificationCenter,
@Nullable ThreadFactory customThreadFactory) {
this.period = period;
this.timeUnit = timeUnit;
this.blockingTimeoutPeriod = blockingTimeoutPeriod;
Expand All @@ -78,7 +90,7 @@ public PollingProjectConfigManager(long period, TimeUnit timeUnit, long blocking
if (TimeUnit.SECONDS.convert(period, this.timeUnit) < 30) {
logger.warn("Polling intervals below 30 seconds are not recommended.");
}
final ThreadFactory threadFactory = Executors.defaultThreadFactory();
final ThreadFactory threadFactory = customThreadFactory != null ? customThreadFactory : Executors.defaultThreadFactory();
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
Thread thread = threadFactory.newThread(runnable);
thread.setDaemon(true);
Expand Down Expand Up @@ -176,43 +188,58 @@ public String getSDKKey() {
return this.sdkKey;
}

public synchronized void start() {
if (started) {
logger.warn("Manager already started.");
return;
}
public void start() {
lock.lock();
try {
if (started) {
logger.warn("Manager already started.");
return;
}

if (scheduledExecutorService.isShutdown()) {
logger.warn("Not starting. Already in shutdown.");
return;
}
if (scheduledExecutorService.isShutdown()) {
logger.warn("Not starting. Already in shutdown.");
return;
}

Runnable runnable = new ProjectConfigFetcher();
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(runnable, 0, period, timeUnit);
started = true;
Runnable runnable = new ProjectConfigFetcher();
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(runnable, 0, period, timeUnit);
started = true;
} finally {
lock.unlock();
}
}

public synchronized void stop() {
if (!started) {
logger.warn("Not pausing. Manager has not been started.");
return;
}
public void stop() {
lock.lock();
try {
if (!started) {
logger.warn("Not pausing. Manager has not been started.");
return;
}

if (scheduledExecutorService.isShutdown()) {
logger.warn("Not pausing. Already in shutdown.");
return;
}
if (scheduledExecutorService.isShutdown()) {
logger.warn("Not pausing. Already in shutdown.");
return;
}

logger.info("pausing project watcher");
scheduledFuture.cancel(true);
started = false;
logger.info("pausing project watcher");
scheduledFuture.cancel(true);
started = false;
} finally {
lock.unlock();
}
}

@Override
public synchronized void close() {
stop();
scheduledExecutorService.shutdownNow();
started = false;
public void close() {
lock.lock();
try {
stop();
scheduledExecutorService.shutdownNow();
started = false;
} finally {
lock.unlock();
}
}

protected void setSdkKey(String sdkKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.optimizely.ab.event.internal.UserEvent;
import com.optimizely.ab.internal.PropertyUtils;
import com.optimizely.ab.notification.NotificationCenter;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -67,6 +68,7 @@ public class BatchEventProcessor implements EventProcessor, AutoCloseable {

private Future<?> future;
private boolean isStarted = false;
private final ReentrantLock lock = new ReentrantLock();

private BatchEventProcessor(BlockingQueue<Object> eventQueue, EventHandler eventHandler, Integer batchSize, Long flushInterval, Long timeoutMillis, ExecutorService executor, NotificationCenter notificationCenter) {
this.eventHandler = eventHandler;
Expand All @@ -78,15 +80,20 @@ private BatchEventProcessor(BlockingQueue<Object> eventQueue, EventHandler event
this.executor = executor;
}

public synchronized void start() {
if (isStarted) {
logger.info("Executor already started.");
return;
}
public void start() {
lock.lock();
try {
if (isStarted) {
logger.info("Executor already started.");
return;
}

isStarted = true;
EventConsumer runnable = new EventConsumer();
future = executor.submit(runnable);
isStarted = true;
EventConsumer runnable = new EventConsumer();
future = executor.submit(runnable);
} finally {
lock.unlock();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
import com.optimizely.ab.annotations.VisibleForTesting;

import java.util.*;
import java.util.concurrent.locks.ReentrantLock;

public class DefaultLRUCache<T> implements Cache<T> {

private final Object lock = new Object();
private final ReentrantLock lock = new ReentrantLock();

private final Integer maxSize;

Expand Down Expand Up @@ -51,8 +52,11 @@ public void save(String key, T value) {
return;
}

synchronized (lock) {
lock.lock();
try {
linkedHashMap.put(key, new CacheEntity(value));
} finally {
lock.unlock();
}
}

Expand All @@ -62,7 +66,8 @@ public T lookup(String key) {
return null;
}

synchronized (lock) {
lock.lock();
try {
if (linkedHashMap.containsKey(key)) {
CacheEntity entity = linkedHashMap.get(key);
Long nowMs = new Date().getTime();
Expand All @@ -75,12 +80,17 @@ public T lookup(String key) {
linkedHashMap.remove(key);
}
return null;
} finally {
lock.unlock();
}
}

public void reset() {
synchronized (lock) {
lock.lock();
try {
linkedHashMap.clear();
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.optimizely.ab.notification;

import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,6 +37,7 @@ public class NotificationManager<T> {

private final Map<Integer, NotificationHandler<T>> handlers = Collections.synchronizedMap(new LinkedHashMap<>());
private final AtomicInteger counter;
private final ReentrantLock lock = new ReentrantLock();

public NotificationManager() {
this(new AtomicInteger());
Expand All @@ -48,13 +50,16 @@ public NotificationManager(AtomicInteger counter) {
public int addHandler(NotificationHandler<T> newHandler) {

// Prevent registering a duplicate listener.
synchronized (handlers) {
lock.lock();
try {
for (NotificationHandler<T> handler : handlers.values()) {
if (handler.equals(newHandler)) {
logger.warn("Notification listener was already added");
return -1;
}
}
} finally {
lock.unlock();
}

int notificationId = counter.incrementAndGet();
Expand All @@ -64,14 +69,17 @@ public int addHandler(NotificationHandler<T> newHandler) {
}

public void send(final T message) {
synchronized (handlers) {
lock.lock();
try {
for (Map.Entry<Integer, NotificationHandler<T>> handler: handlers.entrySet()) {
try {
handler.getValue().handle(message);
} catch (Exception e) {
logger.warn("Catching exception sending notification for class: {}, handler: {}", message.getClass(), handler.getKey());
}
}
} finally {
lock.unlock();
}
}

Expand Down
Loading
Loading