Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

regression-test: automatically fetch connection candidates #37384

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airbyte-ci/connectors/live-tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ The traffic recorded on the control connector is passed to the target connector

## Changelog

### 0.15.0
Automatic retrieval of connection objects for regression tests. The connection id is not required anymore.

### 0.14.2
Fix KeyError when target & control streams differ.

Expand Down
834 changes: 643 additions & 191 deletions airbyte-ci/connectors/live-tests/poetry.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions airbyte-ci/connectors/live-tests/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "live-tests"
version = "0.14.2"
version = "0.15.0"
description = "Contains utilities for testing connectors against live data."
authors = ["Airbyte <contact@airbyte.io>"]
license = "MIT"
Expand All @@ -26,7 +26,6 @@ pytest = "^8.1.1"
pydash = "~=7.0.7"
docker = ">=6,<7"
asyncclick = "^8.1.7.1"
# TODO: when this is open-sourced, don't require connection-retriever
connection-retriever = {git = "git@github.com:airbytehq/airbyte-platform-internal", subdirectory = "tools/connection-retriever"}
duckdb = "^0.10.0"
pandas = "^2.2.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@

import json
import logging
import os
from pathlib import Path
from typing import Dict, Optional, Set

import rich
from connection_retriever import ConnectionObject, retrieve_objects # type: ignore
from connection_retriever.errors import NotPermittedError # type: ignore

from .models import AirbyteCatalog, Command, ConfiguredAirbyteCatalog, ConnectionObjects, SecretDict

LOGGER = logging.getLogger(__name__)
console = rich.get_console()


def parse_config(config: Dict | str | None) -> Optional[SecretDict]:
Expand All @@ -32,14 +35,17 @@ def parse_catalog(catalog: Dict | str | None) -> Optional[AirbyteCatalog]:


def parse_configured_catalog(
configured_catalog: Dict | str | None,
configured_catalog: Dict | str | None, selected_streams: Set[str] | None = None
) -> Optional[ConfiguredAirbyteCatalog]:
if not configured_catalog:
return None
if isinstance(configured_catalog, str):
return ConfiguredAirbyteCatalog.parse_obj(json.loads(configured_catalog))
catalog = ConfiguredAirbyteCatalog.parse_obj(json.loads(configured_catalog))
else:
return ConfiguredAirbyteCatalog.parse_obj(configured_catalog)
catalog = ConfiguredAirbyteCatalog.parse_obj(configured_catalog)
if selected_streams:
return ConfiguredAirbyteCatalog(streams=[stream for stream in catalog.streams if stream.stream.name in selected_streams])
return catalog


def parse_state(state: Dict | str | None) -> Optional[Dict]:
Expand All @@ -59,8 +65,8 @@ def get_state_from_path(state_path: Path) -> Optional[Dict]:
return parse_state(state_path.read_text())


def get_configured_catalog_from_path(path: Path) -> Optional[ConfiguredAirbyteCatalog]:
return parse_configured_catalog(path.read_text())
def get_configured_catalog_from_path(path: Path, selected_streams: Optional[Set[str]] = None) -> Optional[ConfiguredAirbyteCatalog]:
return parse_configured_catalog(path.read_text(), selected_streams)


