Skip to content

Commit

Permalink
Merge pull request #282 from bluelabsio/rm-124-airbyte-engine-healthc…
Browse files Browse the repository at this point in the history
…heck

RM-124: Add command to perform a healthcheck against airbyte server
  • Loading branch information
naswierczek authored Mar 13, 2024
2 parents e6f54f3 + e106732 commit 0703875
Show file tree
Hide file tree
Showing 12 changed files with 191 additions and 13 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,13 @@ docker compose run test
It's theoretically possible to build sufficient additional
containers to support running integration tests locally but
that has not been planned yet.

### In-progress airbyte integration

Currently in progress integration airbyte as an alternative engine for executing
source <-> destination transfers. This functionality is currently behind a
feature flag. To enable all airbyte features in your environment, set the
following environment variable:
```bash
export RECORDS_MOVER_AIRBYTE_ENABLED=1
```
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ classifiers = [
dependencies = [
"timeout_decorator",
"PyYAML>=3.10",
"db-facts>=4",
"db-facts>=5",
"chardet>=3",
"tenacity>=8.0.1",
# NOTE: library no longer under active development, don't want to be surprised
Expand Down Expand Up @@ -159,6 +159,7 @@ cli = [
"odictliteral",
"jsonschema",
"docstring_parser",
"requests",
]

vertica = [
Expand Down
16 changes: 14 additions & 2 deletions records_mover/creds/base_creds.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import os
import logging
if TYPE_CHECKING:
# see the 'gsheets' extras_require option in setup.py - needed for this!
import google.auth.credentials # noqa
import boto3 # noqa

Expand Down Expand Up @@ -54,7 +53,10 @@ def __init__(self,
None] = PleaseInfer.token,
scratch_gcs_url: Union[PleaseInfer,
str,
None] = PleaseInfer.token) -> None:
None] = PleaseInfer.token,
default_airbyte_creds: Union[PleaseInfer,
Dict[str, Any],
None] = PleaseInfer.token) -> None:
self._default_db_creds_name = default_db_creds_name
self._default_aws_creds_name = default_aws_creds_name
self._default_gcp_creds_name = default_gcp_creds_name
Expand All @@ -67,6 +69,8 @@ def __init__(self,
self._scratch_s3_url = scratch_s3_url
self._scratch_gcs_url = scratch_gcs_url

self._default_airbyte_creds = default_airbyte_creds

def google_sheets(self, gcp_creds_name: str) -> 'google.auth.credentials.Credentials':
scopes = _GSHEETS_SCOPES
return self._gcp_creds(gcp_creds_name, scopes)
Expand Down Expand Up @@ -266,3 +270,11 @@ def default_scratch_gcs_url(self) -> Optional[str]:
if self._scratch_gcs_url is PleaseInfer.token:
self._scratch_gcs_url = self._infer_scratch_gcs_url()
return self._scratch_gcs_url

def _infer_airbyte_creds(self) -> Dict[str, Any]:
raise NotImplementedError

def airbyte(self) -> Optional[Dict[str, Any]]:
if self._default_airbyte_creds is PleaseInfer.token:
self._default_airbyte_creds = self._infer_airbyte_creds()
return self._default_airbyte_creds
18 changes: 14 additions & 4 deletions records_mover/creds/creds_via_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import base64
import json
from typing import Iterable, Optional
from typing import Iterable, Optional, Any, Dict
from .base_creds import BaseCreds
from typing import TYPE_CHECKING
from db_facts.db_facts_types import DBFacts
Expand All @@ -13,9 +13,19 @@


class CredsViaEnv(BaseCreds):
def _gcp_creds_from_env(self,
scopes: Iterable[str]) ->\
Optional['google.auth.credentials.Credentials']:
def _infer_airbyte_creds(self) -> Dict[str, Any]:
if 'AIRBYTE_CONNECTION' not in os.environ:
return {}
return {
'user': 'username',
'host': 'host',
'port': 0,
'endpoint': 'endpoint',
'password': 'password',
}

def _gcp_creds_from_env(self, scopes: Iterable[str]) \
-> Optional['google.auth.credentials.Credentials']:
if 'GCP_SERVICE_ACCOUNT_JSON_BASE64' not in os.environ:
return None
import google.oauth2.service_account
Expand Down
12 changes: 11 additions & 1 deletion records_mover/creds/creds_via_lastpass.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,22 @@
from .base_creds import BaseCreds
from typing import TYPE_CHECKING
if TYPE_CHECKING:
# see the 'gsheets' extras_require option in setup.py - needed for this!
import google.auth.credentials # noqa
import boto3 # noqa


class CredsViaLastPass(BaseCreds):
def _infer_airbyte_creds(self) -> dict:
# Magic string! Huzzah. Assumes you have this entry in your local password manager
cred_name = 'airbyte'
return {
'user': lpass_field(cred_name, 'username'),
'host': lpass_field(cred_name, 'host'),
'port': int(lpass_field(cred_name, 'port')),
'endpoint': lpass_field(cred_name, 'endpoint'),
'password': lpass_field(cred_name, 'password'),
}

def _gcp_creds(self, gcp_creds_name: str,
scopes: Iterable[str]) -> 'google.auth.credentials.Credentials':
import google.oauth2.service_account
Expand Down
1 change: 0 additions & 1 deletion records_mover/creds/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def db_facts_from_env() -> DBFacts:
}

db_facts.update(redshift_base_url_values)

if None in db_facts.values():
raise NotImplementedError("Please run with with-db or set DB_* environment variables")
return db_facts # type: ignore
48 changes: 48 additions & 0 deletions records_mover/records/airbyte/airbyte.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import logging
from http import HTTPStatus
import requests
from records_mover import Session

