Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add clear and apply commands #74

Merged
merged 19 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,4 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Added `ocean version` to get the framework version.
- Added `make new` to scaffold in the Ocean repository.

(PORT-4307)
(PORT-4307)
yairsimantov20 marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions changelog/dock-clean-defaults.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added the ability to create and clean the defaults of an integration using the following CLI commands: `ocean defaults dock` and `ocean defaults clean`
27 changes: 20 additions & 7 deletions integrations/pagerduty/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@ class ObjectKind:
INCIDENTS = "incidents"


pager_duty_client = PagerDutyClient(
ocean.integration_config["token"],
ocean.integration_config["api_url"],
ocean.integration_config["app_host"],
)


@ocean.on_resync(ObjectKind.INCIDENTS)
async def on_incidents_resync(kind: str) -> list[dict[str, Any]]:
logger.info(f"Listing Pagerduty resource: {kind}")
pager_duty_client = PagerDutyClient(
ocean.integration_config["token"],
ocean.integration_config["api_url"],
ocean.integration_config["app_host"],
)

return await pager_duty_client.paginate_request_to_pager_duty(
data_key=ObjectKind.INCIDENTS
Expand All @@ -30,6 +28,11 @@ async def on_incidents_resync(kind: str) -> list[dict[str, Any]]:
@ocean.on_resync(ObjectKind.SERVICES)
async def on_services_resync(kind: str) -> list[dict[str, Any]]:
logger.info(f"Listing Pagerduty resource: {kind}")
pager_duty_client = PagerDutyClient(
ocean.integration_config["token"],
ocean.integration_config["api_url"],
ocean.integration_config["app_host"],
)

services = await pager_duty_client.paginate_request_to_pager_duty(
data_key=ObjectKind.SERVICES
Expand All @@ -39,6 +42,11 @@ async def on_services_resync(kind: str) -> list[dict[str, Any]]:

@ocean.router.post("/webhook")
async def upsert_incident_webhook_handler(data: dict[str, Any]) -> None:
pager_duty_client = PagerDutyClient(
ocean.integration_config["token"],
ocean.integration_config["api_url"],
ocean.integration_config["app_host"],
)
event_type = data["event"]["event_type"]
logger.info(f"Processing Pagerduty webhook for event type: {event_type}")
if event_type in pager_duty_client.service_delete_events:
Expand All @@ -63,5 +71,10 @@ async def upsert_incident_webhook_handler(data: dict[str, Any]) -> None:

@ocean.on_start()
async def on_start() -> None:
pager_duty_client = PagerDutyClient(
ocean.integration_config["token"],
ocean.integration_config["api_url"],
ocean.integration_config["app_host"],
)
logger.info("Subscribing to Pagerduty webhooks")
await pager_duty_client.create_webhooks_if_not_exists()
39 changes: 39 additions & 0 deletions port_ocean/bootstrap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import sys
from inspect import getmembers, isclass
from types import ModuleType
from typing import Type

from pydantic import BaseModel

from port_ocean.core.integrations.base import BaseIntegration
from port_ocean.ocean import Ocean
from port_ocean.utils import load_module


def _get_base_integration_class_from_module(
module: ModuleType,
) -> Type[BaseIntegration]:
for name, obj in getmembers(module):
if (
isclass(obj)
and type(obj) == type
and issubclass(obj, BaseIntegration)
and obj != BaseIntegration
):
return obj

raise Exception(f"Failed to load integration from module: {module.__name__}")


def create_default_app(
path: str | None = None, config_factory: Type[BaseModel] | None = None
) -> Ocean:
sys.path.append(".")
try:
integration_path = f"{path}/integration.py" if path else "integration.py"
module = load_module(integration_path)
integration_class = _get_base_integration_class_from_module(module)
except Exception:
integration_class = None

return Ocean(integration_class=integration_class, config_factory=config_factory)
4 changes: 4 additions & 0 deletions port_ocean/cli/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from .pull import pull
from .sail import sail
from .version import version
from .defaults.dock import dock
from .defaults.clean import clean

__all__ = [
"cli_start",
Expand All @@ -12,4 +14,6 @@
"pull",
"sail",
"version",
"dock",
"clean",
]
7 changes: 7 additions & 0 deletions port_ocean/cli/commands/defaults/__init___.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .dock import dock
from .clean import clean

__all__ = [
"dock",
"clean",
]
54 changes: 54 additions & 0 deletions port_ocean/cli/commands/defaults/clean.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
import click

from inspect import getmembers

from port_ocean.utils import load_module
from .group import defaults
from port_ocean.cli.commands.main import print_logo, console
from port_ocean.ocean import Ocean
from port_ocean.core.defaults import clean_defaults
from port_ocean.bootstrap import create_default_app


@defaults.command()
@click.argument("path", default=".", type=click.Path(exists=True))
@click.option(
danielsinai marked this conversation as resolved.
Show resolved Hide resolved
"-f",
"--force",
"force",
is_flag=True,
help="Delete all the entities of the Blueprint as well as the blueprint itself.",
)
@click.option(
"-w",
"--wait",
"wait",
is_flag=True,
help="Wait for the migration to finish. when force is set to true.",
)
def clean(path: str, force: bool, wait: bool) -> None:
"""
Clean defaults of the integration from the .port/resources PATH.

PATH: Path to the integration. If not provided, the current directory will be used.
"""
print_logo()

console.print("Cleaning blueprints and configurations! ⚓️")

if force:
console.print(
"Deleting entities forcefully I sure hope you know what you are doing 🚨 🚨 🚨 ",
yairsimantov20 marked this conversation as resolved.
Show resolved Hide resolved
)
default_app = create_default_app(path)

main_path = f"{path}/main.py" if path else "main.py"

app_module = load_module(main_path)
app: Ocean = {name: item for name, item in getmembers(app_module)}.get(
"app",
default_app,
)

clean_defaults(app.integration.AppConfigHandlerClass.CONFIG_CLASS, force, wait)
39 changes: 39 additions & 0 deletions port_ocean/cli/commands/defaults/dock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
import click

from inspect import getmembers

from port_ocean.utils import load_module
from .group import defaults
from port_ocean.cli.commands.main import print_logo, console
from port_ocean.ocean import Ocean
from port_ocean.core.defaults.initialize import initialize_defaults
from port_ocean.bootstrap import create_default_app


@defaults.command()
@click.argument("path", default=".", type=click.Path(exists=True))
def dock(path: str) -> None:
"""
Apply defaults of the integration from the .port/resources PATH.

PATH: Path to the integration. If not provided, the current directory will be used.
"""
print_logo()

console.print("Unloading cargo at the dock... 📦🚢")

default_app = create_default_app(path)

main_path = f"{path}/main.py" if path else "main.py"

app_module = load_module(main_path)
app: Ocean = {name: item for name, item in getmembers(app_module)}.get(
"app",
default_app,
)

initialize_defaults(
app.integration.AppConfigHandlerClass.CONFIG_CLASS,
app.config,
)
6 changes: 6 additions & 0 deletions port_ocean/cli/commands/defaults/group.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from port_ocean.cli.commands.main import cli_start


@cli_start.group("defaults")
def defaults() -> None:
pass
10 changes: 9 additions & 1 deletion port_ocean/clients/port/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,21 @@
from port_ocean.clients.port.mixins.blueprints import BlueprintClientMixin
from port_ocean.clients.port.mixins.entities import EntityClientMixin
from port_ocean.clients.port.mixins.integrations import IntegrationClientMixin
from port_ocean.clients.port.mixins.migrations import MigrationClientMixin

from port_ocean.clients.port.types import (
KafkaCreds,
)
from port_ocean.clients.port.utils import handle_status_code, async_client
from port_ocean.exceptions.clients import KafkaCredentialsNotFound


class PortClient(EntityClientMixin, IntegrationClientMixin, BlueprintClientMixin):
class PortClient(
EntityClientMixin,
IntegrationClientMixin,
BlueprintClientMixin,
MigrationClientMixin,
):
def __init__(
self,
base_url: str,
Expand All @@ -35,6 +42,7 @@ def __init__(
self, integration_identifier, self.auth, self.client
)
BlueprintClientMixin.__init__(self, self.auth, self.client)
MigrationClientMixin.__init__(self, self.auth, self.client)

async def get_kafka_creds(self) -> KafkaCreds:
logger.info("Fetching organization kafka credentials")
Expand Down
30 changes: 22 additions & 8 deletions port_ocean/clients/port/mixins/blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,29 @@ async def patch_blueprint(
handle_status_code(response)

async def delete_blueprint(
self, identifier: str, should_raise: bool = False
) -> None:
logger.info(f"Deleting blueprint with id: {identifier}")
headers = await self.auth.headers()
response = await self.client.delete(
f"{self.auth.api_url}/blueprints/{identifier}",
headers=headers,
self, identifier: str, should_raise: bool = False, delete_entities: bool = False
) -> None | str:
logger.info(
f"Deleting blueprint with id: {identifier} with all entities: {delete_entities}"
)
handle_status_code(response, should_raise)
headers = await self.auth.headers()
response = None

if not delete_entities:
response = await self.client.delete(
f"{self.auth.api_url}/blueprints/{identifier}",
headers=headers,
)
handle_status_code(response, should_raise)
return None
else:
response = await self.client.delete(
f"{self.auth.api_url}/blueprints/{identifier}/all-entities?delete_blueprint=true",
headers=await self.auth.headers(),
)

handle_status_code(response, should_raise)
return response.json().get("migrationId", "")

async def create_action(
self, blueprint_identifier: str, action: dict[str, Any]
Expand Down
46 changes: 46 additions & 0 deletions port_ocean/clients/port/mixins/migrations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import asyncio

import httpx
from loguru import logger

from port_ocean.clients.port.authentication import PortAuthentication
from port_ocean.clients.port.utils import handle_status_code
from port_ocean.core.models import Migration


class MigrationClientMixin:
def __init__(self, auth: PortAuthentication, client: httpx.AsyncClient):
self.auth = auth
self.client = client

async def wait_for_migration_to_complete(
self,
migration_id: list[str],
interval: int = 5,
) -> Migration:
logger.info(
f"Waiting for migration with id: {migration_id} to complete",
)

headers = await self.auth.headers()
response = await self.client.get(
f"{self.auth.api_url}/migrations/{migration_id}",
headers=headers,
)

handle_status_code(response, should_raise=True)

migration_status = response.json().get("migration", {}).get("status", None)
if (
migration_status == "RUNNING"
or migration_status == "INITIALIZING"
or migration_status == "PENDING"
yairsimantov20 marked this conversation as resolved.
Show resolved Hide resolved
):
await asyncio.sleep(interval)
await self.wait_for_migration_to_complete(migration_id, interval)
else:
logger.info(
f"Migration with id: {migration_id} finished with status {migration_status}",
)

return Migration.parse_obj(response.json()["migration"])
1 change: 0 additions & 1 deletion port_ocean/config/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ def read_yaml_config_settings_source(
yaml_file = getattr(settings.__config__, "yaml_file", "")

assert yaml_file, "Settings.yaml_file not properly configured"

path = Path(base_path, yaml_file)

if not path.exists():
Expand Down
7 changes: 7 additions & 0 deletions port_ocean/core/defaults/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .clean import clean_defaults
from .initialize import initialize_defaults

__all__ = [
"clean_defaults",
"initialize_defaults",
]
Loading