Skip to content

Commit

Permalink
Refactor testagent fixture
Browse files Browse the repository at this point in the history
Pull it out of the snapshot integration module so we can reuse it for
the client tests.
  • Loading branch information
Kyle-Verhoog committed Feb 17, 2024
1 parent 07b485b commit 7b8c965
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 93 deletions.
62 changes: 60 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import asyncio
import json
import os
from pathlib import Path
import random
import socket
import subprocess
from typing import AsyncGenerator
from typing import Awaitable
from typing import Dict
from typing import Generator
Expand All @@ -11,6 +15,9 @@
from typing import Set
from typing import cast

import aiohttp
from aiohttp.client_exceptions import ClientConnectorError
from aiohttp.client_exceptions import ClientOSError
from aiohttp.web import Response
from ddsketch import LogCollapsingLowestDenseDDSketch
from ddsketch.pb.proto import DDSketchProto
Expand Down Expand Up @@ -519,9 +526,60 @@ def fn(token: Optional[str] = None) -> Awaitable[Response]:


@pytest.fixture
def available_port():
def available_port() -> str:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 0)) # Bind to a free port provided by the host.
port = s.getsockname()[1] # Get the port number assigned.
s.close() # Release the socket.
yield port
return str(port)


@pytest.fixture
def testagent_port(available_port: str) -> str:
return available_port


@pytest.fixture
def testagent_url(testagent_port: str) -> str:
return "http://localhost:%s" % testagent_port


@pytest.fixture(scope="module")
def testagent_snapshot_ci_mode() -> bool:
# Default all tests in this module to be run in CI mode
# unless a special env var is passed to make generating
# the snapshots easier.
return os.getenv("GENERATE_SNAPSHOTS") != "1"


@pytest.fixture
async def testagent(
loop: asyncio.BaseEventLoop, testagent_port: str, testagent_snapshot_ci_mode: bool
) -> AsyncGenerator[aiohttp.ClientSession, None]:
env = os.environ.copy()
env.update(
{
"PORT": testagent_port,
"SNAPSHOT_CI": "1" if testagent_snapshot_ci_mode else "0",
"SNAPSHOT_DIR": os.path.join(os.path.dirname(__file__), "integration_snapshots"),
}
)
p = subprocess.Popen(["ddapm-test-agent"], env=env)

# Wait for server to start
try:
async with aiohttp.ClientSession() as session:
for _ in range(100):
try:
r = await session.get(f"http://localhost:{testagent_port}")
except (ClientConnectorError, ClientOSError):
pass
else:
if r.status == 404:
break
await asyncio.sleep(0.05)
else:
assert 0
yield session
finally:
p.terminate()
121 changes: 30 additions & 91 deletions tests/test_snapshot_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,72 +6,18 @@
import asyncio
import os
import subprocess
from typing import AsyncGenerator
from typing import Callable
from typing import Dict
from typing import Generator

import aiohttp
from aiohttp.client_exceptions import ClientConnectorError
from aiohttp.client_exceptions import ClientOSError
from ddtrace import Tracer
from ddtrace.profiling import Profiler
from ddtrace.propagation.http import HTTPPropagator
from ddtrace.sampler import DatadogSampler
import pytest


@pytest.fixture
def testagent_port() -> str:
return "8126"


@pytest.fixture
def testagent_url(testagent_port: str) -> str:
return "http://localhost:%s" % testagent_port


@pytest.fixture(scope="module")
def testagent_snapshot_ci_mode() -> bool:
# Default all tests in this module to be run in CI mode
# unless a special env var is passed to make generating
# the snapshots easier.
return os.getenv("GENERATE_SNAPSHOTS") != "1"


@pytest.fixture
async def testagent(
loop: asyncio.BaseEventLoop, testagent_port: str, testagent_snapshot_ci_mode: bool
) -> AsyncGenerator[aiohttp.ClientSession, None]:
env = os.environ.copy()
env.update(
{
"PORT": testagent_port,
"SNAPSHOT_CI": "1" if testagent_snapshot_ci_mode else "0",
"SNAPSHOT_DIR": os.path.join(os.path.dirname(__file__), "integration_snapshots"),
}
)
p = subprocess.Popen(["ddapm-test-agent"], env=env)

# Wait for server to start
try:
async with aiohttp.ClientSession() as session:
for _ in range(100):
try:
r = await session.get(f"http://localhost:{testagent_port}")
except (ClientConnectorError, ClientOSError):
pass
else:
if r.status == 404:
break
await asyncio.sleep(0.05)
else:
assert 0
yield session
finally:
p.terminate()


