Skip to content

Commit

Permalink
add postgresql_telemetry_sink adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
dakaoctotech authored and bojeanson committed Feb 10, 2023
1 parent fedea57 commit d0aafc2
Show file tree
Hide file tree
Showing 32 changed files with 293 additions and 972 deletions.
11 changes: 10 additions & 1 deletion .github/workflows/ci_edge_orchestrator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ jobs:
image: mongo:5.0.2
ports:
- 27017:27017
hub_monitoring_db:
image: postgres:15.1
ports:
- 5432:5432
env:
POSTGRES_DB: test
POSTGRES_USER: test
POSTGRES_PASSWORD: test
edge_model_serving:
image: ghcr.io/octo-technology/vio/edge_model_serving:latest
ports:
Expand All @@ -33,7 +41,8 @@ jobs:
ports:
- 8502:8501
env:
DATABASE_CONNECTION_URL: mongodb://localhost:27017
MONGO_DB_URI: mongodb://localhost:27017
POSTGRES_DB_URI: postgresql://test:test@localhost:5432/test
TENSORFLOW_SERVING_HOST: localhost
TENSORFLOW_SERVING_PORT: 8501
TFLITE_SERVING_HOST: localhost
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ help:
grep -E '^\.PHONY: [a-zA-Z0-9_-]+ .*?## .*$$' $(MAKEFILE_LIST) | \
awk 'BEGIN {FS = "(: |##)"}; {printf "\033[36m%-30s\033[0m %s\n", $$2, $$3}'

.PHONY: edge_model_serving ## 💁 Start model_serving service (Docker container)
model_serving:
.PHONY: edge_model_serving ## 💁 Start edge_model_serving service (Docker container)
edge_model_serving:
docker-compose up -d --build edge_model_serving

.PHONY: edge_orchestrator ## 🕵 Start edge_orchestrator service (Docker container)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@
login_password: "{{ postgres_admin_password }}"
postgresql_connection_url: "postgresql://{{ login_user }}:{{ login_password }}@{{ login_host }}:5432/{{ db }}"
shell: |
AZURE_POSTGRESQL_URL="{{ postgresql_connection_url }}" alembic -c $(pwd)/alembic.ini upgrade head
DB_CONNECTION_URL="{{ postgresql_connection_url }}" alembic -c $(pwd)/alembic.ini upgrade head
4 changes: 2 additions & 2 deletions deployment/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
alembic==1.6.5
alembic==1.9.2
ansible==4.3.0
packaging
requests[security]
Expand Down Expand Up @@ -49,4 +49,4 @@ azure-mgmt-recoveryservices==0.4.0
azure-mgmt-recoveryservicesbackup==0.6.0
azure-mgmt-notificationhubs==2.0.0
azure-mgmt-eventhub==2.0.0
psycopg2-binary==2.9.1
psycopg2-binary==2.9.5
26 changes: 24 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ services:
- ./edge_orchestrator/data/storage:/edge_orchestrator/data/storage
ports:
- 8000:8000
healthcheck:
test: [ "CMD-SHELL", "nc -zv 0.0.0.0 5432" ]
interval: 5s
timeout: 5s
retries: 5

edge_interface:
container_name: edge_interface
Expand All @@ -45,5 +50,22 @@ services:
environment:
- GF_INSTALL_PLUGINS=yesoreyeram-infinity-datasource
volumes:
- ./hub_monitoring/dashboards:/var/lib/grafana/dashboards
- ./hub_monitoring/provisioning:/etc/grafana/provisioning
- ./hub_monitoring/grafana/dashboards:/var/lib/grafana/dashboards
- ./hub_monitoring/grafana/provisioning:/etc/grafana/provisioning

hub_monitoring_db:
container_name: hub_monitoring_db
image: postgres:15.1
restart: always
environment:
POSTGRES_DB: vio
POSTGRES_USER: vio
POSTGRES_PASSWORD: vio
ports:
- 5432:5432
volumes:
- vio-postgresql-data:/var/lib/postgresql/data
- ./hub_monitoring/docker/init-db.sh:/docker-entrypoint-initdb.d/init-db.sh

volumes:
vio-postgresql-data:
1 change: 0 additions & 1 deletion edge_orchestrator/edge_orchestrator/api_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ def load_config():
elif configuration == 'edge-lite':
from edge_orchestrator.environment.edge_with_azure_container_storage import EdgeWithAzureContainerStorage
configuration_class = EdgeWithAzureContainerStorage

return configuration_class()


Expand Down
5 changes: 3 additions & 2 deletions edge_orchestrator/edge_orchestrator/environment/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
from edge_orchestrator.infrastructure.metadata_storage.mongodb_metadata_storage import MongoDbMetadataStorage
from edge_orchestrator.infrastructure.model_forward.tf_serving_wrapper import TFServingWrapper
from edge_orchestrator.infrastructure.station_config.json_station_config import JsonStationConfig
from edge_orchestrator.infrastructure.telemetry_sink.fake_telemetry_sink import FakeTelemetrySink
from edge_orchestrator.infrastructure.telemetry_sink.postgresql_telemetry_sink import PostgresTelemetrySink


