Skip to content

Commit

Permalink
Add clear and apply commands (#74)
Browse files Browse the repository at this point in the history
* Added clear and apply commands to ocean CLI

* bumped version of ocean framework

* Added group of defaults

* Added migration ids

* Reverted change log

* Added an option to wait for a migration to finish

* Fixed mypy + ruff errors

* Fixed CR comments

* Fixed CR comments

* Added towncrier changelog

* Fixed Pager Duty integration to use client inside the functions

* Fixed changelog fragment

* Fixed changelog fragment

* Fixed changelog fragment grammer
  • Loading branch information
danielsinai authored Aug 17, 2023
1 parent e6a00c0 commit bea8a5b
Show file tree
Hide file tree
Showing 20 changed files with 478 additions and 142 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,4 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Added `ocean version` to get the framework version.
- Added `make new` to scaffold in the Ocean repository.

(PORT-4307)
(PORT-4307)
1 change: 1 addition & 0 deletions changelog/dock-clean-defaults.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added the ability to create and clean the defaults of an integration using the following CLI commands: `ocean defaults dock` and `ocean defaults clean`
27 changes: 20 additions & 7 deletions integrations/pagerduty/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@ class ObjectKind:
INCIDENTS = "incidents"


pager_duty_client = PagerDutyClient(
ocean.integration_config["token"],
ocean.integration_config["api_url"],
ocean.integration_config["app_host"],
)


@ocean.on_resync(ObjectKind.INCIDENTS)
async def on_incidents_resync(kind: str) -> list[dict[str, Any]]:
logger.info(f"Listing Pagerduty resource: {kind}")
pager_duty_client = PagerDutyClient(
ocean.integration_config["token"],
ocean.integration_config["api_url"],
ocean.integration_config["app_host"],
)

return await pager_duty_client.paginate_request_to_pager_duty(
data_key=ObjectKind.INCIDENTS
Expand All @@ -30,6 +28,11 @@ async def on_incidents_resync(kind: str) -> list[dict[str, Any]]:
@ocean.on_resync(ObjectKind.SERVICES)
async def on_services_resync(kind: str) -> list[dict[str, Any]]:
logger.info(f"Listing Pagerduty resource: {kind}")
pager_duty_client = PagerDutyClient(
ocean.integration_config["token"],
ocean.integration_config["api_url"],
ocean.integration_config["app_host"],
)

services = await pager_duty_client.paginate_request_to_pager_duty(
data_key=ObjectKind.SERVICES
Expand All @@ -39,6 +42,11 @@ async def on_services_resync(kind: str) -> list[dict[str, Any]]:

@ocean.router.post("/webhook")
async def upsert_incident_webhook_handler(data: dict[str, Any]) -> None:
pager_duty_client = PagerDutyClient(
ocean.integration_config["token"],
ocean.integration_config["api_url"],
ocean.integration_config["app_host"],
)
event_type = data["event"]["event_type"]
logger.info(f"Processing Pagerduty webhook for event type: {event_type}")
if event_type in pager_duty_client.service_delete_events:
Expand All @@ -63,5 +71,10 @@ async def upsert_incident_webhook_handler(data: dict[str, Any]) -> None:

@ocean.on_start()
async def on_start() -> None:
pager_duty_client = PagerDutyClient(
ocean.integration_config["token"],
ocean.integration_config["api_url"],
ocean.integration_config["app_host"],
)
logger.info("Subscribing to Pagerduty webhooks")
await pager_duty_client.create_webhooks_if_not_exists()
39 changes: 39 additions & 0 deletions port_ocean/bootstrap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import sys
from inspect import getmembers, isclass
from types import ModuleType
from typing import Type

from pydantic import BaseModel

from port_ocean.core.integrations.base import BaseIntegration
from port_ocean.ocean import Ocean
from port_ocean.utils import load_module


def _get_base_integration_class_from_module(
module: ModuleType,
) -> Type[BaseIntegration]:
for name, obj in getmembers(module):
if (
isclass(obj)
and type(obj) == type
and issubclass(obj, BaseIntegration)
and obj != BaseIntegration
):
return obj

raise Exception(f"Failed to load integration from module: {module.__name__}")


def create_default_app(
path: str | None = None, config_factory: Type[BaseModel] | None = None
) -> Ocean:
sys.path.append(".")
try:
integration_path = f"{path}/integration.py" if path else "integration.py"
module = load_module(integration_path)
integration_class = _get_base_integration_class_from_module(module)
except Exception:
integration_class = None

return Ocean(integration_class=integration_class, config_factory=config_factory)
4 changes: 4 additions & 0 deletions port_ocean/cli/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from .pull import pull
from .sail import sail
from .version import version
from .defaults.dock import dock
from .defaults.clean import clean

__all__ = [
"cli_start",
Expand All @@ -12,4 +14,6 @@
"pull",
"sail",
"version",
"dock",
"clean",
]
7 changes: 7 additions & 0 deletions port_ocean/cli/commands/defaults/__init___.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .dock import dock
from .clean import clean

__all__ = [
"dock",
"clean",
]
54 changes: 54 additions & 0 deletions port_ocean/cli/commands/defaults/clean.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
import click

from inspect import getmembers

from port_ocean.utils import load_module
from .group import defaults
from port_ocean.cli.commands.main import print_logo, console
from port_ocean.ocean import Ocean
from port_ocean.core.defaults import clean_defaults
from port_ocean.bootstrap import create_default_app


@defaults.command()
@click.argument("path", default=".", type=click.Path(exists=True))
@click.option(
"-f",
"--force",
"force",
is_flag=True,
help="Delete all the entities of the Blueprint as well as the blueprint itself.",
)
@click.option(
"-w",
"--wait",
"wait",
is_flag=True,
help="Wait for the migration to finish. when force is set to true.",
)
def clean(path: str, force: bool, wait: bool) -> None:
"""
Clean defaults of the integration from the .port/resources PATH.
PATH: Path to the integration. If not provided, the current directory will be used.
"""
print_logo()

console.print("Cleaning blueprints and configurations! ⚓️")

if force:
console.print(
"Deleting entities forcefully I sure hope you know what you are doing 🚨 🚨 🚨 ",
)
default_app = create_default_app(path)

main_path = f"{path}/main.py" if path else "main.py"

app_module = load_module(main_path)
app: Ocean = {name: item for name, item in getmembers(app_module)}.get(
"app",
default_app,
)

clean_defaults(app.integration.AppConfigHandlerClass.CONFIG_CLASS, force, wait)
39 changes: 39 additions & 0 deletions port_ocean/cli/commands/defaults/dock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
import click

from inspect import getmembers

from port_ocean.utils import load_module
from .group import defaults
from port_ocean.cli.commands.main import print_logo, console
from port_ocean.ocean import Ocean
from port_ocean.core.defaults.initialize import initialize_defaults
from port_ocean.bootstrap import create_default_app


@defaults.command()
@click.argument("path", default=".", type=click.Path(exists=True))
def dock(path: str) -> None:
"""
Apply defaults of the integration from the .port/resources PATH.
PATH: Path to the integration. If not provided, the current directory will be used.
"""
print_logo()

console.print("Unloading cargo at the dock... 📦🚢")

default_app = create_default_app(path)

main_path = f"{path}/main.py" if path else "main.py"

app_module = load_module(main_path)
app: Ocean = {name: item for name, item in getmembers(app_module)}.get(
"app",
default_app,
)

initialize_defaults(
app.integration.AppConfigHandlerClass.CONFIG_CLASS,
app.config,
)
6 changes: 6 additions & 0 deletions port_ocean/cli/commands/defaults/group.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from port_ocean.cli.commands.main import cli_start


@cli_start.group("defaults")
def defaults() -> None:
pass
10 changes: 9 additions & 1 deletion port_ocean/clients/port/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,21 @@
from port_ocean.clients.port.mixins.blueprints import BlueprintClientMixin
from port_ocean.clients.port.mixins.entities import EntityClientMixin
from port_ocean.clients.port.mixins.integrations import IntegrationClientMixin
from port_ocean.clients.port.mixins.migrations import MigrationClientMixin

from port_ocean.clients.port.types import (
KafkaCreds,
)
from port_ocean.clients.port.utils import handle_status_code, async_client
from port_ocean.exceptions.clients import KafkaCredentialsNotFound


class PortClient(EntityClientMixin, IntegrationClientMixin, BlueprintClientMixin):
class PortClient(
EntityClientMixin,
IntegrationClientMixin,
BlueprintClientMixin,
MigrationClientMixin,
):
def __init__(
self,
base_url: str,
Expand All @@ -35,6 +42,7 @@ def __init__(
self, integration_identifier, self.auth, self.client
)
BlueprintClientMixin.__init__(self, self.auth, self.client)
MigrationClientMixin.__init__(self, self.auth, self.client)

async def get_kafka_creds(self) -> KafkaCreds:
logger.info("Fetching organization kafka credentials")
Expand Down
30 changes: 22 additions & 8 deletions port_ocean/clients/port/mixins/blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,29 @@ async def patch_blueprint(
handle_status_code(response)

async def delete_blueprint(
self, identifier: str, should_raise: bool = False
) -> None:
logger.info(f"Deleting blueprint with id: {identifier}")
headers = await self.auth.headers()
response = await self.client.delete(
f"{self.auth.api_url}/blueprints/{identifier}",
headers=headers,
self, identifier: str, should_raise: bool = False, delete_entities: bool = False
) -> None | str:
logger.info(
f"Deleting blueprint with id: {identifier} with all entities: {delete_entities}"
)
handle_status_code(response, should_raise)
headers = await self.auth.headers()
response = None

if not delete_entities:
response = await self.client.delete(
f"{self.auth.api_url}/blueprints/{identifier}",
headers=headers,
)
handle_status_code(response, should_raise)
return None
else:
response = await self.client.delete(
f"{self.auth.api_url}/blueprints/{identifier}/all-entities?delete_blueprint=true",
headers=await self.auth.headers(),
)

handle_status_code(response, should_raise)
return response.json().get("migrationId", "")

async def create_action(
self, blueprint_identifier: str, action: dict[str, Any]
Expand Down
46 changes: 46 additions & 0 deletions port_ocean/clients/port/mixins/migrations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import asyncio

import httpx
from loguru import logger

from port_ocean.clients.port.authentication import PortAuthentication
from port_ocean.clients.port.utils import handle_status_code
from port_ocean.core.models import Migration


class MigrationClientMixin:
def __init__(self, auth: PortAuthentication, client: httpx.AsyncClient):
self.auth = auth
self.client = client

async def wait_for_migration_to_complete(
self,
migration_id: list[str],
interval: int = 5,
) -> Migration:
logger.info(
f"Waiting for migration with id: {migration_id} to complete",
)

headers = await self.auth.headers()
response = await self.client.get(
f"{self.auth.api_url}/migrations/{migration_id}",
headers=headers,
)

handle_status_code(response, should_raise=True)

migration_status = response.json().get("migration", {}).get("status", None)
if (
migration_status == "RUNNING"
or migration_status == "INITIALIZING"
or migration_status == "PENDING"
):
await asyncio.sleep(interval)
await self.wait_for_migration_to_complete(migration_id, interval)
else:
logger.info(
f"Migration with id: {migration_id} finished with status {migration_status}",
)

return Migration.parse_obj(response.json()["migration"])
1 change: 0 additions & 1 deletion port_ocean/config/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ def read_yaml_config_settings_source(
yaml_file = getattr(settings.__config__, "yaml_file", "")

assert yaml_file, "Settings.yaml_file not properly configured"

path = Path(base_path, yaml_file)

if not path.exists():
Expand Down
7 changes: 7 additions & 0 deletions port_ocean/core/defaults/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .clean import clean_defaults
from .initialize import initialize_defaults

__all__ = [
"clean_defaults",
"initialize_defaults",
]
Loading

0 comments on commit bea8a5b

Please sign in to comment.