Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to Open Telemetry #6

Merged
merged 6 commits into from
Sep 15, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 9 additions & 143 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,96 +9,20 @@

Welcome to reactive-streams-telemetry!

The goal of this [Scala](https://www.scala-lang.org/) library is to provide an [Reactive Streams](http://www.reactive-streams.org/) interface to
an application instance's working set of metrics and traces, thereby providing control over resource usage.
The goal of this [Scala](https://www.scala-lang.org/)/Java library is to provide a [Reactive Streams](http://www.reactive-streams.org/) interface to [Open Telemetry](https://github.com/open-telemetry/opentelemetry-java) metrics and traces so that low memory utilization can be attained.

Metrics and traces are presented using streams where elements pertain to a [metric](https://metrics.dropwizard.io/3.1.0/manual/core/)
and [span](https://opentracing.io/docs/overview/spans/) respectively.
Applications can then consume these streams and present them
Metrics and traces are presented using streams where elements pertain to Open Telemetry's [MetricData](https://github.com/open-telemetry/opentelemetry-java/blob/master/sdk/src/main/java/io/opentelemetry/sdk/metrics/data/MetricData.java)
and [SpanData](https://github.com/open-telemetry/opentelemetry-java/blob/master/sdk/src/main/java/io/opentelemetry/sdk/trace/data/SpanData.java) classes respectively.
Applications can consume these streams and present them
however they wish e.g. [Akka HTTP](https://doc.akka.io/docs/akka-http/current/) can be used to serve a snapshot of metrics and traces as JSON
with an HTTP `GET` route. Another example could be where metrics and traces are published
over UDP to your favorite collection engine.
over UDP to your favorite collection engine.

[Akka Streams](https://doc.akka.io/docs/akka/2.5/stream/)
is used as the Reactive Streams interface and implementation
with reporting for [Drop Wizard Metrics](https://metrics.dropwizard.io/4.0.0/) and
[Open Tracing](https://opentracing.io/) via [Jaeger Tracing](https://www.jaegertracing.io/).
We also provide a JSON encoding as a convenience and use [spray-json](https://github.com/spray/spray-json)
for this purpose.
[Akka Streams](https://doc.akka.io/docs/akka/2.6/stream/)
is used as the Reactive Streams interface and implementation.

Other than the libraries declared above, there are no additional dependencies.

## Teaser

Serve up the latest telemetry gathered given an [Alpakka Unix Domain Socket](https://doc.akka.io/docs/alpakka/current/unix-domain-socket.html)
and the establishment of the `metrics` and `traces` sources (described following this):

```scala
val source =
metrics
.map { snapshot =>
import MetricsJsonProtocol._
JsObject("metricsSnapshot" -> snapshot.toJson).compactPrint
}
.merge(
traces
.map { span =>
import TracingJsonProtocol._
JsObject("span" -> span.toJson).compactPrint
}
)
.map(s => ByteString(s + "\n")) // Having a newline delim helps consumers such as 'jq' to parse it

UnixDomainSocket()
.bindAndHandle(Flow.fromSinkAndSourceCoupled(Sink.ignore, source),
new File("/var/run/mysocket.sock"))
```

Then, using [`nc`](https://linux.die.net/man/1/nc) and [`jq`](https://stedolan.github.io/jq/manual/),
you can connect to your app and stream out prettified JSON:

```bash
$ nc -v -U /var/run/mysocket.sock | jq
{
"metricsSnapshot": {
"counters": {
"lora-server.messages-appended": 0,
"lora-server.downlink-packets-rx": 14,
"lora-server.unsupported-payload": 0,
"lora-server.uplink-packets-tx": 7
},
"gauges": {},
"histograms": {},
"meters": {
"lora-server.valid-data-up": {
"fifteenMinuteRate": 0,
"count": 0,
"fiveMinuteRate": 0,
"oneMinuteRate": 0,
"meanRate": 0
}
},
"timers": {}
}
}
{
"span": {
"baggage": [],
"duration": 1667,
"logs": [],
"operationName": "uplink-rx-to-ack",
"references": [],
"spanId": -5323514506617469000,
"start": 1551332653982000,
"tags": {
"sampler.type": "const",
"sampler.param": "true"
},
"traceId": -5323514506617469000
}
}
```

## Download

Builds are published to Maven Central. Please substitute `version` accordingly.
Expand All @@ -107,67 +31,9 @@ Builds are published to Maven Central. Please substitute `version` accordingly.
"au.com.titanclass" %% "reactive-streams-telemetry" % version
```

## Metrics setup

```scala
import com.codahale.metrics._
import au.com.titanclass.streams.telemetry._
import java.util.concurrent.TimeUnit

val metricRegistry = new MetricRegistry()

val reporter = new MetricsReporter(
metricRegistry,
MetricFilter.ALL,
TimeUnit.HOURS,
TimeUnit.MILLISECONDS,
None
)
```

## Tracing setup

```scala
import io.jaegertracing.internal.{ JaegerTracer => Tracer }
import au.com.titanclass.streams.telemetry._

val reporter = new TracingReporter(1)

val tracer = new Tracer.Builder("my-tracing-service")
.withReporter(reporter)
.build()
```

## JSON serialization

To use the JSON serialization you import `MetricsJsonProtocol` or `TracingJsonProtocol` for metrics
and tracing respectively. Here's an example of how to serialize `Span` objects to JSON:

```scala
import au.com.titanclass.streams.telemetry.{TracingJsonProtocol, TracingReporter}
import io.jaegertracing.internal.{ JaegerTracer => Tracer }
import io.jaegertracing.internal.samplers.ConstSampler
import akka.stream.scaladsl.Sink

val reporter = new TracingReporter(1)
## Usage

val sampler = new ConstSampler(true)

val tracer = new Tracer.Builder("tracing-reporter-tests")
.withReporter(reporter)
.withSampler(sampler)
.build()

val span = tracer.buildSpan("some-span").start()
span.log(0, "hello-world")
span.finish()

import TracingJsonProtocol._

reporter.source
.runWith(Sink.head)
.map (_.toJson)
```
Please check out the tests for sample usage.

## Contribution policy ##

Expand Down
27 changes: 13 additions & 14 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ lazy val `reactive-streams-telemetry` =
.settings(
libraryDependencies ++= Seq(
library.akkaStream,
library.dropWizardMetricsCore,
library.jaegerCore,
library.sprayJson,
library.utest % Test
library.openTelemetryProto,
library.openTelemetrySdk,
library.utest % Test,
library.akkaStreamTestkit % Test
)
)

Expand All @@ -24,17 +24,16 @@ lazy val `reactive-streams-telemetry` =
lazy val library =
new {
object Version {
val akka = "2.6.7"
val dropWizardMetrics = "4.1.2"
val jaeger = "1.3.1"
val sprayJson = "1.3.5"
val utest = "0.7.2"
val akka = "2.6.7"
val openTelemetryProto = "0.3.0"
val openTelemetrySdk = "0.6.0"
val utest = "0.7.2"
}
val akkaStream = "com.typesafe.akka" %% "akka-stream" % Version.akka
val dropWizardMetricsCore = "io.dropwizard.metrics" % "metrics-core" % Version.dropWizardMetrics
val jaegerCore = "io.jaegertracing" % "jaeger-core" % Version.jaeger
val sprayJson = "io.spray" %% "spray-json" % Version.sprayJson
val utest = "com.lihaoyi" %% "utest" % Version.utest
val akkaStream = "com.typesafe.akka" %% "akka-stream" % Version.akka
val akkaStreamTestkit = "com.typesafe.akka" %% "akka-stream-testkit" % Version.akka
val openTelemetryProto = "io.opentelemetry" % "opentelemetry-proto" % Version.openTelemetryProto
val openTelemetrySdk = "io.opentelemetry" % "opentelemetry-sdk" % Version.openTelemetrySdk
val utest = "com.lihaoyi" %% "utest" % Version.utest
}

// *****************************************************************************
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package au.com.titanclass.streams.telemetry

import io.opentelemetry.proto.metrics.v1.{ Metric, MetricDescriptor }
import io.opentelemetry.sdk.metrics.data.MetricData

/**
* Encodes metric data to the Open Telemetry protobuf form
*/
object MetricProtobufMarshalling {
implicit class Marshaller(metricData: MetricData) {
def toProtobuf: Metric.Builder =
Metric
.newBuilder()
.setMetricDescriptor(
MetricDescriptor.newBuilder().setName(metricData.getDescriptor.getName)
)
}
}

This file was deleted.

Loading