class Docker(Config):
MONGO_DB_URI = os.environ.get('MONGO_DB_URI', 'mongodb://edge_db:27017/')
POSTGRES_DB_URI = os.environ.get('POSTGRES_DB_URI', 'postgresql://vio:vio@hub_monitoring_db:5432/vio')
SERVING_MODEL_URL = os.environ.get('SERVING_MODEL_URL', 'http://edge_model_serving:8501')

def __init__(self):
Expand All @@ -22,4 +23,4 @@ def __init__(self):
self.inventory, self.ROOT_PATH / 'data')
self.edge_station = EdgeStation(self.station_config, self.ROOT_PATH / 'data')
self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL, self.inventory, self.station_config)
self.telemetry_sink = FakeTelemetrySink()
self.telemetry_sink = PostgresTelemetrySink(self.POSTGRES_DB_URI)
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os

from edge_orchestrator.domain.models.edge_station import EdgeStation
from edge_orchestrator.environment.config import Config
from edge_orchestrator.infrastructure.binary_storage.filesystem_binary_storage import FileSystemBinaryStorage
Expand All @@ -9,10 +11,11 @@


class EdgeWithMongoDbMetadataStorage(Config):
MONGO_DB_URI = os.environ.get('MONGO_DB_URI', 'mongodb://edge_db:27017/')
SERVING_MODEL_URL = 'http://edge_model_serving:8501'

def __init__(self):
self.metadata_storage = MongoDbMetadataStorage('mongodb://mongodb:27017/')
self.metadata_storage = MongoDbMetadataStorage(self.MONGO_DB_URI)
self.binary_storage = FileSystemBinaryStorage(self.ROOT_PATH / 'data' / 'storage')
self.inventory = JsonInventory(self.ROOT_PATH / 'config' / 'inventory.json')
self.station_config = JsonStationConfig(self.ROOT_PATH / 'config' / 'station_configs',
Expand Down
5 changes: 3 additions & 2 deletions edge_orchestrator/edge_orchestrator/environment/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
from edge_orchestrator.infrastructure.metadata_storage.mongodb_metadata_storage import MongoDbMetadataStorage
from edge_orchestrator.infrastructure.model_forward.tf_serving_wrapper import TFServingWrapper
from edge_orchestrator.infrastructure.station_config.json_station_config import JsonStationConfig
from edge_orchestrator.infrastructure.telemetry_sink.fake_telemetry_sink import FakeTelemetrySink
from edge_orchestrator.infrastructure.telemetry_sink.postgresql_telemetry_sink import PostgresTelemetrySink
from tests.conftest import TEST_STATION_CONFIGS_FOLDER_PATH, TEST_INVENTORY_PATH, TEST_DATA_FOLDER_PATH


class Test(Config):
ROOT_PATH = Path('/tests')
MONGO_DB_URI = os.environ.get('MONGO_DB_URI', 'mongodb://edge_db:27017/')
POSTGRES_DB_URI = os.environ.get('POSTGRES_DB_URI', 'postgresql://vio:vio@hub_monitoring_db:5432/vio')
SERVING_MODEL_URL = os.environ.get('SERVING_MODEL_URL', 'http://edge_model_serving:8501')

def __init__(self):
Expand All @@ -25,4 +26,4 @@ def __init__(self):
self.inventory, TEST_DATA_FOLDER_PATH)
self.edge_station = EdgeStation(self.station_config, TEST_DATA_FOLDER_PATH)
self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL, self.inventory, self.station_config)
self.telemetry_sink = FakeTelemetrySink()
self.telemetry_sink = PostgresTelemetrySink(self.POSTGRES_DB_URI)
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
from typing import List, Dict

import pymongo

from edge_orchestrator.domain.models.item import Item
from edge_orchestrator.domain.ports.metadata_storage import MetadataStorage


class MongoDbMetadataStorage(MetadataStorage):
def __init__(self, mongodb_uri=None):
mongodb_uri = mongodb_uri or 'mongodb://localhost:27017/'
def __init__(self, mongodb_uri: str):
self.client = pymongo.MongoClient(mongodb_uri)
self.db = self.client['orchestratorDB']
self.items_metadata = self.db['items']
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from datetime import datetime
from random import randrange
from urllib.parse import urlparse
from uuid import uuid4

import psycopg2
import time
from typing import Dict

from edge_orchestrator import logger
from edge_orchestrator.domain.ports.telemetry_sink import TelemetrySink


class PostgresTelemetrySink(TelemetrySink):

