Skip to content

Commit

Permalink
feat: submit histogram of metrics on a minute-by-minute basis
Browse files Browse the repository at this point in the history
  • Loading branch information
jim80net committed Apr 27, 2020
1 parent a3bb944 commit 92431b4
Showing 1 changed file with 161 additions and 42 deletions.
203 changes: 161 additions & 42 deletions slowlog_check.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
require 'dogapi'

LOGGER = Logger.new($stdout)
LOGGER.level = Logger::INFO
LOGGER.level = Logger::DEBUG

METRICNAME = 'scribddev.redis.slowlog.micros'

REDIS = Redis.new(
host: ENV.fetch('REDIS_HOST'),
Expand All @@ -26,7 +28,7 @@ def log_context
LOGGER.debug(@event)
end

def time
def event_time
# DateTime because Time does not natively parse AWS CloudWatch Event time
DateTime.rfc3339(@event.fetch("time", DateTime.now.rfc3339))
end
Expand All @@ -40,49 +42,48 @@ def replication_group
end
end

# TODO: Rather than hard code a day lookback,
# look back at an increasing increment until hitting some max value
def last_datadog_metrics_submitted_by_me_in_the_last_day
resp = DDOG.get_points(
"scribd.slowlog_check.slowlog{replication_group:#{replication_group}}",
"#{METRICNAME}{replication_group:#{replication_group}}",
Time.now - 86400,
Time.now
)

raise "Error getting last datadog metric submitted by me" unless resp[0] == "200"
raise "Error getting last datadog metric submitted by me" unless resp[1].fetch("status") == "ok"
resp
end

def last_datadog_metric
def minute_precision(time)
Time.at(
last_datadog_metrics_submitted_by_me_in_the_last_day[1]
.fetch("series")
.first
.fetch("pointlist")
.map {|x| x[0]}
.max
.to_i / 1000
time.to_i - (time.to_i % 60)
)
end

def last_datadog_metric
series = last_datadog_metrics_submitted_by_me_in_the_last_day[1].fetch("series")
if series == [] # First invocation
return Time.at(0)
else
minute_precision(
Time.at(
series
.first
.fetch("pointlist")
.map {|x| x[0]}
.max
.to_i / 1000
)
)
end
end

def last_time_submitted
return @last_time_submitted if defined? @last_time_submitted
@last_time_submitted = last_datadog_metric
end

def emit_point(time, value, tags)
LOGGER.info "Sending slowlog entry: #{value}µs executing #{tags[:command]} at #{time}."
resp = DDOG.emit_points(
'redis.slowlog.micros.avg',
[[time, value]],
{
host: replication_group,
tags: tags
}
)
raise "Error submitting metric for #{replication_group}" unless resp[0] == "202"
@last_time_submitted = time
resp
end

def slowlog_time(slowlog)
Time.at slowlog[1]
end
Expand All @@ -91,15 +92,86 @@ def slowlog_microseconds(slowlog)
slowlog[2]
end

def client_ip(ip_and_port)
ip_and_port.split(':')[0]
def reporting_interval
now_i = Time.now.to_i
start_time_i = last_time_submitted.to_i + 60
times = (start_time_i..now_i).step(60).to_a
Hash[times.collect {|time| [Time.at(time), nil]}]
end

def slowlog_tags(slowlog)
def _95percentile(sorted_values)
index = (sorted_values.length * 0.95) - 1
sorted_values[index]
end

def add_metric_to_bucket(prior, new)
new_values = prior[:values].push(new)
new_count = prior[:count] += 1
new_avg = (prior[:avg] * prior[:count] + new) / new_count

sorted_values = new_values.sort
new_median = sorted_values[sorted_values.count / 2]
new_95percentile = _95percentile(sorted_values)
new_min = sorted_values[0]
new_max = sorted_values[-1]
new_sum = sorted_values.reduce(:+)

{
values: new_values,
avg: new_avg,
count: new_count,
median: new_median,
_95percentile: new_95percentile,
min: new_min,
max: new_max,
sum: new_sum
}
end

def slowlogs_by_flush_interval
result = reporting_interval
REDIS.slowlog('get').each do |slowlog|
time = slowlog_time(slowlog)
break if minute_precision(time) <= minute_precision(last_time_submitted)

command = slowlog[3][0]
value = slowlog_microseconds(slowlog)
bucket = minute_precision(time)

if result[bucket].nil?
result[bucket] = {
command => {
values: [value],
avg: value,
count: 1,
median: value,
_95percentile: value,
min: value,
max: value,
sum: value
}
}
elsif result[bucket][command].nil?
result[bucket][command] = {
values: [value],
avg: value,
count: 1,
median: value,
_95percentile: value,
min: value,
max: value,
sum: value
}
else
result[bucket][command] = add_metric_to_bucket(result[bucket][command], value)
end
end

result
end

def default_tags
{
command: slowlog[3][0],
client: client_ip(slowlog[4]),
client_name: slowlog[5],
replication_group: replication_group,
service: replication_group,
namespace: ENV.fetch('NAMESPACE'),
Expand All @@ -108,26 +180,69 @@ def slowlog_tags(slowlog)
}
end

def emit_point(params)
metric = METRICNAME + '.' + params.fetch(:metric)
type = params.fetch(:type, 'gauge')
interval = params.fetch(:interval, 60)
points = params.fetch(:points)
host = params.fetch(:host, replication_group)
tags = params.fetch(:tags, default_tags)

LOGGER.info "Sending slowlog entry: #{points.first[1]}µs executing #{tags[:command]} at #{points.first[0]}."
resp = DDOG.emit_points(
metric,
points,
{
type: type,
interval: interval,
host: host,
tags: tags
}
)
raise "Error submitting metric for #{replication_group}" unless resp[1].fetch("status") == "ok"
@last_time_submitted = Time.at(points.first[0])
resp
end

def ship_slowlogs
REDIS.slowlog('get').each do |slowlog|
break if slowlog_time(slowlog) <= last_time_submitted
emit_point(
slowlog_time(slowlog),
slowlog_microseconds(slowlog),
slowlog_tags(slowlog)
)
slowlogs = slowlogs_by_flush_interval
slowlogs.keys.sort.each do |timestamp|
timebucket = slowlogs.fetch(timestamp)
next if timebucket.nil?

timebucket.keys.each do |command|
all_metrics = timebucket.fetch(command)

# Emit most metrics
[:avg, :count, :median, :min, :max].each do |metric|
emit_point(
metric: metric.to_s,
type: metric == :count ? 'rate' : 'gauge',
points: [[timestamp, all_metrics.fetch(metric)]],
tags: default_tags.merge(command: command)
)
end

# Stupid symbol's cannot start with a number
emit_point(
metric: '95percentile',
points: [[timestamp, all_metrics.fetch(:_95percentile)]],
tags: default_tags.merge(command: command)
)

end
end
end



def lambda_handler(event: {}, context: {})
@event = event

log_context
LOGGER.info "Event time: #{time}."
LOGGER.info "Event time: #{event_time}."
begin
REDIS.ping
ship_slowlogs
rescue StandardError => e
LOGGER.error e.inspect
# => #<Redis::CannotConnectError: Timed out connecting to Redis on 10.0.1.1:6380>
Expand All @@ -140,4 +255,8 @@ def lambda_handler(event: {}, context: {})

if __FILE__ == $0
lambda_handler

require 'pry'
binding.pry

end

0 comments on commit 92431b4

Please sign in to comment.