Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Remote storage refactorings #5243

Merged
merged 29 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
ebb5b20
refactor(fixtures): move methods to types
koivunej Sep 5, 2023
0c0706d
wip: split apart the remote storages and NeonEnvBuilder
koivunej Sep 6, 2023
7facf09
chore: mypy fixes
koivunej Sep 6, 2023
021f154
doc: comment minor reformat
koivunej Sep 6, 2023
a1b3c36
fix: mock_s3 bucket name truncation
koivunej Sep 6, 2023
5057f05
refactor: mark ext remote storage as not cleaned up
koivunej Sep 6, 2023
62a1884
refactor: remove duplication in configuring remote storage
koivunej Sep 6, 2023
44377e1
refactor: remove backed up code
koivunej Sep 6, 2023
ce556cc
fix: confusion about which config, pass mock_s3 env vars
koivunej Sep 6, 2023
c9a497e
fix: only ext should use the hardcoded bucket in eu-central-1
koivunej Sep 7, 2023
3e226d5
refactor: stop using enable_blah_blah_blah_storage
koivunej Sep 7, 2023
6a2c562
refactor: remove now unused enable_xxxx_remote_storage
koivunej Sep 7, 2023
e0c2173
refactor: separate enable_extensions_remote_storage
koivunej Sep 7, 2023
57b038b
refactor: remove unused force_enable
koivunej Sep 7, 2023
955ea6d
refactor(neon_fixtures): add assertion message
koivunej Sep 7, 2023
3d4880f
chore: reformat test_download_extensions
koivunej Sep 7, 2023
6146580
refactor: rename env.remote_storage as env.pageserver_remote_storage
koivunej Sep 7, 2023
6af9cff
refactor: rename enable_{,pageserver_}remote_storage
koivunej Sep 7, 2023
401a951
refactor: unify cleanup
koivunej Sep 7, 2023
16fcda9
refactor: remove unused self.pageserver_remote_storage_kind
koivunej Sep 7, 2023
2c37fc0
refactor: remove unused self.remote_storage_prefix
koivunej Sep 7, 2023
e3131d3
refactor: no need to use the long single arg name
koivunej Sep 6, 2023
aa8116e
refactor: remove extra lines
koivunej Sep 7, 2023
8ed246f
refactor: suggestions
koivunej Sep 8, 2023
77ec1fd
refactor: more suggestinos
koivunej Sep 8, 2023
989195f
test_tenant_relocation: rename fixes
koivunej Sep 7, 2023
59c51f8
test_compat: add new properties
koivunej Sep 8, 2023
e952ea3
test_compact: align directory
koivunej Sep 8, 2023
eaa4c82
refactor: single source of truth for local_fs path
koivunej Sep 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
303 changes: 86 additions & 217 deletions test_runner/fixtures/neon_fixtures.py

Large diffs are not rendered by default.

