diff --git a/tests/conftest.py b/tests/conftest.py index 34e5113..2e1cc88 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,11 @@ +import asyncio import json +import os from pathlib import Path import random import socket +import subprocess +from typing import AsyncGenerator from typing import Awaitable from typing import Dict from typing import Generator @@ -11,6 +15,9 @@ from typing import Set from typing import cast +import aiohttp +from aiohttp.client_exceptions import ClientConnectorError +from aiohttp.client_exceptions import ClientOSError from aiohttp.web import Response from ddsketch import LogCollapsingLowestDenseDDSketch from ddsketch.pb.proto import DDSketchProto @@ -519,9 +526,60 @@ def fn(token: Optional[str] = None) -> Awaitable[Response]: @pytest.fixture -def available_port(): +def available_port() -> str: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(("", 0)) # Bind to a free port provided by the host. port = s.getsockname()[1] # Get the port number assigned. s.close() # Release the socket. - yield port + return str(port) + + +@pytest.fixture +def testagent_port(available_port: str) -> str: + return available_port + + +@pytest.fixture +def testagent_url(testagent_port: str) -> str: + return "http://localhost:%s" % testagent_port + + +@pytest.fixture(scope="module") +def testagent_snapshot_ci_mode() -> bool: + # Default all tests in this module to be run in CI mode + # unless a special env var is passed to make generating + # the snapshots easier. + return os.getenv("GENERATE_SNAPSHOTS") != "1" + + +@pytest.fixture +async def testagent( + loop: asyncio.BaseEventLoop, testagent_port: str, testagent_snapshot_ci_mode: bool +) -> AsyncGenerator[aiohttp.ClientSession, None]: + env = os.environ.copy() + env.update( + { + "PORT": testagent_port, + "SNAPSHOT_CI": "1" if testagent_snapshot_ci_mode else "0", + "SNAPSHOT_DIR": os.path.join(os.path.dirname(__file__), "integration_snapshots"), + } + ) + p = subprocess.Popen(["ddapm-test-agent"], env=env) + + # Wait for server to start + try: + async with aiohttp.ClientSession() as session: + for _ in range(100): + try: + r = await session.get(f"http://localhost:{testagent_port}") + except (ClientConnectorError, ClientOSError): + pass + else: + if r.status == 404: + break + await asyncio.sleep(0.05) + else: + assert 0 + yield session + finally: + p.terminate() diff --git a/tests/test_snapshot_integration.py b/tests/test_snapshot_integration.py index 216497f..77e2aa5 100644 --- a/tests/test_snapshot_integration.py +++ b/tests/test_snapshot_integration.py @@ -6,14 +6,11 @@ import asyncio import os import subprocess -from typing import AsyncGenerator from typing import Callable from typing import Dict from typing import Generator import aiohttp -from aiohttp.client_exceptions import ClientConnectorError -from aiohttp.client_exceptions import ClientOSError from ddtrace import Tracer from ddtrace.profiling import Profiler from ddtrace.propagation.http import HTTPPropagator @@ -21,57 +18,6 @@ import pytest -@pytest.fixture -def testagent_port() -> str: - return "8126" - - -@pytest.fixture -def testagent_url(testagent_port: str) -> str: - return "http://localhost:%s" % testagent_port - - -@pytest.fixture(scope="module") -def testagent_snapshot_ci_mode() -> bool: - # Default all tests in this module to be run in CI mode - # unless a special env var is passed to make generating - # the snapshots easier. - return os.getenv("GENERATE_SNAPSHOTS") != "1" - - -@pytest.fixture -async def testagent( - loop: asyncio.BaseEventLoop, testagent_port: str, testagent_snapshot_ci_mode: bool -) -> AsyncGenerator[aiohttp.ClientSession, None]: - env = os.environ.copy() - env.update( - { - "PORT": testagent_port, - "SNAPSHOT_CI": "1" if testagent_snapshot_ci_mode else "0", - "SNAPSHOT_DIR": os.path.join(os.path.dirname(__file__), "integration_snapshots"), - } - ) - p = subprocess.Popen(["ddapm-test-agent"], env=env) - - # Wait for server to start - try: - async with aiohttp.ClientSession() as session: - for _ in range(100): - try: - r = await session.get(f"http://localhost:{testagent_port}") - except (ClientConnectorError, ClientOSError): - pass - else: - if r.status == 404: - break - await asyncio.sleep(0.05) - else: - assert 0 - yield session - finally: - p.terminate() - - @pytest.fixture def tracer(testagent_port: str, testagent: aiohttp.ClientSession) -> Tracer: tracer = Tracer(url=f"http://localhost:{testagent_port}") @@ -129,6 +75,7 @@ def stats_tracer(tracer: Tracer, trace_sample_rate: float) -> Generator[Tracer, ) async def test_single_trace( testagent, + testagent_url, tracer, operation_name, service, @@ -139,8 +86,8 @@ async def test_single_trace( metrics, response_code, ): - await testagent.get("http://localhost:8126/test/session/start?test_session_token=test_single_trace") - tracer = Tracer(url="http://localhost:8126") + await testagent.get(f"{testagent_url}/test/session/start?test_session_token=test_single_trace") + tracer = Tracer(url=testagent_url) with tracer.trace(operation_name, service=service, resource=resource, span_type=span_type) as span: if error is not None: span.error = error @@ -149,13 +96,13 @@ async def test_single_trace( for k, v in metrics.items(): span.set_metric(k, v) tracer.shutdown() - resp = await testagent.get("http://localhost:8126/test/session/snapshot?test_session_token=test_single_trace") + resp = await testagent.get(f"{testagent_url}/test/session/snapshot?test_session_token=test_single_trace") assert resp.status == response_code -async def test_multi_trace(testagent, tracer): - await testagent.get("http://localhost:8126/test/session/start?test_session_token=test_multi_trace") +async def test_multi_trace(testagent_url, testagent, tracer): + await testagent.get(f"{testagent_url}/test/session/start?test_session_token=test_multi_trace") with tracer.trace("root0"): with tracer.trace("child0"): pass @@ -163,11 +110,11 @@ async def test_multi_trace(testagent, tracer): with tracer.trace("child1"): pass tracer.flush() - resp = await testagent.get("http://localhost:8126/test/session/snapshot?test_session_token=test_multi_trace") + resp = await testagent.get(f"{testagent_url}/test/session/snapshot?test_session_token=test_multi_trace") assert resp.status == 200 # Run the snapshot test again. - await testagent.get("http://localhost:8126/test/session/start?test_session_token=test_multi_trace") + await testagent.get(f"{testagent_url}/test/session/start?test_session_token=test_multi_trace") with tracer.trace("root0"): with tracer.trace("child0"): pass @@ -175,24 +122,22 @@ async def test_multi_trace(testagent, tracer): with tracer.trace("child1"): pass tracer.flush() - resp = await testagent.get("http://localhost:8126/test/session/snapshot?test_session_token=test_multi_trace") + resp = await testagent.get(f"{testagent_url}/test/session/snapshot?test_session_token=test_multi_trace") assert resp.status == 200 # Simulate a failed snapshot with a missing trace. - await testagent.get("http://localhost:8126/test/session/start?test_session_token=test_multi_trace") + await testagent.get(f"{testagent_url}/test/session/start?test_session_token=test_multi_trace") with tracer.trace("root0"): with tracer.trace("child0"): pass tracer.flush() - resp = await testagent.get("http://localhost:8126/test/session/snapshot?test_session_token=test_multi_trace") + resp = await testagent.get(f"{testagent_url}/test/session/snapshot?test_session_token=test_multi_trace") assert resp.status == 400 tracer.shutdown() -async def test_trace_distributed_same_payload(testagent, tracer): - await testagent.get( - "http://localhost:8126/test/session/start?test_session_token=test_trace_distributed_same_payload" - ) +async def test_trace_distributed_same_payload(testagent_url, testagent, tracer): + await testagent.get(f"{testagent_url}/test/session/start?test_session_token=test_trace_distributed_same_payload") with tracer.trace("root0"): with tracer.trace("child0") as span: ctx = span.context @@ -203,13 +148,13 @@ async def test_trace_distributed_same_payload(testagent, tracer): pass tracer.flush() resp = await testagent.get( - "http://localhost:8126/test/session/snapshot?test_session_token=test_trace_distributed_same_payload" + f"{testagent_url}/test/session/snapshot?test_session_token=test_trace_distributed_same_payload" ) assert resp.status == 200 -async def test_trace_distributed_propagated(testagent, tracer): - await testagent.get("http://localhost:8126/test/session/start?test_session_token=test_trace_distributed_propagated") +async def test_trace_distributed_propagated(testagent_url, testagent, tracer): + await testagent.get(f"{testagent_url}/test/session/start?test_session_token=test_trace_distributed_propagated") headers = { "x-datadog-trace-id": "1234", "x-datadog-parent-id": "5678", @@ -222,34 +167,26 @@ async def test_trace_distributed_propagated(testagent, tracer): pass tracer.flush() resp = await testagent.get( - "http://localhost:8126/test/session/snapshot?test_session_token=test_trace_distributed_propagated" + f"{testagent_url}/test/session/snapshot?test_session_token=test_trace_distributed_propagated" ) assert resp.status == 200 -async def test_trace_missing_received(testagent, tracer): - resp = await testagent.get( - "http://localhost:8126/test/session/start?test_session_token=test_trace_missing_received" - ) +async def test_trace_missing_received(testagent_url, testagent, tracer): + resp = await testagent.get(f"{testagent_url}/test/session/start?test_session_token=test_trace_missing_received") assert resp.status == 200, await resp.text() with tracer.trace("root0"): with tracer.trace("child0"): pass tracer.flush() - resp = await testagent.get( - "http://localhost:8126/test/session/snapshot?test_session_token=test_trace_missing_received" - ) + resp = await testagent.get(f"{testagent_url}/test/session/snapshot?test_session_token=test_trace_missing_received") assert resp.status == 200 # Do another snapshot without sending any traces. - resp = await testagent.get( - "http://localhost:8126/test/session/start?test_session_token=test_trace_missing_received" - ) + resp = await testagent.get(f"{testagent_url}/test/session/start?test_session_token=test_trace_missing_received") assert resp.status == 200, await resp.text() - resp = await testagent.get( - "http://localhost:8126/test/session/snapshot?test_session_token=test_trace_missing_received" - ) + resp = await testagent.get(f"{testagent_url}/test/session/snapshot?test_session_token=test_trace_missing_received") assert resp.status == 400 @@ -293,6 +230,7 @@ def _tracestats_traces_extra_trace(tracer: Tracer) -> None: ], ) async def test_tracestats( + testagent_url: str, testagent: aiohttp.ClientSession, stats_tracer: Tracer, testagent_snapshot_ci_mode: bool, @@ -300,22 +238,23 @@ async def test_tracestats( do_traces: Callable[[Tracer], None], fail: bool, ) -> None: - await testagent.get("http://localhost:8126/test/session/start?test_session_token=test_trace_stats") + await testagent.get(f"{testagent_url}/test/session/start?test_session_token=test_trace_stats") do_traces(stats_tracer) stats_tracer.shutdown() # force out the stats - resp = await testagent.get("http://localhost:8126/test/session/snapshot?test_session_token=test_trace_stats") + resp = await testagent.get(f"{testagent_url}/test/session/snapshot?test_session_token=test_trace_stats") if fail: assert resp.status == 400 else: assert resp.status == 200 -async def test_cmd(testagent: aiohttp.ClientSession, tracer: Tracer) -> None: +async def test_cmd(testagent_url: str, testagent: aiohttp.ClientSession, tracer: Tracer) -> None: """Test the commands provided with the library. Note that this test reuses the trace/snapshot from test_single_trace above. """ env = os.environ.copy() + env["DD_TRACE_AGENT_URL"] = testagent_url p = subprocess.run( ["ddapm-test-agent-session-start", "--test-session-token=test_single_trace"], env=env, @@ -340,11 +279,11 @@ async def test_cmd(testagent: aiohttp.ClientSession, tracer: Tracer) -> None: assert p.returncode == 1 -async def test_profiling_endpoint(testagent: aiohttp.ClientSession, testagent_port: int) -> None: - p = Profiler(url="http://localhost:%s" % testagent_port) +async def test_profiling_endpoint(testagent_url: str, testagent: aiohttp.ClientSession) -> None: + p = Profiler(url=testagent_url) p.start() p.stop(flush=True) - resp = await testagent.get("http://localhost:8126/test/session/requests") + resp = await testagent.get(f"{testagent_url}/test/session/requests") assert resp.status == 200 data = await resp.json() assert len(data) >= 1