logger = logging.getLogger(__name__)


class AirbyteEngine:
"""
Intention: Main engine of activity for connecting to and managing airflow "stuff."
General thoughts, maybe this should encapsulate making a request to airbyte
Thus we'd have a method for making a request which'd know how to authenticate
"""
session: Session

def __init__(self, session: Session):
"""
Args:
session: Optional records mover Session, exposed for testing.
If a session isn't provided, one will be requested
"""
if session is None:
self.session = Session()
else:
self.session = session

def healthcheck(self) -> bool:
self.session.set_stream_logging()
data = self.session.creds.airbyte()
if data is None:
logger.error("Could not retrieve credentials from secrets store")
return False
url = f"{data['host']}:{data['port']}/health"
username = data['user']
password = data['password']
try:
response = requests.get(url, auth=(username, password))
logger.debug(f"""Airbyte instance {data['host']}. HTTP status code
{response.status_code}""")
if response.status_code is HTTPStatus.OK.value:
return True
else:
return False
except Exception as e:
logger.error(f"""Exception encountered executing HTTP request against configured
airbyte instance: {e}""")
return False
26 changes: 22 additions & 4 deletions records_mover/records/cli.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
"""CLI to move records from place to place"""
import argparse
from odictliteral import odict
from .airbyte.airbyte import AirbyteEngine
from .job.schema import method_to_json_schema
from .job.mover import run_records_mover_job
from ..utils.json_schema import method_signature_to_json_schema
from .processing_instructions import ProcessingInstructions
from records_mover.cli.job_config_schema_as_args_parser import (JobConfigSchemaAsArgsParser,
arguments_output_to_config)
from records_mover.cli.job_config_schema_as_args_parser import (
JobConfigSchemaAsArgsParser, arguments_output_to_config
)
from records_mover.logging import set_stream_logging
from ..mover_types import JsonSchema, JobConfig
from ..version import __version__
import sys
import os
from typing import Callable, Dict, Any, TYPE_CHECKING
if TYPE_CHECKING:
from records_mover import Session
Expand Down Expand Up @@ -82,10 +85,15 @@ def build_parser() -> argparse.ArgumentParser:

# https://stackoverflow.com/questions/15405636/pythons-argparse-to-show-programs-version-with-prog-and-version-string-formatt
parser.add_argument('-V', '--version', action='version', version="%(prog)s ("+__version__+")")

airbyte_feature_flag = os.getenv('RECORDS_MOVER_AIRBYTE_ENABLED')
if airbyte_feature_flag is not None:
parser.add_argument('-hc', '--healthcheck', action='store_true', required=False,
help='Returns health of the configured airbyte instance')

subparsers = parser.add_subparsers(help='subcommand_help')
from records_mover import Session
bootstrap_session = Session()

for source in sources:
for target in targets:
name = f"{source}2{target}"
Expand Down Expand Up @@ -114,7 +122,17 @@ def main() -> None:
args = parser.parse_args()
raw_config = vars(args)
func = getattr(args, 'func', None)
if func is None:

if "healthcheck" in args:
from records_mover import Session
session = Session()
engine = AirbyteEngine(session)
result = engine.healthcheck()
if result:
print("Airbyte Status: OK!")
else:
print("Airbyte Status: Unhealthy")
elif func is None:
parser.print_help()
else:
set_stream_logging()
Expand Down
63 changes: 63 additions & 0 deletions tests/component/airbyte/test_airbyte_healthcheck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import unittest
from http import HTTPStatus
from unittest.mock import patch, MagicMock

from records_mover.creds.base_creds import BaseCreds
from records_mover.records.airbyte.airbyte import AirbyteEngine


class MockCreds(BaseCreds):
"""
Helper class to provide credentials to the airbyte engine
"""

def airbyte(self):
return {
'user': 'username',
'password': 'password',
'host': 'host',
'port': '8000',
'endpoint': 'endpoint'
}


class MockSession:
"""
Helper class to provide credentials, just so we don't have to worry about db-facts
"""

def __init__(self):
self.creds = MockCreds()

def set_stream_logging(self):
pass


class AirbyteHealthCheckTest(unittest.TestCase):
@patch('requests.Session.send')
def test_airbyte_healthcheck_returns_true_when_healthy(self, send):
# Given
engine = AirbyteEngine(session=MockSession())
response = MagicMock()
response.status_code = HTTPStatus.OK.value
send.return_value = response

# When
result = engine.healthcheck()

# Then
self.assertTrue(result)

@patch('requests.Session.send')
def test_airbyte_healthcheck_returns_false_when_unhealthy(self, send):
# Given
engine = AirbyteEngine(session=MockSession())
response = MagicMock()
response.status_code = HTTPStatus.NOT_FOUND.value
send.return_value = response

# When
result = engine.healthcheck()

# Then
self.assertFalse(result)
5 changes: 5 additions & 0 deletions tests/unit/records/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import unittest

import mock
from mock import patch, call, Mock
from records_mover.records.cli import main

Expand Down Expand Up @@ -26,6 +28,9 @@ def test_main(self,
mock_google_auth_default.return_value = (mock_credentials, mock_project)
mock_parser = mock_argparse.ArgumentParser.return_value
mock_subparsers = mock_parser.add_subparsers.return_value
mock_args = mock.MagicMock()
mock_args.healthcheck = False
mock_parser.parse_args.return_value = mock_args
main()
# pick an example
mock_subparsers.add_parser.assert_has_calls([call('table2recordsdir',
Expand Down
Empty file.
2 changes: 2 additions & 0 deletions types/stubs/requests/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def get(url, params=None, **kwargs):
...

0 comments on commit 0703875

Please sign in to comment.