16 changes: 6 additions & 10 deletions test_runner/fixtures/pageserver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,11 @@ def list_prefix(
Note that this function takes into account prefix_in_bucket.
"""
# For local_fs we need to properly handle empty directories, which we currently dont, so for simplicity stick to s3 api.
assert neon_env_builder.remote_storage_kind in (
RemoteStorageKind.MOCK_S3,
RemoteStorageKind.REAL_S3,
)
# For mypy
assert isinstance(neon_env_builder.remote_storage, S3Storage)
assert neon_env_builder.remote_storage_client is not None
remote = neon_env_builder.pageserver_remote_storage
assert isinstance(remote, S3Storage), "localfs is currently not supported"
assert remote.client is not None

prefix_in_bucket = neon_env_builder.remote_storage.prefix_in_bucket or ""
prefix_in_bucket = remote.prefix_in_bucket or ""
if not prefix:
prefix = prefix_in_bucket
else:
Expand All @@ -277,9 +273,9 @@ def list_prefix(
prefix = "/".join((prefix_in_bucket, prefix))

# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
response = neon_env_builder.remote_storage_client.list_objects_v2(
response = remote.client.list_objects_v2(
Delimiter="/",
Bucket=neon_env_builder.remote_storage.bucket_name,
Bucket=remote.bucket_name,
Prefix=prefix,
)
return response
Expand Down
294 changes: 233 additions & 61 deletions test_runner/fixtures/remote_storage.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,36 @@
import enum
import hashlib
import json
import os
import re
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union

import boto3
from mypy_boto3_s3 import S3Client

from fixtures.log_helper import log
from fixtures.types import TenantId, TimelineId

TIMELINE_INDEX_PART_FILE_NAME = "index_part.json"


@enum.unique
class RemoteStorageUser(str, enum.Enum):
"""
Instead of using strings for the users, use a more strict enum.
"""

PAGESERVER = "pageserver"
EXTENSIONS = "ext"
SAFEKEEPER = "safekeeper"

def __str__(self) -> str:
return self.value


class MockS3Server:
"""
Starts a mock S3 server for testing on a port given, errors if the server fails to start or exits prematurely.
Expand Down Expand Up @@ -58,49 +77,6 @@ def kill(self):
self.subprocess.kill()


@enum.unique
class RemoteStorageKind(str, enum.Enum):
LOCAL_FS = "local_fs"
MOCK_S3 = "mock_s3"
REAL_S3 = "real_s3"
# Pass to tests that are generic to remote storage
# to ensure the test pass with or without the remote storage
NOOP = "noop"


def available_remote_storages() -> List[RemoteStorageKind]:
remote_storages = [RemoteStorageKind.LOCAL_FS, RemoteStorageKind.MOCK_S3]
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
remote_storages.append(RemoteStorageKind.REAL_S3)
log.info("Enabling real s3 storage for tests")
else:
log.info("Using mock implementations to test remote storage")
return remote_storages


def available_s3_storages() -> List[RemoteStorageKind]:
remote_storages = [RemoteStorageKind.MOCK_S3]
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
remote_storages.append(RemoteStorageKind.REAL_S3)
log.info("Enabling real s3 storage for tests")
else:
log.info("Using mock implementations to test remote storage")
return remote_storages


def s3_storage() -> RemoteStorageKind:
"""
For tests that require a remote storage impl that exposes an S3
endpoint, but don't want to parametrize over multiple storage types.

Use real S3 if available, else use MockS3
"""
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
return RemoteStorageKind.REAL_S3
else:
return RemoteStorageKind.MOCK_S3


@dataclass
class LocalFsStorage:
root: Path
Expand All @@ -118,15 +94,28 @@ def index_content(self, tenant_id: TenantId, timeline_id: TimelineId):
with self.index_path(tenant_id, timeline_id).open("r") as f:
return json.load(f)

def to_toml_inline_table(self) -> str:
return f"local_path='{self.root}'"

def cleanup(self):
# no cleanup is done here, because there's NeonEnvBuilder.cleanup_local_storage which will remove everything, including localfs files
pass

@staticmethod
def component_path(repo_dir: Path, user: RemoteStorageUser) -> Path:
return repo_dir / "local_fs_remote_storage" / str(user)


@dataclass
class S3Storage:
bucket_name: str
bucket_region: str
access_key: str
secret_key: str
prefix_in_bucket: str
client: S3Client
cleanup: bool
endpoint: Optional[str] = None
prefix_in_bucket: Optional[str] = ""

def access_env_vars(self) -> Dict[str, str]:
return {
Expand All @@ -144,29 +133,212 @@ def to_string(self) -> str:
}
)

def to_toml_inline_table(self) -> str:
s = [
f"bucket_name='{self.bucket_name}'",
f"bucket_region='{self.bucket_region}'",
]

if self.prefix_in_bucket is not None:
s.append(f"prefix_in_bucket='{self.prefix_in_bucket}'")

if self.endpoint is not None:
s.append(f"endpoint='{self.endpoint}'")

return ",".join(s)

def do_cleanup(self):
if not self.cleanup:
# handles previous keep_remote_storage_contents
return

log.info(
"removing data from test s3 bucket %s by prefix %s",
self.bucket_name,
self.prefix_in_bucket,
)
paginator = self.client.get_paginator("list_objects_v2")
pages = paginator.paginate(
Bucket=self.bucket_name,
Prefix=self.prefix_in_bucket,
)

# Using Any because DeleteTypeDef (from boto3-stubs) doesn't fit our case
objects_to_delete: Any = {"Objects": []}
cnt = 0
for item in pages.search("Contents"):
# weirdly when nothing is found it returns [None]
if item is None:
break

objects_to_delete["Objects"].append({"Key": item["Key"]})

# flush once aws limit reached
if len(objects_to_delete["Objects"]) >= 1000:
self.client.delete_objects(
Bucket=self.bucket_name,
Delete=objects_to_delete,
)
objects_to_delete = {"Objects": []}
cnt += 1

# flush rest
if len(objects_to_delete["Objects"]):
self.client.delete_objects(
Bucket=self.bucket_name,
Delete=objects_to_delete,
)

log.info(f"deleted {cnt} objects from remote storage")


RemoteStorage = Union[LocalFsStorage, S3Storage]


# serialize as toml inline table
def remote_storage_to_toml_inline_table(remote_storage: RemoteStorage) -> str:
if isinstance(remote_storage, LocalFsStorage):
remote_storage_config = f"local_path='{remote_storage.root}'"
elif isinstance(remote_storage, S3Storage):
remote_storage_config = f"bucket_name='{remote_storage.bucket_name}',\
bucket_region='{remote_storage.bucket_region}'"
@enum.unique
class RemoteStorageKind(str, enum.Enum):
LOCAL_FS = "local_fs"
MOCK_S3 = "mock_s3"
REAL_S3 = "real_s3"
# Pass to tests that are generic to remote storage
# to ensure the test pass with or without the remote storage
NOOP = "noop"