def __init__(self, connection_url: str, timeout: int = 30, interval: int = 2):
self.connection_url = connection_url
self._connection = None
self._timeout = timeout
self._interval = interval
self._device_id = f'device_{randrange(42)}'

@property
def connection(self):
if self._connection:
return self._connection

result = urlparse(self.connection_url)
username, password, hostname, port = result.username, result.password, result.hostname, result.port
database = result.path[1:]

nb_retry = self._timeout // self._interval
for i in range(nb_retry):
try:
self._connection = psycopg2.connect(dbname=database, user=username, password=password,
host=hostname, port=port)
logger.debug(f'Telemetry Postgres DB took ‘{i * self._interval}‘sec to start and be migrated')
return self._connection
except psycopg2.OperationalError:
time.sleep(self._interval)
else:
raise TimeoutError(f'Unable to connect to Telemetry Postgres DB using {self.connection_url} after {self._timeout:.0f} seconds') # noqa

async def send(self, message: Dict):
try:
_id = uuid4().__str__()
device_id = self._device_id
decision = message['decision']
timestamp = datetime.now()
item_id = message['item_id']
config = message['config']

self._insert_message(_id, device_id, decision, timestamp, item_id, config)
except psycopg2.DatabaseError as e:
logger.error(f'Message was not correctly inserted into telemetry table : {e}')

def _insert_message(self, _id: str, device_id: str, decision: str, timestamp: datetime, item_id: str,
config: str):
with self.connection.cursor() as curs:
curs.execute(
'INSERT INTO iothub.telemetry '
'(id, device_id, business_decision, timestamp, item_id, config) VALUES (%s, %s, %s, %s, %s, %s)',
(_id, device_id, decision, timestamp, item_id, config)
)
self.connection.commit()
logger.warning(f'Telemetry message for item ‘{item_id}‘ stored with id ‘{_id}‘')
2 changes: 2 additions & 0 deletions edge_orchestrator/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
'fastapi==0.80.0',
'numpy==1.24.1',
'Pillow==8.4.0',
'psycopg2-binary==2.9.5',
'pymongo==4.3.3',
'uvicorn==0.20.0',
'smart_open[azure]==6.3.0'
],
extras_require={
'dev': [
'alembic==1.9.2',
'autopep8==2.0.1',
'behave==1.2.6',
'flake8==6.0.0',
Expand Down
1 change: 1 addition & 0 deletions edge_orchestrator/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@
'tests.fixtures.containers', 'tests.fixtures.items_config']

EDGE_DB_IMG = "mongo:5.0.2"
HUB_MONITORING_DB_IMG = "postgres:15.1"
EDGE_MODEL_SERVING_IMG = "ghcr.io/octo-technology/vio/edge_model_serving:latest"
EDGE_TFLITE_SERVING_IMG = "ghcr.io/octo-technology/vio/edge_tflite_serving:latest"
46 changes: 36 additions & 10 deletions edge_orchestrator/tests/fixtures/containers.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import logging
import os
from typing import Union, Tuple, Generator
from dotenv import load_dotenv

import docker
import pymongo
from _pytest.fixtures import SubRequest
from _pytest.fixtures import fixture
from alembic import command as alembic_command
from alembic.config import Config
from testcontainers.core import config
from testcontainers.core.container import DockerContainer
from testcontainers.mongodb import MongoDbContainer
from testcontainers.postgres import PostgresContainer
from typing import Union, Tuple, Generator, Optional

from tests.conftest import ROOT_REPOSITORY_PATH, EDGE_DB_IMG, EDGE_MODEL_SERVING_IMG, EDGE_TFLITE_SERVING_IMG
from tests.conftest import ROOT_REPOSITORY_PATH, EDGE_DB_IMG, EDGE_MODEL_SERVING_IMG, EDGE_TFLITE_SERVING_IMG, \
HUB_MONITORING_DB_IMG
from tests.tf_serving_container import TfServingContainer

load_dotenv()

config.MAX_TRIES = 5


Expand All @@ -31,12 +32,14 @@ def check_image_presence_or_pull_it_from_registry(image_name: str):
'password': os.environ.get('REGISTRY_PASSWORD')})


def start_test_mongo_db(image_name: str) -> Tuple[str, MongoDbContainer]:
connection_url = os.environ.get('DATABASE_CONNECTION_URL')
def start_test_db(image_name: str, connection_url: Optional[str]) -> Tuple[str, MongoDbContainer]:
container = None
if connection_url is None:
check_image_presence_or_pull_it_from_registry(image_name)
container = MongoDbContainer(image_name)
if 'mongo' in image_name:
container = MongoDbContainer(image_name)
else:
container = PostgresContainer(image_name)
container.start()
connection_url = container.get_connection_url()
return connection_url, container
Expand All @@ -49,8 +52,8 @@ def stop_test_container(container: DockerContainer):

