Skip to content

Commit

Permalink
RM-124: Add command to perform a healthcheck against airbyte server
Browse files Browse the repository at this point in the history
- adds new optional arugment that can be passed to mvrec that will
  query the healthcheck endpoint on the configured airbyte instance
  and return either true or false depending on if that endpoint
  returns a 200 status code
- adds "feature flag" so this can dark launch
- Added new class to support argument
- Added initial methods for retieving airbyte credentials
- Added unit tests for new class method
  • Loading branch information
naswierczek committed Mar 13, 2024
1 parent e6f54f3 commit 10b8129
Show file tree
Hide file tree
Showing 12 changed files with 192 additions and 17 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,13 @@ docker compose run test
It's theoretically possible to build sufficient additional
containers to support running integration tests locally but
that has not been planned yet.

### In-progress airbyte integration

Currently in progress integration airbyte as an alternative engine for executing
source <-> destination transfers. This functionality is currently behind a
feature flag. To enable all airbyte features in your environment, set the
following environment variable:
```bash
export RECORDS_MOVER_AIRBYTE_ENABLED=1
```
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ classifiers = [
dependencies = [
"timeout_decorator",
"PyYAML>=3.10",
"db-facts>=4",
"db-facts>=5",
"chardet>=3",
"tenacity>=8.0.1",
# NOTE: library no longer under active development, don't want to be surprised
Expand Down Expand Up @@ -159,6 +159,7 @@ cli = [
"odictliteral",
"jsonschema",
"docstring_parser",
"requests",
]

vertica = [
Expand Down
16 changes: 14 additions & 2 deletions records_mover/creds/base_creds.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import os
import logging
if TYPE_CHECKING:
# see the 'gsheets' extras_require option in setup.py - needed for this!
import google.auth.credentials # noqa
import boto3 # noqa

Expand Down Expand Up @@ -54,7 +53,10 @@ def __init__(self,
None] = PleaseInfer.token,
scratch_gcs_url: Union[PleaseInfer,
str,
None] = PleaseInfer.token) -> None:
None] = PleaseInfer.token,
default_airbyte_creds: Union[PleaseInfer,
Dict[str, Any],
None] = PleaseInfer.token) -> None:
self._default_db_creds_name = default_db_creds_name
self._default_aws_creds_name = default_aws_creds_name
self._default_gcp_creds_name = default_gcp_creds_name
Expand All @@ -67,6 +69,8 @@ def __init__(self,
self._scratch_s3_url = scratch_s3_url
self._scratch_gcs_url = scratch_gcs_url

self._default_airbyte_creds = default_airbyte_creds

def google_sheets(self, gcp_creds_name: str) -> 'google.auth.credentials.Credentials':
scopes = _GSHEETS_SCOPES
return self._gcp_creds(gcp_creds_name, scopes)
Expand Down Expand Up @@ -266,3 +270,11 @@ def default_scratch_gcs_url(self) -> Optional[str]:
if self._scratch_gcs_url is PleaseInfer.token:
self._scratch_gcs_url = self._infer_scratch_gcs_url()
return self._scratch_gcs_url

def _infer_airbyte_creds(self) -> Dict[str, Any]:
raise NotImplementedError

def airbyte(self) -> Optional[Dict[str, Any]]:
if self._default_airbyte_creds is PleaseInfer.token:
self._default_airbyte_creds = self._infer_airbyte_creds()
return self._default_airbyte_creds
18 changes: 14 additions & 4 deletions records_mover/creds/creds_via_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import base64
import json
from typing import Iterable, Optional
from typing import Iterable, Optional, Any, Dict
from .base_creds import BaseCreds
from typing import TYPE_CHECKING
from db_facts.db_facts_types import DBFacts
Expand All @@ -13,9 +13,19 @@


class CredsViaEnv(BaseCreds):
def _gcp_creds_from_env(self,
scopes: Iterable[str]) ->\
Optional['google.auth.credentials.Credentials']:
def _infer_airbyte_creds(self) -> Dict[str, Any]:
if 'AIRBYTE_CONNECTION' not in os.environ:
return {}
return {
'user': 'username',
'host': 'host',
'port': 0,
'endpoint': 'endpoint',
'password': 'password',
}

def _gcp_creds_from_env(self, scopes: Iterable[str]) \
-> Optional['google.auth.credentials.Credentials']:
if 'GCP_SERVICE_ACCOUNT_JSON_BASE64' not in os.environ:
return None
import google.oauth2.service_account
Expand Down
12 changes: 11 additions & 1 deletion records_mover/creds/creds_via_lastpass.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,22 @@
from .base_creds import BaseCreds
from typing import TYPE_CHECKING
if TYPE_CHECKING:
# see the 'gsheets' extras_require option in setup.py - needed for this!
import google.auth.credentials # noqa
import boto3 # noqa


class CredsViaLastPass(BaseCreds):
def _infer_airbyte_creds(self) -> dict:
# Magic string! Huzzah. Assumes you have this entry in your local password manager
cred_name = 'airbyte'
return {
'user': lpass_field(cred_name, 'username'),
'host': lpass_field(cred_name, 'host'),
'port': int(lpass_field(cred_name, 'port')),
'endpoint': lpass_field(cred_name, 'endpoint'),
'password': lpass_field(cred_name, 'password'),
}

def _gcp_creds(self, gcp_creds_name: str,
scopes: Iterable[str]) -> 'google.auth.credentials.Credentials':
import google.oauth2.service_account
Expand Down
1 change: 0 additions & 1 deletion records_mover/creds/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def db_facts_from_env() -> DBFacts:
}