def configure(
self,
repo_dir: Path,
mock_s3_server,
run_id: str,
test_name: str,
user: RemoteStorageUser,
bucket_name: Optional[str] = None,
bucket_region: Optional[str] = None,
) -> Optional[RemoteStorage]:
if self == RemoteStorageKind.NOOP:
return None

if self == RemoteStorageKind.LOCAL_FS:
return LocalFsStorage(LocalFsStorage.component_path(repo_dir, user))

# real_s3 uses this as part of prefix, mock_s3 uses this as part of
# bucket name, giving all users unique buckets because we have to
# create them
test_name = re.sub(r"[_\[\]]", "-", test_name)

def to_bucket_name(user: str, test_name: str) -> str:
s = f"{user}-{test_name}"

if len(s) > 63:
prefix = s[:30]
suffix = hashlib.sha256(test_name.encode()).hexdigest()[:32]
s = f"{prefix}-{suffix}"
assert len(s) == 63

return s

if self == RemoteStorageKind.MOCK_S3:
# there's a single mock_s3 server for each process running the tests
mock_endpoint = mock_s3_server.endpoint()
mock_region = mock_s3_server.region()

access_key, secret_key = mock_s3_server.access_key(), mock_s3_server.secret_key()

client = boto3.client(
"s3",
endpoint_url=mock_endpoint,
region_name=mock_region,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
)

bucket_name = to_bucket_name(user, test_name)
log.info(
f"using mock_s3 bucket name {bucket_name} for user={user}, test_name={test_name}"
)

return S3Storage(
bucket_name=bucket_name,
endpoint=mock_endpoint,
bucket_region=mock_region,
access_key=access_key,
secret_key=secret_key,
prefix_in_bucket="",
client=client,
cleanup=False,
)

assert self == RemoteStorageKind.REAL_S3

env_access_key = os.getenv("AWS_ACCESS_KEY_ID")
assert env_access_key, "no aws access key provided"
env_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
assert env_secret_key, "no aws access key provided"

# session token is needed for local runs with sso auth
session_token = os.getenv("AWS_SESSION_TOKEN")

bucket_name = bucket_name or os.getenv("REMOTE_STORAGE_S3_BUCKET")
assert bucket_name is not None, "no remote storage bucket name provided"
bucket_region = bucket_region or os.getenv("REMOTE_STORAGE_S3_REGION")
assert bucket_region is not None, "no remote storage region provided"

prefix_in_bucket = f"{run_id}/{test_name}/{user}"

client = boto3.client(
"s3",
region_name=bucket_region,
aws_access_key_id=env_access_key,
aws_secret_access_key=env_secret_key,
aws_session_token=session_token,
)
koivunej marked this conversation as resolved.
Show resolved Hide resolved

return S3Storage(
bucket_name=bucket_name,
bucket_region=bucket_region,
access_key=env_access_key,
secret_key=env_secret_key,
prefix_in_bucket=prefix_in_bucket,
client=client,
cleanup=True,
)


def available_remote_storages() -> List[RemoteStorageKind]:
remote_storages = [RemoteStorageKind.LOCAL_FS, RemoteStorageKind.MOCK_S3]
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
remote_storages.append(RemoteStorageKind.REAL_S3)
log.info("Enabling real s3 storage for tests")
else:
log.info("Using mock implementations to test remote storage")
return remote_storages

if remote_storage.prefix_in_bucket is not None:
remote_storage_config += f",prefix_in_bucket='{remote_storage.prefix_in_bucket}'"

if remote_storage.endpoint is not None:
remote_storage_config += f",endpoint='{remote_storage.endpoint}'"
def available_s3_storages() -> List[RemoteStorageKind]:
remote_storages = [RemoteStorageKind.MOCK_S3]
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
remote_storages.append(RemoteStorageKind.REAL_S3)
log.info("Enabling real s3 storage for tests")
else:
raise Exception("invalid remote storage type")
log.info("Using mock implementations to test remote storage")
return remote_storages

return f"{{{remote_storage_config}}}"

def s3_storage() -> RemoteStorageKind:
"""
For tests that require a remote storage impl that exposes an S3
endpoint, but don't want to parametrize over multiple storage types.

Use real S3 if available, else use MockS3
"""
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
return RemoteStorageKind.REAL_S3
else:
return RemoteStorageKind.MOCK_S3


# serialize as toml inline table
def remote_storage_to_toml_inline_table(remote_storage: RemoteStorage) -> str:
if not isinstance(remote_storage, (LocalFsStorage, S3Storage)):
raise Exception("invalid remote storage type")

class RemoteStorageUsers(enum.Flag):
PAGESERVER = enum.auto()
SAFEKEEPER = enum.auto()
return f"{{{remote_storage.to_toml_inline_table()}}}"
Loading