@pytest.fixture
def tracer(testagent_port: str, testagent: aiohttp.ClientSession) -> Tracer:
tracer = Tracer(url=f"http://localhost:{testagent_port}")
Expand Down Expand Up @@ -129,6 +75,7 @@ def stats_tracer(tracer: Tracer, trace_sample_rate: float) -> Generator[Tracer,
)
async def test_single_trace(
testagent,
testagent_url,
tracer,
operation_name,
service,
Expand All @@ -139,8 +86,8 @@ async def test_single_trace(
metrics,
response_code,
):
await testagent.get("http://localhost:8126/test/session/start?test_session_token=test_single_trace")
tracer = Tracer(url="http://localhost:8126")
await testagent.get(f"{testagent_url}/test/session/start?test_session_token=test_single_trace")
tracer = Tracer(url=testagent_url)
with tracer.trace(operation_name, service=service, resource=resource, span_type=span_type) as span:
if error is not None:
span.error = error
Expand All @@ -149,50 +96,48 @@ async def test_single_trace(
for k, v in metrics.items():
span.set_metric(k, v)
tracer.shutdown()
resp = await testagent.get("http://localhost:8126/test/session/snapshot?test_session_token=test_single_trace")
resp = await testagent.get(f"{testagent_url}/test/session/snapshot?test_session_token=test_single_trace")

assert resp.status == response_code


async def test_multi_trace(testagent, tracer):
await testagent.get("http://localhost:8126/test/session/start?test_session_token=test_multi_trace")
async def test_multi_trace(testagent_url, testagent, tracer):
await testagent.get(f"{testagent_url}/test/session/start?test_session_token=test_multi_trace")
with tracer.trace("root0"):
with tracer.trace("child0"):
pass
with tracer.trace("root1"):
with tracer.trace("child1"):
pass
tracer.flush()
resp = await testagent.get("http://localhost:8126/test/session/snapshot?test_session_token=test_multi_trace")
resp = await testagent.get(f"{testagent_url}/test/session/snapshot?test_session_token=test_multi_trace")
assert resp.status == 200

# Run the snapshot test again.
await testagent.get("http://localhost:8126/test/session/start?test_session_token=test_multi_trace")
await testagent.get(f"{testagent_url}/test/session/start?test_session_token=test_multi_trace")
with tracer.trace("root0"):
with tracer.trace("child0"):
pass
with tracer.trace("root1"):
with tracer.trace("child1"):
pass
tracer.flush()
resp = await testagent.get("http://localhost:8126/test/session/snapshot?test_session_token=test_multi_trace")
resp = await testagent.get(f"{testagent_url}/test/session/snapshot?test_session_token=test_multi_trace")
assert resp.status == 200

# Simulate a failed snapshot with a missing trace.
await testagent.get("http://localhost:8126/test/session/start?test_session_token=test_multi_trace")
await testagent.get(f"{testagent_url}/test/session/start?test_session_token=test_multi_trace")
with tracer.trace("root0"):
with tracer.trace("child0"):
pass
tracer.flush()
resp = await testagent.get("http://localhost:8126/test/session/snapshot?test_session_token=test_multi_trace")
resp = await testagent.get(f"{testagent_url}/test/session/snapshot?test_session_token=test_multi_trace")
assert resp.status == 400
tracer.shutdown()


async def test_trace_distributed_same_payload(testagent, tracer):
await testagent.get(
"http://localhost:8126/test/session/start?test_session_token=test_trace_distributed_same_payload"
)
async def test_trace_distributed_same_payload(testagent_url, testagent, tracer):
await testagent.get(f"{testagent_url}/test/session/start?test_session_token=test_trace_distributed_same_payload")
with tracer.trace("root0"):
with tracer.trace("child0") as span:
ctx = span.context
Expand All @@ -203,13 +148,13 @@ async def test_trace_distributed_same_payload(testagent, tracer):
pass
tracer.flush()
resp = await testagent.get(
"http://localhost:8126/test/session/snapshot?test_session_token=test_trace_distributed_same_payload"
f"{testagent_url}/test/session/snapshot?test_session_token=test_trace_distributed_same_payload"
)
assert resp.status == 200


async def test_trace_distributed_propagated(testagent, tracer):
await testagent.get("http://localhost:8126/test/session/start?test_session_token=test_trace_distributed_propagated")
async def test_trace_distributed_propagated(testagent_url, testagent, tracer):
await testagent.get(f"{testagent_url}/test/session/start?test_session_token=test_trace_distributed_propagated")
headers = {
"x-datadog-trace-id": "1234",
"x-datadog-parent-id": "5678",
Expand All @@ -222,34 +167,26 @@ async def test_trace_distributed_propagated(testagent, tracer):
pass
tracer.flush()
resp = await testagent.get(
"http://localhost:8126/test/session/snapshot?test_session_token=test_trace_distributed_propagated"
f"{testagent_url}/test/session/snapshot?test_session_token=test_trace_distributed_propagated"
)
assert resp.status == 200


async def test_trace_missing_received(testagent, tracer):
resp = await testagent.get(
"http://localhost:8126/test/session/start?test_session_token=test_trace_missing_received"
)
async def test_trace_missing_received(testagent_url, testagent, tracer):
resp = await testagent.get(f"{testagent_url}/test/session/start?test_session_token=test_trace_missing_received")
assert resp.status == 200, await resp.text()

with tracer.trace("root0"):
with tracer.trace("child0"):
pass
tracer.flush()
resp = await testagent.get(
"http://localhost:8126/test/session/snapshot?test_session_token=test_trace_missing_received"
)
resp = await testagent.get(f"{testagent_url}/test/session/snapshot?test_session_token=test_trace_missing_received")
assert resp.status == 200

# Do another snapshot without sending any traces.
resp = await testagent.get(
"http://localhost:8126/test/session/start?test_session_token=test_trace_missing_received"
)
resp = await testagent.get(f"{testagent_url}/test/session/start?test_session_token=test_trace_missing_received")
assert resp.status == 200, await resp.text()
resp = await testagent.get(
"http://localhost:8126/test/session/snapshot?test_session_token=test_trace_missing_received"
)
resp = await testagent.get(f"{testagent_url}/test/session/snapshot?test_session_token=test_trace_missing_received")
assert resp.status == 400


Expand Down Expand Up @@ -293,29 +230,31 @@ def _tracestats_traces_extra_trace(tracer: Tracer) -> None:
],
)
async def test_tracestats(
testagent_url: str,
testagent: aiohttp.ClientSession,
stats_tracer: Tracer,
testagent_snapshot_ci_mode: bool,
trace_sample_rate: float,
do_traces: Callable[[Tracer], None],
fail: bool,
) -> None:
await testagent.get("http://localhost:8126/test/session/start?test_session_token=test_trace_stats")
await testagent.get(f"{testagent_url}/test/session/start?test_session_token=test_trace_stats")
do_traces(stats_tracer)
stats_tracer.shutdown() # force out the stats
resp = await testagent.get("http://localhost:8126/test/session/snapshot?test_session_token=test_trace_stats")
resp = await testagent.get(f"{testagent_url}/test/session/snapshot?test_session_token=test_trace_stats")
if fail:
assert resp.status == 400
else:
assert resp.status == 200


async def test_cmd(testagent: aiohttp.ClientSession, tracer: Tracer) -> None:
async def test_cmd(testagent_url: str, testagent: aiohttp.ClientSession, tracer: Tracer) -> None:
"""Test the commands provided with the library.
Note that this test reuses the trace/snapshot from test_single_trace above.
"""
env = os.environ.copy()
env["DD_TRACE_AGENT_URL"] = testagent_url
p = subprocess.run(
["ddapm-test-agent-session-start", "--test-session-token=test_single_trace"],
env=env,
Expand All @@ -340,11 +279,11 @@ async def test_cmd(testagent: aiohttp.ClientSession, tracer: Tracer) -> None:
assert p.returncode == 1


async def test_profiling_endpoint(testagent: aiohttp.ClientSession, testagent_port: int) -> None:
p = Profiler(url="http://localhost:%s" % testagent_port)
async def test_profiling_endpoint(testagent_url: str, testagent: aiohttp.ClientSession) -> None:
p = Profiler(url=testagent_url)
p.start()
p.stop(flush=True)
resp = await testagent.get("http://localhost:8126/test/session/requests")
resp = await testagent.get(f"{testagent_url}/test/session/requests")
assert resp.status == 200
data = await resp.json()
assert len(data) >= 1
Expand Down

0 comments on commit 7b8c965

Please sign in to comment.