db_facts.update(redshift_base_url_values)

if None in db_facts.values():
raise NotImplementedError("Please run with with-db or set DB_* environment variables")
return db_facts # type: ignore
48 changes: 48 additions & 0 deletions records_mover/records/airbyte/airbyte.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import logging
from http import HTTPStatus
import requests
from records_mover import Session

logger = logging.getLogger(__name__)


class AirbyteEngine:
"""
Intention: Main engine of activity for connecting to and managing airflow "stuff."
General thoughts, maybe this should encapsulate making a request to airbyte
Thus we'd have a method for making a request which'd know how to authenticate
"""
session: Session

def __init__(self, session: Session):
"""
Args:
session: Optional records mover Session, exposed for testing.
If a session isn't provided, one will be requested
"""
if session is None:
self.session = Session()
else:
self.session = session

def healthcheck(self) -> bool:
self.session.set_stream_logging()
data = self.session.creds.airbyte()
if data is None:
logger.error("Could not retrieve credentials from secrets store")
return False
url = f"{data['host']}:{data['port']}/health"
username = data['user']
password = data['password']
try:
response = requests.get(url, auth=(username, password))
logger.debug(f"""Airbyte instance {data['host']}. HTTP status code
{response.status_code}""")
if response.status_code is HTTPStatus.OK.value:
return True
else:
return False
except Exception as e:
logger.error(f"""Exception encountered executing HTTP request against configured
airbyte instance: {e}""")
return False
31 changes: 23 additions & 8 deletions records_mover/records/cli.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
"""CLI to move records from place to place"""
import argparse
from odictliteral import odict
from .airbyte.airbyte import AirbyteEngine
from .job.schema import method_to_json_schema
from .job.mover import run_records_mover_job
from ..utils.json_schema import method_signature_to_json_schema
from .processing_instructions import ProcessingInstructions
from records_mover.cli.job_config_schema_as_args_parser import (JobConfigSchemaAsArgsParser,
arguments_output_to_config)
from records_mover.cli.job_config_schema_as_args_parser import (
JobConfigSchemaAsArgsParser, arguments_output_to_config
)
from records_mover.logging import set_stream_logging
from ..mover_types import JsonSchema, JobConfig
from ..version import __version__
import sys
import os
from typing import Callable, Dict, Any, TYPE_CHECKING
if TYPE_CHECKING:
from records_mover import Session
Expand Down Expand Up @@ -49,7 +52,7 @@ def job_fn(raw_config: Dict[str, Any]) -> None:
return job_fn


