From 9b5992748bf368dfab3e15de2c1ffc76c3900c1b Mon Sep 17 00:00:00 2001 From: Connor Adams Date: Thu, 23 Jul 2020 15:38:38 -0400 Subject: [PATCH 1/6] Integrate Exemplars with Python SDK --- .../statistical_exemplars-checkpoint.ipynb | 359 ++++++++++++++ docs/examples/exemplars/README.rst | 40 ++ docs/examples/exemplars/semantic_exemplars.py | 68 +++ .../exemplars/statistical_exemplars.ipynb | 340 +++++++++++++ .../exemplars/statistical_exemplars.py | 132 +++++ .../sdk/metrics/export/__init__.py | 3 +- .../sdk/metrics/export/aggregate.py | 53 +- .../sdk/metrics/export/exemplars.py | 277 +++++++++++ .../src/opentelemetry/sdk/metrics/view.py | 5 +- .../tests/metrics/export/test_exemplars.py | 460 ++++++++++++++++++ 10 files changed, 1725 insertions(+), 12 deletions(-) create mode 100644 docs/examples/exemplars/.ipynb_checkpoints/statistical_exemplars-checkpoint.ipynb create mode 100644 docs/examples/exemplars/README.rst create mode 100644 docs/examples/exemplars/semantic_exemplars.py create mode 100644 docs/examples/exemplars/statistical_exemplars.ipynb create mode 100644 docs/examples/exemplars/statistical_exemplars.py create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py create mode 100644 opentelemetry-sdk/tests/metrics/export/test_exemplars.py diff --git a/docs/examples/exemplars/.ipynb_checkpoints/statistical_exemplars-checkpoint.ipynb b/docs/examples/exemplars/.ipynb_checkpoints/statistical_exemplars-checkpoint.ipynb new file mode 100644 index 00000000000..deb0f27457f --- /dev/null +++ b/docs/examples/exemplars/.ipynb_checkpoints/statistical_exemplars-checkpoint.ipynb @@ -0,0 +1,359 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This example will build an exemplar sample set on a \"bytes in\" counter aggregator, which just sums up the number of bytes sent into our \"application\".\n", + "We will use these statistical exemplars to generate insights into the data that was aggregated away.\n", + "\n", + "We'll start by importing everything we will need from opentelemetry to create the metrics:" + ] + }, + { + "cell_type": "code", + "execution_count": 48, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import matplotlib.pyplot as plt\n", + "import random\n", + "\n", + "from collections import defaultdict\n", + "\n", + "from opentelemetry import metrics\n", + "from opentelemetry.sdk.metrics import Counter, MeterProvider\n", + "from opentelemetry.sdk.metrics.export.aggregate import SumAggregator\n", + "from opentelemetry.sdk.metrics.export.controller import PushController\n", + "from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import InMemoryMetricsExporter\n", + "from opentelemetry.sdk.metrics.view import View, ViewConfig" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can then set up an in-memory metrics exporter so we can analyze the exemplar data in-service:" + ] + }, + { + "cell_type": "code", + "execution_count": 49, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Overriding current MeterProvider\n" + ] + } + ], + "source": [ + "## set up opentelemetry\n", + "\n", + "# Sets the global MeterProvider instance\n", + "metrics.set_meter_provider(MeterProvider())\n", + "\n", + "meter = metrics.get_meter(__name__)\n", + "\n", + "# Export to a python list so we can do stats with the data\n", + "exporter = InMemoryMetricsExporter()\n", + "\n", + "# instead of waiting for the controller to tick over time, we will just tick it ourselves\n", + "controller = PushController(meter, exporter, 500)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We now need to create the bytes in metric, and assign it a view (this is where we set up exemplars):" + ] + }, + { + "cell_type": "code", + "execution_count": 50, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "# Create the metric that we will use\n", + "bytes_counter = meter.create_metric(\n", + " name=\"bytes_counter\",\n", + " description=\"Number of bytes received by service\",\n", + " unit=\"By\",\n", + " value_type=int,\n", + " metric_type=Counter,\n", + ")\n", + "\n", + "# Every time interval we will collect 100 exemplars statistically (selected without bias)\n", + "aggregator_config = {\"num_exemplars\": 100, \"statistical_exemplars\": True}\n", + "\n", + "# Assign a Sum aggregator to `bytes_counter` that collects exemplars\n", + "counter_view = View(\n", + " bytes_counter,\n", + " SumAggregator(config=aggregator_config),\n", + " label_keys=[\"environment\"],\n", + " config=ViewConfig.LABEL_KEYS,\n", + ")\n", + "\n", + "meter.register_view(counter_view)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The last thing we need to do before we can start working with exemplars is generating a large set of data for metrics.\n", + "If the dataset is too small, we won't be able to collect a large enough subset of the input to analyze with exemplars.\n", + "\n", + "If this was a real application, the data would be generated through requests to/from the server." + ] + }, + { + "cell_type": "code", + "execution_count": 51, + "metadata": {}, + "outputs": [], + "source": [ + "## generate the random metric data\n", + "\n", + "def unknown_customer_calls():\n", + " \"\"\"Generate customer call data to our application\"\"\"\n", + "\n", + " # set a random seed for consistency of data for example purposes\n", + " np.random.seed(1)\n", + " # Make exemplar selection consistent for example purposes\n", + " random.seed(1)\n", + "\n", + " # customer 123 is a big user, and made 1000 requests in this timeframe\n", + " requests = np.random.normal(1000, 250, 1000) # 1000 requests with average 1000 bytes, covariance 100\n", + "\n", + " for request in requests:\n", + " bytes_counter.add(int(request), {\"environment\": \"production\", \"method\": \"REST\", \"customer_id\": 123})\n", + "\n", + " # customer 247 is another big user, making fewer, but bigger requests\n", + " requests = np.random.normal(5000, 1250, 200) # 200 requests with average size of 5k bytes\n", + "\n", + " for request in requests:\n", + " bytes_counter.add(int(request), {\"environment\": \"production\", \"method\": \"REST\", \"customer_id\": 247})\n", + "\n", + " # There are many other smaller customers\n", + " for customer_id in range(250):\n", + " requests = np.random.normal(1000, 250, np.random.randint(1, 10))\n", + " method = \"REST\" if np.random.randint(2) else \"gRPC\"\n", + " for request in requests:\n", + " bytes_counter.add(int(request), {\"environment\": \"production\", \"method\": method, \"customer_id\": customer_id})\n", + "\n", + "unknown_customer_calls()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Analyzing the Exemplars" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's export our metric and collect the exemplars from the outputted aggregation:" + ] + }, + { + "cell_type": "code", + "execution_count": 52, + "metadata": {}, + "outputs": [], + "source": [ + "# Tick the controller so it sends metrics to the exporter\n", + "controller.tick()\n", + "\n", + "# collect metrics from our exporter\n", + "metric_data = exporter.get_exported_metrics()\n", + "\n", + "# get the exemplars from the bytes_in counter aggregator\n", + "aggregator = metric_data[0].aggregator\n", + "exemplars = aggregator.checkpoint_exemplars" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "One of the key values of exemplars is its ability to handle dropped labels (labels that are too high cardinality to create a new metric record for each value). \n", + "In our application, we drop the \"customer_id\" label since there is an unbounded number of possible labels. However, with exemplars, we can still estimate stats related\n", + "to the customer ids, for example the relative size of each customer:" + ] + }, + { + "cell_type": "code", + "execution_count": 53, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "\n", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# Sum up the total bytes in per customer from all of the exemplars collected\n", + "customer_bytes_map = defaultdict(int)\n", + "for exemplar in exemplars:\n", + " customer_bytes_map[exemplar.dropped_labels] += exemplar.value\n", + "\n", + "\n", + "customer_bytes_list = sorted(list(customer_bytes_map.items()), key=lambda t: t[1], reverse=True)\n", + "\n", + "# Save our top 5 customers and sum all of the rest into \"Others\".\n", + "top_5_customers = [(\"Customer {}\".format(dict(val[0])[\"customer_id\"]), val[1]) for val in customer_bytes_list[:5]] + [(\"Other Customers\", sum([val[1] for val in customer_bytes_list[5:]]))]\n", + "\n", + "# unzip the data into X (sizes of each customer's contribution) and labels\n", + "labels, X = zip(*top_5_customers)\n", + "\n", + "# create the chart with matplotlib and show it\n", + "plt.pie(X, labels=labels)\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Running this shows that the usage of our service is relatively closely split between customer 247, customer 123, and everyone else, which lines up closely with the data that was generated.\n", + "The more exemplars we sample, the more accurate this data will be, but also the more costly (in terms of memory usage) the metric would be.\n", + "\n", + "We can use the \"sample_count\" property of exemplars to predict the actual number of bytes customers sent (vs the percentage)\n", + "For example, to predict the number of bytes customer 123 sent:" + ] + }, + { + "cell_type": "code", + "execution_count": 59, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "sample count 25.33 custmer 40474\n", + "Customer 123 sent about 1025206 bytes this interval\n" + ] + } + ], + "source": [ + "# Estimate how many bytes customer 123 sent\n", + "customer_123_bytes = customer_bytes_map[((\"customer_id\", 123), (\"method\", \"REST\"))]\n", + "\n", + "# Since the exemplars were randomly sampled, all sample_counts will be the same\n", + "sample_count = exemplars[0].sample_count\n", + "print(\"sample count\", sample_count, \"custmer\", customer_123_bytes)\n", + "full_customer_123_bytes = sample_count * customer_123_bytes\n", + "\n", + "# With seed == 1 we get 1025206 - quite close to the statistical mean of 1000000! (more exemplars would make this estimation even more accurate)\n", + "print(\"Customer 123 sent about {} bytes this interval\".format(int(full_customer_123_bytes)))\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "We could also estimate the percentage of our top 25 customers that use gRPC (another dropped label):" + ] + }, + { + "cell_type": "code", + "execution_count": 58, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "~44% of the top 25 customers (by bytes in) used gRPC this interval\n" + ] + } + ], + "source": [ + "# Determine the top 25 customers by how many bytes they sent in exemplars\n", + "top_25_customers = customer_bytes_list[:25]\n", + "\n", + "# out of those 25 customers, determine how many used grpc, and come up with a ratio\n", + "percent_grpc = len(list(filter(lambda customer_value: customer_value[0][1][1] == \"gRPC\", top_25_customers))) / len(top_25_customers)\n", + "\n", + "print(\"~{}% of the top 25 customers (by bytes in) used gRPC this interval\".format(int(percent_grpc*100)))\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The value of exemplars goes beyond just handling dropped labels, however. We can also estimate the input distribution to the `bytes_counter` metric, through histograms or quantiles:\n" + ] + }, + { + "cell_type": "code", + "execution_count": 57, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "50th Percentile Bytes In: 1031\n", + "90th Percentile Bytes In: 1624\n", + "99th Percentile Bytes In: 6094\n" + ] + } + ], + "source": [ + "# Determine the 50th, 90th, and 99th percentile of byte size sent in\n", + "quantiles = np.quantile([exemplar.value for exemplar in exemplars], [0.5, 0.9, 0.99])\n", + "print(\"50th Percentile Bytes In:\", int(quantiles[0]))\n", + "print(\"90th Percentile Bytes In:\", int(quantiles[1]))\n", + "print(\"99th Percentile Bytes In:\", int(quantiles[2]))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This is only a small subset of the things that can be done with exemplars - almost any statistic \n", + "that could be created through an aggregator on the original data can be estimated through exemplars." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "jupyter3_Python_3", + "language": "python", + "name": "jupyter3_python_3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/docs/examples/exemplars/README.rst b/docs/examples/exemplars/README.rst new file mode 100644 index 00000000000..b49a02b8de6 --- /dev/null +++ b/docs/examples/exemplars/README.rst @@ -0,0 +1,40 @@ +OpenTelemetry Exemplars Example +=============================== + +Exemplars are example measurements for aggregations. While they are simple conceptually, exemplars can estimate any statistic about the input distribution, can provide links to sample traces for high latency requests, and much more. +For more information about exemplars and how they work in OpenTelemetry, see the `spec `_ + +Examples +-------- + +Installation + +.. code-block:: sh + + pip install opentelemetry-api + pip install opentelemetry-sdk + pip install matplotlib # may have to install Qt as well + pip install numpy + + pip install opentelemetry-exporter-cloud-monitoring # if you want to export exemplars to cloud monitoring + +Statistical exemplars +^^^^^^^^^^^^^^^^^^^^^ + +The opentelemetry SDK provides a way to sample exemplars statistically: + + - Exemplars will be picked to represent the input distribution, without unquantifiable bias + - A "sample_count" attribute will be set on each exemplar to quantify how many measurements each exemplar represents + +See 'statistical_exemplars.ipynb' for the example (TODO: how do I link this?) + +Semantic exemplars +^^^^^^^^^^^^^^^^^^ + +Semantic exemplars are exemplars that have not been sampled statistically, +but instead aim to provide value as individual exemplars. +They will have a trace id/span id attached for the active trace when the exemplar was recorded, +and they may focus on measurements with abnormally high/low values. + +'semantic_exemplars.py' shows how to generate exemplars for a histogram aggregation. +Currently only the Google Cloud Monitoring exporter supports uploading these exemplars. diff --git a/docs/examples/exemplars/semantic_exemplars.py b/docs/examples/exemplars/semantic_exemplars.py new file mode 100644 index 00000000000..5d14dd3bea5 --- /dev/null +++ b/docs/examples/exemplars/semantic_exemplars.py @@ -0,0 +1,68 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +This example shows how to generate "semantic" exemplars for a histogram, and how to export them to Google Cloud Monitoring. +""" + +import random +import time + +from opentelemetry import metrics +from opentelemetry.sdk.metrics import ( + MeterProvider, + ValueRecorder, +) +from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter +from opentelemetry.sdk.metrics.export.aggregate import ( + HistogramAggregator, +) +from opentelemetry.sdk.metrics.view import View, ViewConfig + +# Set up OpenTelemetry metrics +metrics.set_meter_provider(MeterProvider(stateful=False)) +meter = metrics.get_meter(__name__) + +# Use the Google Cloud Monitoring Metrics Exporter since its the only one that currently supports exemplars +metrics.get_meter_provider().start_pipeline(meter, ConsoleMetricsExporter(), 10) + +# Create our duration metric +request_duration = meter.create_metric( + name="request_duration", + description="duration (ms) of incoming requests", + unit="ms", + value_type=int, + metric_type=ValueRecorder, +) + +# Add a Histogram view to our duration metric, and make it generate 1 exemplars per bucket +duration_view = View( + request_duration, + # Latency in buckets: + # [>=0ms, >=25ms, >=50ms, >=75ms, >=100ms, >=200ms, >=400ms, >=600ms, >=800ms, >=1s, >=2s, >=4s, >=6s] + # We want to generate 1 exemplar per bucket, where each exemplar has a linked trace that was recorded. + # So we need to set num_exemplars to 1 and not specify statistical_exemplars (defaults to false) + HistogramAggregator, + aggregator_config={"bounds": [0, 25, 50, 75, 100, 200, 400, 600, 800, 1000, 2000, 4000, 6000], + "num_exemplars": 1}, + label_keys=["environment"], + view_config=ViewConfig.LABEL_KEYS, +) + +meter.register_view(duration_view) + +for i in range(100): + # Generate some random data for the histogram with a dropped label "customer_id" + request_duration.record(random.randint(1, 8000), {"environment": "staging", "customer_id": random.randint(1, 100)}) + time.sleep(1) diff --git a/docs/examples/exemplars/statistical_exemplars.ipynb b/docs/examples/exemplars/statistical_exemplars.ipynb new file mode 100644 index 00000000000..5f3659e41e8 --- /dev/null +++ b/docs/examples/exemplars/statistical_exemplars.ipynb @@ -0,0 +1,340 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This example will build an exemplar sample set on a \"bytes in\" counter aggregator, which just sums up the number of bytes sent into our \"application\".\n", + "We will use these statistical exemplars to generate insights into the data that was aggregated away.\n", + "\n", + "We'll start by importing everything we will need from opentelemetry to create the metrics:" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import matplotlib.pyplot as plt\n", + "import random\n", + "\n", + "from collections import defaultdict\n", + "\n", + "from opentelemetry import metrics\n", + "from opentelemetry.sdk.metrics import Counter, MeterProvider\n", + "from opentelemetry.sdk.metrics.export.aggregate import SumAggregator\n", + "from opentelemetry.sdk.metrics.export.controller import PushController\n", + "from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import InMemoryMetricsExporter\n", + "from opentelemetry.sdk.metrics.view import View, ViewConfig" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can then set up an in-memory metrics exporter so we can analyze the exemplar data in-service:" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "## set up opentelemetry\n", + "\n", + "# Sets the global MeterProvider instance\n", + "metrics.set_meter_provider(MeterProvider())\n", + "\n", + "meter = metrics.get_meter(__name__)\n", + "\n", + "# Export to a python list so we can do stats with the data\n", + "exporter = InMemoryMetricsExporter()\n", + "\n", + "# instead of waiting for the controller to tick over time, we will just tick it ourselves\n", + "controller = PushController(meter, exporter, 500)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We now need to create the bytes in metric, and assign it a view (this is where we set up exemplars):" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "# Create the metric that we will use\n", + "bytes_counter = meter.create_metric(\n", + " name=\"bytes_counter\",\n", + " description=\"Number of bytes received by service\",\n", + " unit=\"By\",\n", + " value_type=int,\n", + " metric_type=Counter,\n", + ")\n", + "\n", + "# Every time interval we will collect 100 exemplars statistically (selected without bias)\n", + "aggregator_config = {\"num_exemplars\": 100, \"statistical_exemplars\": True}\n", + "\n", + "# Assign a Sum aggregator to `bytes_counter` that collects exemplars\n", + "counter_view = View(\n", + " bytes_counter,\n", + " SumAggregator,\n", + " aggregator_config=aggregator_config,\n", + " label_keys=[\"environment\"],\n", + " view_config=ViewConfig.LABEL_KEYS,\n", + ")\n", + "\n", + "meter.register_view(counter_view)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The last thing we need to do before we can start working with exemplars is generating a large set of data for metrics.\n", + "If the dataset is too small, we won't be able to collect a large enough subset of the input to analyze with exemplars.\n", + "\n", + "If this was a real application, the data would be generated through requests to/from the server." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "## generate the random metric data\n", + "\n", + "def unknown_customer_calls():\n", + " \"\"\"Generate customer call data to our application\"\"\"\n", + "\n", + " # set a random seed for consistency of data for example purposes\n", + " np.random.seed(1)\n", + " # Make exemplar selection consistent for example purposes\n", + " random.seed(1)\n", + "\n", + " # customer 123 is a big user, and made 1000 requests in this timeframe\n", + " requests = np.random.normal(1000, 250, 1000) # 1000 requests with average 1000 bytes, covariance 100\n", + "\n", + " for request in requests:\n", + " bytes_counter.add(int(request), {\"environment\": \"production\", \"method\": \"REST\", \"customer_id\": 123})\n", + "\n", + " # customer 247 is another big user, making fewer, but bigger requests\n", + " requests = np.random.normal(5000, 1250, 200) # 200 requests with average size of 5k bytes\n", + "\n", + " for request in requests:\n", + " bytes_counter.add(int(request), {\"environment\": \"production\", \"method\": \"REST\", \"customer_id\": 247})\n", + "\n", + " # There are many other smaller customers\n", + " for customer_id in range(250):\n", + " requests = np.random.normal(1000, 250, np.random.randint(1, 10))\n", + " method = \"REST\" if np.random.randint(2) else \"gRPC\"\n", + " for request in requests:\n", + " bytes_counter.add(int(request), {\"environment\": \"production\", \"method\": method, \"customer_id\": customer_id})\n", + "\n", + "unknown_customer_calls()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Analyzing the Exemplars" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's export our metric and collect the exemplars from the outputted aggregation:" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "# Tick the controller so it sends metrics to the exporter\n", + "controller.tick()\n", + "\n", + "# collect metrics from our exporter\n", + "metric_data = exporter.get_exported_metrics()\n", + "\n", + "# get the exemplars from the bytes_in counter aggregator\n", + "aggregator = metric_data[0].aggregator\n", + "exemplars = aggregator.checkpoint_exemplars" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "One of the key values of exemplars is its ability to handle dropped labels (labels that are too high cardinality to create a new metric record for each value). \n", + "In our application, we drop the \"customer_id\" label since there is an unbounded number of possible labels. However, with exemplars, we can still estimate stats related\n", + "to the customer ids, for example the sizes of our top customers:" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [ + { + "output_type": "display_data", + "data": { + "text/plain": "
", + "image/svg+xml": "\n\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n\n", + "image/png": "\n" + }, + "metadata": {} + } + ], + "source": [ + "# Sum up the total bytes in per customer from all of the exemplars collected\n", + "customer_bytes_map = defaultdict(int)\n", + "for exemplar in exemplars:\n", + " customer_bytes_map[exemplar.dropped_labels] += exemplar.value\n", + "\n", + "\n", + "customer_bytes_list = sorted(list(customer_bytes_map.items()), key=lambda t: t[1], reverse=True)\n", + "\n", + "# Save our top 5 customers and sum all of the rest into \"Others\".\n", + "top_3_customers = [(\"Customer {}\".format(dict(val[0])[\"customer_id\"]), val[1]) for val in customer_bytes_list[:3]] + [(\"Other Customers\", sum([val[1] for val in customer_bytes_list[3:]]))]\n", + "\n", + "# unzip the data into X (sizes of each customer's contribution) and labels\n", + "labels, X = zip(*top_3_customers)\n", + "\n", + "# create the chart with matplotlib and show it\n", + "plt.pie(X)\n", + "plt.legend(labels, loc = \"upper right\") \n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Running this shows that the usage of our service is relatively closely split between customer 247, customer 123, and everyone else, which lines up closely with the data that was generated.\n", + "The more exemplars we sample, the more accurate this data will be, but also the more costly (in terms of memory usage) the metric would be.\n", + "\n", + "We can use the \"sample_count\" property of exemplars to predict the actual number of bytes customers sent (vs the percentage)\n", + "For example, to predict the number of bytes customer 123 sent:" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": "Customer 123 sent about 1025206 bytes this interval\n" + } + ], + "source": [ + "# Estimate how many bytes customer 123 sent\n", + "customer_123_bytes = customer_bytes_map[((\"customer_id\", 123), (\"method\", \"REST\"))]\n", + "\n", + "# Since the exemplars were randomly sampled, all sample_counts will be the same\n", + "sample_count = exemplars[0].sample_count\n", + "full_customer_123_bytes = sample_count * customer_123_bytes\n", + "\n", + "# With seed == 1 we get 1025206 - quite close to the statistical mean of 1000000! (more exemplars would make this estimation even more accurate)\n", + "print(\"Customer 123 sent about {} bytes this interval\".format(int(full_customer_123_bytes)))\n" + ] + }, + { + "source": [ + "We could also estimate the percentage of our top 25 customers that use gRPC (another dropped label):" + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": "~44% of the top 25 customers (by bytes in) used gRPC this interval\n" + } + ], + "source": [ + "# Determine the top 25 customers by how many bytes they sent in exemplars\n", + "top_25_customers = customer_bytes_list[:25]\n", + "\n", + "# out of those 25 customers, determine how many used grpc, and come up with a ratio\n", + "percent_grpc = len(list(filter(lambda customer_value: customer_value[0][1][1] == \"gRPC\", top_25_customers))) / len(top_25_customers)\n", + "\n", + "print(\"~{}% of the top 25 customers (by bytes in) used gRPC this interval\".format(int(percent_grpc*100)))\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The value of exemplars goes beyond just handling dropped labels, however. We can also estimate the input distribution to the `bytes_counter` metric, through histograms or quantiles:\n" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": "50th Percentile Bytes In: 1031\n90th Percentile Bytes In: 1624\n99th Percentile Bytes In: 6094\n" + } + ], + "source": [ + "# Determine the 50th, 90th, and 99th percentile of byte size sent in\n", + "quantiles = np.quantile([exemplar.value for exemplar in exemplars], [0.5, 0.9, 0.99])\n", + "print(\"50th Percentile Bytes In:\", int(quantiles[0]))\n", + "print(\"90th Percentile Bytes In:\", int(quantiles[1]))\n", + "print(\"99th Percentile Bytes In:\", int(quantiles[2]))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This is only a small subset of the things that can be done with exemplars - almost any statistic \n", + "that could be created through an aggregator on the original data can be estimated through exemplars." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "jupyter3_Python_3", + "language": "python", + "name": "jupyter3_python_3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} \ No newline at end of file diff --git a/docs/examples/exemplars/statistical_exemplars.py b/docs/examples/exemplars/statistical_exemplars.py new file mode 100644 index 00000000000..353e516cb4e --- /dev/null +++ b/docs/examples/exemplars/statistical_exemplars.py @@ -0,0 +1,132 @@ +import numpy as np +import matplotlib.pyplot as plt +import random + +from collections import defaultdict + +from opentelemetry import metrics +from opentelemetry.sdk.metrics import Counter, MeterProvider +from opentelemetry.sdk.metrics.export.aggregate import SumAggregator +from opentelemetry.sdk.metrics.export.controller import PushController +from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import InMemoryMetricsExporter +from opentelemetry.sdk.metrics.view import View, ViewConfig + +## set up opentelemetry + +# Sets the global MeterProvider instance +metrics.set_meter_provider(MeterProvider()) + +meter = metrics.get_meter(__name__) + +# Export to a python list so we can do stats with the data +exporter = InMemoryMetricsExporter() + +# instead of waiting for the controller to tick over time, we will just tick it ourselves +controller = PushController(meter, exporter, 500) + +# Create the metric that we will use +bytes_counter = meter.create_metric( + name="bytes_counter", + description="Number of bytes received by service", + unit="By", + value_type=int, + metric_type=Counter, +) + +# Every time interval we will collect 100 exemplars statistically (selected without bias) +aggregator_config = {"num_exemplars": 100, "statistical_exemplars": True} + +# Assign a Sum aggregator to `bytes_counter` that collects exemplars +counter_view = View( + bytes_counter, + SumAggregator, + aggregator_config=aggregator_config, + label_keys=["environment"], + view_config=ViewConfig.LABEL_KEYS, +) + +meter.register_view(counter_view) + +## generate the random metric data + +def unknown_customer_calls(): + """Generate customer call data to our application""" + + # set a random seed for consistency of data for example purposes + np.random.seed(1) + # Make exemplar selection consistent for example purposes + random.seed(1) + + # customer 123 is a big user, and made 1000 requests in this timeframe + requests = np.random.normal(1000, 250, 1000) # 1000 requests with average 1000 bytes, covariance 100 + + for request in requests: + bytes_counter.add(int(request), {"environment": "production", "method": "REST", "customer_id": 123}) + + # customer 247 is another big user, making fewer, but bigger requests + requests = np.random.normal(5000, 1250, 200) # 200 requests with average size of 5k bytes + + for request in requests: + bytes_counter.add(int(request), {"environment": "production", "method": "REST", "customer_id": 247}) + + # There are many other smaller customers + for customer_id in range(250): + requests = np.random.normal(1000, 250, np.random.randint(1, 10)) + method = "REST" if np.random.randint(2) else "gRPC" + for request in requests: + bytes_counter.add(int(request), {"environment": "production", "method": method, "customer_id": customer_id}) + +unknown_customer_calls() + +# Tick the controller so it sends metrics to the exporter +controller.tick() + +# collect metrics from our exporter +metric_data = exporter.get_exported_metrics() + +# get the exemplars from the bytes_in counter aggregator +aggregator = metric_data[0].aggregator +exemplars = aggregator.checkpoint_exemplars + +# Sum up the total bytes in per customer from all of the exemplars collected +customer_bytes_map = defaultdict(int) +for exemplar in exemplars: + customer_bytes_map[exemplar.dropped_labels] += exemplar.value + + +customer_bytes_list = sorted(list(customer_bytes_map.items()), key=lambda t: t[1], reverse=True) + +# Save our top 5 customers and sum all of the rest into "Others". +top_5_customers = [("Customer {}".format(dict(val[0])["customer_id"]), val[1]) for val in customer_bytes_list[:5]] + [("Other Customers", sum([val[1] for val in customer_bytes_list[5:]]))] + +# unzip the data into X (sizes of each customer's contribution) and labels +labels, X = zip(*top_5_customers) + +# create the chart with matplotlib and show it +plt.pie(X, labels=labels) +plt.show() + +# Estimate how many bytes customer 123 sent +customer_123_bytes = customer_bytes_map[(("customer_id", 123), ("method", "REST"))] + +# Since the exemplars were randomly sampled, all sample_counts will be the same +sample_count = exemplars[0].sample_count +print("sample count", sample_count, "custmer", customer_123_bytes) +full_customer_123_bytes = sample_count * customer_123_bytes + +# With seed == 1 we get 1008612 - quite close to the statistical mean of 1000000! (more exemplars would make this estimation even more accurate) +print("Customer 123 sent about {} bytes this interval".format(int(full_customer_123_bytes))) + +# Determine the top 25 customers by how many bytes they sent in exemplars +top_25_customers = customer_bytes_list[:25] + +# out of those 25 customers, determine how many used grpc, and come up with a ratio +percent_grpc = len(list(filter(lambda customer_value: customer_value[0][1][1] == "gRPC", top_25_customers))) / len(top_25_customers) + +print("~{}% of the top 25 customers (by bytes in) used gRPC this interval".format(int(percent_grpc*100))) + +# Determine the 50th, 90th, and 99th percentile of byte size sent in +quantiles = np.quantile([exemplar.value for exemplar in exemplars], [0.5, 0.9, 0.99]) +print("50th Percentile Bytes In:", int(quantiles[0])) +print("90th Percentile Bytes In:", int(quantiles[1])) +print("99th Percentile Bytes In:", int(quantiles[2])) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py index 16911f94efb..8abada0a3c7 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py @@ -77,11 +77,12 @@ def export( ) -> "MetricsExportResult": for record in metric_records: print( - '{}(data="{}", labels="{}", value={})'.format( + '{}(data="{}", labels="{}", value={}, exemplars={})'.format( type(self).__name__, record.instrument, record.labels, record.aggregator.checkpoint, + record.aggregator.checkpoint_exemplars ) ) return MetricsExportResult.SUCCESS diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index 121f39a98b6..a0a8db346bc 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -16,8 +16,17 @@ import logging import threading from collections import OrderedDict, namedtuple +import itertools +from collections import namedtuple, OrderedDict from opentelemetry.util import time_ns +from opentelemetry.sdk.metrics.export.exemplars import ( + Exemplar, + RandomExemplarSampler, + MinMaxExemplarSampler, + BucketedExemplarSampler, + ExemplarManager +) logger = logging.getLogger(__name__) @@ -36,9 +45,10 @@ def __init__(self, config=None): self.config = config else: self.config = {} + self.checkpoint_exemplars = list() @abc.abstractmethod - def update(self, value): + def update(self, value, dropped_labels=None): """Updates the current with the new value.""" @abc.abstractmethod @@ -59,15 +69,19 @@ def __init__(self, config=None): self.checkpoint = 0 self._lock = threading.Lock() self.last_update_timestamp = None + self.exemplar_manager = ExemplarManager(config, MinMaxExemplarSampler, RandomExemplarSampler) - def update(self, value): + def update(self, value, dropped_labels=None): with self._lock: self.current += value self.last_update_timestamp = time_ns() + self.exemplar_manager.sample(value, dropped_labels) + def take_checkpoint(self): with self._lock: self.checkpoint = self.current + self.checkpoint_exemplars = self.exemplar_manager.take_checkpoint() self.current = 0 def merge(self, other): @@ -77,6 +91,7 @@ def merge(self, other): self.last_update_timestamp = get_latest_timestamp( self.last_update_timestamp, other.last_update_timestamp ) + self.checkpoint_exemplars = self.exemplar_manager.merge(self.checkpoint_exemplars, other.checkpoint_exemplars) class MinMaxSumCountAggregator(Aggregator): @@ -105,8 +120,11 @@ def __init__(self, config=None): self._lock = threading.Lock() self.last_update_timestamp = None - def update(self, value): + self.exemplar_manager = ExemplarManager(config, MinMaxExemplarSampler, RandomExemplarSampler) + + def update(self, value, dropped_labels=None): with self._lock: + if self.current is self._EMPTY: self.current = self._TYPE(value, value, value, 1) else: @@ -118,9 +136,12 @@ def update(self, value): ) self.last_update_timestamp = time_ns() + self.exemplar_manager.sample(value, dropped_labels) + def take_checkpoint(self): with self._lock: self.checkpoint = self.current + self.checkpoint_exemplars = self.exemplar_manager.take_checkpoint() self.current = self._EMPTY def merge(self, other): @@ -132,6 +153,7 @@ def merge(self, other): self.last_update_timestamp = get_latest_timestamp( self.last_update_timestamp, other.last_update_timestamp ) + self.checkpoint_exemplars = self.exemplar_manager.merge(self.checkpoint_exemplars, other.checkpoint_exemplars) class HistogramAggregator(Aggregator): @@ -151,6 +173,8 @@ def __init__(self, config=None): self.current = OrderedDict([(bb, 0) for bb in self._boundaries]) self.checkpoint = OrderedDict([(bb, 0) for bb in self._boundaries]) + self.exemplar_manager = ExemplarManager(config, BucketedExemplarSampler, BucketedExemplarSampler, boundaries=self._boundaries) + self.current[">"] = 0 self.checkpoint[">"] = 0 @@ -178,18 +202,21 @@ def _merge_checkpoint(cls, val1, val2): logger.warning("Cannot merge histograms with different buckets.") return val1 - def update(self, value): + def update(self, value, dropped_labels=None): with self._lock: if self.current is None: self.current = [0 for ii in range(len(self._boundaries) + 1)] # greater than max value if value >= self._boundaries[len(self._boundaries) - 1]: self.current[">"] += 1 + self.exemplar_manager.sample(value, dropped_labels, bucket_index=len(self._boundaries)) else: - for bb in self._boundaries: + for index, bb in enumerate(self._boundaries): # find first bucket that value is less than if value < bb: self.current[bb] += 1 + + self.exemplar_manager.sample(value, dropped_labels, bucket_index=index) break self.last_update_timestamp = time_ns() @@ -197,6 +224,9 @@ def take_checkpoint(self): with self._lock: self.checkpoint = self.current self.current = OrderedDict([(bb, 0) for bb in self._boundaries]) + + self.checkpoint_exemplars = self.exemplar_manager.take_checkpoint() + self.current[">"] = 0 def merge(self, other): @@ -205,6 +235,9 @@ def merge(self, other): self.checkpoint = self._merge_checkpoint( self.checkpoint, other.checkpoint ) + + self.checkpoint_exemplars = self.exemplar_manager.merge(self.checkpoint_exemplars, other.checkpoint_exemplars) + self.last_update_timestamp = get_latest_timestamp( self.last_update_timestamp, other.last_update_timestamp ) @@ -218,7 +251,7 @@ def __init__(self, config=None): self._lock = threading.Lock() self.last_update_timestamp = None - def update(self, value): + def update(self, value, dropped_labels=None): with self._lock: self.current = value self.last_update_timestamp = time_ns() @@ -245,19 +278,20 @@ class ValueObserverAggregator(Aggregator): def __init__(self, config=None): super().__init__(config=config) - self.mmsc = MinMaxSumCountAggregator() + self.mmsc = MinMaxSumCountAggregator(config=config) self.current = None self.checkpoint = self._TYPE(None, None, None, 0, None) self.last_update_timestamp = None - def update(self, value): - self.mmsc.update(value) + def update(self, value, dropped_labels=None): + self.mmsc.update(value, dropped_labels=dropped_labels) self.current = value self.last_update_timestamp = time_ns() def take_checkpoint(self): self.mmsc.take_checkpoint() self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (self.current,))) + self.checkpoint_exemplars = self.mmsc.checkpoint_exemplars def merge(self, other): if verify_type(self, other): @@ -269,6 +303,7 @@ def merge(self, other): if self.last_update_timestamp == other.last_update_timestamp: last = other.checkpoint.last self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (last,))) + self.checkpoint_exemplars = self.mmsc.checkpoint_exemplars def get_latest_timestamp(time_stamp, other_timestamp): diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py new file mode 100644 index 00000000000..9fc74cebe57 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py @@ -0,0 +1,277 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" + Exemplars are sample data points for aggregators. For more information, see the `spec `_ + + Every synchronous aggregator is instrumented with two exemplar recorders: + 1. A "semantic" exemplar sampler, which only samples exemplars if they have a sampled trace context (and can pick exemplars with other biases, ie min + max). + 2. A "statistical" exemplar sampler, which samples exemplars without bias (ie no preferenced for traced exemplars) + + To use an exemplar recorder, pass in two arguments to the aggregator config in views (see the "Exemplars" example for an example): + "num_exemplars": The number of exemplars to record (if applicable, in each bucket). Note that in non-statistical mode the recorder may not use "num_exemplars" + "statistical_exemplars": If exemplars should be recorded statistically + + For exemplars to be recorded, `num_exemplars` must be greater than 0. +""" + +import abc +import random +import itertools + +from opentelemetry.context import get_current +from opentelemetry.util import time_ns + +class Exemplar: + """ + A sample data point for an aggregator. Exemplars represent individual measurements recorded. + """ + def __init__(self, value, timestamp, dropped_labels=None, span_id=None, trace_id=None, sample_count=None): + self._value = value + self._timestamp = timestamp + self._span_id = span_id + self._trace_id = trace_id + self._sample_count = sample_count + self._dropped_labels = dropped_labels + + def __repr__(self): + return f"Exemplar(value={self._value}, timestamp={self._timestamp}, labels={dict(self._dropped_labels) if self._dropped_labels else None}, context={{'span_id':{self._span_id}, 'trace_id':{self._trace_id}}})" + + @property + def value(self): + """The current value of the Exemplar point""" + return self._value + + @property + def timestamp(self): + """The time that this Exemplar's value was recorded""" + return self._timestamp + + @property + def span_id(self): + """The span ID of the context when the exemplar was recorded""" + return self._span_id + + @property + def trace_id(self): + """The trace ID of the context when the exemplar was recorded""" + return self._trace_id + + @property + def dropped_labels(self): + """Labels that were dropped by the aggregator but still passed by the user""" + return self._dropped_labels + + @property + def sample_count(self): + """For statistical exemplars, how many measurements a single exemplar represents""" + return self._sample_count + + def set_sample_count(self, count): + self._sample_count = count + +class ExemplarSampler: + """ + Abstract class to sample exemplars through a stream of incoming measurements + """ + def __init__(self, k, statistical=False): + self._k = k + self._statistical = statistical + self._sample_set = list() + + @abc.abstractmethod + def sample(self, exemplar, **kwargs): + """ + Given an exemplar, determine if it should be sampled or not + """ + pass + + @property + @abc.abstractmethod + def sample_set(self): + """ + Return the list of exemplars that have been sampled + """ + pass + + @abc.abstractmethod + def merge(self, set1, set2): + """ + Given two lists of sampled exemplars, merge them while maintaining the invariants specified by this sampler + """ + pass + + @abc.abstractmethod + def reset(self): + """ + Reset the sampler + """ + pass + +class RandomExemplarSampler(ExemplarSampler): + """ + Randomly sample a set of k exemplars from a stream. Each measurement in the stream + will have an equal chance of being sampled. + + If RandomExemplarSampler` is specified to be statistical, it will add a sample_count to every exemplar it records. + This value will be equal to the number of measurements recorded per every exemplar measured - all exemplars will have the same sample_count value. + """ + def __init__(self, k, statistical=False): + super().__init__(k, statistical=statistical) + self.rand_count = 0 + + def sample(self, exemplar, **kwargs): + self.rand_count += 1 + + if len(self.sample_set) < self._k: + self.sample_set.append(exemplar) + return + + j = random.randint(0, self.rand_count-1) + + if j < self._k: + self.sample_set[j] = exemplar + + def merge(self, set1, set2): + combined = set1 + set2 + if len(combined) <= self._k: + return combined + else: + return random.sample(combined, self._k) + + @property + def sample_set(self): + if self._statistical: + for exemplar in self._sample_set: + exemplar.set_sample_count(self.rand_count / len(self._sample_set)) + return self._sample_set + + def reset(self): + self._sample_set = [] + self.rand_count = 0 + +class MinMaxExemplarSampler(ExemplarSampler): + """ + Sample the minimum and maximum measurements recorded only + """ + def __init__(self, k, statistical=False): + # K will always be 2 (min and max), and selecting min and max can never be statistical + super().__init__(2, statistical=False) + self._sample_set = [] + + def sample(self, exemplar, **kwargs): + self._sample_set = [min(self._sample_set + [exemplar], key=lambda exemplar: exemplar.value), max(self._sample_set + [exemplar], key=lambda exemplar: exemplar.value)] + if self._sample_set[0] == self._sample_set[1]: + self._sample_set = [self._sample_set[0]] + + @property + def sample_set(self): + return self._sample_set + + def merge(self, set1, set2): + merged_set = set1 + set2 + if len(merged_set) <= 2: + return sorted(merged_set, key=lambda exemplar: exemplar.value) + + return [min(merged_set), max(merged_set)] + + def reset(self): + self._sample_set = [] + +class BucketedExemplarSampler(ExemplarSampler): + """ + Randomly sample k exemplars for each bucket in the aggregator. + + If `BucketedExemplarSampler` is specified to be statistical, it will add a sample_count to every exemplar it records. + This value will be equal to `len(bucket.exemplars) / bucket.count`, that is the number of measurements each exemplar represents. + """ + def __init__(self, k, statistical=False, boundaries=None): + super().__init__(k) + self._boundaries = boundaries + self._sample_set = [RandomExemplarSampler(k, statistical=statistical) for _ in range(len(self._boundaries) + 1)] + + def sample(self, exemplar, **kwargs): + bucket_index = kwargs.get("bucket_index") + if bucket_index is None: + return + + self._sample_set[bucket_index].sample(exemplar) + + @property + def sample_set(self): + return list(itertools.chain.from_iterable([sampler.sample_set for sampler in self._sample_set])) + + def merge(self, set1, set2): + exemplar_set = [list() for _ in range(len(self._boundaries) + 1)] + for setx in [set1, set2]: + bucket_idx = 0 + for exemplar in setx: + if exemplar.value >= self._boundaries[-1]: + exemplar_set[-1].append(exemplar) + continue + + while exemplar.value >= self._boundaries[bucket_idx]: + bucket_idx += 1 + exemplar_set[bucket_idx].append(exemplar) + + for i, inner_set in enumerate(exemplar_set): + if len(inner_set) > self._k: + exemplar_set[i] = random.sample(inner_set, self._k) + return list(itertools.chain.from_iterable(exemplar_set)) + + def reset(self): + for sampler in self._sample_set: + sampler.reset() + +class ExemplarManager: + """ + Manages two different exemplar samplers: + 1. A "semantic" exemplar sampler, which only samples exemplars if they have a sampled trace context. + 2. A "statistical" exemplar sampler, which samples exemplars without bias (ie no preferenced for traced exemplars) + """ + + def __init__(self, config, default_exemplar_sampler, statistical_exemplar_sampler=None, **kwargs): + if config: + self.exemplars_count = config.get('num_exemplars', 0) + self.record_exemplars = self.exemplars_count > 0 + self.statistical_exemplars = config.get('statistical_exemplars', False) + if self.statistical_exemplars and statistical_exemplar_sampler: + self.exemplar_sampler = statistical_exemplar_sampler(self.exemplars_count, statistical=self.statistical_exemplars, **kwargs) + else: + self.exemplar_sampler = default_exemplar_sampler(self.exemplars_count, statistical=self.statistical_exemplars, **kwargs) + else: + self.record_exemplars = False + + def sample(self, value, dropped_labels, **kwargs): + context = get_current() + + is_sampled = 'current-span' in context and context['current-span'].get_context().trace_flags.sampled if context else False + + # if not statistical, we want to gather traced exemplars only - so otherwise don't sample + if self.record_exemplars and (is_sampled or self.statistical_exemplars): + span_id = context['current-span'].context.span_id if context else None + trace_id = context['current-span'].context.trace_id if context else None + self.exemplar_sampler.sample(Exemplar(value, time_ns(), dropped_labels, span_id, trace_id), **kwargs) + + def take_checkpoint(self): + if self.record_exemplars: + ret = self.exemplar_sampler.sample_set + self.exemplar_sampler.reset() + return ret + return [] + + def merge(self, checkpoint_exemplars, other_checkpoint_exemplars): + if self.record_exemplars: + return self.exemplar_sampler.merge(checkpoint_exemplars, other_checkpoint_exemplars) + return [] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/view.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/view.py index 0dd75c6887b..2f85c95573c 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/view.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/view.py @@ -39,12 +39,13 @@ class ViewData: - def __init__(self, labels: Tuple[Tuple[str, str]], aggregator: Aggregator): + def __init__(self, labels: Tuple[Tuple[str, str]], aggregator: Aggregator, dropped_labels: Tuple[Tuple[str, str]] = None): self.labels = labels self.aggregator = aggregator + self.dropped_labels = dropped_labels def record(self, value: ValueT): - self.aggregator.update(value) + self.aggregator.update(value, dropped_labels=self.dropped_labels) # Uniqueness is based on labels and aggregator type def __hash__(self): diff --git a/opentelemetry-sdk/tests/metrics/export/test_exemplars.py b/opentelemetry-sdk/tests/metrics/export/test_exemplars.py new file mode 100644 index 00000000000..95412d2b8e9 --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/export/test_exemplars.py @@ -0,0 +1,460 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from unittest.mock import patch +from time import time + +from opentelemetry.sdk.metrics.export.aggregate import ( + SumAggregator, + MinMaxSumCountAggregator, + HistogramAggregator, + Exemplar, + RandomExemplarSampler, + MinMaxExemplarSampler, + BucketedExemplarSampler, + ExemplarManager, + ValueObserverAggregator +) +from opentelemetry.sdk.metrics import ( + MeterProvider, + ValueRecorder, +) +from opentelemetry import trace, metrics +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.trace.sampling import ALWAYS_OFF, ALWAYS_ON + +from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import InMemoryMetricsExporter +from opentelemetry.sdk.metrics.view import View, ViewConfig +from opentelemetry.sdk.metrics.export.controller import PushController + +class TestRandomExemplarSampler(unittest.TestCase): + def test_sample(self): + sampler = RandomExemplarSampler(2, statistical=True) + exemplar1 = Exemplar(1, time()) + exemplar2 = Exemplar(2, time()) + exemplar3 = Exemplar(3, time()) + + sampler.sample(exemplar1) + self.assertEqual(len(sampler.sample_set), 1) + self.assertEqual(sampler.sample_set[0], exemplar1) + self.assertEqual(exemplar1.sample_count, 1) + + sampler.sample(exemplar2) + self.assertEqual(len(sampler.sample_set), 2) + self.assertEqual(sampler.sample_set[1], exemplar2) + self.assertEqual(exemplar1.sample_count, 1) + self.assertEqual(exemplar2.sample_count, 1) + + def _patched_randint(mn, mx): + return mn + + with patch("random.randint", _patched_randint): + sampler.sample(exemplar3) + self.assertEqual(len(sampler.sample_set), 2) + self.assertEqual(sampler.sample_set[0], exemplar3) + self.assertEqual(exemplar3.sample_count, 1.5) + self.assertEqual(exemplar2.sample_count, 1.5) + + def _patched_randint(mn, mx): + return 1 + + with patch("random.randint", _patched_randint): + sampler.sample(exemplar1) + self.assertEqual(len(sampler.sample_set), 2) + self.assertEqual(sampler.sample_set[1], exemplar1) + self.assertEqual(exemplar1.sample_count, 2) + + def test_reset(self): + sampler = RandomExemplarSampler(2) + exemplar1 = Exemplar(1, time()) + exemplar2 = Exemplar(2, time()) + + sampler.sample(exemplar1) + sampler.sample(exemplar2) + + sampler.reset() + self.assertEqual(len(sampler.sample_set), 0) + + sampler.sample(exemplar1) + self.assertEqual(len(sampler.sample_set), 1) + + def test_merge(self): + set1 = [1, 2, 3] + set2 = [4, 5, 6] + sampler = RandomExemplarSampler(6) + self.assertEqual(set1+set2, sampler.merge(set1, set2)) + sampler = RandomExemplarSampler(8) + self.assertEqual(set1+set2, sampler.merge(set1, set2)) + sampler = RandomExemplarSampler(4) + self.assertEqual(4, len(sampler.merge(set1, set2))) + + +class TestMinMaxExemplarSampler(unittest.TestCase): + def test_sample(self): + sampler = MinMaxExemplarSampler(2) + exemplar1 = Exemplar(1, time()) + exemplar2 = Exemplar(2, time()) + exemplar3 = Exemplar(3, time()) + + sampler.sample(exemplar1) + self.assertEqual(len(sampler.sample_set), 1) + self.assertEqual(sampler.sample_set[0], exemplar1) + + sampler.sample(exemplar2) + self.assertEqual(len(sampler.sample_set), 2) + self.assertEqual(sampler.sample_set[0], exemplar1) + self.assertEqual(sampler.sample_set[1], exemplar2) + + sampler.sample(exemplar3) + self.assertEqual(len(sampler.sample_set), 2) + self.assertEqual(sampler.sample_set[0], exemplar1) + self.assertEqual(sampler.sample_set[1], exemplar3) + + def test_reset(self): + sampler = MinMaxExemplarSampler(2) + exemplar1 = Exemplar(1, time()) + exemplar2 = Exemplar(2, time()) + + sampler.sample(exemplar1) + sampler.sample(exemplar2) + + sampler.reset() + self.assertEqual(len(sampler.sample_set), 0) + + sampler.sample(exemplar1) + self.assertEqual(len(sampler.sample_set), 1) + + def test_merge(self): + set1 = [1, 2, 3] + set2 = [4, 5, 6] + sampler = MinMaxExemplarSampler(2) + self.assertEqual([1, 6], sampler.merge(set1, set2)) + + +class TestBucketedExemplarSampler(unittest.TestCase): + def test_exemplars(self): + sampler = BucketedExemplarSampler(1, boundaries=[2, 4, 7], statistical=True) + sampler.sample(Exemplar(3, time()), bucket_index=1) + self.assertEqual(len(sampler.sample_set), 1) + self.assertEqual(sampler.sample_set[0].value, 3) + + sampler.sample(Exemplar(5, time()), bucket_index=2) + + self.assertEqual(len(sampler.sample_set), 2) + self.assertEqual(sampler.sample_set[1].value, 5) + self.assertEqual(sampler.sample_set[1].sample_count, 1) + + def _patched_randint(mn, mx): + return 0 + + with patch("random.randint", _patched_randint): + sampler.sample(Exemplar(6, time()), bucket_index=2) + + self.assertEqual(len(sampler.sample_set), 2) + self.assertEqual(sampler.sample_set[1].value, 6) + self.assertEqual(sampler.sample_set[1].sample_count, 2) + + sampler.sample(Exemplar(1, time()), bucket_index=0) + sampler.sample(Exemplar(9, time()), bucket_index=3) + + self.assertEqual(len(sampler.sample_set), 4) + self.assertEqual(sampler.sample_set[0].sample_count, 1) + self.assertEqual(sampler.sample_set[1].sample_count, 1) + self.assertEqual(sampler.sample_set[2].sample_count, 2) + self.assertEqual(sampler.sample_set[3].sample_count, 1) + + def test_merge(self): + sampler = BucketedExemplarSampler(1, boundaries=[3, 4, 6]) + + self.assertEqual(len(sampler.merge([Exemplar(1, time())], [Exemplar(2, time())])), 1) + + self.assertEqual(len(sampler.merge([Exemplar(1, time()), Exemplar(5, time())], [Exemplar(2, time())])), 2) + + +class TestExemplarManager(unittest.TestCase): + def test_statistical(self): + config = {"statistical_exemplars": True, "num_exemplars": 1} + manager = ExemplarManager(config, MinMaxExemplarSampler, RandomExemplarSampler) + self.assertIsInstance(manager.exemplar_sampler, RandomExemplarSampler) + manager.sample(5, {"dropped_label": "value"}) + self.assertEqual(len(manager.exemplar_sampler.sample_set), 1) + self.assertEqual(manager.exemplar_sampler.sample_set[0].value, 5) + self.assertEqual(manager.exemplar_sampler.sample_set[0].dropped_labels, {"dropped_label": "value"}) + + checkpoint = manager.take_checkpoint() + self.assertEqual(len(checkpoint), 1) + self.assertEqual(checkpoint[0].value, 5) + + self.assertEqual(len(manager.exemplar_sampler.sample_set), 0) + + merged = manager.merge([Exemplar(2, time())], [Exemplar(3, time())]) + self.assertEqual(len(merged), 1) + + def test_semantic(self): + config = {"statistical_exemplars": True, "num_exemplars": 1} + manager = ExemplarManager(config, MinMaxExemplarSampler, RandomExemplarSampler) + self.assertIsInstance(manager.exemplar_sampler, RandomExemplarSampler) + manager.sample(5, {}) + self.assertEqual(len(manager.exemplar_sampler.sample_set), 1) + self.assertEqual(manager.exemplar_sampler.sample_set[0].value, 5) + + checkpoint = manager.take_checkpoint() + self.assertEqual(len(checkpoint), 1) + self.assertEqual(checkpoint[0].value, 5) + + self.assertEqual(len(manager.exemplar_sampler.sample_set), 0) + + merged = manager.merge([Exemplar(2, time())], [Exemplar(3, time())]) + self.assertEqual(len(merged), 1) + + +class TestStandardExemplars(unittest.TestCase): + def _no_exemplars_test(self, aggregator): + config = {} + agg = aggregator(config=config) + agg.update(3) + agg.update(5) + agg.take_checkpoint() + self.assertEqual(agg.checkpoint_exemplars, []) + + other_agg = aggregator(config={"num_exemplars": 2, "statistical_exemplars": True}) + other_agg.update(2) + other_agg.update(4) + other_agg.take_checkpoint() + self.assertEqual(len(other_agg.checkpoint_exemplars), 2) + agg.merge(other_agg) + self.assertEqual(agg.checkpoint_exemplars, []) + + def _simple_exemplars_test(self, aggregator): + config = {"num_exemplars": 2, "statistical_exemplars": True} + agg = aggregator(config=config) + agg.update(2, dropped_labels={"dropped_label": "value"}) + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 1) + self.assertEqual(agg.checkpoint_exemplars[0].value, 2) + self.assertEqual(agg.checkpoint_exemplars[0].dropped_labels, {"dropped_label": "value"}) + + agg.update(2) + agg.update(5) + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 2) + self.assertEqual(agg.checkpoint_exemplars[1].value, 5) + + agg.update(2) + agg.update(5) + + def _patched_randint(mn, mx): + return 1 + with patch("random.randint", _patched_randint): + agg.update(7) + + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 2) + self.assertEqual(agg.checkpoint_exemplars[0].value, 2) + self.assertEqual(agg.checkpoint_exemplars[1].value, 7) + + def _record_traces_only_test(self, aggregator): + config = {"num_exemplars": 2} + agg = aggregator(config=config) + + agg.update(2) + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 0) + + # Test with sampler on/off + tp = TracerProvider(sampler=ALWAYS_ON) + tracer = tp.get_tracer(__name__) + + span = tracer.start_span("Test Span ON") + with tracer.use_span(span): + agg.update(5) + agg.update(7) + agg.update(6) + + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 2) + + self.assertEqual(agg.checkpoint_exemplars[0].span_id, span.context.span_id) + self.assertEqual(agg.checkpoint_exemplars[0].value, 5) + self.assertEqual(agg.checkpoint_exemplars[1].value, 7) + + tp = TracerProvider(sampler=ALWAYS_OFF) + tracer = tp.get_tracer(__name__) + + with tracer.start_as_current_span("Test Span OFF"): + agg.update(5) + + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 0) + + def _merge_aggregators_test(self, aggregator): + config = {"num_exemplars": 2, "statistical_exemplars": True} + + agg1 = aggregator(config=config) + agg2 = aggregator(config=config) + + agg1.update(1) + agg1.take_checkpoint() + + agg2.update(2) + agg2.take_checkpoint() + + self.assertEqual(len(agg1.checkpoint_exemplars), 1) + self.assertEqual(len(agg2.checkpoint_exemplars), 1) + + agg1.merge(agg2) + + self.assertEqual(len(agg1.checkpoint_exemplars), 2) + + def test_sum_aggregator(self): + self._no_exemplars_test(SumAggregator) + self._simple_exemplars_test(SumAggregator) + self._record_traces_only_test(SumAggregator) + self._merge_aggregators_test(SumAggregator) + + def test_mmsc_aggregator(self): + self._no_exemplars_test(MinMaxSumCountAggregator) + self._simple_exemplars_test(MinMaxSumCountAggregator) + self._record_traces_only_test(MinMaxSumCountAggregator) + self._merge_aggregators_test(MinMaxSumCountAggregator) + + def test_observer_aggregator(self): + self._no_exemplars_test(ValueObserverAggregator) + self._simple_exemplars_test(ValueObserverAggregator) + self._record_traces_only_test(ValueObserverAggregator) + self._merge_aggregators_test(ValueObserverAggregator) + + +class TestHistogramExemplars(unittest.TestCase): + def test_no_exemplars(self): + config = {"bounds": [2, 4, 6]} + agg = HistogramAggregator(config=config) + agg.update(3) + agg.update(5) + agg.take_checkpoint() + self.assertEqual(agg.checkpoint_exemplars, []) + + other_agg = HistogramAggregator(config=dict(config, **{"num_exemplars": 1, "statistical_exemplars": True})) + + other_agg.update(3) + other_agg.update(5) + other_agg.take_checkpoint() + self.assertEqual(len(other_agg.checkpoint_exemplars), 2) + + agg.merge(other_agg) + self.assertEqual(agg.checkpoint_exemplars, []) + + def test_simple_exemplars(self): + config = {"bounds": [2, 4, 7], "num_exemplars": 1, "statistical_exemplars": True} + agg = HistogramAggregator(config=config) + agg.update(2, dropped_labels={"dropped_label": "value"}) + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 1) + self.assertEqual(agg.checkpoint_exemplars[0].value, 2) + self.assertEqual(agg.checkpoint_exemplars[0].dropped_labels, {"dropped_label": "value"}) + + agg.update(2) + agg.update(5) + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 2) + self.assertEqual(agg.checkpoint_exemplars[1].value, 5) + + agg.update(5) + + def _patched_randint(mn, mx): + return 0 + + with patch("random.randint", _patched_randint): + agg.update(6) + + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 1) + self.assertEqual(agg.checkpoint_exemplars[0].value, 6) + + agg.update(1) + agg.update(3) + agg.update(6) + agg.update(9) + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 4) + + def test_record_traces_only(self): + config = {"bounds": [2, 4, 6], "num_exemplars": 2, "statistical_exemplars": False} + agg = HistogramAggregator(config=config) + + agg.update(2) + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 0) + + # Test with sampler on/off + tp = TracerProvider(sampler=ALWAYS_ON) + tracer = tp.get_tracer(__name__) + + span = tracer.start_span("Test Span ON") + with tracer.use_span(span): + agg.update(5) + + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 1) + + self.assertEqual(agg.checkpoint_exemplars[0].span_id, span.context.span_id) + + tp = TracerProvider(sampler=ALWAYS_OFF) + tracer = tp.get_tracer(__name__) + + with tracer.start_as_current_span("Test Span OFF"): + agg.update(5) + + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 0) + +class TestFullPipelineExemplars(unittest.TestCase): + def test_histogram(self): + # Use the meter type provided by the SDK package + metrics.set_meter_provider(MeterProvider()) + meter = metrics.get_meter(__name__) + exporter = InMemoryMetricsExporter() + controller = PushController(meter, exporter, 5) + + requests_size = meter.create_metric( + name="requests_size", + description="size of requests", + unit="1", + value_type=int, + metric_type=ValueRecorder, + ) + + size_view = View( + requests_size, + HistogramAggregator(config={"bounds": [20, 40, 60, 80, 100], "num_exemplars": 1, "statistical_exemplars": True}), + label_keys=["environment"], + config=ViewConfig.LABEL_KEYS, + ) + + meter.register_view(size_view) + + # Since this is using the HistogramAggregator, the bucket counts will be reflected + # with each record + requests_size.record(25, {"environment": "staging", "test": "value"}) + requests_size.record(1, {"environment": "staging", "test": "value2"}) + requests_size.record(200, {"environment": "staging", "test": "value3"}) + + controller.tick() + metrics_list = exporter.get_exported_metrics() + self.assertEqual(len(metrics_list), 1) + exemplars = metrics_list[0].aggregator.checkpoint_exemplars + self.assertEqual(len(exemplars), 3) + self.assertEqual([(exemplar.value, exemplar.dropped_labels) for exemplar in exemplars], + [(1, (("test", "value2"),)), (25, (("test", "value"),)), (200, (("test", "value3"),))]) From c2727b4ee65089498832eba533baaea6a89fca63 Mon Sep 17 00:00:00 2001 From: Connor Adams Date: Thu, 23 Jul 2020 17:11:18 -0400 Subject: [PATCH 2/6] linting --- docs/examples/exemplars/semantic_exemplars.py | 38 +++-- .../exemplars/statistical_exemplars.py | 90 ++++++++--- .../src/opentelemetry/sdk/metrics/__init__.py | 2 +- .../sdk/metrics/export/__init__.py | 2 +- .../sdk/metrics/export/aggregate.py | 46 ++++-- .../sdk/metrics/export/exemplars.py | 142 ++++++++++++----- .../src/opentelemetry/sdk/metrics/view.py | 15 +- .../tests/metrics/export/test_exemplars.py | 147 +++++++++++++----- .../tests/metrics/export/test_export.py | 2 +- .../tests/metrics/test_metrics.py | 4 +- 10 files changed, 358 insertions(+), 130 deletions(-) diff --git a/docs/examples/exemplars/semantic_exemplars.py b/docs/examples/exemplars/semantic_exemplars.py index 5d14dd3bea5..bf80ede1f6b 100644 --- a/docs/examples/exemplars/semantic_exemplars.py +++ b/docs/examples/exemplars/semantic_exemplars.py @@ -20,14 +20,9 @@ import time from opentelemetry import metrics -from opentelemetry.sdk.metrics import ( - MeterProvider, - ValueRecorder, -) +from opentelemetry.sdk.metrics import MeterProvider, ValueRecorder from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter -from opentelemetry.sdk.metrics.export.aggregate import ( - HistogramAggregator, -) +from opentelemetry.sdk.metrics.export.aggregate import HistogramAggregator from opentelemetry.sdk.metrics.view import View, ViewConfig # Set up OpenTelemetry metrics @@ -35,7 +30,9 @@ meter = metrics.get_meter(__name__) # Use the Google Cloud Monitoring Metrics Exporter since its the only one that currently supports exemplars -metrics.get_meter_provider().start_pipeline(meter, ConsoleMetricsExporter(), 10) +metrics.get_meter_provider().start_pipeline( + meter, ConsoleMetricsExporter(), 10 +) # Create our duration metric request_duration = meter.create_metric( @@ -54,8 +51,24 @@ # We want to generate 1 exemplar per bucket, where each exemplar has a linked trace that was recorded. # So we need to set num_exemplars to 1 and not specify statistical_exemplars (defaults to false) HistogramAggregator, - aggregator_config={"bounds": [0, 25, 50, 75, 100, 200, 400, 600, 800, 1000, 2000, 4000, 6000], - "num_exemplars": 1}, + aggregator_config={ + "bounds": [ + 0, + 25, + 50, + 75, + 100, + 200, + 400, + 600, + 800, + 1000, + 2000, + 4000, + 6000, + ], + "num_exemplars": 1, + }, label_keys=["environment"], view_config=ViewConfig.LABEL_KEYS, ) @@ -64,5 +77,8 @@ for i in range(100): # Generate some random data for the histogram with a dropped label "customer_id" - request_duration.record(random.randint(1, 8000), {"environment": "staging", "customer_id": random.randint(1, 100)}) + request_duration.record( + random.randint(1, 8000), + {"environment": "staging", "customer_id": random.randint(1, 100)}, + ) time.sleep(1) diff --git a/docs/examples/exemplars/statistical_exemplars.py b/docs/examples/exemplars/statistical_exemplars.py index 353e516cb4e..25fd29e82ba 100644 --- a/docs/examples/exemplars/statistical_exemplars.py +++ b/docs/examples/exemplars/statistical_exemplars.py @@ -1,17 +1,18 @@ -import numpy as np -import matplotlib.pyplot as plt import random - from collections import defaultdict +import matplotlib.pyplot as plt +import numpy as np from opentelemetry import metrics from opentelemetry.sdk.metrics import Counter, MeterProvider from opentelemetry.sdk.metrics.export.aggregate import SumAggregator from opentelemetry.sdk.metrics.export.controller import PushController -from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import InMemoryMetricsExporter +from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import ( + InMemoryMetricsExporter, +) from opentelemetry.sdk.metrics.view import View, ViewConfig -## set up opentelemetry +# set up opentelemetry # Sets the global MeterProvider instance metrics.set_meter_provider(MeterProvider()) @@ -47,7 +48,8 @@ meter.register_view(counter_view) -## generate the random metric data +# generate the random metric data + def unknown_customer_calls(): """Generate customer call data to our application""" @@ -58,23 +60,49 @@ def unknown_customer_calls(): random.seed(1) # customer 123 is a big user, and made 1000 requests in this timeframe - requests = np.random.normal(1000, 250, 1000) # 1000 requests with average 1000 bytes, covariance 100 + requests = np.random.normal( + 1000, 250, 1000 + ) # 1000 requests with average 1000 bytes, covariance 100 for request in requests: - bytes_counter.add(int(request), {"environment": "production", "method": "REST", "customer_id": 123}) + bytes_counter.add( + int(request), + { + "environment": "production", + "method": "REST", + "customer_id": 123, + }, + ) # customer 247 is another big user, making fewer, but bigger requests - requests = np.random.normal(5000, 1250, 200) # 200 requests with average size of 5k bytes + requests = np.random.normal( + 5000, 1250, 200 + ) # 200 requests with average size of 5k bytes for request in requests: - bytes_counter.add(int(request), {"environment": "production", "method": "REST", "customer_id": 247}) + bytes_counter.add( + int(request), + { + "environment": "production", + "method": "REST", + "customer_id": 247, + }, + ) # There are many other smaller customers for customer_id in range(250): requests = np.random.normal(1000, 250, np.random.randint(1, 10)) method = "REST" if np.random.randint(2) else "gRPC" for request in requests: - bytes_counter.add(int(request), {"environment": "production", "method": method, "customer_id": customer_id}) + bytes_counter.add( + int(request), + { + "environment": "production", + "method": method, + "customer_id": customer_id, + }, + ) + unknown_customer_calls() @@ -94,10 +122,15 @@ def unknown_customer_calls(): customer_bytes_map[exemplar.dropped_labels] += exemplar.value -customer_bytes_list = sorted(list(customer_bytes_map.items()), key=lambda t: t[1], reverse=True) +customer_bytes_list = sorted( + list(customer_bytes_map.items()), key=lambda t: t[1], reverse=True +) # Save our top 5 customers and sum all of the rest into "Others". -top_5_customers = [("Customer {}".format(dict(val[0])["customer_id"]), val[1]) for val in customer_bytes_list[:5]] + [("Other Customers", sum([val[1] for val in customer_bytes_list[5:]]))] +top_5_customers = [ + ("Customer {}".format(dict(val[0])["customer_id"]), val[1]) + for val in customer_bytes_list[:5] +] + [("Other Customers", sum([val[1] for val in customer_bytes_list[5:]]))] # unzip the data into X (sizes of each customer's contribution) and labels labels, X = zip(*top_5_customers) @@ -107,7 +140,9 @@ def unknown_customer_calls(): plt.show() # Estimate how many bytes customer 123 sent -customer_123_bytes = customer_bytes_map[(("customer_id", 123), ("method", "REST"))] +customer_123_bytes = customer_bytes_map[ + (("customer_id", 123), ("method", "REST")) +] # Since the exemplars were randomly sampled, all sample_counts will be the same sample_count = exemplars[0].sample_count @@ -115,18 +150,35 @@ def unknown_customer_calls(): full_customer_123_bytes = sample_count * customer_123_bytes # With seed == 1 we get 1008612 - quite close to the statistical mean of 1000000! (more exemplars would make this estimation even more accurate) -print("Customer 123 sent about {} bytes this interval".format(int(full_customer_123_bytes))) +print( + "Customer 123 sent about {} bytes this interval".format( + int(full_customer_123_bytes) + ) +) # Determine the top 25 customers by how many bytes they sent in exemplars top_25_customers = customer_bytes_list[:25] # out of those 25 customers, determine how many used grpc, and come up with a ratio -percent_grpc = len(list(filter(lambda customer_value: customer_value[0][1][1] == "gRPC", top_25_customers))) / len(top_25_customers) - -print("~{}% of the top 25 customers (by bytes in) used gRPC this interval".format(int(percent_grpc*100))) +percent_grpc = len( + list( + filter( + lambda customer_value: customer_value[0][1][1] == "gRPC", + top_25_customers, + ) + ) +) / len(top_25_customers) + +print( + "~{}% of the top 25 customers (by bytes in) used gRPC this interval".format( + int(percent_grpc * 100) + ) +) # Determine the 50th, 90th, and 99th percentile of byte size sent in -quantiles = np.quantile([exemplar.value for exemplar in exemplars], [0.5, 0.9, 0.99]) +quantiles = np.quantile( + [exemplar.value for exemplar in exemplars], [0.5, 0.9, 0.99] +) print("50th Percentile Bytes In:", int(quantiles[0])) print("90th Percentile Bytes In:", int(quantiles[1])) print("99th Percentile Bytes In:", int(quantiles[2])) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py index 2af8a551ee1..9bad705b9c2 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py @@ -75,7 +75,7 @@ def update(self, value: metrics_api.ValueT): with self._view_datas_lock: # record the value for each view_data belonging to this aggregator for view_data in self.view_datas: - view_data.record(value) + view_data.record(value, self._labels) def release(self): self.decrease_ref_count() diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py index 8abada0a3c7..ddd08df13c8 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py @@ -82,7 +82,7 @@ def export( record.instrument, record.labels, record.aggregator.checkpoint, - record.aggregator.checkpoint_exemplars + record.aggregator.checkpoint_exemplars, ) ) return MetricsExportResult.SUCCESS diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index a0a8db346bc..998ee23b358 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -16,17 +16,14 @@ import logging import threading from collections import OrderedDict, namedtuple -import itertools -from collections import namedtuple, OrderedDict -from opentelemetry.util import time_ns from opentelemetry.sdk.metrics.export.exemplars import ( - Exemplar, - RandomExemplarSampler, - MinMaxExemplarSampler, BucketedExemplarSampler, - ExemplarManager + ExemplarManager, + MinMaxExemplarSampler, + RandomExemplarSampler, ) +from opentelemetry.util import time_ns logger = logging.getLogger(__name__) @@ -69,7 +66,9 @@ def __init__(self, config=None): self.checkpoint = 0 self._lock = threading.Lock() self.last_update_timestamp = None - self.exemplar_manager = ExemplarManager(config, MinMaxExemplarSampler, RandomExemplarSampler) + self.exemplar_manager = ExemplarManager( + config, MinMaxExemplarSampler, RandomExemplarSampler + ) def update(self, value, dropped_labels=None): with self._lock: @@ -91,7 +90,9 @@ def merge(self, other): self.last_update_timestamp = get_latest_timestamp( self.last_update_timestamp, other.last_update_timestamp ) - self.checkpoint_exemplars = self.exemplar_manager.merge(self.checkpoint_exemplars, other.checkpoint_exemplars) + self.checkpoint_exemplars = self.exemplar_manager.merge( + self.checkpoint_exemplars, other.checkpoint_exemplars + ) class MinMaxSumCountAggregator(Aggregator): @@ -120,7 +121,9 @@ def __init__(self, config=None): self._lock = threading.Lock() self.last_update_timestamp = None - self.exemplar_manager = ExemplarManager(config, MinMaxExemplarSampler, RandomExemplarSampler) + self.exemplar_manager = ExemplarManager( + config, MinMaxExemplarSampler, RandomExemplarSampler + ) def update(self, value, dropped_labels=None): with self._lock: @@ -153,7 +156,9 @@ def merge(self, other): self.last_update_timestamp = get_latest_timestamp( self.last_update_timestamp, other.last_update_timestamp ) - self.checkpoint_exemplars = self.exemplar_manager.merge(self.checkpoint_exemplars, other.checkpoint_exemplars) + self.checkpoint_exemplars = self.exemplar_manager.merge( + self.checkpoint_exemplars, other.checkpoint_exemplars + ) class HistogramAggregator(Aggregator): @@ -173,7 +178,12 @@ def __init__(self, config=None): self.current = OrderedDict([(bb, 0) for bb in self._boundaries]) self.checkpoint = OrderedDict([(bb, 0) for bb in self._boundaries]) - self.exemplar_manager = ExemplarManager(config, BucketedExemplarSampler, BucketedExemplarSampler, boundaries=self._boundaries) + self.exemplar_manager = ExemplarManager( + config, + BucketedExemplarSampler, + BucketedExemplarSampler, + boundaries=self._boundaries, + ) self.current[">"] = 0 self.checkpoint[">"] = 0 @@ -209,14 +219,18 @@ def update(self, value, dropped_labels=None): # greater than max value if value >= self._boundaries[len(self._boundaries) - 1]: self.current[">"] += 1 - self.exemplar_manager.sample(value, dropped_labels, bucket_index=len(self._boundaries)) + self.exemplar_manager.sample( + value, dropped_labels, bucket_index=len(self._boundaries) + ) else: for index, bb in enumerate(self._boundaries): # find first bucket that value is less than if value < bb: self.current[bb] += 1 - self.exemplar_manager.sample(value, dropped_labels, bucket_index=index) + self.exemplar_manager.sample( + value, dropped_labels, bucket_index=index + ) break self.last_update_timestamp = time_ns() @@ -236,7 +250,9 @@ def merge(self, other): self.checkpoint, other.checkpoint ) - self.checkpoint_exemplars = self.exemplar_manager.merge(self.checkpoint_exemplars, other.checkpoint_exemplars) + self.checkpoint_exemplars = self.exemplar_manager.merge( + self.checkpoint_exemplars, other.checkpoint_exemplars + ) self.last_update_timestamp = get_latest_timestamp( self.last_update_timestamp, other.last_update_timestamp diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py index 9fc74cebe57..9eb82a5aed8 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py @@ -27,17 +27,27 @@ """ import abc -import random import itertools +import random from opentelemetry.context import get_current from opentelemetry.util import time_ns + class Exemplar: """ A sample data point for an aggregator. Exemplars represent individual measurements recorded. """ - def __init__(self, value, timestamp, dropped_labels=None, span_id=None, trace_id=None, sample_count=None): + + def __init__( + self, + value, + timestamp, + dropped_labels=None, + span_id=None, + trace_id=None, + sample_count=None, + ): self._value = value self._timestamp = timestamp self._span_id = span_id @@ -46,7 +56,13 @@ def __init__(self, value, timestamp, dropped_labels=None, span_id=None, trace_id self._dropped_labels = dropped_labels def __repr__(self): - return f"Exemplar(value={self._value}, timestamp={self._timestamp}, labels={dict(self._dropped_labels) if self._dropped_labels else None}, context={{'span_id':{self._span_id}, 'trace_id':{self._trace_id}}})" + return "Exemplar(value={}, timestamp={}, labels={}, context={{'span_id':{}, 'trace_id':{}}})".format( + self._value, + self._timestamp, + dict(self._dropped_labels) if self._dropped_labels else None, + self._span_id, + self._trace_id, + ) @property def value(self): @@ -67,24 +83,26 @@ def span_id(self): def trace_id(self): """The trace ID of the context when the exemplar was recorded""" return self._trace_id - + @property def dropped_labels(self): """Labels that were dropped by the aggregator but still passed by the user""" return self._dropped_labels - + @property def sample_count(self): """For statistical exemplars, how many measurements a single exemplar represents""" return self._sample_count - + def set_sample_count(self, count): self._sample_count = count + class ExemplarSampler: """ Abstract class to sample exemplars through a stream of incoming measurements """ + def __init__(self, k, statistical=False): self._k = k self._statistical = statistical @@ -95,7 +113,6 @@ def sample(self, exemplar, **kwargs): """ Given an exemplar, determine if it should be sampled or not """ - pass @property @abc.abstractmethod @@ -103,21 +120,19 @@ def sample_set(self): """ Return the list of exemplars that have been sampled """ - pass @abc.abstractmethod def merge(self, set1, set2): """ Given two lists of sampled exemplars, merge them while maintaining the invariants specified by this sampler """ - pass @abc.abstractmethod def reset(self): """ Reset the sampler """ - pass + class RandomExemplarSampler(ExemplarSampler): """ @@ -127,6 +142,7 @@ class RandomExemplarSampler(ExemplarSampler): If RandomExemplarSampler` is specified to be statistical, it will add a sample_count to every exemplar it records. This value will be equal to the number of measurements recorded per every exemplar measured - all exemplars will have the same sample_count value. """ + def __init__(self, k, statistical=False): super().__init__(k, statistical=statistical) self.rand_count = 0 @@ -138,40 +154,52 @@ def sample(self, exemplar, **kwargs): self.sample_set.append(exemplar) return - j = random.randint(0, self.rand_count-1) + replace_index = random.randint(0, self.rand_count - 1) - if j < self._k: - self.sample_set[j] = exemplar + if replace_index < self._k: + self.sample_set[replace_index] = exemplar def merge(self, set1, set2): combined = set1 + set2 if len(combined) <= self._k: return combined - else: - return random.sample(combined, self._k) + return random.sample(combined, self._k) @property def sample_set(self): if self._statistical: for exemplar in self._sample_set: - exemplar.set_sample_count(self.rand_count / len(self._sample_set)) + exemplar.set_sample_count( + self.rand_count / len(self._sample_set) + ) return self._sample_set def reset(self): self._sample_set = [] self.rand_count = 0 + class MinMaxExemplarSampler(ExemplarSampler): """ Sample the minimum and maximum measurements recorded only """ + def __init__(self, k, statistical=False): # K will always be 2 (min and max), and selecting min and max can never be statistical super().__init__(2, statistical=False) self._sample_set = [] def sample(self, exemplar, **kwargs): - self._sample_set = [min(self._sample_set + [exemplar], key=lambda exemplar: exemplar.value), max(self._sample_set + [exemplar], key=lambda exemplar: exemplar.value)] + self._sample_set = [ + min( + self._sample_set + [exemplar], + key=lambda exemplar: exemplar.value, + ), + max( + self._sample_set + [exemplar], + key=lambda exemplar: exemplar.value, + ), + ] if self._sample_set[0] == self._sample_set[1]: self._sample_set = [self._sample_set[0]] @@ -189,6 +217,7 @@ def merge(self, set1, set2): def reset(self): self._sample_set = [] + class BucketedExemplarSampler(ExemplarSampler): """ Randomly sample k exemplars for each bucket in the aggregator. @@ -196,10 +225,14 @@ class BucketedExemplarSampler(ExemplarSampler): If `BucketedExemplarSampler` is specified to be statistical, it will add a sample_count to every exemplar it records. This value will be equal to `len(bucket.exemplars) / bucket.count`, that is the number of measurements each exemplar represents. """ + def __init__(self, k, statistical=False, boundaries=None): super().__init__(k) self._boundaries = boundaries - self._sample_set = [RandomExemplarSampler(k, statistical=statistical) for _ in range(len(self._boundaries) + 1)] + self._sample_set = [ + RandomExemplarSampler(k, statistical=statistical) + for _ in range(len(self._boundaries) + 1) + ] def sample(self, exemplar, **kwargs): bucket_index = kwargs.get("bucket_index") @@ -210,10 +243,15 @@ def sample(self, exemplar, **kwargs): @property def sample_set(self): - return list(itertools.chain.from_iterable([sampler.sample_set for sampler in self._sample_set])) + return list( + itertools.chain.from_iterable( + [sampler.sample_set for sampler in self._sample_set] + ) + ) def merge(self, set1, set2): exemplar_set = [list() for _ in range(len(self._boundaries) + 1)] + # Sort both sets back into buckets for setx in [set1, set2]: bucket_idx = 0 for exemplar in setx: @@ -224,16 +262,18 @@ def merge(self, set1, set2): while exemplar.value >= self._boundaries[bucket_idx]: bucket_idx += 1 exemplar_set[bucket_idx].append(exemplar) - - for i, inner_set in enumerate(exemplar_set): + + # Pick only k exemplars for every bucket + for index, inner_set in enumerate(exemplar_set): if len(inner_set) > self._k: - exemplar_set[i] = random.sample(inner_set, self._k) + exemplar_set[index] = random.sample(inner_set, self._k) return list(itertools.chain.from_iterable(exemplar_set)) - + def reset(self): for sampler in self._sample_set: sampler.reset() + class ExemplarManager: """ Manages two different exemplar samplers: @@ -241,28 +281,58 @@ class ExemplarManager: 2. A "statistical" exemplar sampler, which samples exemplars without bias (ie no preferenced for traced exemplars) """ - def __init__(self, config, default_exemplar_sampler, statistical_exemplar_sampler=None, **kwargs): + def __init__( + self, + config, + default_exemplar_sampler, + statistical_exemplar_sampler, + **kwargs + ): if config: - self.exemplars_count = config.get('num_exemplars', 0) + self.exemplars_count = config.get("num_exemplars", 0) self.record_exemplars = self.exemplars_count > 0 - self.statistical_exemplars = config.get('statistical_exemplars', False) - if self.statistical_exemplars and statistical_exemplar_sampler: - self.exemplar_sampler = statistical_exemplar_sampler(self.exemplars_count, statistical=self.statistical_exemplars, **kwargs) + self.statistical_exemplars = config.get( + "statistical_exemplars", False + ) + if self.statistical_exemplars: + self.exemplar_sampler = statistical_exemplar_sampler( + self.exemplars_count, + statistical=self.statistical_exemplars, + **kwargs + ) else: - self.exemplar_sampler = default_exemplar_sampler(self.exemplars_count, statistical=self.statistical_exemplars, **kwargs) + self.exemplar_sampler = default_exemplar_sampler( + self.exemplars_count, + statistical=self.statistical_exemplars, + **kwargs + ) else: self.record_exemplars = False def sample(self, value, dropped_labels, **kwargs): context = get_current() - is_sampled = 'current-span' in context and context['current-span'].get_context().trace_flags.sampled if context else False + is_sampled = ( + "current-span" in context + and context["current-span"].get_context().trace_flags.sampled + if context + else False + ) # if not statistical, we want to gather traced exemplars only - so otherwise don't sample - if self.record_exemplars and (is_sampled or self.statistical_exemplars): - span_id = context['current-span'].context.span_id if context else None - trace_id = context['current-span'].context.trace_id if context else None - self.exemplar_sampler.sample(Exemplar(value, time_ns(), dropped_labels, span_id, trace_id), **kwargs) + if self.record_exemplars and ( + is_sampled or self.statistical_exemplars + ): + span_id = ( + context["current-span"].context.span_id if context else None + ) + trace_id = ( + context["current-span"].context.trace_id if context else None + ) + self.exemplar_sampler.sample( + Exemplar(value, time_ns(), dropped_labels, span_id, trace_id), + **kwargs + ) def take_checkpoint(self): if self.record_exemplars: @@ -273,5 +343,7 @@ def take_checkpoint(self): def merge(self, checkpoint_exemplars, other_checkpoint_exemplars): if self.record_exemplars: - return self.exemplar_sampler.merge(checkpoint_exemplars, other_checkpoint_exemplars) + return self.exemplar_sampler.merge( + checkpoint_exemplars, other_checkpoint_exemplars + ) return [] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/view.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/view.py index 2f85c95573c..ec1e9df6f71 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/view.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/view.py @@ -39,13 +39,20 @@ class ViewData: - def __init__(self, labels: Tuple[Tuple[str, str]], aggregator: Aggregator, dropped_labels: Tuple[Tuple[str, str]] = None): + def __init__( + self, labels: Tuple[Tuple[str, str]], aggregator: Aggregator, + ): self.labels = labels self.aggregator = aggregator - self.dropped_labels = dropped_labels - def record(self, value: ValueT): - self.aggregator.update(value, dropped_labels=self.dropped_labels) + def record(self, value: ValueT, all_labels: Tuple[Tuple[str, str]]): + label_dict = dict(self.labels) + self.aggregator.update( + value, + dropped_labels=tuple( + filter(lambda label: label[0] not in label_dict, all_labels) + ), + ) # Uniqueness is based on labels and aggregator type def __hash__(self): diff --git a/opentelemetry-sdk/tests/metrics/export/test_exemplars.py b/opentelemetry-sdk/tests/metrics/export/test_exemplars.py index 95412d2b8e9..e80dee1a832 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_exemplars.py +++ b/opentelemetry-sdk/tests/metrics/export/test_exemplars.py @@ -13,31 +13,32 @@ # limitations under the License. import unittest -from unittest.mock import patch from time import time +from unittest.mock import patch +from opentelemetry import metrics +from opentelemetry.sdk.metrics import MeterProvider, ValueRecorder from opentelemetry.sdk.metrics.export.aggregate import ( - SumAggregator, - MinMaxSumCountAggregator, HistogramAggregator, - Exemplar, - RandomExemplarSampler, MinMaxExemplarSampler, + MinMaxSumCountAggregator, + SumAggregator, + ValueObserverAggregator, +) +from opentelemetry.sdk.metrics.export.controller import PushController +from opentelemetry.sdk.metrics.export.exemplars import ( BucketedExemplarSampler, + Exemplar, ExemplarManager, - ValueObserverAggregator + RandomExemplarSampler, ) -from opentelemetry.sdk.metrics import ( - MeterProvider, - ValueRecorder, +from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import ( + InMemoryMetricsExporter, ) -from opentelemetry import trace, metrics +from opentelemetry.sdk.metrics.view import View, ViewConfig from opentelemetry.sdk.trace import TracerProvider from opentelemetry.trace.sampling import ALWAYS_OFF, ALWAYS_ON -from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import InMemoryMetricsExporter -from opentelemetry.sdk.metrics.view import View, ViewConfig -from opentelemetry.sdk.metrics.export.controller import PushController class TestRandomExemplarSampler(unittest.TestCase): def test_sample(self): @@ -57,8 +58,9 @@ def test_sample(self): self.assertEqual(exemplar1.sample_count, 1) self.assertEqual(exemplar2.sample_count, 1) - def _patched_randint(mn, mx): - return mn + def _patched_randint(minimum, maximum): + # pylint: disable=unused-argument + return minimum with patch("random.randint", _patched_randint): sampler.sample(exemplar3) @@ -66,8 +68,9 @@ def _patched_randint(mn, mx): self.assertEqual(sampler.sample_set[0], exemplar3) self.assertEqual(exemplar3.sample_count, 1.5) self.assertEqual(exemplar2.sample_count, 1.5) - - def _patched_randint(mn, mx): + + def _patched_randint(minimum, maximum): + # pylint: disable=unused-argument return 1 with patch("random.randint", _patched_randint): @@ -94,9 +97,9 @@ def test_merge(self): set1 = [1, 2, 3] set2 = [4, 5, 6] sampler = RandomExemplarSampler(6) - self.assertEqual(set1+set2, sampler.merge(set1, set2)) + self.assertEqual(set1 + set2, sampler.merge(set1, set2)) sampler = RandomExemplarSampler(8) - self.assertEqual(set1+set2, sampler.merge(set1, set2)) + self.assertEqual(set1 + set2, sampler.merge(set1, set2)) sampler = RandomExemplarSampler(4) self.assertEqual(4, len(sampler.merge(set1, set2))) @@ -145,7 +148,9 @@ def test_merge(self): class TestBucketedExemplarSampler(unittest.TestCase): def test_exemplars(self): - sampler = BucketedExemplarSampler(1, boundaries=[2, 4, 7], statistical=True) + sampler = BucketedExemplarSampler( + 1, boundaries=[2, 4, 7], statistical=True + ) sampler.sample(Exemplar(3, time()), bucket_index=1) self.assertEqual(len(sampler.sample_set), 1) self.assertEqual(sampler.sample_set[0].value, 3) @@ -156,7 +161,8 @@ def test_exemplars(self): self.assertEqual(sampler.sample_set[1].value, 5) self.assertEqual(sampler.sample_set[1].sample_count, 1) - def _patched_randint(mn, mx): + def _patched_randint(minimum, maximum): + # pylint: disable=unused-argument return 0 with patch("random.randint", _patched_randint): @@ -178,20 +184,35 @@ def _patched_randint(mn, mx): def test_merge(self): sampler = BucketedExemplarSampler(1, boundaries=[3, 4, 6]) - self.assertEqual(len(sampler.merge([Exemplar(1, time())], [Exemplar(2, time())])), 1) + self.assertEqual( + len(sampler.merge([Exemplar(1, time())], [Exemplar(2, time())])), 1 + ) - self.assertEqual(len(sampler.merge([Exemplar(1, time()), Exemplar(5, time())], [Exemplar(2, time())])), 2) + self.assertEqual( + len( + sampler.merge( + [Exemplar(1, time()), Exemplar(5, time())], + [Exemplar(2, time())], + ) + ), + 2, + ) class TestExemplarManager(unittest.TestCase): def test_statistical(self): config = {"statistical_exemplars": True, "num_exemplars": 1} - manager = ExemplarManager(config, MinMaxExemplarSampler, RandomExemplarSampler) + manager = ExemplarManager( + config, MinMaxExemplarSampler, RandomExemplarSampler + ) self.assertIsInstance(manager.exemplar_sampler, RandomExemplarSampler) manager.sample(5, {"dropped_label": "value"}) self.assertEqual(len(manager.exemplar_sampler.sample_set), 1) self.assertEqual(manager.exemplar_sampler.sample_set[0].value, 5) - self.assertEqual(manager.exemplar_sampler.sample_set[0].dropped_labels, {"dropped_label": "value"}) + self.assertEqual( + manager.exemplar_sampler.sample_set[0].dropped_labels, + {"dropped_label": "value"}, + ) checkpoint = manager.take_checkpoint() self.assertEqual(len(checkpoint), 1) @@ -204,7 +225,9 @@ def test_statistical(self): def test_semantic(self): config = {"statistical_exemplars": True, "num_exemplars": 1} - manager = ExemplarManager(config, MinMaxExemplarSampler, RandomExemplarSampler) + manager = ExemplarManager( + config, MinMaxExemplarSampler, RandomExemplarSampler + ) self.assertIsInstance(manager.exemplar_sampler, RandomExemplarSampler) manager.sample(5, {}) self.assertEqual(len(manager.exemplar_sampler.sample_set), 1) @@ -229,7 +252,9 @@ def _no_exemplars_test(self, aggregator): agg.take_checkpoint() self.assertEqual(agg.checkpoint_exemplars, []) - other_agg = aggregator(config={"num_exemplars": 2, "statistical_exemplars": True}) + other_agg = aggregator( + config={"num_exemplars": 2, "statistical_exemplars": True} + ) other_agg.update(2) other_agg.update(4) other_agg.take_checkpoint() @@ -244,7 +269,10 @@ def _simple_exemplars_test(self, aggregator): agg.take_checkpoint() self.assertEqual(len(agg.checkpoint_exemplars), 1) self.assertEqual(agg.checkpoint_exemplars[0].value, 2) - self.assertEqual(agg.checkpoint_exemplars[0].dropped_labels, {"dropped_label": "value"}) + self.assertEqual( + agg.checkpoint_exemplars[0].dropped_labels, + {"dropped_label": "value"}, + ) agg.update(2) agg.update(5) @@ -255,8 +283,10 @@ def _simple_exemplars_test(self, aggregator): agg.update(2) agg.update(5) - def _patched_randint(mn, mx): + def _patched_randint(minimum, maximum): + # pylint: disable=unused-argument return 1 + with patch("random.randint", _patched_randint): agg.update(7) @@ -282,11 +312,13 @@ def _record_traces_only_test(self, aggregator): agg.update(5) agg.update(7) agg.update(6) - + agg.take_checkpoint() self.assertEqual(len(agg.checkpoint_exemplars), 2) - self.assertEqual(agg.checkpoint_exemplars[0].span_id, span.context.span_id) + self.assertEqual( + agg.checkpoint_exemplars[0].span_id, span.context.span_id + ) self.assertEqual(agg.checkpoint_exemplars[0].value, 5) self.assertEqual(agg.checkpoint_exemplars[1].value, 7) @@ -346,7 +378,11 @@ def test_no_exemplars(self): agg.take_checkpoint() self.assertEqual(agg.checkpoint_exemplars, []) - other_agg = HistogramAggregator(config=dict(config, **{"num_exemplars": 1, "statistical_exemplars": True})) + other_agg = HistogramAggregator( + config=dict( + config, **{"num_exemplars": 1, "statistical_exemplars": True} + ) + ) other_agg.update(3) other_agg.update(5) @@ -357,13 +393,20 @@ def test_no_exemplars(self): self.assertEqual(agg.checkpoint_exemplars, []) def test_simple_exemplars(self): - config = {"bounds": [2, 4, 7], "num_exemplars": 1, "statistical_exemplars": True} + config = { + "bounds": [2, 4, 7], + "num_exemplars": 1, + "statistical_exemplars": True, + } agg = HistogramAggregator(config=config) agg.update(2, dropped_labels={"dropped_label": "value"}) agg.take_checkpoint() self.assertEqual(len(agg.checkpoint_exemplars), 1) self.assertEqual(agg.checkpoint_exemplars[0].value, 2) - self.assertEqual(agg.checkpoint_exemplars[0].dropped_labels, {"dropped_label": "value"}) + self.assertEqual( + agg.checkpoint_exemplars[0].dropped_labels, + {"dropped_label": "value"}, + ) agg.update(2) agg.update(5) @@ -373,7 +416,8 @@ def test_simple_exemplars(self): agg.update(5) - def _patched_randint(mn, mx): + def _patched_randint(minimum, maximum): + # pylint: disable=unused-argument return 0 with patch("random.randint", _patched_randint): @@ -391,7 +435,11 @@ def _patched_randint(mn, mx): self.assertEqual(len(agg.checkpoint_exemplars), 4) def test_record_traces_only(self): - config = {"bounds": [2, 4, 6], "num_exemplars": 2, "statistical_exemplars": False} + config = { + "bounds": [2, 4, 6], + "num_exemplars": 2, + "statistical_exemplars": False, + } agg = HistogramAggregator(config=config) agg.update(2) @@ -409,7 +457,9 @@ def test_record_traces_only(self): agg.take_checkpoint() self.assertEqual(len(agg.checkpoint_exemplars), 1) - self.assertEqual(agg.checkpoint_exemplars[0].span_id, span.context.span_id) + self.assertEqual( + agg.checkpoint_exemplars[0].span_id, span.context.span_id + ) tp = TracerProvider(sampler=ALWAYS_OFF) tracer = tp.get_tracer(__name__) @@ -420,6 +470,7 @@ def test_record_traces_only(self): agg.take_checkpoint() self.assertEqual(len(agg.checkpoint_exemplars), 0) + class TestFullPipelineExemplars(unittest.TestCase): def test_histogram(self): # Use the meter type provided by the SDK package @@ -438,9 +489,14 @@ def test_histogram(self): size_view = View( requests_size, - HistogramAggregator(config={"bounds": [20, 40, 60, 80, 100], "num_exemplars": 1, "statistical_exemplars": True}), + HistogramAggregator, + aggregator_config={ + "bounds": (20, 40, 60, 80, 100), + "num_exemplars": 1, + "statistical_exemplars": True, + }, label_keys=["environment"], - config=ViewConfig.LABEL_KEYS, + view_config=ViewConfig.LABEL_KEYS, ) meter.register_view(size_view) @@ -456,5 +512,14 @@ def test_histogram(self): self.assertEqual(len(metrics_list), 1) exemplars = metrics_list[0].aggregator.checkpoint_exemplars self.assertEqual(len(exemplars), 3) - self.assertEqual([(exemplar.value, exemplar.dropped_labels) for exemplar in exemplars], - [(1, (("test", "value2"),)), (25, (("test", "value"),)), (200, (("test", "value3"),))]) + self.assertEqual( + [ + (exemplar.value, exemplar.dropped_labels) + for exemplar in exemplars + ], + [ + (1, (("test", "value2"),)), + (25, (("test", "value"),)), + (200, (("test", "value3"),)), + ], + ) diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index 99aa9c4a629..c262a5dd202 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -50,7 +50,7 @@ def test_export(self): labels = {"environment": "staging"} aggregator = SumAggregator() record = MetricRecord(metric, labels, aggregator) - result = '{}(data="{}", labels="{}", value={})'.format( + result = '{}(data="{}", labels="{}", value={}, exemplars=[])'.format( ConsoleMetricsExporter.__name__, metric, labels, diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index b854f2d5db9..d620d5eb6f5 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -496,7 +496,7 @@ def test_add(self): view_datas_mock = mock.Mock() bound_metric.view_datas = [view_datas_mock] bound_metric.add(3) - view_datas_mock.record.assert_called_once_with(3) + view_datas_mock.record.assert_called_once_with(3, ()) def test_add_disabled(self): meter_mock = mock.Mock() @@ -538,7 +538,7 @@ def test_record(self): view_datas_mock = mock.Mock() bound_valuerecorder.view_datas = [view_datas_mock] bound_valuerecorder.record(3) - view_datas_mock.record.assert_called_once_with(3) + view_datas_mock.record.assert_called_once_with(3, ()) def test_record_disabled(self): meter_mock = mock.Mock() From f3ed3f36936b6b70704b1c068a530bb95520c741 Mon Sep 17 00:00:00 2001 From: Connor Adams Date: Thu, 6 Aug 2020 15:31:36 -0400 Subject: [PATCH 3/6] semantic -> trace, link to wiki --- docs/examples/exemplars/README.rst | 6 +++--- .../{semantic_exemplars.py => trace_exemplars.py} | 2 +- .../opentelemetry/sdk/metrics/export/exemplars.py | 6 ++++-- .../tests/metrics/export/test_exemplars.py | 15 +++------------ 4 files changed, 11 insertions(+), 18 deletions(-) rename docs/examples/exemplars/{semantic_exemplars.py => trace_exemplars.py} (95%) diff --git a/docs/examples/exemplars/README.rst b/docs/examples/exemplars/README.rst index b49a02b8de6..a647071e365 100644 --- a/docs/examples/exemplars/README.rst +++ b/docs/examples/exemplars/README.rst @@ -28,13 +28,13 @@ The opentelemetry SDK provides a way to sample exemplars statistically: See 'statistical_exemplars.ipynb' for the example (TODO: how do I link this?) -Semantic exemplars +Trace exemplars ^^^^^^^^^^^^^^^^^^ -Semantic exemplars are exemplars that have not been sampled statistically, +Trace exemplars are exemplars that have not been sampled statistically, but instead aim to provide value as individual exemplars. They will have a trace id/span id attached for the active trace when the exemplar was recorded, and they may focus on measurements with abnormally high/low values. -'semantic_exemplars.py' shows how to generate exemplars for a histogram aggregation. +'trace_exemplars.py' shows how to generate exemplars for a histogram aggregation. Currently only the Google Cloud Monitoring exporter supports uploading these exemplars. diff --git a/docs/examples/exemplars/semantic_exemplars.py b/docs/examples/exemplars/trace_exemplars.py similarity index 95% rename from docs/examples/exemplars/semantic_exemplars.py rename to docs/examples/exemplars/trace_exemplars.py index bf80ede1f6b..735e329d9ac 100644 --- a/docs/examples/exemplars/semantic_exemplars.py +++ b/docs/examples/exemplars/trace_exemplars.py @@ -13,7 +13,7 @@ # limitations under the License. # """ -This example shows how to generate "semantic" exemplars for a histogram, and how to export them to Google Cloud Monitoring. +This example shows how to generate trace exemplars for a histogram, and how to export them to Google Cloud Monitoring. """ import random diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py index 9eb82a5aed8..5f4b8677125 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py @@ -16,7 +16,7 @@ Exemplars are sample data points for aggregators. For more information, see the `spec `_ Every synchronous aggregator is instrumented with two exemplar recorders: - 1. A "semantic" exemplar sampler, which only samples exemplars if they have a sampled trace context (and can pick exemplars with other biases, ie min + max). + 1. A "trace" exemplar sampler, which only samples exemplars if they have a sampled trace context (and can pick exemplars with other biases, ie min + max). 2. A "statistical" exemplar sampler, which samples exemplars without bias (ie no preferenced for traced exemplars) To use an exemplar recorder, pass in two arguments to the aggregator config in views (see the "Exemplars" example for an example): @@ -154,6 +154,8 @@ def sample(self, exemplar, **kwargs): self.sample_set.append(exemplar) return + # We sample a random subset of a stream using "Algorithm R": + # https://en.wikipedia.org/wiki/Reservoir_sampling#Simple_algorithm replace_index = random.randint(0, self.rand_count - 1) if replace_index < self._k: @@ -277,7 +279,7 @@ def reset(self): class ExemplarManager: """ Manages two different exemplar samplers: - 1. A "semantic" exemplar sampler, which only samples exemplars if they have a sampled trace context. + 1. A "trace" exemplar sampler, which only samples exemplars if they have a sampled trace context. 2. A "statistical" exemplar sampler, which samples exemplars without bias (ie no preferenced for traced exemplars) """ diff --git a/opentelemetry-sdk/tests/metrics/export/test_exemplars.py b/opentelemetry-sdk/tests/metrics/export/test_exemplars.py index e80dee1a832..77e34b69247 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_exemplars.py +++ b/opentelemetry-sdk/tests/metrics/export/test_exemplars.py @@ -223,21 +223,12 @@ def test_statistical(self): merged = manager.merge([Exemplar(2, time())], [Exemplar(3, time())]) self.assertEqual(len(merged), 1) - def test_semantic(self): - config = {"statistical_exemplars": True, "num_exemplars": 1} + def test_trace(self): + config = {"statistical_exemplars": False, "num_exemplars": 1} manager = ExemplarManager( config, MinMaxExemplarSampler, RandomExemplarSampler ) - self.assertIsInstance(manager.exemplar_sampler, RandomExemplarSampler) - manager.sample(5, {}) - self.assertEqual(len(manager.exemplar_sampler.sample_set), 1) - self.assertEqual(manager.exemplar_sampler.sample_set[0].value, 5) - - checkpoint = manager.take_checkpoint() - self.assertEqual(len(checkpoint), 1) - self.assertEqual(checkpoint[0].value, 5) - - self.assertEqual(len(manager.exemplar_sampler.sample_set), 0) + self.assertIsInstance(manager.exemplar_sampler, MinMaxExemplarSampler) merged = manager.merge([Exemplar(2, time())], [Exemplar(3, time())]) self.assertEqual(len(merged), 1) From a4047059311d9f5e8eae5efa4db41c0f007ba6f2 Mon Sep 17 00:00:00 2001 From: Connor Adams Date: Thu, 6 Aug 2020 18:59:40 -0400 Subject: [PATCH 4/6] fixes --- docs/examples/exemplars/README.rst | 4 +- .../exemplars/statistical_exemplars.ipynb | 4 +- .../exemplars/statistical_exemplars.py | 18 +-- .../sdk/metrics/export/exemplars.py | 121 +++++++++--------- .../tests/metrics/export/test_exemplars.py | 24 ++-- 5 files changed, 86 insertions(+), 85 deletions(-) diff --git a/docs/examples/exemplars/README.rst b/docs/examples/exemplars/README.rst index a647071e365..deed297efdb 100644 --- a/docs/examples/exemplars/README.rst +++ b/docs/examples/exemplars/README.rst @@ -1,6 +1,8 @@ OpenTelemetry Exemplars Example =============================== +.. _Exemplars: + Exemplars are example measurements for aggregations. While they are simple conceptually, exemplars can estimate any statistic about the input distribution, can provide links to sample traces for high latency requests, and much more. For more information about exemplars and how they work in OpenTelemetry, see the `spec `_ @@ -24,7 +26,7 @@ Statistical exemplars The opentelemetry SDK provides a way to sample exemplars statistically: - Exemplars will be picked to represent the input distribution, without unquantifiable bias - - A "sample_count" attribute will be set on each exemplar to quantify how many measurements each exemplar represents + - A "sample_count" attribute will be set on each exemplar to quantify how many measurements each exemplar represents (for randomly sampled exemplars, this value will be N (total measurements) / num_samples. For histogram exemplars, this value will be specific to each bucket). See 'statistical_exemplars.ipynb' for the example (TODO: how do I link this?) diff --git a/docs/examples/exemplars/statistical_exemplars.ipynb b/docs/examples/exemplars/statistical_exemplars.ipynb index 5f3659e41e8..ca7edd1c3db 100644 --- a/docs/examples/exemplars/statistical_exemplars.ipynb +++ b/docs/examples/exemplars/statistical_exemplars.ipynb @@ -122,7 +122,7 @@ " random.seed(1)\n", "\n", " # customer 123 is a big user, and made 1000 requests in this timeframe\n", - " requests = np.random.normal(1000, 250, 1000) # 1000 requests with average 1000 bytes, covariance 100\n", + " requests = np.random.normal(1000, 250, 1000) # 1000 requests with average 1000 bytes, standard deviation 250\n", "\n", " for request in requests:\n", " bytes_counter.add(int(request), {\"environment\": \"production\", \"method\": \"REST\", \"customer_id\": 123})\n", @@ -205,7 +205,7 @@ " customer_bytes_map[exemplar.dropped_labels] += exemplar.value\n", "\n", "\n", - "customer_bytes_list = sorted(list(customer_bytes_map.items()), key=lambda t: t[1], reverse=True)\n", + "customer_bytes_list = sorted(customer_bytes_map.items(), key=lambda t: t[1], reverse=True)\n", "\n", "# Save our top 5 customers and sum all of the rest into \"Others\".\n", "top_3_customers = [(\"Customer {}\".format(dict(val[0])[\"customer_id\"]), val[1]) for val in customer_bytes_list[:3]] + [(\"Other Customers\", sum([val[1] for val in customer_bytes_list[3:]]))]\n", diff --git a/docs/examples/exemplars/statistical_exemplars.py b/docs/examples/exemplars/statistical_exemplars.py index 25fd29e82ba..b7a3ffbd5cd 100644 --- a/docs/examples/exemplars/statistical_exemplars.py +++ b/docs/examples/exemplars/statistical_exemplars.py @@ -61,8 +61,8 @@ def unknown_customer_calls(): # customer 123 is a big user, and made 1000 requests in this timeframe requests = np.random.normal( - 1000, 250, 1000 - ) # 1000 requests with average 1000 bytes, covariance 100 + 1000, 100, 1000 + ) # 1000 requests with average 1000 bytes, standard deviation 100 for request in requests: bytes_counter.add( @@ -123,7 +123,7 @@ def unknown_customer_calls(): customer_bytes_list = sorted( - list(customer_bytes_map.items()), key=lambda t: t[1], reverse=True + customer_bytes_map.items(), key=lambda t: t[1], reverse=True ) # Save our top 5 customers and sum all of the rest into "Others". @@ -146,7 +146,6 @@ def unknown_customer_calls(): # Since the exemplars were randomly sampled, all sample_counts will be the same sample_count = exemplars[0].sample_count -print("sample count", sample_count, "custmer", customer_123_bytes) full_customer_123_bytes = sample_count * customer_123_bytes # With seed == 1 we get 1008612 - quite close to the statistical mean of 1000000! (more exemplars would make this estimation even more accurate) @@ -160,13 +159,10 @@ def unknown_customer_calls(): top_25_customers = customer_bytes_list[:25] # out of those 25 customers, determine how many used grpc, and come up with a ratio -percent_grpc = len( - list( - filter( - lambda customer_value: customer_value[0][1][1] == "gRPC", - top_25_customers, - ) - ) +percent_grpc = sum( + 1 + for customer_value in top_25_customers + if customer_value[0][1][1] == "gRPC" ) / len(top_25_customers) print( diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py index 5f4b8677125..3de83fdaefe 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py @@ -19,7 +19,7 @@ 1. A "trace" exemplar sampler, which only samples exemplars if they have a sampled trace context (and can pick exemplars with other biases, ie min + max). 2. A "statistical" exemplar sampler, which samples exemplars without bias (ie no preferenced for traced exemplars) - To use an exemplar recorder, pass in two arguments to the aggregator config in views (see the "Exemplars" example for an example): + To use an exemplar recorder, pass in two arguments to the aggregator config in views (see the :ref:`Exemplars` example for an example): "num_exemplars": The number of exemplars to record (if applicable, in each bucket). Note that in non-statistical mode the recorder may not use "num_exemplars" "statistical_exemplars": If exemplars should be recorded statistically @@ -29,6 +29,7 @@ import abc import itertools import random +from typing import List, Optional, Tuple, Union from opentelemetry.context import get_current from opentelemetry.util import time_ns @@ -41,12 +42,12 @@ class Exemplar: def __init__( self, - value, - timestamp, - dropped_labels=None, - span_id=None, - trace_id=None, - sample_count=None, + value: Union[int, float], + timestamp: int, + dropped_labels: Optional[Tuple[Tuple[str, str]]] = None, + span_id: Optional[bytes] = None, + trace_id: Optional[bytes] = None, + sample_count: Optional[float] = None, ): self._value = value self._timestamp = timestamp @@ -94,22 +95,22 @@ def sample_count(self): """For statistical exemplars, how many measurements a single exemplar represents""" return self._sample_count - def set_sample_count(self, count): + def set_sample_count(self, count: float): self._sample_count = count -class ExemplarSampler: +class ExemplarSampler(abc.ABC): """ - Abstract class to sample exemplars through a stream of incoming measurements + Abstract class to sample `k` exemplars in some way through a stream of incoming measurements """ - def __init__(self, k, statistical=False): + def __init__(self, k: int, statistical: bool = False): self._k = k self._statistical = statistical - self._sample_set = list() + self._sample_set = [] @abc.abstractmethod - def sample(self, exemplar, **kwargs): + def sample(self, exemplar: Exemplar, **kwargs): """ Given an exemplar, determine if it should be sampled or not """ @@ -122,7 +123,7 @@ def sample_set(self): """ @abc.abstractmethod - def merge(self, set1, set2): + def merge(self, set1: List[Exemplar], set2: List[Exemplar]): """ Given two lists of sampled exemplars, merge them while maintaining the invariants specified by this sampler """ @@ -139,19 +140,19 @@ class RandomExemplarSampler(ExemplarSampler): Randomly sample a set of k exemplars from a stream. Each measurement in the stream will have an equal chance of being sampled. - If RandomExemplarSampler` is specified to be statistical, it will add a sample_count to every exemplar it records. + If `RandomExemplarSampler` is specified to be statistical, it will add a sample_count to every exemplar it records. This value will be equal to the number of measurements recorded per every exemplar measured - all exemplars will have the same sample_count value. """ - def __init__(self, k, statistical=False): + def __init__(self, k: int, statistical: bool = False): super().__init__(k, statistical=statistical) self.rand_count = 0 - def sample(self, exemplar, **kwargs): + def sample(self, exemplar: Exemplar, **kwargs): self.rand_count += 1 - if len(self.sample_set) < self._k: - self.sample_set.append(exemplar) + if len(self._sample_set) < self._k: + self._sample_set.append(exemplar) return # We sample a random subset of a stream using "Algorithm R": @@ -159,13 +160,15 @@ def sample(self, exemplar, **kwargs): replace_index = random.randint(0, self.rand_count - 1) if replace_index < self._k: - self.sample_set[replace_index] = exemplar + self._sample_set[replace_index] = exemplar - def merge(self, set1, set2): - combined = set1 + set2 - if len(combined) <= self._k: - return combined - return random.sample(combined, self._k) + def merge(self, set1: List[Exemplar], set2: List[Exemplar]): + """ + Assume that set2 is the latest set of exemplars. + For simplicity, we will just keep set2 and assume set1 has already been exported. + This may need to change with a different SDK implementation. + """ + return set2 @property def sample_set(self): @@ -186,12 +189,12 @@ class MinMaxExemplarSampler(ExemplarSampler): Sample the minimum and maximum measurements recorded only """ - def __init__(self, k, statistical=False): + def __init__(self, k: int, statistical: bool = False): # K will always be 2 (min and max), and selecting min and max can never be statistical super().__init__(2, statistical=False) self._sample_set = [] - def sample(self, exemplar, **kwargs): + def sample(self, exemplar: Exemplar, **kwargs): self._sample_set = [ min( self._sample_set + [exemplar], @@ -209,12 +212,13 @@ def sample(self, exemplar, **kwargs): def sample_set(self): return self._sample_set - def merge(self, set1, set2): - merged_set = set1 + set2 - if len(merged_set) <= 2: - return sorted(merged_set, key=lambda exemplar: exemplar.value) - - return [min(merged_set), max(merged_set)] + def merge(self, set1: List[Exemplar], set2: List[Exemplar]): + """ + Assume that set2 is the latest set of exemplars. + For simplicity, we will just keep set2 and assume set1 has already been exported. + This may need to change with a different SDK implementation. + """ + return set2 def reset(self): self._sample_set = [] @@ -228,7 +232,9 @@ class BucketedExemplarSampler(ExemplarSampler): This value will be equal to `len(bucket.exemplars) / bucket.count`, that is the number of measurements each exemplar represents. """ - def __init__(self, k, statistical=False, boundaries=None): + def __init__( + self, k: int, statistical: bool = False, boundaries: list = None + ): super().__init__(k) self._boundaries = boundaries self._sample_set = [ @@ -236,7 +242,7 @@ def __init__(self, k, statistical=False, boundaries=None): for _ in range(len(self._boundaries) + 1) ] - def sample(self, exemplar, **kwargs): + def sample(self, exemplar: Exemplar, **kwargs): bucket_index = kwargs.get("bucket_index") if bucket_index is None: return @@ -251,25 +257,13 @@ def sample_set(self): ) ) - def merge(self, set1, set2): - exemplar_set = [list() for _ in range(len(self._boundaries) + 1)] - # Sort both sets back into buckets - for setx in [set1, set2]: - bucket_idx = 0 - for exemplar in setx: - if exemplar.value >= self._boundaries[-1]: - exemplar_set[-1].append(exemplar) - continue - - while exemplar.value >= self._boundaries[bucket_idx]: - bucket_idx += 1 - exemplar_set[bucket_idx].append(exemplar) - - # Pick only k exemplars for every bucket - for index, inner_set in enumerate(exemplar_set): - if len(inner_set) > self._k: - exemplar_set[index] = random.sample(inner_set, self._k) - return list(itertools.chain.from_iterable(exemplar_set)) + def merge(self, set1: List[Exemplar], set2: List[Exemplar]): + """ + Assume that set2 is the latest set of exemplars. + For simplicity, we will just keep set2 and assume set1 has already been exported. + This may need to change with a different SDK implementation. + """ + return set2 def reset(self): for sampler in self._sample_set: @@ -285,9 +279,9 @@ class ExemplarManager: def __init__( self, - config, - default_exemplar_sampler, - statistical_exemplar_sampler, + config: dict, + default_exemplar_sampler: ExemplarSampler, + statistical_exemplar_sampler: ExemplarSampler, **kwargs ): if config: @@ -311,7 +305,12 @@ def __init__( else: self.record_exemplars = False - def sample(self, value, dropped_labels, **kwargs): + def sample( + self, + value: Union[int, float], + dropped_labels: Tuple[Tuple[str, str]], + **kwargs + ): context = get_current() is_sampled = ( @@ -343,7 +342,11 @@ def take_checkpoint(self): return ret return [] - def merge(self, checkpoint_exemplars, other_checkpoint_exemplars): + def merge( + self, + checkpoint_exemplars: List[Exemplar], + other_checkpoint_exemplars: List[Exemplar], + ): if self.record_exemplars: return self.exemplar_sampler.merge( checkpoint_exemplars, other_checkpoint_exemplars diff --git a/opentelemetry-sdk/tests/metrics/export/test_exemplars.py b/opentelemetry-sdk/tests/metrics/export/test_exemplars.py index 77e34b69247..4e78f4d23e9 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_exemplars.py +++ b/opentelemetry-sdk/tests/metrics/export/test_exemplars.py @@ -97,11 +97,11 @@ def test_merge(self): set1 = [1, 2, 3] set2 = [4, 5, 6] sampler = RandomExemplarSampler(6) - self.assertEqual(set1 + set2, sampler.merge(set1, set2)) + self.assertEqual(set2, sampler.merge(set1, set2)) sampler = RandomExemplarSampler(8) - self.assertEqual(set1 + set2, sampler.merge(set1, set2)) + self.assertEqual(set2, sampler.merge(set1, set2)) sampler = RandomExemplarSampler(4) - self.assertEqual(4, len(sampler.merge(set1, set2))) + self.assertEqual(3, len(sampler.merge(set1, set2))) class TestMinMaxExemplarSampler(unittest.TestCase): @@ -140,10 +140,10 @@ def test_reset(self): self.assertEqual(len(sampler.sample_set), 1) def test_merge(self): - set1 = [1, 2, 3] - set2 = [4, 5, 6] + set1 = [1, 3] + set2 = [4, 6] sampler = MinMaxExemplarSampler(2) - self.assertEqual([1, 6], sampler.merge(set1, set2)) + self.assertEqual([4, 6], sampler.merge(set1, set2)) class TestBucketedExemplarSampler(unittest.TestCase): @@ -195,7 +195,7 @@ def test_merge(self): [Exemplar(2, time())], ) ), - 2, + 1, ) @@ -339,7 +339,7 @@ def _merge_aggregators_test(self, aggregator): agg1.merge(agg2) - self.assertEqual(len(agg1.checkpoint_exemplars), 2) + self.assertEqual(len(agg1.checkpoint_exemplars), 1) def test_sum_aggregator(self): self._no_exemplars_test(SumAggregator) @@ -495,8 +495,8 @@ def test_histogram(self): # Since this is using the HistogramAggregator, the bucket counts will be reflected # with each record requests_size.record(25, {"environment": "staging", "test": "value"}) - requests_size.record(1, {"environment": "staging", "test": "value2"}) - requests_size.record(200, {"environment": "staging", "test": "value3"}) + requests_size.record(1, {"environment": "staging", "test": "value"}) + requests_size.record(200, {"environment": "staging", "test": "value"}) controller.tick() metrics_list = exporter.get_exported_metrics() @@ -509,8 +509,8 @@ def test_histogram(self): for exemplar in exemplars ], [ - (1, (("test", "value2"),)), + (1, (("test", "value"),)), (25, (("test", "value"),)), - (200, (("test", "value3"),)), + (200, (("test", "value"),)), ], ) From b158e59b9ecaa3da0266eccf2b6c6ae3b6b6bcf3 Mon Sep 17 00:00:00 2001 From: Connor Adams Date: Fri, 7 Aug 2020 12:12:37 -0400 Subject: [PATCH 5/6] readme --- docs/examples/exemplars/README.rst | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/docs/examples/exemplars/README.rst b/docs/examples/exemplars/README.rst index deed297efdb..7151ae36a98 100644 --- a/docs/examples/exemplars/README.rst +++ b/docs/examples/exemplars/README.rst @@ -28,7 +28,11 @@ The opentelemetry SDK provides a way to sample exemplars statistically: - Exemplars will be picked to represent the input distribution, without unquantifiable bias - A "sample_count" attribute will be set on each exemplar to quantify how many measurements each exemplar represents (for randomly sampled exemplars, this value will be N (total measurements) / num_samples. For histogram exemplars, this value will be specific to each bucket). -See 'statistical_exemplars.ipynb' for the example (TODO: how do I link this?) +.. literalinclude:: statistical_exemplars.py + :language: python + :lines: 1- + +For the output of this example, see the corresponding Jupyter notebook. Trace exemplars ^^^^^^^^^^^^^^^^^^ @@ -38,5 +42,7 @@ but instead aim to provide value as individual exemplars. They will have a trace id/span id attached for the active trace when the exemplar was recorded, and they may focus on measurements with abnormally high/low values. -'trace_exemplars.py' shows how to generate exemplars for a histogram aggregation. +.. literalinclude:: trace_exemplars.py + :language: python + :lines: 1- Currently only the Google Cloud Monitoring exporter supports uploading these exemplars. From bb2e3023362f5d5c085d8ebfe8f7cf84ac4fe31e Mon Sep 17 00:00:00 2001 From: Connor Adams Date: Fri, 7 Aug 2020 12:47:45 -0400 Subject: [PATCH 6/6] nits --- docs/examples/exemplars/README.rst | 1 + .../sdk/metrics/export/aggregate.py | 2 +- .../sdk/metrics/export/exemplars.py | 65 ++++++------------- 3 files changed, 23 insertions(+), 45 deletions(-) diff --git a/docs/examples/exemplars/README.rst b/docs/examples/exemplars/README.rst index 7151ae36a98..89af3407b6f 100644 --- a/docs/examples/exemplars/README.rst +++ b/docs/examples/exemplars/README.rst @@ -45,4 +45,5 @@ and they may focus on measurements with abnormally high/low values. .. literalinclude:: trace_exemplars.py :language: python :lines: 1- + Currently only the Google Cloud Monitoring exporter supports uploading these exemplars. diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index 998ee23b358..48908857b3e 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -42,7 +42,7 @@ def __init__(self, config=None): self.config = config else: self.config = {} - self.checkpoint_exemplars = list() + self.checkpoint_exemplars = [] @abc.abstractmethod def update(self, value, dropped_labels=None): diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py index 3de83fdaefe..98bc6c44660 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py @@ -12,24 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. -""" - Exemplars are sample data points for aggregators. For more information, see the `spec `_ +"""Exemplars are sample data points for aggregators. For more information, see the `spec `_ - Every synchronous aggregator is instrumented with two exemplar recorders: - 1. A "trace" exemplar sampler, which only samples exemplars if they have a sampled trace context (and can pick exemplars with other biases, ie min + max). - 2. A "statistical" exemplar sampler, which samples exemplars without bias (ie no preferenced for traced exemplars) +Every synchronous aggregator is instrumented with two exemplar recorders: + 1. A "trace" exemplar sampler, which only samples exemplars if they have a sampled trace context (and can pick exemplars with other biases, ie min + max). + 2. A "statistical" exemplar sampler, which samples exemplars without bias (ie no preferenced for traced exemplars) - To use an exemplar recorder, pass in two arguments to the aggregator config in views (see the :ref:`Exemplars` example for an example): - "num_exemplars": The number of exemplars to record (if applicable, in each bucket). Note that in non-statistical mode the recorder may not use "num_exemplars" - "statistical_exemplars": If exemplars should be recorded statistically +To use an exemplar recorder, pass in two arguments to the aggregator config in views (see the :ref:`Exemplars` example for an example): + "num_exemplars": The number of exemplars to record (if applicable, in each bucket). Note that in non-statistical mode the recorder may not use "num_exemplars" + "statistical_exemplars": If exemplars should be recorded statistically - For exemplars to be recorded, `num_exemplars` must be greater than 0. +For exemplars to be recorded, `num_exemplars` must be greater than 0. """ import abc import itertools import random -from typing import List, Optional, Tuple, Union +from typing import List, Optional, Tuple, Type, Union from opentelemetry.context import get_current from opentelemetry.util import time_ns @@ -95,7 +94,8 @@ def sample_count(self): """For statistical exemplars, how many measurements a single exemplar represents""" return self._sample_count - def set_sample_count(self, count: float): + @sample_count.setter + def sample_count(self, count: float): self._sample_count = count @@ -122,11 +122,14 @@ def sample_set(self): Return the list of exemplars that have been sampled """ - @abc.abstractmethod def merge(self, set1: List[Exemplar], set2: List[Exemplar]): """ - Given two lists of sampled exemplars, merge them while maintaining the invariants specified by this sampler + Assume that set2 is the latest set of exemplars. + For simplicity, we will just keep set2 and assume set1 has already been exported. + This may need to change with a different SDK implementation. """ + # pylint: disable=unused-argument,no-self-use + return set2 @abc.abstractmethod def reset(self): @@ -162,21 +165,11 @@ def sample(self, exemplar: Exemplar, **kwargs): if replace_index < self._k: self._sample_set[replace_index] = exemplar - def merge(self, set1: List[Exemplar], set2: List[Exemplar]): - """ - Assume that set2 is the latest set of exemplars. - For simplicity, we will just keep set2 and assume set1 has already been exported. - This may need to change with a different SDK implementation. - """ - return set2 - @property def sample_set(self): if self._statistical: for exemplar in self._sample_set: - exemplar.set_sample_count( - self.rand_count / len(self._sample_set) - ) + exemplar.sample_count = self.rand_count / len(self._sample_set) return self._sample_set def reset(self): @@ -212,14 +205,6 @@ def sample(self, exemplar: Exemplar, **kwargs): def sample_set(self): return self._sample_set - def merge(self, set1: List[Exemplar], set2: List[Exemplar]): - """ - Assume that set2 is the latest set of exemplars. - For simplicity, we will just keep set2 and assume set1 has already been exported. - This may need to change with a different SDK implementation. - """ - return set2 - def reset(self): self._sample_set = [] @@ -233,7 +218,7 @@ class BucketedExemplarSampler(ExemplarSampler): """ def __init__( - self, k: int, statistical: bool = False, boundaries: list = None + self, k: int, statistical: bool = False, boundaries: List[float] = None ): super().__init__(k) self._boundaries = boundaries @@ -253,18 +238,10 @@ def sample(self, exemplar: Exemplar, **kwargs): def sample_set(self): return list( itertools.chain.from_iterable( - [sampler.sample_set for sampler in self._sample_set] + sampler.sample_set for sampler in self._sample_set ) ) - def merge(self, set1: List[Exemplar], set2: List[Exemplar]): - """ - Assume that set2 is the latest set of exemplars. - For simplicity, we will just keep set2 and assume set1 has already been exported. - This may need to change with a different SDK implementation. - """ - return set2 - def reset(self): for sampler in self._sample_set: sampler.reset() @@ -280,8 +257,8 @@ class ExemplarManager: def __init__( self, config: dict, - default_exemplar_sampler: ExemplarSampler, - statistical_exemplar_sampler: ExemplarSampler, + default_exemplar_sampler: Type[ExemplarSampler], + statistical_exemplar_sampler: Type[ExemplarSampler], **kwargs ): if config: