Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset CDS Profile fetch counters + Introduce generic PeriodTaskExecutor #901

Merged
merged 14 commits into from
Apr 10, 2019
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.microsoft.applicationinsights.internal.profile;

import com.microsoft.applicationinsights.internal.logger.InternalLogger;

/**
* Class Responsible for configuration of Cds Profile Fetch.
*/
public final class CdsProfileFetcherPolicy {

/**
* Maximum number of instant retries to CDS to resolve ikey to AppId.
*/
public int maxInstantRetries;
dhaval24 marked this conversation as resolved.
Show resolved Hide resolved

/**
* The interval in minutes for retry counters and pending tasks to be cleaned.
*/
public long resetPeriodInMinutes;

/**
* Cached instance to be reused across SDK for CDS Profile fetch calls.
*/
private static CdsProfileFetcherPolicy instance;
dhaval24 marked this conversation as resolved.
Show resolved Hide resolved

public int getMaxInstantRetries() {
return maxInstantRetries;
}

public long getResetPeriodInMinutes() {
return resetPeriodInMinutes;
}

public void setMaxInstantRetries(int maxInstantRetries) {
dhaval24 marked this conversation as resolved.
Show resolved Hide resolved
this.maxInstantRetries = maxInstantRetries;
}

public void setResetPeriodInMinutes(long resetPeriodInMinutes) {
this.resetPeriodInMinutes = resetPeriodInMinutes;
dhaval24 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Private Constructor that sets the default value of maxInstantRetries to 3
* and default resetPeriodInMinutes to 240.
*/
private CdsProfileFetcherPolicy() {
maxInstantRetries = 3;
resetPeriodInMinutes = 240;
}

/**
* Returns an instance of CdsProfileFetcherPolicy
* @return instance of CdsProfileFetcherPolicy
*/
public static CdsProfileFetcherPolicy getInstance() {
if (instance == null) {
instance = new CdsProfileFetcherPolicy();
dhaval24 marked this conversation as resolved.
Show resolved Hide resolved
}
return instance;
}

/**
* Resets the CDS configuration policy to default.
*/
public void resetConfiguration() {
dhaval24 marked this conversation as resolved.
Show resolved Hide resolved
if (instance != null) {
InternalLogger.INSTANCE.warn(String.format("Resetting instance of CdsProfileFetcherRetryConfiguration - maxInstantRetries, " +
dhaval24 marked this conversation as resolved.
Show resolved Hide resolved
"resetPeriodInMinutes to %d, % d minutes",maxInstantRetries, resetPeriodInMinutes));
maxInstantRetries = 3;
resetPeriodInMinutes = 240;
} else {
InternalLogger.INSTANCE.warn("No instance of CdsProfileFetcherRetryConfiguration is created");
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.microsoft.applicationinsights.internal.profile;

import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

@RunWith(JUnit4.class)
public class CdsProfileFetcherPolicyTests {

@After
public void clear() {
CdsProfileFetcherPolicy.getInstance().resetConfiguration();
}

@Test
public void testInstanceOfCdsProfileFetcherIsCreated() {
CdsProfileFetcherPolicy.getInstance();
}

@Test
public void callingGetInstanceMultipleTimesReturnsCachedInstance() {
CdsProfileFetcherPolicy policy = CdsProfileFetcherPolicy.getInstance();
CdsProfileFetcherPolicy policy1 = CdsProfileFetcherPolicy.getInstance();
assertThat(policy, equalTo(policy1));
}

@Test
public void defaultConfigurationIsSetWhenNewInstanceIsCreated() {
CdsProfileFetcherPolicy policy = CdsProfileFetcherPolicy.getInstance();
assertThat(policy.getMaxInstantRetries(), equalTo(3));
assertThat(policy.getResetPeriodInMinutes(), equalTo(240L));
}

@Test
public void defaultConfigurationCanBeOverriden() {
CdsProfileFetcherPolicy policy = CdsProfileFetcherPolicy.getInstance();
assertThat(policy.getMaxInstantRetries(), equalTo(3));
assertThat(policy.getResetPeriodInMinutes(), equalTo(240L));

policy.setResetPeriodInMinutes(1);
policy.setMaxInstantRetries(1);
assertThat(policy.getMaxInstantRetries(), equalTo(1));
assertThat(policy.getResetPeriodInMinutes(), equalTo(1L));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,19 @@
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.microsoft.applicationinsights.internal.logger.InternalLogger;
import com.microsoft.applicationinsights.internal.profile.CdsProfileFetcherPolicy;
import com.microsoft.applicationinsights.internal.shutdown.SDKShutdownActivity;

import com.microsoft.applicationinsights.internal.util.ThreadPoolUtils;
import org.apache.http.HttpResponse;
import org.apache.http.ParseException;
import org.apache.http.client.config.RequestConfig;
Expand All @@ -48,13 +51,23 @@ public class CdsProfileFetcher implements AppProfileFetcher {
private String endpointAddress;
private static final String ProfileQueryEndpointAppIdFormat = "%s/api/profiles/%s/appId";
private static final String DefaultProfileQueryEndpointAddress = "https://dc.services.visualstudio.com";
private static final int MAX_RETRIES = 3;

/**
* Instance that holds the configuration for Cds Profile Fetch
*/
private final CdsProfileFetcherPolicy policyConfiguration;

/**
* Executor service that rests the retry counter and pending unresolved tasks periodically
* based on configuration provided.
*/
private final ScheduledExecutorService resetService;

// cache of tasks per ikey
private final ConcurrentMap<String, Future<HttpResponse>> tasks;
/* Visible for Testing */ final ConcurrentMap<String, Future<HttpResponse>> tasks;

// failure counters per ikey
private final Map<String, Integer> failureCounters;
/* Visible for Testing */ final ConcurrentMap<String, Integer> failureCounters;

public CdsProfileFetcher() {
RequestConfig requestConfig = RequestConfig.custom()
Expand All @@ -67,13 +80,21 @@ public CdsProfileFetcher() {
.setDefaultRequestConfig(requestConfig)
.useSystemProperties()
.build());


this.policyConfiguration = CdsProfileFetcherPolicy.getInstance();
resetService = Executors.newSingleThreadScheduledExecutor(
ThreadPoolUtils.createDaemonThreadFactory(CdsProfileFetcher.class, "CdsProfilePurgeService"));
long cachePurgeInterval = policyConfiguration.getResetPeriodInMinutes();
resetService.scheduleAtFixedRate(new CachePurgingRunnable(), cachePurgeInterval, cachePurgeInterval,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dhaval24 I wonder if we need an additional executor for purging the cache. If fetchProfile is used frequently, you could store the last time it was called and purge the cache after elapsed time is over the threshold.

Copy link
Contributor Author

@dhaval24 dhaval24 Apr 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@littleaj there is a slight confusion due to naming. This should more be resetInterval. I have pushed the changes. I do not purge cache and that is not needed. We only reset the counter. If cache part of InstrumentationKeyResolver already has the appId resolved there is no need to retry.

Regarding your comment on do we need additional service here. I would prefer to have seperate service as it adds clean seperation of what task is designated to which service.

TimeUnit.MINUTES);
this.httpClient.start();

this.tasks = new ConcurrentHashMap<String, Future<HttpResponse>>();
this.failureCounters = new HashMap<String, Integer>();
this.tasks = new ConcurrentHashMap<>();
this.failureCounters = new ConcurrentHashMap<>();

this.endpointAddress = DefaultProfileQueryEndpointAddress;
SDKShutdownActivity.INSTANCE.register(this);
SDKShutdownActivity.INSTANCE.register(resetService);
}

@Override
Expand All @@ -86,9 +107,11 @@ public ProfileFetcherResult fetchAppProfile(String instrumentationKey) throws In
ProfileFetcherResult result = new ProfileFetcherResult(null, ProfileFetcherResultTaskStatus.PENDING);

// check if we have tried resolving this ikey too many times. If so, quit to save on perf.
Integer failureCounter = this.failureCounters.get(instrumentationKey);
if (failureCounter != null && failureCounter.intValue() >= MAX_RETRIES) {
InternalLogger.INSTANCE.warn("The profile fetch task will not execute. Max number of retries reached.");
if (failureCounters.containsKey(instrumentationKey) && failureCounters.get(instrumentationKey) >=
policyConfiguration.getMaxInstantRetries()) {
InternalLogger.INSTANCE.warn(String.format(
"The profile fetch task will not execute for next %d hours. Max number of retries reached.",
dhaval24 marked this conversation as resolved.
Show resolved Hide resolved
policyConfiguration.getResetPeriodInMinutes()));
return result;
}

Expand Down Expand Up @@ -134,9 +157,8 @@ public ProfileFetcherResult fetchAppProfile(String instrumentationKey) throws In
}
}

public void setHttpClient(CloseableHttpAsyncClient client) {
void setHttpClient(CloseableHttpAsyncClient client) {
this.httpClient = client;
SDKShutdownActivity.INSTANCE.register(this.httpClient);
}

public void setEndpointAddress(String endpoint) throws MalformedURLException {
Expand All @@ -152,17 +174,29 @@ private Future<HttpResponse> createFetchTask(String instrumentationKey) {
return this.httpClient.execute(request, null);
}

private synchronized void incrementFailureCount(String instrumentationKey) {
Integer failureCounter = this.failureCounters.get(instrumentationKey);
if (failureCounter == null) {
this.failureCounters.put(instrumentationKey, new Integer(1));
} else {
this.failureCounters.put(instrumentationKey, new Integer(failureCounter.intValue() + 1));
private void incrementFailureCount(String instrumentationKey) {
if (!this.failureCounters.containsKey(instrumentationKey)) {
this.failureCounters.put(instrumentationKey, 0);
}
this.failureCounters.put(instrumentationKey, this.failureCounters.get(instrumentationKey) + 1);

}

@Override
public void close() throws IOException {
this.httpClient.close();
dhaval24 marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
* Runnable that is used to clear the retry counters and pending unresolved tasks.
*/
private class CachePurgingRunnable implements Runnable {
@Override
public void run() {
tasks.clear();
failureCounters.clear();
InternalLogger.INSTANCE.info("CDS Profile fetch retry counter has been Reset. Pending fetch tasks" +
"have been abandoned.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,20 @@

package com.microsoft.applicationinsights.web.internal.correlation;

import com.microsoft.applicationinsights.web.internal.correlation.mocks.*;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import com.microsoft.applicationinsights.internal.profile.CdsProfileFetcherPolicy;
import com.microsoft.applicationinsights.web.internal.correlation.mocks.MockHttpAsyncClientWrapper;
import org.apache.http.ParseException;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;

public class CdsProfileFetcherTests {

@Test
Expand Down Expand Up @@ -158,20 +165,60 @@ public void testFetchApplicationIdFailureWithNon200StatusCode() throws Interrupt
Assert.assertNull(result.getAppId());
}

/*@Test
public void testFetchApplicationIdAgainstRealService() throws InterruptedException, ExecutionException, ParseException, IOException {
@Test
public void testCachePurgeServiceClearsRetryCounters() throws InterruptedException, ExecutionException, IOException {
//setup
MockHttpAsyncClientWrapper clientWrapper = new MockHttpAsyncClientWrapper();
clientWrapper.setAppId("AppId");
clientWrapper.setFailureOn(false);

CdsProfileFetcher.INSTANCE.setEndpointAddress("https://dc.services.visualstudio.com/v2/track");

ProfileFetcherResult result = CdsProfileFetcher.INSTANCE.fetchAppProfile("d3207117-0df4-4674-ad6e-a43d3eb5a2df");
System.out.println(result.getAppId());
System.out.println(result.getStatus());

Thread.sleep(2000);
System.out.println("Wake up!");

result = CdsProfileFetcher.INSTANCE.fetchAppProfile("d3207117-0df4-4674-ad6e-a43d3eb5a2df");
System.out.println(result.getAppId());
System.out.println(result.getStatus());
}*/
}
CdsProfileFetcherPolicy configuration = CdsProfileFetcherPolicy.getInstance();
configuration.setResetPeriodInMinutes(1);
CdsProfileFetcher fetcher = new CdsProfileFetcher();
fetcher.setHttpClient(clientWrapper.getClient());

clientWrapper.setTaskAsPending();
ProfileFetcherResult result = fetcher.fetchAppProfile("ikey");
Assert.assertEquals(ProfileFetcherResultTaskStatus.PENDING, result.getStatus());
Assert.assertNull(result.getAppId());

// mimic task completion
clientWrapper.setTaskAsComplete();
clientWrapper.setStatusCode(500);
result = fetcher.fetchAppProfile("ikey");
Assert.assertEquals(ProfileFetcherResultTaskStatus.FAILED, result.getStatus());

assertThat(fetcher.failureCounters.size(), not(0));

TimeUnit.MINUTES.sleep(2);

assertThat(fetcher.failureCounters.size(), equalTo(0));
dhaval24 marked this conversation as resolved.
Show resolved Hide resolved
assertThat(fetcher.tasks.size(), equalTo(0));
}

@Test
public void testCachePurgeServiceClearsTasksCache() throws InterruptedException, ExecutionException, IOException {
//setup
MockHttpAsyncClientWrapper clientWrapper = new MockHttpAsyncClientWrapper();
clientWrapper.setAppId("AppId");
clientWrapper.setFailureOn(false);

CdsProfileFetcherPolicy configuration = CdsProfileFetcherPolicy.getInstance();
configuration.setResetPeriodInMinutes(1);
CdsProfileFetcher fetcher = new CdsProfileFetcher();
fetcher.setHttpClient(clientWrapper.getClient());

clientWrapper.setTaskAsPending();
ProfileFetcherResult result = fetcher.fetchAppProfile("ikey");
Assert.assertEquals(ProfileFetcherResultTaskStatus.PENDING, result.getStatus());
Assert.assertNull(result.getAppId());

assertThat(fetcher.tasks.size(), not(0));

TimeUnit.MINUTES.sleep(2);

assertThat(fetcher.failureCounters.size(), equalTo(0));
assertThat(fetcher.tasks.size(), equalTo(0));
}

}