def build_parser() -> argparse.ArgumentParser:
def build_parser(bootstrap_session: 'Session') -> argparse.ArgumentParser:
# skip in-memory sources/targets like dataframes that don't make
# sense from the command-line
source_method_name_by_cli_name = {
Expand Down Expand Up @@ -82,10 +85,13 @@ def build_parser() -> argparse.ArgumentParser:

# https://stackoverflow.com/questions/15405636/pythons-argparse-to-show-programs-version-with-prog-and-version-string-formatt
parser.add_argument('-V', '--version', action='version', version="%(prog)s ("+__version__+")")
subparsers = parser.add_subparsers(help='subcommand_help')
from records_mover import Session
bootstrap_session = Session()

airbyte_feature_flag = os.getenv('RECORDS_MOVER_AIRBYTE_ENABLED')
if airbyte_feature_flag is not None:
parser.add_argument('-hc', '--healthcheck', action='store_true', required=False,
help='Returns health of the configured airbyte instance')

subparsers = parser.add_subparsers(help='subcommand_help')
for source in sources:
for target in targets:
name = f"{source}2{target}"
Expand All @@ -110,11 +116,20 @@ def main() -> None:
warnings.filterwarnings("ignore",
"Your application has authenticated using end user credentials")

parser = build_parser()
from records_mover import Session
session = Session()
parser = build_parser(session)
args = parser.parse_args()
raw_config = vars(args)
func = getattr(args, 'func', None)
if func is None:
if args.healthcheck:
engine = AirbyteEngine(session)
result = engine.healthcheck()
if result:
print("Airbyte Status: OK!")
else:
print("Airbyte Status: Unhealthy")
elif func is None:
parser.print_help()
else:
set_stream_logging()
Expand Down
63 changes: 63 additions & 0 deletions tests/component/airbyte/test_airbyte_healthcheck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import unittest
from http import HTTPStatus
from unittest.mock import patch, MagicMock

from records_mover.creds.base_creds import BaseCreds
from records_mover.records.airbyte.airbyte import AirbyteEngine


class MockCreds(BaseCreds):
"""
Helper class to provide credentials to the airbyte engine
"""

def airbyte(self):
return {
'user': 'username',
'password': 'password',
'host': 'host',
'port': '8000',
'endpoint': 'endpoint'
}


class MockSession:
"""
Helper class to provide credentials, just so we don't have to worry about db-facts
"""

def __init__(self):
self.creds = MockCreds()

def set_stream_logging(self):
pass


class AirbyteHealthCheckTest(unittest.TestCase):
@patch('requests.Session.send')
def test_airbyte_healthcheck_returns_true_when_healthy(self, send):
# Given
engine = AirbyteEngine(session=MockSession())
response = MagicMock()
response.status_code = HTTPStatus.OK.value
send.return_value = response

# When
result = engine.healthcheck()

# Then
self.assertTrue(result)

@patch('requests.Session.send')
def test_airbyte_healthcheck_returns_false_when_unhealthy(self, send):
# Given
engine = AirbyteEngine(session=MockSession())
response = MagicMock()
response.status_code = HTTPStatus.NOT_FOUND.value
send.return_value = response

# When
result = engine.healthcheck()

# Then
self.assertFalse(result)
5 changes: 5 additions & 0 deletions tests/unit/records/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import unittest

import mock
from mock import patch, call, Mock
from records_mover.records.cli import main

Expand Down Expand Up @@ -26,6 +28,9 @@ def test_main(self,
mock_google_auth_default.return_value = (mock_credentials, mock_project)
mock_parser = mock_argparse.ArgumentParser.return_value
mock_subparsers = mock_parser.add_subparsers.return_value
mock_args = mock.MagicMock()
mock_args.healthcheck = False
mock_parser.parse_args.return_value = mock_args
main()
# pick an example
mock_subparsers.add_parser.assert_has_calls([call('table2recordsdir',
Expand Down
Empty file.
2 changes: 2 additions & 0 deletions types/stubs/requests/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def get(url, params=None, **kwargs):
...

0 comments on commit 10b8129

Please sign in to comment.