Skip to content

Commit

Permalink
chore(test): Add integration test for flask tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
alithethird committed Jan 10, 2025
1 parent d5b34a4 commit 1a44207
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 0 deletions.
86 changes: 86 additions & 0 deletions tests/integration/flask/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,92 @@ async def flask_async_app_fixture(build_charm: str, model: Model, test_async_fla
return app


@pytest_asyncio.fixture(scope="module", name="flask_tracing_app")
async def flask_tracing_app_fixture(build_charm: str, model: Model, test_tracing_flask_image: str):
"""Build and deploy the flask charm with test-tracing-flask image."""
app_name = "flask-tracing-k8s"

resources = {
"flask-app-image": test_tracing_flask_image,
}
app = await model.deploy(
build_charm, resources=resources, application_name=app_name, series="jammy"
)
await model.wait_for_idle(raise_on_blocked=True)
return app

async def deploy_and_configure_minio(ops_test: OpsTest) -> None:
"""Deploy and set up minio and s3-integrator needed for s3-like storage backend in the HA charms."""
config = {
"access-key": "accesskey",
"secret-key": "secretkey",
}
await ops_test.model.deploy("minio", channel="edge", trust=True, config=config)
await ops_test.model.wait_for_idle(
apps=["minio"], status="active", timeout=2000, idle_period=45
)
minio_addr = await unit_address(ops_test, "minio", 0)

mc_client = Minio(
f"{minio_addr}:9000",
access_key="accesskey",
secret_key="secretkey",
secure=False,
)

# create tempo bucket
found = mc_client.bucket_exists("tempo")
if not found:
mc_client.make_bucket("tempo")

# configure s3-integrator
s3_integrator_app: Application = ops_test.model.applications["s3-integrator"]
s3_integrator_leader: Unit = s3_integrator_app.units[0]

await s3_integrator_app.set_config(
{
"endpoint": f"minio-0.minio-endpoints.{ops_test.model.name}.svc.cluster.local:9000",
"bucket": "tempo",
}
)

action = await s3_integrator_leader.run_action("sync-s3-credentials", **config)
action_result = await action.wait()
assert action_result.status == "completed"

@pytest_asyncio.fixture(scope="module", name="tempo_app")
async def deploy_tempo_cluster(ops_test: OpsTest):
"""Deploys tempo in its HA version together with minio and s3-integrator."""
tempo_app = "tempo"
worker_app = "tempo-worker"
tempo_worker_charm_url, worker_channel = "tempo-worker-k8s", "edge"
tempo_coordinator_charm_url, coordinator_channel = "tempo-coordinator-k8s", "edge"
await ops_test.model.deploy(
tempo_worker_charm_url, application_name=worker_app, channel=worker_channel, trust=True
)
app = await ops_test.model.deploy(
tempo_coordinator_charm_url,
application_name=tempo_app,
channel=coordinator_channel,
trust=True,
)
await ops_test.model.deploy("s3-integrator", channel="edge")

await ops_test.model.integrate(tempo_app + ":s3", "s3-integrator" + ":s3-credentials")
await ops_test.model.integrate(tempo_app + ":tempo-cluster", worker_app + ":tempo-cluster")

await deploy_and_configure_minio(ops_test)
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(
apps=[tempo_app, worker_app, "s3-integrator"],
status="active",
timeout=2000,
idle_period=30,
# TODO: remove when https://github.com/canonical/tempo-coordinator-k8s-operator/issues/90 is fixed
raise_on_error=False,
)
return app

@pytest_asyncio.fixture(scope="module", name="traefik_app")
async def deploy_traefik_fixture(
model: Model,
Expand Down
1 change: 1 addition & 0 deletions tests/integration/flask/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
ops >= 1.5.0
pytest-operator >= 0.32.0
aiohttp == 3.11.7
tenacity
58 changes: 58 additions & 0 deletions tests/integration/flask/test_workload_tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

"""Integration tests for Flask workers and schedulers."""

import asyncio
import logging
import time

import aiohttp
import pytest
from juju.application import Application
from juju.model import Model
from pytest_operator.plugin import OpsTest

logger = logging.getLogger(__name__)


@pytest.mark.usefixtures("flask_tracing_app")
@pytest.mark.usefixtures("tempo_app")
async def test_workload_tracing(
ops_test: OpsTest,
model: Model,
flask_tracing_app: Application,
tempo_app: Application,
get_unit_ips,
):
"""
arrange: Flask is deployed with async enabled rock. Change gunicorn worker class.
act: Do 15 requests that would take 2 seconds each.
assert: All 15 requests should be served in under 3 seconds.
"""
await ops_test.model.integrate(f"{flask_tracing_app.name}:tracing", f"{tempo_app.name}:tracing")

await ops_test.model.wait_for_idle(
apps=[flask_tracing_app.name, tempo_app.name], status="active", timeout=300
)
# the flask unit is not important. Take the first one
flask_unit_ip = (await get_unit_ips(flask_tracing_app.name))[0]

async def _fetch_page(session):
params = {"duration": 2}
async with session.get(f"http://{flask_unit_ip}:8000", params=params) as response:
return await response.text()

async with aiohttp.ClientSession() as session:
page = _fetch_page(session)
await asyncio.gather([page])

print("--------------------------")
print(f"{flask_tracing_app.name}-app")
print("--------------------------")
# verify workload traces are ingested into Tempo
assert await get_traces_patiently(
await get_application_ip(ops_test, tempo_app.name),
service_name=f"{flask_tracing_app.name}-app",
tls=False,
)
26 changes: 26 additions & 0 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import pathlib
import uuid
import zipfile
import requests
from tenacity import retry, stop_after_attempt, wait_exponential

import yaml

Expand Down Expand Up @@ -44,3 +46,27 @@ def inject_charm_config(charm: pathlib.Path | str, config: dict, tmp_dir: pathli
with open(charm, "wb") as new_charm_file:
new_charm_file.write(new_charm.getvalue())
return str(charm)


def get_traces(tempo_host: str, service_name: str, tls=False):
"""Get traces directly from Tempo REST API."""
url = f"{'https' if tls else 'http'}://{tempo_host}:3200/api/search?tags=service.name={service_name}"
req = requests.get(
url,
verify=False,
)
assert req.status_code == 200
traces = json.loads(req.text)["traces"]
return traces


@retry(stop=stop_after_attempt(15), wait=wait_exponential(multiplier=1, min=4, max=10))
async def get_traces_patiently(tempo_host, service_name="tracegen-otlp_http", tls=False):
"""Get traces directly from Tempo REST API, but also try multiple times.
Useful for cases when Tempo might not return the traces immediately (its API is known for returning data in
random order).
"""
traces = get_traces(tempo_host, service_name=service_name, tls=tls)
assert len(traces) > 0
return traces

0 comments on commit 1a44207

Please sign in to comment.