Skip to content

Commit

Permalink
Sample Event Listener (#6)
Browse files Browse the repository at this point in the history
* Renamed trigger channel to event listener
* Added sample event listener
* Moved each event listener to its own dir
  • Loading branch information
yairsimantov20 authored Jul 13, 2023
1 parent 9266c10 commit c672c5e
Show file tree
Hide file tree
Showing 25 changed files with 297 additions and 153 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ port:
clientId: PORT_CLIENT_ID # Can be loaded via environment variable: PORT_CLIENT_ID
clientSecret: PORT_CLIENT_SECRET # Can be loaded via environment variable: PORT_CLIENT_SECRET
baseUrl: https://api.getport.io/v1
# The trigger channel to use for the integration service.
triggerChannel:
# The event listener to use for the integration service.
eventListener:
type: KAFKA / WEBHOOK
integration:
# The name of the integration.
Expand Down
4 changes: 2 additions & 2 deletions assets/ExportArchitecture.excalidraw
Original file line number Diff line number Diff line change
Expand Up @@ -419,11 +419,11 @@
"locked": false,
"fontSize": 16,
"fontFamily": 1,
"text": "Trigger channel",
"text": "Event listener",
"textAlign": "left",
"verticalAlign": "top",
"containerId": null,
"originalText": "Trigger channel",
"originalText": "Event listener",
"lineHeight": 1.25,
"baseline": 14
},
Expand Down
2 changes: 1 addition & 1 deletion assets/ExportArchitecture.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 3 additions & 4 deletions port_ocean/cli/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
try:
from .commands import cli_start # ruff: noqa: F401
except ImportError:
print("Failed to import commands.")
from .commands import cli_start

__all__ = ["cli_start"]
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ port:
clientId: PORT_CLIENT_ID # Can be loaded via environment variable: PORT_CLIENT_ID
clientSecret: PORT_CLIENT_SECRET # Can be loaded via environment variable: PORT_CLIENT_SECRET
baseUrl: https://api.getport.io
# The trigger channel to use for the integration service.
triggerChannel:
# The event listener to use for the integration service.
eventListener:
type: KAFKA
integration:
# The identifier of this integration instance.
Expand Down
4 changes: 3 additions & 1 deletion port_ocean/clients/port/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ def __init__(
integration_type,
)
EntityClientMixin.__init__(self, self.auth, self.client)
IntegrationClientMixin.__init__(self, self.auth, self.client)
IntegrationClientMixin.__init__(
self, integration_identifier, self.auth, self.client
)

async def get_kafka_creds(self, silent: bool = False) -> KafkaCreds:
logger.info("Fetching organization kafka credentials")
Expand Down
16 changes: 11 additions & 5 deletions port_ocean/clients/port/mixins/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,20 @@


class IntegrationClientMixin:
def __init__(self, auth: PortAuthentication, client: httpx.AsyncClient):
def __init__(
self,
integration_identifier: str,
auth: PortAuthentication,
client: httpx.AsyncClient,
):
self.integration_identifier = integration_identifier
self.auth = auth
self.client = client

async def get_integration(self, identifier: str) -> dict[str, Any]:
logger.info(f"Fetching integration with id: {identifier}")
async def get_current_integration(self) -> dict[str, Any]:
logger.info(f"Fetching integration with id: {self.integration_identifier}")
response = await self.client.get(
f"{self.auth.api_url}/integration/{identifier}",
f"{self.auth.api_url}/integration/{self.integration_identifier}",
headers=await self.auth.headers(),
)
response.raise_for_status()
Expand Down Expand Up @@ -58,7 +64,7 @@ async def initiate_integration(
) -> None:
logger.info(f"Initiating integration with id: {identifier}")
try:
integration = await self.get_integration(identifier)
integration = await self.get_current_integration(identifier)

