diff --git a/CHANGELOG.md b/CHANGELOG.md index 7027672246..dae6fee016 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -131,4 +131,4 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Added `ocean version` to get the framework version. - Added `make new` to scaffold in the Ocean repository. - (PORT-4307) + (PORT-4307) \ No newline at end of file diff --git a/changelog/dock-clean-defaults.feature.md b/changelog/dock-clean-defaults.feature.md new file mode 100644 index 0000000000..9e64f66c10 --- /dev/null +++ b/changelog/dock-clean-defaults.feature.md @@ -0,0 +1 @@ +Added the ability to create and clean the defaults of an integration using the following CLI commands: `ocean defaults dock` and `ocean defaults clean` \ No newline at end of file diff --git a/integrations/pagerduty/main.py b/integrations/pagerduty/main.py index 59a56072c0..711ed8a025 100644 --- a/integrations/pagerduty/main.py +++ b/integrations/pagerduty/main.py @@ -11,16 +11,14 @@ class ObjectKind: INCIDENTS = "incidents" -pager_duty_client = PagerDutyClient( - ocean.integration_config["token"], - ocean.integration_config["api_url"], - ocean.integration_config["app_host"], -) - - @ocean.on_resync(ObjectKind.INCIDENTS) async def on_incidents_resync(kind: str) -> list[dict[str, Any]]: logger.info(f"Listing Pagerduty resource: {kind}") + pager_duty_client = PagerDutyClient( + ocean.integration_config["token"], + ocean.integration_config["api_url"], + ocean.integration_config["app_host"], + ) return await pager_duty_client.paginate_request_to_pager_duty( data_key=ObjectKind.INCIDENTS @@ -30,6 +28,11 @@ async def on_incidents_resync(kind: str) -> list[dict[str, Any]]: @ocean.on_resync(ObjectKind.SERVICES) async def on_services_resync(kind: str) -> list[dict[str, Any]]: logger.info(f"Listing Pagerduty resource: {kind}") + pager_duty_client = PagerDutyClient( + ocean.integration_config["token"], + ocean.integration_config["api_url"], + ocean.integration_config["app_host"], + ) services = await pager_duty_client.paginate_request_to_pager_duty( data_key=ObjectKind.SERVICES @@ -39,6 +42,11 @@ async def on_services_resync(kind: str) -> list[dict[str, Any]]: @ocean.router.post("/webhook") async def upsert_incident_webhook_handler(data: dict[str, Any]) -> None: + pager_duty_client = PagerDutyClient( + ocean.integration_config["token"], + ocean.integration_config["api_url"], + ocean.integration_config["app_host"], + ) event_type = data["event"]["event_type"] logger.info(f"Processing Pagerduty webhook for event type: {event_type}") if event_type in pager_duty_client.service_delete_events: @@ -63,5 +71,10 @@ async def upsert_incident_webhook_handler(data: dict[str, Any]) -> None: @ocean.on_start() async def on_start() -> None: + pager_duty_client = PagerDutyClient( + ocean.integration_config["token"], + ocean.integration_config["api_url"], + ocean.integration_config["app_host"], + ) logger.info("Subscribing to Pagerduty webhooks") await pager_duty_client.create_webhooks_if_not_exists() diff --git a/port_ocean/bootstrap.py b/port_ocean/bootstrap.py new file mode 100644 index 0000000000..820fcdbe0e --- /dev/null +++ b/port_ocean/bootstrap.py @@ -0,0 +1,39 @@ +import sys +from inspect import getmembers, isclass +from types import ModuleType +from typing import Type + +from pydantic import BaseModel + +from port_ocean.core.integrations.base import BaseIntegration +from port_ocean.ocean import Ocean +from port_ocean.utils import load_module + + +def _get_base_integration_class_from_module( + module: ModuleType, +) -> Type[BaseIntegration]: + for name, obj in getmembers(module): + if ( + isclass(obj) + and type(obj) == type + and issubclass(obj, BaseIntegration) + and obj != BaseIntegration + ): + return obj + + raise Exception(f"Failed to load integration from module: {module.__name__}") + + +def create_default_app( + path: str | None = None, config_factory: Type[BaseModel] | None = None +) -> Ocean: + sys.path.append(".") + try: + integration_path = f"{path}/integration.py" if path else "integration.py" + module = load_module(integration_path) + integration_class = _get_base_integration_class_from_module(module) + except Exception: + integration_class = None + + return Ocean(integration_class=integration_class, config_factory=config_factory) diff --git a/port_ocean/cli/commands/__init__.py b/port_ocean/cli/commands/__init__.py index 2e46a59335..d49ce36451 100644 --- a/port_ocean/cli/commands/__init__.py +++ b/port_ocean/cli/commands/__init__.py @@ -4,6 +4,8 @@ from .pull import pull from .sail import sail from .version import version +from .defaults.dock import dock +from .defaults.clean import clean __all__ = [ "cli_start", @@ -12,4 +14,6 @@ "pull", "sail", "version", + "dock", + "clean", ] diff --git a/port_ocean/cli/commands/defaults/__init___.py b/port_ocean/cli/commands/defaults/__init___.py new file mode 100644 index 0000000000..335c55a42b --- /dev/null +++ b/port_ocean/cli/commands/defaults/__init___.py @@ -0,0 +1,7 @@ +from .dock import dock +from .clean import clean + +__all__ = [ + "dock", + "clean", +] diff --git a/port_ocean/cli/commands/defaults/clean.py b/port_ocean/cli/commands/defaults/clean.py new file mode 100644 index 0000000000..d1d8cacb53 --- /dev/null +++ b/port_ocean/cli/commands/defaults/clean.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +import click + +from inspect import getmembers + +from port_ocean.utils import load_module +from .group import defaults +from port_ocean.cli.commands.main import print_logo, console +from port_ocean.ocean import Ocean +from port_ocean.core.defaults import clean_defaults +from port_ocean.bootstrap import create_default_app + + +@defaults.command() +@click.argument("path", default=".", type=click.Path(exists=True)) +@click.option( + "-f", + "--force", + "force", + is_flag=True, + help="Delete all the entities of the Blueprint as well as the blueprint itself.", +) +@click.option( + "-w", + "--wait", + "wait", + is_flag=True, + help="Wait for the migration to finish. when force is set to true.", +) +def clean(path: str, force: bool, wait: bool) -> None: + """ + Clean defaults of the integration from the .port/resources PATH. + + PATH: Path to the integration. If not provided, the current directory will be used. + """ + print_logo() + + console.print("Cleaning blueprints and configurations! ⚓️") + + if force: + console.print( + "Deleting entities forcefully I sure hope you know what you are doing 🚨 🚨 🚨 ", + ) + default_app = create_default_app(path) + + main_path = f"{path}/main.py" if path else "main.py" + + app_module = load_module(main_path) + app: Ocean = {name: item for name, item in getmembers(app_module)}.get( + "app", + default_app, + ) + + clean_defaults(app.integration.AppConfigHandlerClass.CONFIG_CLASS, force, wait) diff --git a/port_ocean/cli/commands/defaults/dock.py b/port_ocean/cli/commands/defaults/dock.py new file mode 100644 index 0000000000..32c881bc3a --- /dev/null +++ b/port_ocean/cli/commands/defaults/dock.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +import click + +from inspect import getmembers + +from port_ocean.utils import load_module +from .group import defaults +from port_ocean.cli.commands.main import print_logo, console +from port_ocean.ocean import Ocean +from port_ocean.core.defaults.initialize import initialize_defaults +from port_ocean.bootstrap import create_default_app + + +@defaults.command() +@click.argument("path", default=".", type=click.Path(exists=True)) +def dock(path: str) -> None: + """ + Apply defaults of the integration from the .port/resources PATH. + + PATH: Path to the integration. If not provided, the current directory will be used. + """ + print_logo() + + console.print("Unloading cargo at the dock... 📦🚢") + + default_app = create_default_app(path) + + main_path = f"{path}/main.py" if path else "main.py" + + app_module = load_module(main_path) + app: Ocean = {name: item for name, item in getmembers(app_module)}.get( + "app", + default_app, + ) + + initialize_defaults( + app.integration.AppConfigHandlerClass.CONFIG_CLASS, + app.config, + ) diff --git a/port_ocean/cli/commands/defaults/group.py b/port_ocean/cli/commands/defaults/group.py new file mode 100644 index 0000000000..2d9243c58f --- /dev/null +++ b/port_ocean/cli/commands/defaults/group.py @@ -0,0 +1,6 @@ +from port_ocean.cli.commands.main import cli_start + + +@cli_start.group("defaults") +def defaults() -> None: + pass diff --git a/port_ocean/clients/port/client.py b/port_ocean/clients/port/client.py index d3560feb2e..a431e545f2 100644 --- a/port_ocean/clients/port/client.py +++ b/port_ocean/clients/port/client.py @@ -4,6 +4,8 @@ from port_ocean.clients.port.mixins.blueprints import BlueprintClientMixin from port_ocean.clients.port.mixins.entities import EntityClientMixin from port_ocean.clients.port.mixins.integrations import IntegrationClientMixin +from port_ocean.clients.port.mixins.migrations import MigrationClientMixin + from port_ocean.clients.port.types import ( KafkaCreds, ) @@ -11,7 +13,12 @@ from port_ocean.exceptions.clients import KafkaCredentialsNotFound -class PortClient(EntityClientMixin, IntegrationClientMixin, BlueprintClientMixin): +class PortClient( + EntityClientMixin, + IntegrationClientMixin, + BlueprintClientMixin, + MigrationClientMixin, +): def __init__( self, base_url: str, @@ -35,6 +42,7 @@ def __init__( self, integration_identifier, self.auth, self.client ) BlueprintClientMixin.__init__(self, self.auth, self.client) + MigrationClientMixin.__init__(self, self.auth, self.client) async def get_kafka_creds(self) -> KafkaCreds: logger.info("Fetching organization kafka credentials") diff --git a/port_ocean/clients/port/mixins/blueprints.py b/port_ocean/clients/port/mixins/blueprints.py index b57b1bd361..ba5e5babfb 100644 --- a/port_ocean/clients/port/mixins/blueprints.py +++ b/port_ocean/clients/port/mixins/blueprints.py @@ -45,15 +45,29 @@ async def patch_blueprint( handle_status_code(response) async def delete_blueprint( - self, identifier: str, should_raise: bool = False - ) -> None: - logger.info(f"Deleting blueprint with id: {identifier}") - headers = await self.auth.headers() - response = await self.client.delete( - f"{self.auth.api_url}/blueprints/{identifier}", - headers=headers, + self, identifier: str, should_raise: bool = False, delete_entities: bool = False + ) -> None | str: + logger.info( + f"Deleting blueprint with id: {identifier} with all entities: {delete_entities}" ) - handle_status_code(response, should_raise) + headers = await self.auth.headers() + response = None + + if not delete_entities: + response = await self.client.delete( + f"{self.auth.api_url}/blueprints/{identifier}", + headers=headers, + ) + handle_status_code(response, should_raise) + return None + else: + response = await self.client.delete( + f"{self.auth.api_url}/blueprints/{identifier}/all-entities?delete_blueprint=true", + headers=await self.auth.headers(), + ) + + handle_status_code(response, should_raise) + return response.json().get("migrationId", "") async def create_action( self, blueprint_identifier: str, action: dict[str, Any] diff --git a/port_ocean/clients/port/mixins/migrations.py b/port_ocean/clients/port/mixins/migrations.py new file mode 100644 index 0000000000..07d48d7f7d --- /dev/null +++ b/port_ocean/clients/port/mixins/migrations.py @@ -0,0 +1,46 @@ +import asyncio + +import httpx +from loguru import logger + +from port_ocean.clients.port.authentication import PortAuthentication +from port_ocean.clients.port.utils import handle_status_code +from port_ocean.core.models import Migration + + +class MigrationClientMixin: + def __init__(self, auth: PortAuthentication, client: httpx.AsyncClient): + self.auth = auth + self.client = client + + async def wait_for_migration_to_complete( + self, + migration_id: list[str], + interval: int = 5, + ) -> Migration: + logger.info( + f"Waiting for migration with id: {migration_id} to complete", + ) + + headers = await self.auth.headers() + response = await self.client.get( + f"{self.auth.api_url}/migrations/{migration_id}", + headers=headers, + ) + + handle_status_code(response, should_raise=True) + + migration_status = response.json().get("migration", {}).get("status", None) + if ( + migration_status == "RUNNING" + or migration_status == "INITIALIZING" + or migration_status == "PENDING" + ): + await asyncio.sleep(interval) + await self.wait_for_migration_to_complete(migration_id, interval) + else: + logger.info( + f"Migration with id: {migration_id} finished with status {migration_status}", + ) + + return Migration.parse_obj(response.json()["migration"]) diff --git a/port_ocean/config/base.py b/port_ocean/config/base.py index e8fa4e0205..830a1af242 100644 --- a/port_ocean/config/base.py +++ b/port_ocean/config/base.py @@ -20,7 +20,6 @@ def read_yaml_config_settings_source( yaml_file = getattr(settings.__config__, "yaml_file", "") assert yaml_file, "Settings.yaml_file not properly configured" - path = Path(base_path, yaml_file) if not path.exists(): diff --git a/port_ocean/core/defaults/__init__.py b/port_ocean/core/defaults/__init__.py new file mode 100644 index 0000000000..9beaf06ce5 --- /dev/null +++ b/port_ocean/core/defaults/__init__.py @@ -0,0 +1,7 @@ +from .clean import clean_defaults +from .initialize import initialize_defaults + +__all__ = [ + "clean_defaults", + "initialize_defaults", +] diff --git a/port_ocean/core/defaults/clean.py b/port_ocean/core/defaults/clean.py new file mode 100644 index 0000000000..4f8cada9a8 --- /dev/null +++ b/port_ocean/core/defaults/clean.py @@ -0,0 +1,73 @@ +import asyncio +from typing import Type + +import httpx +from loguru import logger + +from port_ocean.context.ocean import ocean +from port_ocean.core.handlers.port_app_config.models import PortAppConfig + + +from port_ocean.core.defaults.common import ( + get_port_integration_defaults, + is_integration_exists, +) + + +def clean_defaults( + config_class: Type[PortAppConfig], + force: bool, + wait: bool, +) -> None: + try: + asyncio.new_event_loop().run_until_complete( + _clean_defaults(config_class, force, wait) + ) + + except Exception as e: + logger.error(f"Failed to clear defaults, skipping... Error: {e}") + + +async def _clean_defaults( + config_class: Type[PortAppConfig], force: bool, wait: bool +) -> None: + port_client = ocean.port_client + is_exists = await is_integration_exists(port_client) + if not is_exists: + return None + defaults = get_port_integration_defaults(config_class) + if not defaults: + return None + + try: + migration_ids = await asyncio.gather( + *( + port_client.delete_blueprint( + blueprint["identifier"], should_raise=True, delete_entities=force + ) + for blueprint in defaults.blueprints + ) + ) + + if not force: + logger.info( + "Finished deleting blueprints and configurations! ⚓️", + ) + return None + + migration_ids = [migration_id for migration_id in migration_ids if migration_id] + + if migration_ids and len(migration_ids) > 0 and not wait: + logger.info( + f"Migration started. To check the status of the migration, track these ids using /migrations/:id route {migration_ids}", + ) + elif migration_ids and len(migration_ids) > 0 and wait: + await asyncio.gather( + *( + ocean.port_client.wait_for_migration_to_complete(migration_id) + for migration_id in migration_ids + ) + ) + except httpx.HTTPStatusError as e: + logger.error(f"Failed to delete blueprints: {e.response.text}.") + raise e diff --git a/port_ocean/core/defaults/common.py b/port_ocean/core/defaults/common.py new file mode 100644 index 0000000000..f9e208b0c5 --- /dev/null +++ b/port_ocean/core/defaults/common.py @@ -0,0 +1,114 @@ +import json +from pathlib import Path +from typing import Type, Any, TypedDict, Optional + +import httpx +import yaml +from pydantic import BaseModel, Field +from starlette import status + +from port_ocean.clients.port.client import PortClient +from port_ocean.core.handlers.port_app_config.models import PortAppConfig +from port_ocean.exceptions.port_defaults import ( + UnsupportedDefaultFileType, +) + +YAML_EXTENSIONS = [".yaml", ".yml"] +ALLOWED_FILE_TYPES = [".json", *YAML_EXTENSIONS] + + +class Preset(TypedDict): + blueprint: str + data: list[dict[str, Any]] + + +class Defaults(BaseModel): + blueprints: list[dict[str, Any]] = [] + actions: list[Preset] = [] + scorecards: list[Preset] = [] + port_app_config: Optional[PortAppConfig] = Field( + default=None, alias="port-app-config" + ) + + class Config: + allow_population_by_field_name = True + + +async def is_integration_exists(port_client: PortClient) -> bool: + try: + await port_client.get_current_integration(should_log=False) + return True + except httpx.HTTPStatusError as e: + if e.response.status_code != status.HTTP_404_NOT_FOUND: + raise e + + return False + + +def deconstruct_blueprints_to_creation_steps( + raw_blueprints: list[dict[str, Any]] +) -> tuple[list[dict[str, Any]], ...]: + """ + Deconstructing the blueprint into stages so the api wont fail to create a blueprint if there is a conflict + example: Preventing the failure of creating a blueprint with a relation to another blueprint + """ + ( + bare_blueprint, + with_relations, + full_blueprint, + ) = ([], [], []) + + for blueprint in raw_blueprints.copy(): + full_blueprint.append(blueprint.copy()) + + blueprint.pop("calculationProperties", {}) + blueprint.pop("mirrorProperties", {}) + with_relations.append(blueprint.copy()) + + blueprint.pop("teamInheritance", {}) + blueprint.pop("relations", {}) + bare_blueprint.append(blueprint) + + return ( + bare_blueprint, + with_relations, + full_blueprint, + ) + + +def get_port_integration_defaults( + port_app_config_class: Type[PortAppConfig], base_path: Path = Path(".") +) -> Defaults | None: + defaults_dir = base_path / ".port/resources" + if not defaults_dir.exists(): + return None + + if not defaults_dir.is_dir(): + raise UnsupportedDefaultFileType( + f"Defaults directory is not a directory: {defaults_dir}" + ) + + default_jsons = {} + allowed_file_names = [ + field_model.alias for _, field_model in Defaults.__fields__.items() + ] + for path in defaults_dir.iterdir(): + if path.stem in allowed_file_names: + if not path.is_file() or path.suffix not in ALLOWED_FILE_TYPES: + raise UnsupportedDefaultFileType( + f"Defaults directory should contain only one of the next types: {ALLOWED_FILE_TYPES}. Found: {path}" + ) + + if path.suffix in YAML_EXTENSIONS: + default_jsons[path.stem] = yaml.safe_load(path.read_text()) + else: + default_jsons[path.stem] = json.loads(path.read_text()) + + return Defaults( + blueprints=default_jsons.get("blueprints", []), + actions=default_jsons.get("actions", []), + scorecards=default_jsons.get("scorecards", []), + port_app_config=port_app_config_class( + **default_jsons.get("port-app-config", {}) + ), + ) diff --git a/port_ocean/port_defaults.py b/port_ocean/core/defaults/initialize.py similarity index 65% rename from port_ocean/port_defaults.py rename to port_ocean/core/defaults/initialize.py index 05c35cf5b4..403d0fa397 100644 --- a/port_ocean/port_defaults.py +++ b/port_ocean/core/defaults/initialize.py @@ -1,13 +1,8 @@ import asyncio -import json -from pathlib import Path -from typing import Type, Any, TypedDict, Optional +from typing import Type, Any import httpx -import yaml from loguru import logger -from pydantic import BaseModel, Field -from starlette import status from port_ocean.clients.port.client import PortClient from port_ocean.config.settings import IntegrationConfiguration @@ -15,39 +10,9 @@ from port_ocean.core.handlers.port_app_config.models import PortAppConfig from port_ocean.exceptions.port_defaults import ( AbortDefaultCreationError, - UnsupportedDefaultFileType, ) -YAML_EXTENSIONS = [".yaml", ".yml"] -ALLOWED_FILE_TYPES = [".json", *YAML_EXTENSIONS] - - -class Preset(TypedDict): - blueprint: str - data: list[dict[str, Any]] - - -class Defaults(BaseModel): - blueprints: list[dict[str, Any]] = [] - actions: list[Preset] = [] - scorecards: list[Preset] = [] - port_app_config: Optional[PortAppConfig] = Field( - default=None, alias="port-app-config" - ) - - class Config: - allow_population_by_field_name = True - - -async def _is_integration_exists(port_client: PortClient) -> bool: - try: - await port_client.get_current_integration(should_log=False) - return True - except httpx.HTTPStatusError as e: - if e.response.status_code != status.HTTP_404_NOT_FOUND: - raise e - - return False +from port_ocean.core.defaults.common import Defaults, get_port_integration_defaults def deconstruct_blueprints_to_creation_steps( @@ -152,11 +117,9 @@ async def _initialize_defaults( config_class: Type[PortAppConfig], integration_config: IntegrationConfiguration ) -> None: port_client = ocean.port_client - is_exists = await _is_integration_exists(port_client) - if is_exists: - return None defaults = get_port_integration_defaults(config_class) if not defaults: + logger.warning("No defaults found. Skipping...") return None try: @@ -186,41 +149,3 @@ def initialize_defaults( ) except Exception as e: logger.debug(f"Failed to initialize defaults, skipping... Error: {e}") - - -def get_port_integration_defaults( - port_app_config_class: Type[PortAppConfig], base_path: Path = Path(".") -) -> Defaults | None: - defaults_dir = base_path / ".port/resources" - if not defaults_dir.exists(): - return None - - if not defaults_dir.is_dir(): - raise UnsupportedDefaultFileType( - f"Defaults directory is not a directory: {defaults_dir}" - ) - - default_jsons = {} - allowed_file_names = [ - field_model.alias for _, field_model in Defaults.__fields__.items() - ] - for path in defaults_dir.iterdir(): - if path.stem in allowed_file_names: - if not path.is_file() or path.suffix not in ALLOWED_FILE_TYPES: - raise UnsupportedDefaultFileType( - f"Defaults directory should contain only one of the next types: {ALLOWED_FILE_TYPES}. Found: {path}" - ) - - if path.suffix in YAML_EXTENSIONS: - default_jsons[path.stem] = yaml.safe_load(path.read_text()) - else: - default_jsons[path.stem] = json.loads(path.read_text()) - - return Defaults( - blueprints=default_jsons.get("blueprints", []), - actions=default_jsons.get("actions", []), - scorecards=default_jsons.get("scorecards", []), - port_app_config=port_app_config_class( - **default_jsons.get("port-app-config", {}) - ), - ) diff --git a/port_ocean/core/models.py b/port_ocean/core/models.py index 191a26eb9f..5d7bd88331 100644 --- a/port_ocean/core/models.py +++ b/port_ocean/core/models.py @@ -28,3 +28,11 @@ class Blueprint(BaseModel): team: str | None properties_schema: dict[str, Any] = Field(alias="schema") relations: dict[str, BlueprintRelation] + + +class Migration(BaseModel): + id: str + actor: str + sourceBlueprint: str + mapping: dict[str, Any] + status: str diff --git a/port_ocean/run.py b/port_ocean/run.py index 7397e80e90..c63cb561b7 100644 --- a/port_ocean/run.py +++ b/port_ocean/run.py @@ -1,60 +1,25 @@ -import sys -from importlib.util import spec_from_file_location, module_from_spec -from inspect import getmembers, isclass -from types import ModuleType +from inspect import getmembers from typing import Type +from pydantic import BaseModel import uvicorn +from port_ocean.bootstrap import create_default_app from port_ocean.config.dynamic import default_config_factory -from port_ocean.config.settings import LogLevelType, ApplicationSettings -from port_ocean.core.integrations.base import BaseIntegration +from port_ocean.config.settings import ApplicationSettings, LogLevelType +from port_ocean.core.defaults.initialize import initialize_defaults from port_ocean.logger_setup import setup_logger from port_ocean.ocean import Ocean -from port_ocean.port_defaults import initialize_defaults -from port_ocean.utils import get_spec_file +from port_ocean.utils import get_spec_file, load_module -def _get_base_integration_class_from_module( - module: ModuleType, -) -> Type[BaseIntegration]: - for name, obj in getmembers(module): - if ( - isclass(obj) - and type(obj) == type - and issubclass(obj, BaseIntegration) - and obj != BaseIntegration - ): - return obj - - raise Exception(f"Failed to load integration from module: {module.__name__}") - - -def _load_module(file_path: str) -> ModuleType: - spec = spec_from_file_location("module_name", file_path) - if spec is None or spec.loader is None: - raise Exception(f"Failed to load integration from path: {file_path}") - - module = module_from_spec(spec) - spec.loader.exec_module(module) - - return module - - -def _create_default_app(path: str | None = None) -> Ocean: - sys.path.append(".") - try: - integration_path = f"{path}/integration.py" if path else "integration.py" - module = _load_module(integration_path) - integration_class = _get_base_integration_class_from_module(module) - except Exception: - integration_class = None - +def _get_default_config_factory() -> None | Type[BaseModel]: spec = get_spec_file() config_factory = None if spec is not None: config_factory = default_config_factory(spec.get("configurations", [])) - return Ocean(integration_class=integration_class, config_factory=config_factory) + + return config_factory def run( @@ -66,10 +31,11 @@ def run( application_settings = ApplicationSettings(log_level=log_level, port=port) setup_logger(application_settings.log_level) - default_app = _create_default_app(path) + config_factory = _get_default_config_factory() + default_app = create_default_app(path, config_factory) main_path = f"{path}/main.py" if path else "main.py" - app_module = _load_module(main_path) + app_module = load_module(main_path) app: Ocean = {name: item for name, item in getmembers(app_module)}.get( "app", default_app ) diff --git a/port_ocean/utils.py b/port_ocean/utils.py index 6e51cc28e4..2a71b79062 100644 --- a/port_ocean/utils.py +++ b/port_ocean/utils.py @@ -1,6 +1,8 @@ +from importlib.util import module_from_spec, spec_from_file_location import inspect from pathlib import Path from time import time +from types import ModuleType from typing import Callable, Any from uuid import uuid4 @@ -31,3 +33,14 @@ def get_spec_file(path: Path = Path(".")) -> dict[str, Any] | None: return yaml.safe_load((path / ".port/spec.yaml").read_text()) except FileNotFoundError: return None + + +def load_module(file_path: str) -> ModuleType: + spec = spec_from_file_location("module.name", file_path) + if spec is None or spec.loader is None: + raise Exception(f"Failed to load integration from path: {file_path}") + + module = module_from_spec(spec) + spec.loader.exec_module(module) + + return module