Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(functions): remove unused codes #535

Merged
merged 2 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/dashboard/apigateway/apigateway/apps/plugin/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ class PluginBindingManager(models.Manager):
def delete_by_gateway_id(self, gateway_id):
self.filter(gateway_id=gateway_id).delete()

def bulk_delete(self, objs):
return self.filter(id__in=[i.pk for i in objs]).delete()

def create_or_update_bindings(
self,
gateway,
Expand Down
29 changes: 0 additions & 29 deletions src/dashboard/apigateway/apigateway/apps/support/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,35 +105,6 @@ def clear_unreleased_resource_doc(self, gateway_id: int) -> None:
resource_version_ids = Release.objects.get_released_resource_version_ids(gateway_id)
self.filter(gateway_id=gateway_id).exclude(resource_version_id__in=resource_version_ids).delete()

def get_latest_released_resource_doc(self, gateway_id: int, resource_id: int) -> dict:
"""获取最新的已发布资源文档"""
# TODO: 暂时仅支持展示中文文档
resource_doc = (
self.filter(
gateway_id=gateway_id,
resource_id=resource_id,
language=DocLanguageEnum.ZH.value,
)
.order_by("-resource_version_id")
.first()
)
return resource_doc.data if resource_doc else {}

def get_released_resource_doc(
self,
gateway_id: int,
resource_version_id: int,
resource_id: int,
language=DocLanguageEnum.ZH.value,
) -> Optional[dict]:
doc = self.filter(
gateway_id=gateway_id,
resource_version_id=resource_version_id,
resource_id=resource_id,
language=language,
).first()
return doc.data if doc else {}

def get_doc_updated_time(self, gateway_id: int, resource_version_id: int, resource_id: int) -> Dict[str, str]:
qs = self.filter(gateway_id=gateway_id, resource_version_id=resource_version_id, resource_id=resource_id)
return {doc.language: doc.data["updated_time"] for doc in qs}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,6 @@ def test_filter_resource_version_public_latest_sdk(self):


class TestReleasedResourceDocManager:
def test_get_released_resource_doc(self, fake_gateway):
# doc exist
G(ReleasedResourceDoc, gateway=fake_gateway, resource_version_id=1, resource_id=1, data={"content": "test"})
result = ReleasedResourceDoc.objects.get_released_resource_doc(fake_gateway.id, 1, 1)
assert result == {"content": "test"}

# doc not exist
result = ReleasedResourceDoc.objects.get_released_resource_doc(fake_gateway.id, 1, 2)
assert result == {}

def test_get_doc_updated_time(self, fake_resource_version, fake_resource1):
fake_gateway = fake_resource_version.gateway
G(
Expand Down
85 changes: 0 additions & 85 deletions src/dashboard/apigateway/apigateway/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from copy import deepcopy
from functools import partial

import fakeredis
import pytest
from celery import shared_task
from ddf import G
Expand Down Expand Up @@ -65,7 +64,6 @@
from apigateway.schema import instances
from apigateway.schema.data.meta_schema import init_meta_schemas
from apigateway.tests.utils.testing import dummy_time, get_response_json
from apigateway.utils.redis_utils import REDIS_CLIENTS, get_default_redis_client
from apigateway.utils.yaml import yaml_dumps

UserModel = get_user_model()
Expand Down Expand Up @@ -101,17 +99,6 @@ def request_factory():
return APIRequestFactory()


# class FakeToken:
# access_token: str = "access_token"


# class FakeUser:
# is_active: bool = True
# username: str = "admin"
# is_authenticated: bool = True
# token = FakeToken


@pytest.fixture
def fake_admin_user(mocker):
return mocker.MagicMock(
Expand All @@ -124,17 +111,6 @@ def fake_admin_user(mocker):
)


@pytest.fixture
def fake_anonymous_user(mocker):
return mocker.MagicMock(
is_active=True,
username="test",
is_authenticated=False,
is_anonymous=True,
is_superuser=False,
)


@pytest.fixture
def fake_request(request_factory):
return request_factory.get("")
Expand Down Expand Up @@ -430,11 +406,6 @@ def fake_released_resource_doc(fake_gateway, fake_resource_version, fake_resourc
)


@pytest.fixture
def api_factory():
return partial(G, Gateway, _maintainers=FAKE_USERNAME)


@pytest.fixture(autouse=True)
def meta_schemas(db):
schemas = init_meta_schemas()
Expand All @@ -461,11 +432,6 @@ def unique_gateway_name(unique_id):
return f"a{unique_id[:19]}"


@pytest.fixture
def unique_backend_service_name(unique_id):
return f"a{unique_id[:20]}".lower()


@pytest.fixture
def fake_tgz_file(faker):
fp = tempfile.TemporaryFile()
Expand Down Expand Up @@ -540,40 +506,6 @@ def fn(method, view_name, path_params=None, gateway=None, user=None, app=None, *
return fn


class FakeRedis(fakeredis.FakeRedis):
REDIS_PREFIX: str

def __init__(self, connection_pool=None, *args, **kwargs):
super(FakeRedis, self).__init__(*args, **kwargs)

def execute_command(self, command, *args, **kwargs):
has_prefix = len(args) == 0

for i in args:
# test all redis command has key prefix
if isinstance(i, str) and i.startswith(self.REDIS_PREFIX):
has_prefix = True

if not has_prefix:
raise KeyError("Redis prefix not found")

return super(FakeRedis, self).execute_command(command, *args, **kwargs)


@pytest.fixture(autouse=True)
def patch_redis(mocker, settings):
class PatchedRedis(FakeRedis):
REDIS_PREFIX = settings.REDIS_PREFIX

REDIS_CLIENTS.clear()
mocker.patch("redis.Redis", PatchedRedis)


@pytest.fixture
def default_redis():
return get_default_redis_client()


@pytest.fixture
def skip_view_permissions_check(mocker):
mocker.patch("rest_framework.views.APIView.check_permissions")
Expand Down Expand Up @@ -769,23 +701,6 @@ def fake_plugin_bk_header_rewrite(fake_plugin_type_bk_header_rewrite, fake_gatew
)


@pytest.fixture()
def clone_model():
"""Clone a django model"""

def clone(model, **kwargs):
new_model = deepcopy(model)
new_model.pk = None

for k, v in kwargs.items():
setattr(new_model, k, v)

new_model.save()
return model._meta.model.objects.get(pk=new_model.pk)

return clone


@pytest.fixture()
def fake_rsa_private_key():
return """-----BEGIN RSA PRIVATE KEY-----
Expand Down
Loading