Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix custom metric on Function Consumption #2946

Merged
merged 20 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ public class StatusFile {
static final String HOME_ENV_VAR = "HOME";

// visible for testing
static final String DEFAULT_LOGDIR = "/LogFiles";
static final String DEFAULT_LOGDIR = File.separator + "LogFiles";

// visible for testing
static final String DEFAULT_APPLICATIONINSIGHTS_LOGDIR = "/ApplicationInsights";
static final String DEFAULT_APPLICATIONINSIGHTS_LOGDIR = File.separator + "ApplicationInsights";
heyams marked this conversation as resolved.
Show resolved Hide resolved

// visible for testing
static final String WINDOWS_DEFAULT_HOME_DIR =
"/home" + DEFAULT_LOGDIR + DEFAULT_APPLICATIONINSIGHTS_LOGDIR;
File.separator + "home" + DEFAULT_LOGDIR + DEFAULT_APPLICATIONINSIGHTS_LOGDIR;

// visible for testing
static String logDir;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ private static String getDefaultPath() {
}
if (DiagnosticsHelper.useAppSvcRpIntegrationLogging()
|| DiagnosticsHelper.useFunctionsRpIntegrationLogging()) {
return StatusFile.getLogDir() + "/" + DEFAULT_NAME;
return StatusFile.getLogDir() + File.separator + DEFAULT_NAME;
}
// azure spring cloud
return DEFAULT_NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ private void configureFunctions() {
Appender<ILoggingEvent> diagnosticAppender = configureConsoleAppender();
diagnosticLogger.addAppender(diagnosticAppender);

// todo add appdender filter
heyams marked this conversation as resolved.
Show resolved Hide resolved
ApplicationInsightsDiagnosticsLogFilter filter = new ApplicationInsightsDiagnosticsLogFilter();
filter.setContext(loggerContext);
filter.start();
diagnosticAppender.addFilter(filter);

// errors reported by other loggers should also go to diagnostic log
// (level filter for these is applied in ApplicationInsightsDiagnosticsLogFilter)
rootLogger.addAppender(diagnosticAppender);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ public class RuntimeConfiguration {
public String instrumentationLoggingLevel;

public String selfDiagnosticsLevel;

public boolean profilerEnabled;

public long heartbeatIntervalSeconds;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,24 @@

package com.microsoft.applicationinsights.agent.internal.init;

import static java.util.concurrent.TimeUnit.MINUTES;

import ch.qos.logback.classic.LoggerContext;
import com.azure.monitor.opentelemetry.exporter.implementation.heartbeat.HeartbeatExporter;
import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryItem;
import com.azure.monitor.opentelemetry.exporter.implementation.utils.Strings;
import com.microsoft.applicationinsights.agent.internal.classicsdk.BytecodeUtilImpl;
import com.microsoft.applicationinsights.agent.internal.configuration.Configuration;
import com.microsoft.applicationinsights.agent.internal.exporter.AgentLogExporter;
import com.microsoft.applicationinsights.agent.internal.legacyheaders.DelegatingPropagator;
import com.microsoft.applicationinsights.agent.internal.profiler.ProfilingInitializer;
import com.microsoft.applicationinsights.agent.internal.sampling.DelegatingSampler;
import com.microsoft.applicationinsights.agent.internal.sampling.Samplers;
import com.microsoft.applicationinsights.agent.internal.telemetry.TelemetryClient;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.slf4j.Logger;
Expand All @@ -27,14 +33,20 @@ public class RuntimeConfigurator {
private final TelemetryClient telemetryClient;
private final Supplier<AgentLogExporter> agentLogExporter;
private volatile RuntimeConfiguration currentConfig;
@Nullable private final Consumer<List<TelemetryItem>> heartbeatTelemetryItemsConsumer;
@Nullable private final ProfilingInitializer profilingInitializer;
heyams marked this conversation as resolved.
Show resolved Hide resolved

RuntimeConfigurator(
TelemetryClient telemetryClient,
Supplier<AgentLogExporter> agentLogExporter,
Configuration initialConfig) {
Configuration initialConfig,
Consumer<List<TelemetryItem>> heartbeatTelemetryItemConsumer,
ProfilingInitializer profilingInitializer) {
this.telemetryClient = telemetryClient;
this.agentLogExporter = agentLogExporter;
currentConfig = captureInitialConfig(initialConfig);
this.heartbeatTelemetryItemsConsumer = heartbeatTelemetryItemConsumer;
this.profilingInitializer = profilingInitializer;
}

private static RuntimeConfiguration captureInitialConfig(Configuration initialConfig) {
Expand All @@ -58,6 +70,9 @@ private static RuntimeConfiguration captureInitialConfig(Configuration initialCo

runtimeConfig.instrumentationLoggingLevel = initialConfig.instrumentation.logging.level;
runtimeConfig.selfDiagnosticsLevel = initialConfig.selfDiagnostics.level;

runtimeConfig.profilerEnabled = initialConfig.preview.profiler.enabled;
runtimeConfig.heartbeatIntervalSeconds = initialConfig.heartbeat.intervalSeconds;
return runtimeConfig;
}

Expand All @@ -79,6 +94,9 @@ private static RuntimeConfiguration copy(RuntimeConfiguration config) {

copy.instrumentationLoggingLevel = config.instrumentationLoggingLevel;
copy.selfDiagnosticsLevel = config.selfDiagnosticsLevel;

copy.profilerEnabled = config.profilerEnabled;
copy.heartbeatIntervalSeconds = config.heartbeatIntervalSeconds;
return copy;
}

Expand Down Expand Up @@ -112,6 +130,21 @@ public void apply(RuntimeConfiguration runtimeConfig) {
updateSampling(enabled, runtimeConfig.sampling, runtimeConfig.samplingPreview);
}

// initialize Profiler
if (runtimeConfig.profilerEnabled && profilingInitializer != null) {
trask marked this conversation as resolved.
Show resolved Hide resolved
profilingInitializer.initialize();
}

// enable Heartbeat
if (telemetryClient.getConnectionString() != null) {
long intervalSeconds =
heyams marked this conversation as resolved.
Show resolved Hide resolved
Math.min(runtimeConfig.heartbeatIntervalSeconds, MINUTES.toSeconds(15));
HeartbeatExporter.start(
intervalSeconds, telemetryClient::populateDefaults, heartbeatTelemetryItemsConsumer);
}

// TODO (heya) enable Statsbeat and need to refactor RuntimeConfiguration

updateInstrumentationLoggingLevel(runtimeConfig.instrumentationLoggingLevel);
updateSelfDiagnosticsLevel(runtimeConfig.selfDiagnosticsLevel);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {
.setDiskPersistenceMaxSizeMb(configuration.preview.diskPersistenceMaxSizeMb)
.build();

// interval longer than 15 minutes is not allowed since we use this data for usage telemetry
long intervalSeconds = Math.min(configuration.heartbeat.intervalSeconds, MINUTES.toSeconds(15));
Consumer<List<TelemetryItem>> telemetryItemsConsumer =
Consumer<List<TelemetryItem>> heartbeatTelemetryItemConsumer =
telemetryItems -> {
for (TelemetryItem telemetryItem : telemetryItems) {
TelemetryObservers.INSTANCE
Expand All @@ -146,13 +144,35 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {
telemetryClient.getMetricsBatchItemProcessor().trackAsync(telemetryItem);
}
};
HeartbeatExporter.start(
intervalSeconds, telemetryClient::populateDefaults, telemetryItemsConsumer);

// interval longer than 15 minutes is not allowed since we use this data for usage telemetry
if (telemetryClient.getConnectionString() != null) {
startupLogger.verbose("connection string is not null, start HeartbeatExporter");
long intervalSeconds =
Math.min(configuration.heartbeat.intervalSeconds, MINUTES.toSeconds(15));
heyams marked this conversation as resolved.
Show resolved Hide resolved
HeartbeatExporter.start(
intervalSeconds, telemetryClient::populateDefaults, heartbeatTelemetryItemConsumer);
}

TelemetryClient.setActive(telemetryClient);

ProfilingInitializer profilingInitializer = null;
if (configuration.preview.profiler.enabled) {
try {
profilingInitializer =
ProfilingInitializer.initialize(tempDir, configuration, telemetryClient);
heyams marked this conversation as resolved.
Show resolved Hide resolved
} catch (RuntimeException e) {
startupLogger.warning("Failed to initialize profiler", e);
}
}

RuntimeConfigurator runtimeConfigurator =
new RuntimeConfigurator(telemetryClient, () -> agentLogExporter, configuration);
new RuntimeConfigurator(
telemetryClient,
() -> agentLogExporter,
configuration,
heartbeatTelemetryItemConsumer,
profilingInitializer);

if (configuration.sampling.percentage != null) {
BytecodeUtilImpl.samplingPercentage = configuration.sampling.percentage.floatValue();
Expand All @@ -164,14 +184,6 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {
BytecodeUtilImpl.connectionStringConfiguredAtRuntime =
configuration.connectionStringConfiguredAtRuntime;

if (configuration.preview.profiler.enabled) {
try {
ProfilingInitializer.initialize(tempDir, configuration, telemetryClient);
} catch (RuntimeException e) {
startupLogger.warning("Failed to initialize profiler", e);
}
}

if (ConfigurationBuilder.inAzureFunctionsConsumptionWorker()) {
AzureFunctions.setup(
() -> telemetryClient.getConnectionString() != null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public synchronized void initialize() {
"disable profiler or use a writable file system");
}

if (configuration.preview.profiler.enabled) {
if (configuration.preview.profiler.enabled && telemetryClient.getConnectionString() != null) {
performInit();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import static com.azure.monitor.opentelemetry.exporter.implementation.utils.AzureMonitorMsgId.BATCH_ITEM_PROCESSOR_ERROR;

import com.azure.core.util.logging.ClientLogger;
import com.azure.monitor.opentelemetry.exporter.implementation.logging.OperationLogger;
import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryItem;
import com.azure.monitor.opentelemetry.exporter.implementation.pipeline.TelemetryItemExporter;
Expand All @@ -26,6 +27,8 @@
// copied from io.opentelemetry.sdk.trace.export.BatchSpanProcessor
public final class BatchItemProcessor {

private static final ClientLogger logger = new ClientLogger(BatchItemProcessor.class);

private static final String WORKER_THREAD_NAME =
BatchItemProcessor.class.getSimpleName() + "_WorkerThread";

Expand Down Expand Up @@ -62,8 +65,14 @@ public static BatchItemProcessorBuilder builder(TelemetryItemExporter exporter)
queue,
queue.capacity(),
queueName);
Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker);
workerThread.start();

try {
Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker);
workerThread.setUncaughtExceptionHandler((t, e) -> logger.error(e.getMessage(), e));
workerThread.start();
} catch (RuntimeException ex) {
logger.error("An error occurs when running the batch worker thread", ex);
}
trask marked this conversation as resolved.
Show resolved Hide resolved
}

public void trackAsync(TelemetryItem item) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ void shouldUpdate() throws URISyntaxException {

// when
RuntimeConfigurator runtimeConfigurator =
new RuntimeConfigurator(telemetryClient, () -> null, config);
new RuntimeConfigurator(telemetryClient, () -> null, config, null, null);
new RpConfigurationPolling(rpConfiguration, runtimeConfigurator).run();

// then
Expand Down Expand Up @@ -117,7 +117,7 @@ void shouldBePopulatedByEnvVars() throws URISyntaxException {

// when
RuntimeConfigurator runtimeConfigurator =
new RuntimeConfigurator(telemetryClient, () -> null, config);
new RuntimeConfigurator(telemetryClient, () -> null, config, null, null);
new RpConfigurationPolling(rpConfiguration, runtimeConfigurator).run();

// then
Expand Down