COMMAND_TO_REQUIRED_OBJECT_TYPES = {
Expand All @@ -85,6 +91,8 @@ def get_connection_objects(
retrieval_reason: Optional[str],
fail_if_missing_objects: bool = True,
connector_image: Optional[str] = None,
auto_select_connection: bool = False,
selected_streams: Optional[Set[str]] = None,
) -> ConnectionObjects:
"""This function retrieves the connection objects values.
It checks that the required objects are available and raises a UsageError if they are not.
Expand All @@ -100,18 +108,26 @@ def get_connection_objects(
retrieval_reason (Optional[str]): The reason to access the connection objects.
fail_if_missing_objects (bool, optional): Whether to raise a ValueError if a required object is missing. Defaults to True.
connector_image (Optional[str]): The image name for the connector under test.
auto_select_connection (bool, optional): Whether to automatically select a connection if no connection id is passed. Defaults to False.
selected_streams (Optional[Set[str]]): The set of selected streams to use when auto selecting a connection.
Raises:
click.UsageError: If a required object is missing for the command.
click.UsageError: If a retrieval reason is missing when passing a connection id.
Returns:
ConnectionObjects: The connection objects values.
"""
if connection_id is None and not auto_select_connection:
raise ValueError("A connection id or auto_select_connection must be provided to retrieve the connection objects.")
if auto_select_connection and not connector_image:
raise ValueError("A connector image must be provided when using auto_select_connection.")

custom_config = get_connector_config_from_path(custom_config_path) if custom_config_path else None
custom_configured_catalog = get_configured_catalog_from_path(custom_configured_catalog_path) if custom_configured_catalog_path else None
custom_configured_catalog = (
get_configured_catalog_from_path(custom_configured_catalog_path, selected_streams) if custom_configured_catalog_path else None
)
custom_state = get_state_from_path(custom_state_path) if custom_state_path else None

if not connection_id:
if not connection_id and not auto_select_connection:
connection_object = ConnectionObjects(
source_config=custom_config,
destination_config=custom_config,
Expand All @@ -121,15 +137,35 @@ def get_connection_objects(
workspace_id=None,
source_id=None,
destination_id=None,
connection_id=None,
source_docker_image=None,
)
else:
if not retrieval_reason:
raise ValueError("A retrieval reason is required to access the connection objects when passing a connection id.")
retrieved_objects = retrieve_objects(connection_id, requested_objects, retrieval_reason=retrieval_reason)
LOGGER.info("Retrieving connection objects from the database...")
if auto_select_connection:
is_ci = os.getenv("CI", False)
connection_id, retrieved_objects = retrieve_objects(
requested_objects,
retrieval_reason=retrieval_reason,
source_docker_repository=connector_image,
prompt_for_connection_selection=not is_ci,
with_streams=selected_streams,
)
else:
connection_id, retrieved_objects = retrieve_objects(
requested_objects,
retrieval_reason=retrieval_reason,
connection_id=connection_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this should also include selected_streams.

with_streams=selected_streams,
)
retrieved_source_config = parse_config(retrieved_objects.get(ConnectionObject.SOURCE_CONFIG))
rerieved_destination_config = parse_config(retrieved_objects.get(ConnectionObject.DESTINATION_CONFIG))
retrieved_catalog = parse_catalog(retrieved_objects.get(ConnectionObject.CATALOG))
retrieved_configured_catalog = parse_configured_catalog(retrieved_objects.get(ConnectionObject.CONFIGURED_CATALOG))
retrieved_configured_catalog = parse_configured_catalog(
retrieved_objects.get(ConnectionObject.CONFIGURED_CATALOG), selected_streams
)
retrieved_state = parse_state(retrieved_objects.get(ConnectionObject.STATE))

retrieved_source_docker_image = retrieved_objects.get(ConnectionObject.SOURCE_DOCKER_IMAGE)
Expand All @@ -149,6 +185,8 @@ def get_connection_objects(
workspace_id=retrieved_objects.get(ConnectionObject.WORKSPACE_ID),
source_id=retrieved_objects.get(ConnectionObject.SOURCE_ID),
destination_id=retrieved_objects.get(ConnectionObject.DESTINATION_ID),
source_docker_image=retrieved_source_docker_image,
connection_id=connection_id,
)
if fail_if_missing_objects:
if not connection_object.source_config and ConnectionObject.SOURCE_CONFIG in requested_objects:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import _collections_abc
import dagger
import requests

# type: ignore
from airbyte_protocol.models import AirbyteCatalog, AirbyteMessage, ConfiguredAirbyteCatalog # type: ignore
from airbyte_protocol.models import Type as AirbyteMessageType
from genson import SchemaBuilder # type: ignore
Expand Down Expand Up @@ -429,3 +431,5 @@ class ConnectionObjects:
workspace_id: Optional[str]
source_id: Optional[str]
destination_id: Optional[str]
source_docker_image: Optional[str]
connection_id: Optional[str]
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
import webbrowser
from pathlib import Path
from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Callable, Dict, Generator, Iterable, List, Optional
from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Callable, Dict, Generator, Iterable, List, Optional, Set

import dagger
import pytest
Expand Down Expand Up @@ -56,26 +56,30 @@ def pytest_addoption(parser: Parser) -> None:
)
parser.addoption(
"--control-version",
default="latest",
help="The control version used for regression testing. Defaults to latest",
help="The control version used for regression testing.",
)
parser.addoption(
"--target-version",
default="dev",
help="The target version used for regression testing. Defaults to latest",
help="The target version used for regression testing. Defaults to dev.",
)
parser.addoption("--config-path")
parser.addoption("--catalog-path")
parser.addoption("--state-path")
parser.addoption("--connection-id")
parser.addoption(
"--auto-select-connection",
default=True,
help="Automatically select the connection to run the tests on.",
)
parser.addoption("--pr-url", help="The URL of the PR you are testing")
parser.addoption("--stream", help="The stream to run the tests on. (Can be used multiple times)", action="append")


def pytest_configure(config: Config) -> None:
user_email = get_user_email()
prompt_for_confirmation(user_email)
track_usage(user_email, vars(config.option))

config.stash[stash_keys.AIRBYTE_API_KEY] = get_airbyte_api_key()
config.stash[stash_keys.USER] = user_email
start_timestamp = int(time.time())
Expand All @@ -91,18 +95,16 @@ def pytest_configure(config: Config) -> None:
dagger_log_path.touch()
config.stash[stash_keys.DAGGER_LOG_PATH] = dagger_log_path
config.stash[stash_keys.PR_URL] = get_option_or_fail(config, "--pr-url")
config.stash[stash_keys.CONNECTION_ID] = get_option_or_fail(config, "--connection-id")

config.stash[stash_keys.AUTO_SELECT_CONNECTION] = config.getoption("--auto-select-connection")
config.stash[stash_keys.CONNECTOR_IMAGE] = get_option_or_fail(config, "--connector-image")
config.stash[stash_keys.CONTROL_VERSION] = get_option_or_fail(config, "--control-version")
config.stash[stash_keys.TARGET_VERSION] = get_option_or_fail(config, "--target-version")
if config.stash[stash_keys.CONTROL_VERSION] == config.stash[stash_keys.TARGET_VERSION]:
pytest.exit(f"Control and target versions are the same: {control_version}. Please provide different versions.")
custom_source_config_path = config.getoption("--config-path")
custom_configured_catalog_path = config.getoption("--catalog-path")
custom_state_path = config.getoption("--state-path")
config.stash[stash_keys.SELECTED_STREAMS] = set(config.getoption("--stream") or [])

config.stash[stash_keys.SHOULD_READ_WITH_STATE] = prompt_for_read_with_or_without_state()
retrieval_reason = f"Running regression tests on connection {config.stash[stash_keys.CONNECTION_ID]} for connector {config.stash[stash_keys.CONNECTOR_IMAGE]} on the control ({config.stash[stash_keys.CONTROL_VERSION]}) and target versions ({config.stash[stash_keys.TARGET_VERSION]})."
retrieval_reason = f"Running regression tests on connection for connector {config.stash[stash_keys.CONNECTOR_IMAGE]} on target versions ({config.stash[stash_keys.TARGET_VERSION]})."
try:
config.stash[stash_keys.CONNECTION_OBJECTS] = get_connection_objects(
{
Expand All @@ -115,18 +117,30 @@ def pytest_configure(config: Config) -> None:
ConnectionObject.SOURCE_ID,
ConnectionObject.DESTINATION_ID,
},
config.stash[stash_keys.CONNECTION_ID],
config.getoption("--connection-id"),
Path(custom_source_config_path) if custom_source_config_path else None,
Path(custom_configured_catalog_path) if custom_configured_catalog_path else None,
Path(custom_state_path) if custom_state_path else None,
retrieval_reason,
fail_if_missing_objects=False,
connector_image=config.stash[stash_keys.CONNECTOR_IMAGE],
auto_select_connection=config.stash[stash_keys.AUTO_SELECT_CONNECTION],
selected_streams=config.stash[stash_keys.SELECTED_STREAMS],
)
config.stash[stash_keys.IS_PERMITTED_BOOL] = True
except (ConnectionNotFoundError, NotPermittedError) as exc:
clean_up_artifacts(MAIN_OUTPUT_DIRECTORY, LOGGER)
pytest.exit(str(exc))

config.stash[stash_keys.CONNECTION_ID] = config.stash[stash_keys.CONNECTION_OBJECTS].connection_id # type: ignore

if source_docker_image := config.stash[stash_keys.CONNECTION_OBJECTS].source_docker_image:
config.stash[stash_keys.CONTROL_VERSION] = source_docker_image.split(":")[-1]
else:
config.stash[stash_keys.CONTROL_VERSION] = "latest"
Comment on lines +137 to +140
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasoning behind this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reasoning is:

  • If you passed custom config/state/catalog we are not calling the DB and have no clue about which is the current production version of a connector, so we fallback to latest as the control version
  • If we retrieve the connection from the DB we can have an accurate control version in case the connector version was pinned in production to a non latest version.


if config.stash[stash_keys.CONTROL_VERSION] == config.stash[stash_keys.TARGET_VERSION]:
pytest.exit(f"Control and target versions are the same: {control_version}. Please provide different versions.")
if config.stash[stash_keys.CONNECTION_OBJECTS].workspace_id and config.stash[stash_keys.CONNECTION_ID]:
config.stash[stash_keys.CONNECTION_URL] = build_connection_url(
config.stash[stash_keys.CONNECTION_OBJECTS].workspace_id,
Expand Down Expand Up @@ -304,9 +318,12 @@ def actor_id(connection_objects: ConnectionObjects, control_connector: Connector


@pytest.fixture(scope="session")
def configured_catalog(
connection_objects: ConnectionObjects,
) -> ConfiguredAirbyteCatalog:
def selected_streams(request: SubRequest) -> Set[str]:
return request.config.stash[stash_keys.SELECTED_STREAMS]


@pytest.fixture(scope="session")
def configured_catalog(connection_objects: ConnectionObjects, selected_streams: Optional[Set[str]]) -> ConfiguredAirbyteCatalog:
if not connection_objects.configured_catalog:
pytest.skip("Catalog is not provided. The catalog fixture can't be used.")
assert connection_objects.configured_catalog is not None
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from pathlib import Path
from typing import List
from typing import List, Set

import pytest
from live_tests.commons.models import ConnectionObjects
from live_tests.regression_tests.report import Report

AIRBYTE_API_KEY = pytest.StashKey[str]()
AUTO_SELECT_CONNECTION = pytest.StashKey[bool]()
CONNECTION_ID = pytest.StashKey[str]()
CONNECTION_OBJECTS = pytest.StashKey[ConnectionObjects]()
CONNECTION_URL = pytest.StashKey[str | None]()
Expand All @@ -20,6 +21,7 @@
PR_URL = pytest.StashKey[str]()
REPORT = pytest.StashKey[Report]()
RETRIEVAL_REASONS = pytest.StashKey[str]()
SELECTED_STREAMS = pytest.StashKey[Set[str]]()
SESSION_START_TIMESTAMP = pytest.StashKey[int]()
SHOULD_READ_WITH_STATE = pytest.StashKey[bool]()
TARGET_VERSION = pytest.StashKey[str]()
Expand Down
Loading