-
Notifications
You must be signed in to change notification settings - Fork 363
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Publish ingestion metrics on statsd (#7875)
* Add statsd service * Add celery task to publish stats * Add task to prod misc worker * Fix beat task * Aggregate stats * Publish on statsd * Fix date range filter * Allow to change statsd host and port * Add a prefix for stats publication
- Loading branch information
Showing
10 changed files
with
180 additions
and
25 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,3 +9,6 @@ ui/partials/ | |
|
||
# Ignore markdown | ||
*.md | ||
|
||
# Statsd configuration file | ||
docker/statsd_config.js |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
{ | ||
debug: true, | ||
healthStatus: true, | ||
dumpMessages: true, | ||
port: 8125, | ||
// InfluxDB backend should be added to read stats | ||
backends: [] | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import pytest | ||
from unittest.mock import patch, MagicMock, call | ||
from treeherder.workers.stats import publish_stats | ||
from treeherder.model.models import Push, Job | ||
from django.utils import timezone | ||
from datetime import timedelta | ||
|
||
|
||
@pytest.mark.django_db | ||
@patch('treeherder.workers.stats.get_stats_client') | ||
def test_publish_stats_nothing_to_do(get_worker_mock, django_assert_num_queries, caplog): | ||
statsd_client = MagicMock() | ||
get_worker_mock.return_value = statsd_client | ||
assert Push.objects.count() == 0 | ||
assert Job.objects.count() == 0 | ||
with django_assert_num_queries(2): | ||
publish_stats() | ||
assert [(level, message) for _, level, message in caplog.record_tuples] == [ | ||
(20, 'Publishing runtime statistics to statsd'), | ||
(20, 'Ingested 0 pushes'), | ||
(20, 'Ingested 0 jobs in total'), | ||
] | ||
assert statsd_client.call_args_list == [] | ||
|
||
|
||
@pytest.mark.django_db | ||
@patch('treeherder.workers.stats.get_stats_client') | ||
def test_publish_stats( | ||
get_worker_mock, eleven_jobs_stored_new_date, django_assert_num_queries, caplog, settings | ||
): | ||
"Test statsd statistics publication task" | ||
settings.CELERY_STATS_PUBLICATION_DELAY = 10 | ||
statsd_client = MagicMock() | ||
get_worker_mock.return_value = statsd_client | ||
assert Push.objects.count() == 10 | ||
assert Job.objects.count() == 11 | ||
Push.objects.update(time=timezone.now() - timedelta(minutes=10)) | ||
Job.objects.update(end_time=timezone.now() - timedelta(minutes=10)) | ||
|
||
with django_assert_num_queries(2): | ||
publish_stats() | ||
assert [(level, message) for _, level, message in caplog.record_tuples] == [ | ||
(20, 'Publishing runtime statistics to statsd'), | ||
(20, 'Ingested 10 pushes'), | ||
(20, 'Ingested 11 jobs in total'), | ||
] | ||
assert statsd_client.incr.call_args_list == [ | ||
call('push', 10), | ||
call('jobs', 11), | ||
call('jobs_repo.mozilla-central', 11), | ||
call('jobs_state.completed', 11), | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
from celery import shared_task | ||
from django.conf import settings | ||
from django.utils import timezone | ||
from datetime import timedelta | ||
from django.db.models import Count | ||
from treeherder.model.models import Push, Job | ||
from itertools import groupby | ||
import statsd | ||
import logging | ||
|
||
logging.basicConfig() | ||
logger = logging.getLogger(__name__) | ||
logger.setLevel(logging.INFO) | ||
|
||
|
||
def get_stats_client(): | ||
return statsd.StatsClient( | ||
settings.STATSD_HOST, settings.STATSD_PORT, prefix=settings.STATSD_PREFIX | ||
) | ||
|
||
|
||
@shared_task(name='publish-stats') | ||
def publish_stats(): | ||
""" | ||
Publish runtime stats on statsd | ||
""" | ||
stats_client = get_stats_client() | ||
logger.info('Publishing runtime statistics to statsd') | ||
end_date = timezone.now() | ||
# Round the date to the current date range | ||
# This should not overlapse as the beat is set as a relative cron based delay in minutes | ||
end_date = end_date - timedelta( | ||
minutes=end_date.minute % settings.CELERY_STATS_PUBLICATION_DELAY, | ||
seconds=end_date.second, | ||
microseconds=end_date.microsecond, | ||
) | ||
|
||
start_date = end_date - timedelta(minutes=settings.CELERY_STATS_PUBLICATION_DELAY) | ||
logger.debug(f'Reading data ingested from {start_date} to {end_date}') | ||
|
||
# Nb of pushes | ||
pushes_count = Push.objects.filter(time__lte=end_date, time__gt=start_date).count() | ||
logger.info(f'Ingested {pushes_count} pushes') | ||
if pushes_count: | ||
stats_client.incr('push', pushes_count) | ||
|
||
# Compute stats for jobs in a single request | ||
jobs_stats = ( | ||
Job.objects.filter(end_time__lte=end_date, end_time__gt=start_date) | ||
.values('push__repository__name', 'state') | ||
.annotate(count=Count('id')) | ||
.values_list('push__repository__name', 'state', 'count') | ||
) | ||
# nb of job total | ||
jobs_total = sum(ct for _, _, ct in jobs_stats) | ||
logger.info(f'Ingested {jobs_total} jobs in total') | ||
if jobs_total: | ||
stats_client.incr('jobs', jobs_total) | ||
|
||
# nb of job per repo | ||
jobs_per_repo = { | ||
key: sum(ct for k, ct in vals) | ||
for key, vals in groupby(sorted((repo, ct) for repo, _, ct in jobs_stats), lambda x: x[0]) | ||
} | ||
logger.debug(f'Jobs per repo: {jobs_per_repo}') | ||
for key, value in jobs_per_repo.items(): | ||
stats_client.incr(f'jobs_repo.{key}', value) | ||
|
||
# nb of job per state | ||
jobs_per_state = { | ||
key: sum(ct for k, ct in vals) | ||
for key, vals in groupby(sorted((state, ct) for _, state, ct in jobs_stats), lambda x: x[0]) | ||
} | ||
logger.debug(f'Jobs per state : {jobs_per_state}') | ||
for key, value in jobs_per_state.items(): | ||
stats_client.incr(f'jobs_state.{key}', value) |