@fixture(scope='session')
def setup_test_mongo_db() -> str:
image_name = EDGE_DB_IMG # noqa
connection_url, mongo_db_container = start_test_mongo_db(image_name=image_name)
connection_url, mongo_db_container = start_test_db(image_name=EDGE_DB_IMG,
connection_url=os.environ.get('MONGO_DB_URI'))
yield connection_url
stop_test_container(mongo_db_container)

Expand All @@ -62,6 +65,20 @@ def test_mongo_db_uri(setup_test_mongo_db) -> str:
client.drop_database('orchestratorDB')


@fixture(scope='session')
def setup_test_postgres_db() -> str:
connection_url, postgres_db_container = start_test_db(image_name=HUB_MONITORING_DB_IMG,
connection_url=os.environ.get('POSTGRES_DB_URI'))
apply_db_migrations(connection_url)
yield connection_url
stop_test_container(postgres_db_container)


@fixture(scope='function')
def test_postgres_db_uri(setup_test_postgres_db) -> str:
yield setup_test_postgres_db


def start_test_tf_serving(image_name: str, starting_log: str, exposed_model_name: str,
tf_serving_host: Union[str, None] = os.environ.get('TENSORFLOW_SERVING_HOST'),
tf_serving_port: Union[int, None] = os.environ.get('TENSORFLOW_SERVING_PORT'),
Expand Down Expand Up @@ -123,3 +140,12 @@ def test_tensorflow_serving_base_url(setup_test_tensorflow_serving) -> str:
@fixture(scope='function')
def test_tflite_serving_base_url(setup_test_tflite_serving) -> str:
return setup_test_tflite_serving


def apply_db_migrations(connection_url: str) -> bool:
os.environ['DB_CONNECTION_URL'] = connection_url
path_to_migration = ROOT_REPOSITORY_PATH / 'hub_monitoring/db_migrations'
alembic_cfg = Config(path_to_migration / 'alembic.ini')
alembic_cfg.set_main_option('script_location', (path_to_migration / 'alembic').as_posix())
alembic_command.upgrade(alembic_cfg, 'head')
return True
20 changes: 14 additions & 6 deletions edge_orchestrator/tests/functional_tests/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,29 @@
from behave.runner import Context
from fastapi.testclient import TestClient

from tests.conftest import ROOT_REPOSITORY_PATH, TEST_DATA_FOLDER_PATH, EDGE_DB_IMG, EDGE_MODEL_SERVING_IMG
from tests.fixtures.containers import start_test_mongo_db, start_test_tf_serving, stop_test_container
from tests.conftest import ROOT_REPOSITORY_PATH, TEST_DATA_FOLDER_PATH, EDGE_DB_IMG, EDGE_MODEL_SERVING_IMG, \
HUB_MONITORING_DB_IMG
from tests.fixtures.containers import start_test_db, start_test_tf_serving, stop_test_container, apply_db_migrations


def before_all(context: Context):
context.test_directory = Path(__file__).parent.parent
image_name = EDGE_DB_IMG
context.mongo_db_uri, context.mongo_db_container = start_test_mongo_db(image_name=image_name)
image_name = EDGE_MODEL_SERVING_IMG
context.mongo_db_uri, context.mongo_db_container = start_test_db(image_name=EDGE_DB_IMG,
connection_url=os.environ.get('MONGO_DB_URI'))
context.postgres_db_uri, context.postgres_db_container = start_test_db(image_name=HUB_MONITORING_DB_IMG,
connection_url=os.environ.get(
'POSTGRES_DB_URI'))
apply_db_migrations(context.postgres_db_uri)

context.tensorflow_serving_url, context.tensorflow_serving_container = start_test_tf_serving(
image_name=image_name,
image_name=EDGE_MODEL_SERVING_IMG,
starting_log=r'Entering the event loop ...',
exposed_model_name="marker_quality_control",
host_volume_path=((ROOT_REPOSITORY_PATH / 'edge_model_serving').as_posix()),
container_volume_path='/models')
os.environ['API_CONFIG'] = 'test'
os.environ['MONGO_DB_URI'] = context.mongo_db_uri
os.environ['POSTGRES_DB_URI'] = context.postgres_db_uri
os.environ['SERVING_MODEL_URL'] = context.tensorflow_serving_url
from edge_orchestrator.application.server import server
context.test_client = TestClient(server())
Expand All @@ -31,5 +37,7 @@ def after_all(context: Context):
rmtree(TEST_DATA_FOLDER_PATH / 'storage')
if context.mongo_db_container:
stop_test_container(context.mongo_db_container)
if context.postgres_db_container:
stop_test_container(context.postgres_db_container)
if context.tensorflow_serving_container:
stop_test_container(context.tensorflow_serving_container)
Loading

0 comments on commit d0aafc2

Please sign in to comment.