Skip to content

Commit

Permalink
GEOMESA-3400 Configurable Micrometer metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz committed Oct 1, 2024
1 parent cbc28c4 commit 6e5587d
Show file tree
Hide file tree
Showing 9 changed files with 524 additions and 4 deletions.
10 changes: 7 additions & 3 deletions build/cqs.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ io.dropwizard.metrics:metrics-core 3.2.6 compile
io.dropwizard.metrics:metrics-graphite 3.2.6 compile
io.dropwizard.metrics:metrics-jvm 3.2.6 compile
io.github.azagniotov:dropwizard-metrics-cloudwatch 1.0.13 compile
io.micrometer:micrometer-commons 1.12.2 compile
io.micrometer:micrometer-core 1.12.2 compile
io.micrometer:micrometer-observation 1.12.2 compile
io.micrometer:micrometer-commons 1.13.4 compile
io.micrometer:micrometer-core 1.13.4 compile
io.micrometer:micrometer-observation 1.13.4 compile
io.netty:netty-all 4.1.106.Final compile
io.netty:netty-buffer 4.1.106.Final compile
io.netty:netty-codec 4.1.106.Final compile
Expand Down Expand Up @@ -289,6 +289,10 @@ com.datastax.cassandra:cassandra-driver-core 3.11.5 provided
com.datastax.cassandra:cassandra-driver-mapping 3.11.5 provided
io.confluent:kafka-avro-serializer 6.2.9 provided
io.confluent:kafka-schema-registry-client 6.2.9 provided
io.micrometer:micrometer-registry-cloudwatch2 1.13.4 provided
io.micrometer:micrometer-registry-prometheus 1.13.4 provided
io.prometheus:prometheus-metrics-exporter-httpserver 1.3.1 provided
io.prometheus:prometheus-metrics-exporter-pushgateway 1.3.1 provided
javax.media:jai_core 1.1.3 provided
org.apache.accumulo:accumulo-start 2.1.3 provided
org.apache.commons:commons-text 1.10.0 provided
Expand Down
64 changes: 64 additions & 0 deletions docs/user/appendix/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,67 @@ Extensions
Additional reporters can be added at runtime by implementing
``org.locationtech.geomesa.metrics.core.ReporterFactory`` and registering the new class as a
`service provider <https://docs.oracle.com/javase/8/docs/api/java/util/ServiceLoader.html>`__.

Micrometer Metrics
==================

GeoMesa also has initial support for `Micrometer <https://docs.micrometer.io/micrometer/reference/>`__ metrics. Metric
implementations can be configured at runtime through `Typesafe Config <https://github.com/lightbend/config/tree/main>`__:

.. code-block:: scala
org.locationtech.geomesa.metrics.micrometer.MicrometerConfig.setup()
Configuration should be under the key ``geomesa.metrics``, and takes the following config:

::

geomesa.metrics = {
reporters = []
# enable various metrics
bindings = {
classloader = false # enable jvm classloader metrics
memory = false # enable jvm memory usage metrics
gc = false # enable jvm garbage collection metrics
processor = false # enable jvm processor usage metrics
threads = false # enable jvm thread usage metrics
}
}

The following reporters are supported:

Prometheus
----------

::

{
type = "prometheus"
enabled = true
# use prometheus "standard" names - see https://docs.micrometer.io/micrometer/reference/implementations/prometheus.html#_the_prometheus_rename_filter
rename = false
commonTags = { "application" = "my-app" }
port = 9090
# additional config can also be done via sys props - see https://prometheus.github.io/client_java/config/config/
properties = {}
# omit if not using pushgateway
pushGateway = {
host = "localhost:9091"
job = "my-job"
scheme = "http"
format = "PROMETHEUS_PROTOBUF" # or PROMETHEUS_TEXT
}
}

Cloudwatch
----------

::

