Skip to content

Commit

Permalink
Merge pull request #1 from nickmahilani/master
Browse files Browse the repository at this point in the history
Remove RxJava dependency
  • Loading branch information
nickmahilani authored Jun 27, 2019
2 parents 61dde8a + d16c2f2 commit 0f4f5ba
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 120 deletions.
1 change: 0 additions & 1 deletion mantis-publish-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ ext {

dependencies {
api "io.mantisrx:mql-jvm:$mqlVersion"
api 'io.reactivex:rxjava:1.3.8'
api "io.mantisrx:mantis-discovery-proto:$mantisDiscoveryVersion"

api "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,16 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import io.mantisrx.publish.api.EventPublisher;
import io.mantisrx.publish.config.MrePublishConfiguration;
import io.mantisrx.publish.internal.discovery.MantisJobDiscovery;
import com.netflix.spectator.api.Registry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Subscription;
import rx.schedulers.Schedulers;


/**
* Initializes the Mantis Realtime Events Publisher and its internal components.
Expand All @@ -57,7 +55,20 @@ public class MrePublishClientInitializer {
private final EventTransmitter eventTransmitter;
private final Tee tee;

private final List<Subscription> subscriptions = new ArrayList<>();
private final List<ScheduledFuture<?>> scheduledFutures = new ArrayList<>();
private static final ScheduledThreadPoolExecutor DRAINER_EXECUTOR =
new ScheduledThreadPoolExecutor(1, r -> new Thread(r, "MantisDrainer"));
private static final ScheduledThreadPoolExecutor SUBSCRIPTIONS_EXECUTOR =
new ScheduledThreadPoolExecutor(1, r -> new Thread(r, "MantisSubscriptionsTracker"));

static {
DRAINER_EXECUTOR.setRemoveOnCancelPolicy(true);
SUBSCRIPTIONS_EXECUTOR.setRemoveOnCancelPolicy(true);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
DRAINER_EXECUTOR.shutdown();
SUBSCRIPTIONS_EXECUTOR.shutdown();
}));
}

public MrePublishClientInitializer(
MrePublishConfiguration config,
Expand All @@ -81,57 +92,46 @@ public MrePublishClientInitializer(
* Starts internal components for the Mantis Realtime Events Publisher.
*/
public void start() {
this.subscriptions.add(setupSubscriptionTracker(subscriptionsTracker));
this.subscriptions.add(setupDrainer(streamManager, eventTransmitter, tee));
this.scheduledFutures.add(setupSubscriptionTracker(subscriptionsTracker));
this.scheduledFutures.add(setupDrainer(streamManager, eventTransmitter, tee));
}

/**
* Safely shuts down internal components for the Mantis Realtime Events Publisher.
*/
public void stop() {
Iterator<Subscription> iterator = subscriptions.iterator();
Iterator<ScheduledFuture<?>> iterator = scheduledFutures.iterator();
while (iterator.hasNext()) {
Subscription sub = iterator.next();
if (sub != null && !sub.isUnsubscribed()) {
sub.unsubscribe();
ScheduledFuture<?> next = iterator.next();
if (next != null && !next.isCancelled()) {
next.cancel(false);
}
}
subscriptions.clear();
scheduledFutures.clear();
}

public EventPublisher getEventPublisher() {
return eventPublisher;
}

private Subscription setupDrainer(StreamManager streamManager, EventTransmitter transmitter, Tee tee) {
private ScheduledFuture<?> setupDrainer(StreamManager streamManager, EventTransmitter transmitter, Tee tee) {
EventProcessor eventProcessor = new EventProcessor(config, streamManager, tee);
EventDrainer eventDrainer =
new EventDrainer(config, streamManager, registry, eventProcessor, transmitter, Clock.systemUTC());

return Schedulers
.newThread()
.createWorker()
.schedulePeriodically(eventDrainer::run, 1, config.drainerIntervalMsec(), TimeUnit.MILLISECONDS);
return DRAINER_EXECUTOR.scheduleAtFixedRate(eventDrainer::run,
0, config.drainerIntervalMsec(), TimeUnit.MILLISECONDS);
}

private Subscription setupSubscriptionTracker(SubscriptionTracker subscriptionsTracker) {
final AtomicLong subscriptionsRefreshTimeMs = new AtomicLong(System.currentTimeMillis());
final int subRefreshIntervalMs = config.subscriptionRefreshIntervalSec() * 1000;

return Schedulers
.newThread()
.createWorker()
.schedulePeriodically(() -> {
try {
if ((System.currentTimeMillis() - subscriptionsRefreshTimeMs.get()) > subRefreshIntervalMs) {
subscriptionsTracker.refreshSubscriptions();
subscriptionsRefreshTimeMs.set(System.currentTimeMillis());
}
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("failed to refresh subscriptions", e);
}
}
}, 1, 1, TimeUnit.SECONDS);
private ScheduledFuture<?> setupSubscriptionTracker(SubscriptionTracker subscriptionsTracker) {
return SUBSCRIPTIONS_EXECUTOR.scheduleAtFixedRate(() -> {
try {
subscriptionsTracker.refreshSubscriptions();
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("failed to refresh scheduledFutures", e);
}
}
}, 1, config.subscriptionRefreshIntervalSec(), TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,8 @@

package io.mantisrx.publish.internal.discovery.mantisapi;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.ProtocolException;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -59,8 +52,6 @@
import com.netflix.spectator.ipc.http.HttpResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;


public class DefaultMantisApiClient implements MantisApiClient {

Expand Down Expand Up @@ -173,64 +164,4 @@ private JobDiscoveryInfo convertJobSchedInfo(JobSchedulingInfo jobSchedulingInfo
return new JobDiscoveryInfo(jobClusterName, jobSchedulingInfo.getJobId(), jobWorkers);
}

@Override
public Observable<JobDiscoveryInfo> jobDiscoveryInfoStream(final String jobClusterName) {
try {
// Interim impl to get JobDiscoveryInfo from SSE stream, will be replaced by above method once the request response API is added
URL url = new URL(String.format(JOB_DISCOVERY_STREAM_URL_FORMAT, mrePublishConfiguration.discoveryApiHostname(), mrePublishConfiguration.discoveryApiPort(), jobClusterName));
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setConnectTimeout(CONNECT_TIMEOUT_MS);
conn.setReadTimeout(READ_TIMEOUT_MS);
conn.connect();
int responseCode = conn.getResponseCode();
if (responseCode == HttpURLConnection.HTTP_OK) {
InputStream inputStream = conn.getInputStream();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
StringBuffer response = new StringBuffer();
String inputLine;
boolean dataElementNotRead = true;
while ((inputLine = bufferedReader.readLine()) != null) {
if (inputLine.startsWith("data:")) {
if (dataElementNotRead) {
dataElementNotRead = false;
// start accumulating response payload till the next data element
response.append(inputLine.substring(5));
logger.debug("appended {}", inputLine);
} else {
// finish reading the data element
logger.debug("finished reading data element");
break;
}
} else {
if (!dataElementNotRead) {
// append next chunk of data for the data element
if (inputLine.isEmpty()) {
logger.debug("finished reading data element");
break;
} else {
logger.debug("appended {}", inputLine);
response.append(inputLine);
}
}
}
}
bufferedReader.close();
String discoveryInfoPayload = response.toString();
JobSchedulingInfo jobSchedulingInfo = mapper.readValue(discoveryInfoPayload, JobSchedulingInfo.class);
JobDiscoveryInfo jobDiscoveryInfo = convertJobSchedInfo(jobSchedulingInfo, jobClusterName);
logger.debug(jobDiscoveryInfo.toString());
return Observable.just(jobDiscoveryInfo);
} else {
logger.debug("resp {}", responseCode);
}
} catch (MalformedURLException e) {
logger.error("invalid URL", e);
} catch (ProtocolException e) {
logger.error("caught protocol exception", e);
} catch (IOException e) {
logger.error("caught IOException", e);
}
return Observable.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.netflix.mantis.discovery.proto.AppJobClustersMap;
import com.netflix.mantis.discovery.proto.JobDiscoveryInfo;
import rx.Observable;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -29,6 +28,4 @@ public interface MantisApiClient {
CompletableFuture<AppJobClustersMap> getJobClusterMapping(final Optional<String> app);

CompletableFuture<JobDiscoveryInfo> jobDiscoveryInfo(final String jobClusterName);

Observable<JobDiscoveryInfo> jobDiscoveryInfoStream(final String jobClusterName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.netflix.archaius.DefaultPropertyFactory;
Expand Down Expand Up @@ -55,8 +57,6 @@
import io.mantisrx.publish.netty.pipeline.HttpEventChannelManager;
import io.mantisrx.publish.netty.transmitters.ChoiceOfTwoEventTransmitter;
import io.netty.util.ResourceLeakDetector;
import rx.Subscription;
import rx.schedulers.Schedulers;


public class LocalMrePublishClientInitializer {
Expand Down Expand Up @@ -135,17 +135,15 @@ public static void main(String[] args) throws InterruptedException {
final Event event = new Event();
final CountDownLatch latch = new CountDownLatch(1000);
event.set("testKey", "testValue");
Subscription subscription = Schedulers
.newThread()
.createWorker()
.schedulePeriodically(() -> {
eventPublisher.publish(event);
latch.countDown();
}, 1, 1, TimeUnit.SECONDS);
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, "EventPublisherTest"));
ScheduledFuture<?> scheduledFuture = executor.scheduleAtFixedRate(() -> {
eventPublisher.publish(event);
latch.countDown();
}, 1, 1, TimeUnit.SECONDS);

latch.await();
if (!subscription.isUnsubscribed()) {
subscription.unsubscribe();
if (!scheduledFuture.isCancelled()) {
scheduledFuture.cancel(true);
}
mreClient.stop();
}
Expand Down

0 comments on commit 0f4f5ba

Please sign in to comment.