Skip to content

Commit

Permalink
✨ Source Sendgrid: Move contacts stream to async declarative component (
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 authored Sep 11, 2024
1 parent e4fec50 commit e679191
Show file tree
Hide file tree
Showing 9 changed files with 508 additions and 503 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: fbb5fbe2-16ad-4cf4-af7d-ff9d9c316c87
dockerImageTag: 1.0.18
dockerImageTag: 1.1.0
releases:
breakingChanges:
1.0.0:
Expand Down
496 changes: 369 additions & 127 deletions airbyte-integrations/connectors/source-sendgrid/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "1.0.18"
version = "1.1.0"
name = "source-sendgrid"
description = "Source implementation for Sendgrid."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -16,8 +16,8 @@ repository = "https://github.com/airbytehq/airbyte"
include = "source_sendgrid"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "^0"
python = "^3.10,<3.12"
airbyte_cdk = "^5"
pandas = "^2.1.1"

[tool.poetry.scripts]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import pendulum
from airbyte_cdk.config_observation import create_connector_config_control_message
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.models import AirbyteMessageSerializer
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
from orjson import orjson

logger = logging.getLogger("airbyte_logger")

Expand Down Expand Up @@ -68,7 +70,8 @@ def modify_and_save(cls, config_path: str, source: Source, config: Mapping[str,

@classmethod
def emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
print(create_connector_config_control_message(migrated_config).json(exclude_unset=True))
message = create_connector_config_control_message(migrated_config)
print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())

@classmethod
def migrate(cls, args: List[str], source: Source) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: 0.81.1
version: 5.5.0
type: DeclarativeSource
check:
type: CheckStream
Expand Down Expand Up @@ -874,6 +874,62 @@ streams:
page_size: 100
cursor_value: '{{ response.get("_metadata", {}).get("next", {}) }}'
stop_condition: '{{ not response.get("_metadata", {}).get("next", {}) }}'

- type: DeclarativeStream
name: contacts
primary_key:
- contact_id
$parameters:
name: contacts
schema_loader:
type: JsonFileSchemaLoader
file_path: "./source_sendgrid/schemas/{{ parameters['name'] }}.json"
retriever:
type: AsyncRetriever
status_mapping:
running:
- pending
completed:
- ready
failed:
- failed
timeout:
- timeout
status_extractor:
type: DpathExtractor
field_path: ["status"]
urls_extractor:
type: DpathExtractor
field_path: ["urls"]
creation_requester:
type: HttpRequester
http_method: POST
url_base: https://api.sendgrid.com
path: /v3/marketing/contacts/exports
authenticator:
type: BearerAuthenticator
api_token: "{{ config['api_key'] }}"
polling_requester:
type: HttpRequester
http_method: GET
url_base: https://api.sendgrid.com
path: "/v3/marketing/contacts/exports/{{stream_slice['create_job_response'].json()['id'] }}"
authenticator:
type: BearerAuthenticator
api_token: "{{ config['api_key'] }}"
download_requester:
type: HttpRequester
http_method: GET
url_base: ""
path: "{{stream_slice['url']}}"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: []
transformations:
- type: KeysToLower

spec:
connection_specification:
$schema: http://json-schema.org/draft-07/schema#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,10 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Any, List, Mapping

from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator

from .streams import Contacts


# Hybrid Declarative Source
class SourceSendgrid(YamlDeclarativeSource):
def __init__(self):
# this takes care of check and other methods
def __init__(self) -> None:
super().__init__(**{"path_to_yaml": "manifest.yaml"})

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
# get all the lowcode streams
streams = super().streams(config)
authenticator = TokenAuthenticator(config["api_key"])
# this stream download a csv file from sendgrid and emits the records
# it's not currently easy to do in lowcode, so we do it in python
streams.append(Contacts(authenticator=authenticator))
return streams

This file was deleted.

Loading

0 comments on commit e679191

Please sign in to comment.