Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPE-5178 Adopt admin-address throught out #516

Merged
merged 15 commits into from
Sep 11, 2024
8 changes: 6 additions & 2 deletions lib/charms/grafana_agent/v0/cos_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
Using the `COSAgentProvider` object only requires instantiating it,
typically in the `__init__` method of your charm (the one which sends telemetry).

The constructor of `COSAgentProvider` has only one required and nine optional parameters:
The constructor of `COSAgentProvider` has only one required and ten optional parameters:

```python
def __init__(
Expand All @@ -36,6 +36,7 @@ def __init__(
log_slots: Optional[List[str]] = None,
dashboard_dirs: Optional[List[str]] = None,
refresh_events: Optional[List] = None,
tracing_protocols: Optional[List[str]] = None,
scrape_configs: Optional[Union[List[Dict], Callable]] = None,
):
```
Expand Down Expand Up @@ -65,6 +66,8 @@ def __init__(

- `refresh_events`: List of events on which to refresh relation data.

- `tracing_protocols`: List of requested tracing protocols that the charm requires to send traces.

- `scrape_configs`: List of standard scrape_configs dicts or a callable that returns the list in
case the configs need to be generated dynamically. The contents of this list will be merged
with the configs from `metrics_endpoints`.
Expand Down Expand Up @@ -108,6 +111,7 @@ def __init__(self, *args):
log_slots=["my-app:slot"],
dashboard_dirs=["./src/dashboards_1", "./src/dashboards_2"],
refresh_events=["update-status", "upgrade-charm"],
tracing_protocols=["otlp_http", "otlp_grpc"],
scrape_configs=[
{
"job_name": "custom_job",
Expand Down Expand Up @@ -249,7 +253,7 @@ class _MetricsEndpointDict(TypedDict):

LIBID = "dc15fa84cef84ce58155fb84f6c6213a"
LIBAPI = 0
LIBPATCH = 10
LIBPATCH = 11

PYDEPS = ["cosl", "pydantic"]

Expand Down
236 changes: 136 additions & 100 deletions lib/charms/mysql/v0/mysql.py

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
)
from flush_mysql_logs import FlushMySQLLogsCharmEvents, MySQLLogs
from hostname_resolution import MySQLMachineHostnameResolution
from ip_address_observer import IPAddressChangeCharmEvents
from mysql_vm_helpers import (
MySQL,
MySQLCreateCustomMySQLDConfigError,
Expand All @@ -116,6 +117,10 @@ class MySQLDNotRestartedError(Error):
"""Exception raised when MySQLD is not restarted after configuring instance."""


class MySQLCustomCharmEvents(FlushMySQLLogsCharmEvents, IPAddressChangeCharmEvents):
"""Custom event sources for the charm."""


@trace_charm(
tracing_endpoint="tracing_endpoint",
extra_types=(
Expand All @@ -141,9 +146,7 @@ class MySQLOperatorCharm(MySQLCharmBase, TypedCharmBase[CharmConfig]):
"""Operator framework charm for MySQL."""

config_type = CharmConfig
# FlushMySQLLogsCharmEvents needs to be defined on the charm object for logrotate
# (which runs juju-run/juju-exec to dispatch a custom event from cron)
on = FlushMySQLLogsCharmEvents() # type: ignore
on = MySQLCustomCharmEvents() # type: ignore

def __init__(self, *args):
super().__init__(*args)
Expand Down
3 changes: 2 additions & 1 deletion src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ def experimental_max_connections_validator(cls, value: int) -> Optional[int]:
"""Check experimental max connections."""
if value < MAX_CONNECTIONS_FLOOR:
raise ValueError(
f"experimental-max-connections must be greater than {MAX_CONNECTIONS_FLOOR}"
f"experimental-max-connections ({value=}) must be equal or greater "
+ f" than {MAX_CONNECTIONS_FLOOR}"
)

return value
Expand Down
10 changes: 9 additions & 1 deletion src/hostname_resolution.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, charm: "MySQLOperatorCharm"):
self.ip_address_observer = IPAddressObserver(charm)

self.framework.observe(self.charm.on.config_changed, self._update_host_details_in_databag)
self.framework.observe(self.on.ip_address_change, self._update_host_details_in_databag)
self.framework.observe(self.charm.on.ip_address_change, self._on_ip_address_change)

self.framework.observe(self.charm.on[PEER].relation_changed, self.update_etc_hosts)
self.framework.observe(self.charm.on[PEER].relation_departed, self.update_etc_hosts)
Expand All @@ -63,6 +63,14 @@ def _update_host_details_in_databag(self, _) -> None:

self.charm.unit_peer_data[HOSTNAME_DETAILS] = json.dumps(host_details)

def _on_ip_address_change(self, _) -> None:
"""Handle ip address changed.

admin_address is bound to previous IP, requiring mysqld restart.
"""
self._update_host_details_in_databag(None)
self.charm._mysql.restart_mysqld()

def _get_host_details(self) -> list[HostsEntry]:
host_details = []

Expand Down
21 changes: 13 additions & 8 deletions src/mysql_vm_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ def wait_until_mysql_connection(self, check_port: bool = True) -> None:

logger.debug("MySQL connection possible")

def execute_backup_commands(
def execute_backup_commands( # type: ignore
self,
s3_directory: str,
s3_parameters: Dict[str, str],
Expand All @@ -402,7 +402,7 @@ def execute_backup_commands(
group=ROOT_SYSTEM_USER,
)

def delete_temp_backup_directory(
def delete_temp_backup_directory( # type: ignore
self, from_directory: str = CHARMED_MYSQL_COMMON_DIRECTORY
) -> None:
"""Delete the temp backup directory."""
Expand All @@ -412,20 +412,25 @@ def delete_temp_backup_directory(
group=ROOT_SYSTEM_USER,
)

def retrieve_backup_with_xbcloud(
def retrieve_backup_with_xbcloud( # type: ignore
self,
backup_id: str,
s3_parameters: Dict[str, str],
temp_restore_directory: str = CHARMED_MYSQL_COMMON_DIRECTORY,
xbcloud_location: str = CHARMED_MYSQL_XBCLOUD_LOCATION,
xbstream_location: str = CHARMED_MYSQL_XBSTREAM_LOCATION,
user=ROOT_SYSTEM_USER,
group=ROOT_SYSTEM_USER,
) -> Tuple[str, str, str]:
"""Retrieve the provided backup with xbcloud."""
return super().retrieve_backup_with_xbcloud(
backup_id,
s3_parameters,
CHARMED_MYSQL_COMMON_DIRECTORY,
CHARMED_MYSQL_XBCLOUD_LOCATION,
CHARMED_MYSQL_XBSTREAM_LOCATION,
user=ROOT_SYSTEM_USER,
group=ROOT_SYSTEM_USER,
temp_restore_directory,
xbcloud_location,
xbstream_location,
user,
group,
)

def prepare_backup_for_restore(self, backup_location: str) -> Tuple[str, str]:
Expand Down
29 changes: 29 additions & 0 deletions tests/integration/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# Copyright 2022 Canonical Ltd.
# See LICENSE file for licensing details.

from time import sleep

import mysql.connector


Expand Down Expand Up @@ -37,3 +39,30 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.connection.commit()
self.cursor.close()
self.connection.close()


def create_db_connections(
num_connections: int, host: str, username: str, password: str, database: str
) -> list[mysql.connector.MySQLConnection]:
"""Create a list of database connections.

Args:
num_connections: Number of connections to create.
host: Hostname of the database.
username: Username to connect to the database.
password: Password to connect to the database.
database: Database to connect to.
"""
connections = []
for _ in range(num_connections):
conn = mysql.connector.connect(
host=host,
user=username,
password=password,
database=database,
use_pure=True,
)
if conn.is_connected():
connections.append(conn)
sleep(0.5)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sleep added to slow down tests so I could validate connections in database. Left here for the same reason, since the test is fairly quick

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It smells to me, it is a workaround for https://warthogs.atlassian.net/browse/DPE-5340 (we should sent peer details when we ready to accept traffic only).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a workaround for that issue. Just to be able to observe the test while developing.
The error on the nightly tests seems a different issue to me, since the router is connecting to a standby cluster (and we have yet to change/discuss if the router should connect to database through router to pick leadership changes on async cases)

return connections
31 changes: 6 additions & 25 deletions tests/integration/high_availability/high_availability_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,37 +355,18 @@ async def ensure_all_units_continuous_writes_incrementing(
ops_test, primary, server_config_credentials
)

select_all_continuous_writes_sql = [f"SELECT * FROM `{DATABASE_NAME}`.`{TABLE_NAME}`"]

async with ops_test.fast_forward():
for unit in mysql_units:
for attempt in Retrying(
reraise=True, stop=stop_after_delay(5 * 60), wait=wait_fixed(10)
):
with attempt:
# ensure that all units are up to date (including the previous primary)
unit_address = await get_unit_ip(ops_test, unit.name)

async with ops_test.fast_forward(fast_interval="15s"):
for attempt in Retrying(reraise=True, stop=stop_after_delay(5 * 60), wait=wait_fixed(10)):
with attempt:
# ensure that all units are up to date (including the previous primary)
for unit in mysql_units:
# ensure the max written value is incrementing (continuous writes is active)
max_written_value = await get_max_written_value_in_database(
ops_test, unit, server_config_credentials
)
logger.info(f"{max_written_value=} on unit {unit.name}")
assert (
max_written_value > last_max_written_value
), "Continuous writes not incrementing"

# ensure that the unit contains all values up to the max written value
all_written_values = set(
await execute_queries_on_unit(
unit_address,
server_config_credentials["username"],
server_config_credentials["password"],
select_all_continuous_writes_sql,
)
)
numbers = set(range(1, max_written_value))
assert (
numbers <= all_written_values
), f"Missing numbers in database for unit {unit.name}"

last_max_written_value = max_written_value
2 changes: 1 addition & 1 deletion tests/integration/high_availability/test_self_healing.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ async def test_network_cut(ops_test: OpsTest, highly_available_cluster, continuo
lambda: primary_unit.workload_status == "active", timeout=40 * 60
)

await ensure_all_units_continuous_writes_incrementing(ops_test)
await ensure_all_units_continuous_writes_incrementing(ops_test)

# ensure that we are able to insert data into the primary and have it replicated to all units
database_name, table_name = "test-network-cut", "data"
Expand Down
95 changes: 95 additions & 0 deletions tests/integration/test_saturate_max_connections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

import logging

import pytest
from mysql.connector.errors import OperationalError
from pytest_operator.plugin import OpsTest

from .connector import create_db_connections
from .helpers import get_unit_ip
from .juju_ import run_action

logger = logging.getLogger(__name__)

MYSQL_APP_NAME = "mysql"
TEST_APP_NAME = "app"
CONNECTIONS = 10


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_build_and_deploy(ops_test: OpsTest) -> None:
"""Build the charm and deploy 1 units to ensure a cluster is formed."""
charm = await ops_test.build_charm(".")
config = {"profile-limit-memory": "2000", "experimental-max-connections": CONNECTIONS}

await ops_test.model.deploy(
charm,
application_name=MYSQL_APP_NAME,
config=config,
num_units=1,
base="ubuntu@22.04",
)


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_deploy_and_relate_test_app(ops_test: OpsTest) -> None:
config = {"auto_start_writes": False, "sleep_interval": "500"}
logger.info("Deploying test app")
await ops_test.model.deploy(
"mysql-test-app",
application_name=TEST_APP_NAME,
num_units=1,
base="ubuntu@22.04",
config=config,
channel="latest/edge",
)

logger.info("Relating test app to mysql")
await ops_test.model.relate(MYSQL_APP_NAME, f"{TEST_APP_NAME}:database")

logger.info("Waiting all to be active")
await ops_test.model.block_until(
lambda: all(unit.workload_status == "active" for unit in ops_test.model.units.values()),
timeout=60 * 10,
wait_period=5,
)


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_saturate_max_connections(ops_test: OpsTest) -> None:
app_unit = ops_test.model.applications[TEST_APP_NAME].units[0]
mysql_unit = ops_test.model.applications[MYSQL_APP_NAME].units[0]

host_ip = await get_unit_ip(ops_test, mysql_unit.name)
logger.info("Running action to get app connection data")
credentials = await run_action(app_unit, "get-client-connection-data")
if "return-code" in credentials:
# juju 2.9 dont have the return-code key
del credentials["return-code"]
if "Code" in credentials:
del credentials["Code"]
credentials["host"] = host_ip

logger.info(f"Creating {CONNECTIONS} connections")
connections = create_db_connections(CONNECTIONS, **credentials)
assert isinstance(connections, list), "Connections not created"

logger.info("Ensure all connections are established")
for conn in connections:
assert conn.is_connected(), "Connection failed to establish"

assert len(connections) == CONNECTIONS, "Not all connections were established"

logger.info("Ensure no more client connections are possible")

with pytest.raises(OperationalError):
# exception raised when too many connections are attempted
create_db_connections(1, **credentials)

logger.info("Get cluster status while connections are saturated")
_ = await run_action(mysql_unit, "get-cluster-status")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also add a check that the admin port is unaffected? or at least admin connections are active?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting the cluster status rely on the admin connection. So when client connections are saturated, the action passing does just that. If the action fails, run_action will raise an assertion error

Loading
Loading