logger.info("Checking for diff in integration configuration")
if (
Expand Down
9 changes: 2 additions & 7 deletions port_ocean/config/integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
from pydantic import BaseModel, Field, BaseSettings

from port_ocean.config.base import BaseOceanSettings
from port_ocean.core.trigger_channel.settings import (
HttpTriggerChannelSettings,
KafkaTriggerChannelSettings,
)
from port_ocean.core.event_listener import EventListenerSettingsType


class PortSettings(BaseSettings):
Expand All @@ -23,9 +20,7 @@ class IntegrationSettings(BaseSettings):

class IntegrationConfiguration(BaseOceanSettings):
port: PortSettings
trigger_channel: KafkaTriggerChannelSettings | HttpTriggerChannelSettings = Field(
alias="triggerChannel"
)
event_listener: EventListenerSettingsType = Field(alias="eventListener")
batch_work_size: int | None = Field(alias="batchWorkSize", default=None)
integration: IntegrationSettings

Expand Down
4 changes: 2 additions & 2 deletions port_ocean/context/ocean.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from port_ocean.clients.port.client import PortClient
from port_ocean.clients.port.types import UserAgentType
from port_ocean.config.integration import IntegrationConfiguration
from port_ocean.core.models import Entity
from port_ocean.core.types import (
RESYNC_EVENT_LISTENER,
Expand All @@ -16,6 +15,7 @@
from port_ocean.exceptions.context import PortOceanContextNotFoundError

if TYPE_CHECKING:
from port_ocean.config.integration import IntegrationConfiguration
from port_ocean.core.integrations.base import BaseIntegration
from port_ocean.ocean import Ocean

Expand All @@ -25,7 +25,7 @@ class PortOceanContext:
app: "Ocean"

@property
def config(self) -> IntegrationConfiguration:
def config(self) -> "IntegrationConfiguration":
return self.app.config

@property
Expand Down
17 changes: 17 additions & 0 deletions port_ocean/core/event_listener/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from .http.event_listener import HttpEventListener, HttpEventListenerSettings
from .kafka.event_listener import KafkaEventListener, KafkaEventListenerSettings
from .sample.event_listener import SampleEventListener, SampleEventListenerSettings

EventListenerSettingsType = (
HttpEventListenerSettings | KafkaEventListenerSettings | SampleEventListenerSettings
)

__all__ = [
"EventListenerSettingsType",
"HttpEventListener",
"HttpEventListenerSettings",
"KafkaEventListener",
"KafkaEventListenerSettings",
"SampleEventListener",
"SampleEventListenerSettings",
]
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
from abc import abstractmethod
from typing import TypedDict, Callable, Any, Awaitable

from pydantic import BaseSettings

class TriggerChannelEvents(TypedDict):

class EventListenerEvents(TypedDict):
on_resync: Callable[[dict[Any, Any]], Awaitable[None]]


class BaseTriggerChannel:
class BaseEventListener:
def __init__(
self,
events: TriggerChannelEvents,
events: EventListenerEvents,
):
self.events = events

@abstractmethod
async def start(self) -> None:
pass


class EventListenerSettings(BaseSettings):
type: str

def to_request(self) -> dict[str, Any]:
return {"type": self.type}
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,29 @@

from port_ocean.context.ocean import PortOceanContext
from port_ocean.core.base import BaseWithContext
from port_ocean.core.trigger_channel.base import (
BaseTriggerChannel,
TriggerChannelEvents,
from port_ocean.core.event_listener import (
HttpEventListener,
KafkaEventListener,
SampleEventListener,
)
from port_ocean.core.trigger_channel.http import (
HttpTriggerChannel,
from port_ocean.core.event_listener import (
HttpEventListenerSettings,
KafkaEventListenerSettings,
SampleEventListenerSettings,
)
from port_ocean.core.trigger_channel.kafka import (
KafkaTriggerChannel,
from port_ocean.core.event_listener.base import (
BaseEventListener,
EventListenerEvents,
)
from port_ocean.core.trigger_channel.settings import (
HttpTriggerChannelSettings,
KafkaTriggerChannelSettings,
)
from port_ocean.exceptions.core import UnsupportedTriggerChannelException
from port_ocean.exceptions.core import UnsupportedEventListenerTypeException


class TriggerChannelFactory(BaseWithContext):
class EventListenerFactory(BaseWithContext):
def __init__(
self,
context: PortOceanContext,
installation_id: str,
events: TriggerChannelEvents,
events: EventListenerEvents,
):
super().__init__(context)
self.installation_id = installation_id
Expand All @@ -45,23 +45,23 @@ async def wrapper(event: dict[Any, Any]) -> None:

return wrapper

async def create_trigger_channel(self) -> BaseTriggerChannel:
wrapped_events: TriggerChannelEvents = {
async def create_event_listener(self) -> BaseEventListener:
wrapped_events: EventListenerEvents = {
"on_resync": self.on_event(self.events["on_resync"])
}
trigger_channel: BaseTriggerChannel
config = self.context.config.trigger_channel
event_listener: BaseEventListener
config = self.context.config.event_listener
_type = config.type.lower()
assert_message = "Invalid trigger channel config, expected KafkaTriggerChannelSettings and got {0}"
logger.info(f"Found trigger channel type: {_type}")
assert_message = "Invalid event listener config, expected KafkaEventListenerSettings and got {0}"
logger.info(f"Found event listener type: {_type}")

match _type:
case "kafka":
assert isinstance(
config, KafkaTriggerChannelSettings
config, KafkaEventListenerSettings
), assert_message.format(type(config))
org_id = await self.context.port_client.get_org_id()
trigger_channel = KafkaTriggerChannel(
event_listener = KafkaEventListener(
wrapped_events,
config,
org_id,
Expand All @@ -71,13 +71,19 @@ async def create_trigger_channel(self) -> BaseTriggerChannel:

case "webhook":
assert isinstance(
config, HttpTriggerChannelSettings
config, HttpEventListenerSettings
), assert_message.format(type(config))
event_listener = HttpEventListener(wrapped_events, config)

case "sample":
assert isinstance(
config, SampleEventListenerSettings
), assert_message.format(type(config))
trigger_channel = HttpTriggerChannel(wrapped_events, config)
event_listener = SampleEventListener(wrapped_events, config)

case _:
raise UnsupportedTriggerChannelException(
f"Trigger channel {_type} not supported"
raise UnsupportedEventListenerTypeException(
f"Event listener {_type} not supported"
)

return trigger_channel
return event_listener
File renamed without changes.
43 changes: 43 additions & 0 deletions port_ocean/core/event_listener/http/event_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Literal, Any

from fastapi import APIRouter
from loguru import logger
from pydantic import AnyHttpUrl, Field

from port_ocean.context.ocean import ocean
from port_ocean.core.event_listener.base import (
BaseEventListener,
EventListenerEvents,
EventListenerSettings,
)


class HttpEventListenerSettings(EventListenerSettings):
type: Literal["WEBHOOK"]
app_host: AnyHttpUrl = Field(alias="appHost")

def to_request(self) -> dict[str, Any]:
return {
**super().to_request(),
"url": self.app_host + "/resync",
}


class HttpEventListener(BaseEventListener):
def __init__(
self,
events: EventListenerEvents,
event_listener_config: HttpEventListenerSettings,
):
super().__init__(events)
self.event_listener_config = event_listener_config

async def start(self) -> None:
logger.info("Setting up HTTP Event Listener")
target_channel_router = APIRouter()

@target_channel_router.post("/resync")
async def resync() -> None:
await self.events["on_resync"]({})

ocean.app.fast_api_app.include_router(target_channel_router)
Empty file.
Loading

0 comments on commit c672c5e

Please sign in to comment.