Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test analytics table output #1

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions awx/main/tests/functional/analytics/test_collectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import pytest
import tempfile
import os
import shutil
import csv

from django.utils.timezone import now
from django.db.backends.sqlite3.base import SQLiteCursorWrapper

from awx.main.analytics import collectors

from awx.main.models import (
ProjectUpdate,
InventorySource,
)


@pytest.fixture
def sqlite_copy_expert(request):
# copy_expert is postgres-specific, and SQLite doesn't support it; mock its
# behavior to test that it writes a file that contains stdout from events
path = tempfile.mkdtemp(prefix='copied_tables')

def write_stdout(self, sql, fd):
# Would be cool if we instead properly disected the SQL query and verified
# it that way. But instead, we just take the nieve approach here.
assert sql.startswith('COPY (')
assert sql.endswith(') TO STDOUT WITH CSV HEADER')

sql = sql.replace('COPY (', '')
sql = sql.replace(') TO STDOUT WITH CSV HEADER', '')

# Remove JSON style queries
# TODO: could replace JSON style queries with sqlite kind of equivalents
sql_new = []
for line in sql.split('\n'):
if line.find('main_jobevent.event_data::') == -1:
sql_new.append(line)
sql = '\n'.join(sql_new)

self.execute(sql)
results = self.fetchall()
headers = [i[0] for i in self.description]

csv_handle = csv.writer(fd, delimiter=',', quoting=csv.QUOTE_ALL, escapechar='\\', lineterminator='\n')
csv_handle.writerow(headers)
csv_handle.writerows(results)


setattr(SQLiteCursorWrapper, 'copy_expert', write_stdout)
request.addfinalizer(lambda: shutil.rmtree(path))
request.addfinalizer(lambda: delattr(SQLiteCursorWrapper, 'copy_expert'))
return path


@pytest.mark.django_db
def test_copy_tables_unified_job_query(sqlite_copy_expert, project, inventory, job_template):
'''
Ensure that various unified job types are in the output of the query.
'''

time_start = now()
inv_src = InventorySource.objects.create(name="inventory_update1", inventory=inventory, source='gce')

project_update_name = ProjectUpdate.objects.create(project=project, name="project_update1").name
inventory_update_name = inv_src.create_unified_job().name
job_name = job_template.create_unified_job().name

with tempfile.TemporaryDirectory() as tmpdir:
collectors.copy_tables(time_start, tmpdir)
with open(os.path.join(tmpdir, 'unified_jobs_table.csv')) as f:
lines = ''.join([l for l in f])

assert project_update_name in lines
assert inventory_update_name in lines
assert job_name in lines