{
type = "cloudwatch"
enabled = true
namespace = "geomesa"
# properties for the cloudwatch client
properties = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ object SimpleFeatureConverter extends StrictLogging {
*/
def apply(sft: SimpleFeatureType, config: Config): SimpleFeatureConverter = {
factories.toStream.flatMap(_.apply(sft, config)).headOption.getOrElse {
throw new IllegalArgumentException(s"Cannot find factory for ${sft.getTypeName}")
throw new IllegalArgumentException(s"Cannot find converter factory for ${sft.getTypeName}")
}
}

Expand Down
69 changes: 69 additions & 0 deletions geomesa-metrics/geomesa-metrics-micrometer/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<parent>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-metrics_2.12</artifactId>
<version>5.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>geomesa-metrics-micrometer_2.12</artifactId>
<name>GeoMesa Metrics Micrometer</name>

<dependencies>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-utils_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
</dependency>
<dependency>
<groupId>com.github.pureconfig</groupId>
<artifactId>pureconfig_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>prometheus-metrics-exporter-httpserver</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>prometheus-metrics-exporter-pushgateway</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-cloudwatch2</artifactId>
<scope>provided</scope>
</dependency>

<!-- test dependencies -->
<dependency>
<groupId>org.specs2</groupId>
<artifactId>specs2-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.specs2</groupId>
<artifactId>specs2-junit_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>6.1.26</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.metrics.micrometer

import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}
import io.micrometer.cloudwatch2.CloudWatchMeterRegistry
import io.micrometer.core.instrument.binder.jvm.{ClassLoaderMetrics, JvmGcMetrics, JvmMemoryMetrics, JvmThreadMetrics}
import io.micrometer.core.instrument.binder.system.ProcessorMetrics
import io.micrometer.core.instrument.{Clock, MeterRegistry, Metrics, Tag}
import io.micrometer.prometheusmetrics.{PrometheusMeterRegistry, PrometheusRenameFilter}
import io.prometheus.metrics.exporter.httpserver.HTTPServer
import io.prometheus.metrics.exporter.pushgateway.{Format, PushGateway, Scheme}
import org.locationtech.geomesa.utils.io.CloseWithLogging
import pureconfig.{ConfigReader, ConfigSource}
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient

import java.io.Closeable
import java.util.Locale
import java.util.concurrent.atomic.AtomicReference

object MicrometerConfig {

import pureconfig.generic.semiauto._

import scala.collection.JavaConverters._

private val registries = scala.collection.mutable.Map.empty[String, String]
private val bindings = scala.collection.mutable.Set.empty[String]

val ConfigPath = "geomesa.metrics"

/**
* Add registries to the global registry list, based on the default configuration paths
*
* @param conf conf
*/
def setup(conf: Config = ConfigFactory.load()): Unit = synchronized {
if (conf.hasPath(ConfigPath)) {
// noinspection ScalaUnusedSymbol
implicit val bindingsReader: ConfigReader[MetricsBindings] = deriveReader[MetricsBindings]
implicit val metricsReader: ConfigReader[MetricsConfig] = deriveReader[MetricsConfig]
implicit val registryReader: ConfigReader[RegistryConfig] = deriveReader[RegistryConfig]
val metricsConfig = ConfigSource.fromConfig(conf.getConfig(ConfigPath)).loadOrThrow[MetricsConfig]
metricsConfig.registries.foreach { registryConfig =>
val config = ConfigSource.fromConfig(registryConfig).loadOrThrow[RegistryConfig]
if (config.enabled) {
val configString = registryConfig.root().render(ConfigRenderOptions.concise())
if (registries.contains(config.`type`)) {
val existing = registries(config.`type`)
if (existing != configString) {
throw new IllegalArgumentException(
s"Registry type ${config.`type`} already registered with a different configuration:" +
s"\n existing: $existing\n update: $configString")
}
} else {
val registry = createRegistry(conf)
sys.addShutdownHook(registry.close())
Metrics.addRegistry(registry)
registries.put(config.`type`, configString)
}
}
}

if (metricsConfig.bindings.classloader && bindings.add("classloader")) {
new ClassLoaderMetrics().bindTo(Metrics.globalRegistry)
}
if (metricsConfig.bindings.memory && bindings.add("memory")) {
new JvmMemoryMetrics().bindTo(Metrics.globalRegistry)
}
if (metricsConfig.bindings.gc && bindings.add("gc")) {
new JvmGcMetrics().bindTo(Metrics.globalRegistry)
}
if (metricsConfig.bindings.processor && bindings.add("processor")) {
new ProcessorMetrics().bindTo(Metrics.globalRegistry)
}
if (metricsConfig.bindings.threads && bindings.add("threads")) {
new JvmThreadMetrics().bindTo(Metrics.globalRegistry)
}
}
}

/**
* Create a new registry
*
* @param conf configuration for the registry
* @return
*/
def createRegistry(conf: Config): MeterRegistry = {
implicit val reader: ConfigReader[RegistryConfig] = deriveReader[RegistryConfig]
val config = ConfigSource.fromConfig(conf).loadOrThrow[RegistryConfig]
config.`type`.toLowerCase(Locale.US) match {
case "prometheus" => createPrometheusRegistry(conf)
case "cloudwatch" => createCloudwatchRegistry(conf)
case t => throw new IllegalArgumentException(s"No registry type defined for '$t' - valid values are: prometheus, cloudwatch")
}
}

private def createPrometheusRegistry(conf: Config): PrometheusMeterRegistry = {
// noinspection ScalaUnusedSymbol
implicit val gatewayReader: ConfigReader[PushGatewayConfig] = deriveReader[PushGatewayConfig]
implicit val prometheusReader: ConfigReader[PrometheusConfig] = deriveReader[PrometheusConfig]
val config = ConfigSource.fromConfig(conf).loadOrThrow[PrometheusConfig]
val dependentClose = new AtomicReference[Closeable]()
val registry = new PrometheusMeterRegistry(k => config.properties.getOrElse(k, null)) {
override def close(): Unit = {
CloseWithLogging(Option(dependentClose.get()))
super.close()
}
}
registry.throwExceptionOnRegistrationFailure()
if (config.rename) {
registry.config().meterFilter(new PrometheusRenameFilter())
}
if (config.commonTags.nonEmpty) {
val tags = config.commonTags.map { case (k, v) => Tag.of(k, v) }
registry.config.commonTags(tags.asJava)
}
config.pushGateway match {
case None =>
val server =
HTTPServer.builder()
.port(config.port)
.registry(registry.getPrometheusRegistry)
.buildAndStart()
dependentClose.set(server)

case Some(pg) =>
val builder = PushGateway.builder().registry(registry.getPrometheusRegistry).address(pg.host)
pg.job.foreach(builder.job)
pg.format.foreach(v => builder.format(Format.valueOf(v.toUpperCase(Locale.US))))
pg.scheme.foreach(v => builder.scheme(Scheme.fromString(v.toLowerCase(Locale.US))))
val pushGateway = builder.build()
dependentClose.set(() => pushGateway.pushAdd())
}
registry
}

private def createCloudwatchRegistry(conf: Config): CloudWatchMeterRegistry = {
implicit val reader: ConfigReader[CloudwatchConfig] = deriveReader[CloudwatchConfig]
val config = ConfigSource.fromConfig(conf).loadOrThrow[CloudwatchConfig]
new CloudWatchMeterRegistry(k => config.properties.getOrElse(k, null), Clock.SYSTEM, CloudWatchAsyncClient.create())
}

private case class MetricsConfig(
registries: Seq[Config],
bindings: MetricsBindings
)

private case class MetricsBindings(
classloader: Boolean = false,
memory: Boolean = false,
gc: Boolean = false,
processor: Boolean = false,
threads: Boolean = false,
)

private case class RegistryConfig(
`type`: String,
enabled: Boolean = true,
)

private case class PrometheusConfig(
rename: Boolean = false,
commonTags: Map[String, String] = Map.empty,
port: Int = 9090,
// additional config can also be done via sys props - see https://prometheus.github.io/client_java/config/config/
properties: Map[String, String] = Map.empty,
pushGateway: Option[PushGatewayConfig],
)

private case class PushGatewayConfig(
host: String,
scheme: Option[String],
job: Option[String],
format: Option[String],
)

private case class CloudwatchConfig(
namespace: String = "geomesa",
properties: Map[String, String] = Map.empty
)
}
Loading

0 comments on commit 6e5587d

Please sign in to comment.