From 92431b4ef22d89f078e9e64d4e63c719454095c7 Mon Sep 17 00:00:00 2001 From: Jim Park Date: Wed, 22 Apr 2020 20:38:43 -0700 Subject: [PATCH] feat: submit histogram of metrics on a minute-by-minute basis --- slowlog_check.rb | 203 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 161 insertions(+), 42 deletions(-) diff --git a/slowlog_check.rb b/slowlog_check.rb index f75a482..4facaed 100755 --- a/slowlog_check.rb +++ b/slowlog_check.rb @@ -7,7 +7,9 @@ require 'dogapi' LOGGER = Logger.new($stdout) -LOGGER.level = Logger::INFO +LOGGER.level = Logger::DEBUG + +METRICNAME = 'scribddev.redis.slowlog.micros' REDIS = Redis.new( host: ENV.fetch('REDIS_HOST'), @@ -26,7 +28,7 @@ def log_context LOGGER.debug(@event) end -def time +def event_time # DateTime because Time does not natively parse AWS CloudWatch Event time DateTime.rfc3339(@event.fetch("time", DateTime.now.rfc3339)) end @@ -40,49 +42,48 @@ def replication_group end end +# TODO: Rather than hard code a day lookback, +# look back at an increasing increment until hitting some max value def last_datadog_metrics_submitted_by_me_in_the_last_day resp = DDOG.get_points( - "scribd.slowlog_check.slowlog{replication_group:#{replication_group}}", + "#{METRICNAME}{replication_group:#{replication_group}}", Time.now - 86400, Time.now ) - raise "Error getting last datadog metric submitted by me" unless resp[0] == "200" + raise "Error getting last datadog metric submitted by me" unless resp[1].fetch("status") == "ok" resp end -def last_datadog_metric +def minute_precision(time) Time.at( - last_datadog_metrics_submitted_by_me_in_the_last_day[1] - .fetch("series") - .first - .fetch("pointlist") - .map {|x| x[0]} - .max - .to_i / 1000 + time.to_i - (time.to_i % 60) ) end +def last_datadog_metric + series = last_datadog_metrics_submitted_by_me_in_the_last_day[1].fetch("series") + if series == [] # First invocation + return Time.at(0) + else + minute_precision( + Time.at( + series + .first + .fetch("pointlist") + .map {|x| x[0]} + .max + .to_i / 1000 + ) + ) + end +end + def last_time_submitted return @last_time_submitted if defined? @last_time_submitted @last_time_submitted = last_datadog_metric end -def emit_point(time, value, tags) - LOGGER.info "Sending slowlog entry: #{value}µs executing #{tags[:command]} at #{time}." - resp = DDOG.emit_points( - 'redis.slowlog.micros.avg', - [[time, value]], - { - host: replication_group, - tags: tags - } - ) - raise "Error submitting metric for #{replication_group}" unless resp[0] == "202" - @last_time_submitted = time - resp -end - def slowlog_time(slowlog) Time.at slowlog[1] end @@ -91,15 +92,86 @@ def slowlog_microseconds(slowlog) slowlog[2] end -def client_ip(ip_and_port) - ip_and_port.split(':')[0] +def reporting_interval + now_i = Time.now.to_i + start_time_i = last_time_submitted.to_i + 60 + times = (start_time_i..now_i).step(60).to_a + Hash[times.collect {|time| [Time.at(time), nil]}] end -def slowlog_tags(slowlog) +def _95percentile(sorted_values) + index = (sorted_values.length * 0.95) - 1 + sorted_values[index] +end + +def add_metric_to_bucket(prior, new) + new_values = prior[:values].push(new) + new_count = prior[:count] += 1 + new_avg = (prior[:avg] * prior[:count] + new) / new_count + + sorted_values = new_values.sort + new_median = sorted_values[sorted_values.count / 2] + new_95percentile = _95percentile(sorted_values) + new_min = sorted_values[0] + new_max = sorted_values[-1] + new_sum = sorted_values.reduce(:+) + + { + values: new_values, + avg: new_avg, + count: new_count, + median: new_median, + _95percentile: new_95percentile, + min: new_min, + max: new_max, + sum: new_sum + } +end + +def slowlogs_by_flush_interval + result = reporting_interval + REDIS.slowlog('get').each do |slowlog| + time = slowlog_time(slowlog) + break if minute_precision(time) <= minute_precision(last_time_submitted) + + command = slowlog[3][0] + value = slowlog_microseconds(slowlog) + bucket = minute_precision(time) + + if result[bucket].nil? + result[bucket] = { + command => { + values: [value], + avg: value, + count: 1, + median: value, + _95percentile: value, + min: value, + max: value, + sum: value + } + } + elsif result[bucket][command].nil? + result[bucket][command] = { + values: [value], + avg: value, + count: 1, + median: value, + _95percentile: value, + min: value, + max: value, + sum: value + } + else + result[bucket][command] = add_metric_to_bucket(result[bucket][command], value) + end + end + + result +end + +def default_tags { - command: slowlog[3][0], - client: client_ip(slowlog[4]), - client_name: slowlog[5], replication_group: replication_group, service: replication_group, namespace: ENV.fetch('NAMESPACE'), @@ -108,26 +180,69 @@ def slowlog_tags(slowlog) } end +def emit_point(params) + metric = METRICNAME + '.' + params.fetch(:metric) + type = params.fetch(:type, 'gauge') + interval = params.fetch(:interval, 60) + points = params.fetch(:points) + host = params.fetch(:host, replication_group) + tags = params.fetch(:tags, default_tags) + + LOGGER.info "Sending slowlog entry: #{points.first[1]}µs executing #{tags[:command]} at #{points.first[0]}." + resp = DDOG.emit_points( + metric, + points, + { + type: type, + interval: interval, + host: host, + tags: tags + } + ) + raise "Error submitting metric for #{replication_group}" unless resp[1].fetch("status") == "ok" + @last_time_submitted = Time.at(points.first[0]) + resp +end + def ship_slowlogs - REDIS.slowlog('get').each do |slowlog| - break if slowlog_time(slowlog) <= last_time_submitted - emit_point( - slowlog_time(slowlog), - slowlog_microseconds(slowlog), - slowlog_tags(slowlog) - ) + slowlogs = slowlogs_by_flush_interval + slowlogs.keys.sort.each do |timestamp| + timebucket = slowlogs.fetch(timestamp) + next if timebucket.nil? + + timebucket.keys.each do |command| + all_metrics = timebucket.fetch(command) + + # Emit most metrics + [:avg, :count, :median, :min, :max].each do |metric| + emit_point( + metric: metric.to_s, + type: metric == :count ? 'rate' : 'gauge', + points: [[timestamp, all_metrics.fetch(metric)]], + tags: default_tags.merge(command: command) + ) + end + + # Stupid symbol's cannot start with a number + emit_point( + metric: '95percentile', + points: [[timestamp, all_metrics.fetch(:_95percentile)]], + tags: default_tags.merge(command: command) + ) + + end end end + def lambda_handler(event: {}, context: {}) @event = event log_context - LOGGER.info "Event time: #{time}." + LOGGER.info "Event time: #{event_time}." begin REDIS.ping - ship_slowlogs rescue StandardError => e LOGGER.error e.inspect # => # @@ -140,4 +255,8 @@ def lambda_handler(event: {}, context: {}) if __FILE__ == $0 lambda_handler + + require 'pry' + binding.pry + end